001/*-
002 * #%L
003 * HAPI FHIR JPA Server
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.reindex;
021
022import ca.uhn.fhir.context.FhirContext;
023import ca.uhn.fhir.context.RuntimeResourceDefinition;
024import ca.uhn.fhir.i18n.Msg;
025import ca.uhn.fhir.interceptor.model.RequestPartitionId;
026import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
027import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
028import ca.uhn.fhir.jpa.api.pid.IResourcePidList;
029import ca.uhn.fhir.jpa.api.pid.IResourcePidStream;
030import ca.uhn.fhir.jpa.api.pid.StreamTemplate;
031import ca.uhn.fhir.jpa.api.pid.TypedResourcePid;
032import ca.uhn.fhir.jpa.api.pid.TypedResourceStream;
033import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc;
034import ca.uhn.fhir.jpa.dao.data.IResourceTableDao;
035import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
036import ca.uhn.fhir.jpa.model.dao.JpaPid;
037import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
038import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
039import ca.uhn.fhir.rest.api.Constants;
040import ca.uhn.fhir.rest.api.SortSpec;
041import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
042import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
043import ca.uhn.fhir.util.DateRangeUtil;
044import jakarta.annotation.Nonnull;
045import jakarta.annotation.Nullable;
046import org.apache.commons.lang3.Validate;
047
048import java.util.Date;
049import java.util.function.Supplier;
050import java.util.stream.Stream;
051
052public class Batch2DaoSvcImpl implements IBatch2DaoSvc {
053        private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(Batch2DaoSvcImpl.class);
054
055        private final IResourceTableDao myResourceTableDao;
056
057        private final MatchUrlService myMatchUrlService;
058
059        private final DaoRegistry myDaoRegistry;
060
061        private final FhirContext myFhirContext;
062
063        private final IHapiTransactionService myTransactionService;
064
065        @Override
066        public boolean isAllResourceTypeSupported() {
067                return true;
068        }
069
070        public Batch2DaoSvcImpl(
071                        IResourceTableDao theResourceTableDao,
072                        MatchUrlService theMatchUrlService,
073                        DaoRegistry theDaoRegistry,
074                        FhirContext theFhirContext,
075                        IHapiTransactionService theTransactionService) {
076                myResourceTableDao = theResourceTableDao;
077                myMatchUrlService = theMatchUrlService;
078                myDaoRegistry = theDaoRegistry;
079                myFhirContext = theFhirContext;
080                myTransactionService = theTransactionService;
081        }
082
083        @Override
084        public IResourcePidStream fetchResourceIdStream(
085                        Date theStart, Date theEnd, RequestPartitionId theRequestPartitionId, String theUrl) {
086                if (theUrl == null) {
087                        return makeStreamResult(
088                                        theRequestPartitionId, () -> streamResourceIdsNoUrl(theStart, theEnd, theRequestPartitionId));
089                } else {
090                        return makeStreamResult(
091                                        theRequestPartitionId,
092                                        () -> streamResourceIdsWithUrl(theStart, theEnd, theUrl, theRequestPartitionId));
093                }
094        }
095
096        private Stream<TypedResourcePid> streamResourceIdsWithUrl(
097                        Date theStart, Date theEnd, String theUrl, RequestPartitionId theRequestPartitionId) {
098                validateUrl(theUrl);
099
100                SearchParameterMap searchParamMap = parseQuery(theUrl);
101                searchParamMap.setLastUpdated(DateRangeUtil.narrowDateRange(searchParamMap.getLastUpdated(), theStart, theEnd));
102
103                String resourceType = theUrl.substring(0, theUrl.indexOf('?'));
104                IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(resourceType);
105
106                SystemRequestDetails request = new SystemRequestDetails().setRequestPartitionId(theRequestPartitionId);
107
108                return dao.searchForIdStream(searchParamMap, request, null).map(pid -> new TypedResourcePid(resourceType, pid));
109        }
110
111        private static TypedResourcePid typedPidFromQueryArray(Object[] thePidTypeDateArray) {
112                String resourceType = (String) thePidTypeDateArray[1];
113                Long pid = (Long) thePidTypeDateArray[0];
114                return new TypedResourcePid(resourceType, JpaPid.fromId(pid));
115        }
116
117        @Nonnull
118        private TypedResourceStream makeStreamResult(
119                        RequestPartitionId theRequestPartitionId, Supplier<Stream<TypedResourcePid>> streamSupplier) {
120
121                IHapiTransactionService.IExecutionBuilder txSettings =
122                                myTransactionService.withSystemRequest().withRequestPartitionId(theRequestPartitionId);
123
124                StreamTemplate<TypedResourcePid> streamTemplate =
125                                StreamTemplate.fromSupplier(streamSupplier).withTransactionAdvice(txSettings);
126
127                return new TypedResourceStream(theRequestPartitionId, streamTemplate);
128        }
129
130        @Nonnull
131        private Stream<TypedResourcePid> streamResourceIdsNoUrl(
132                        Date theStart, Date theEnd, RequestPartitionId theRequestPartitionId) {
133                Stream<Object[]> rowStream;
134                if (theRequestPartitionId == null || theRequestPartitionId.isAllPartitions()) {
135                        ourLog.debug("Search for resources - all partitions");
136                        rowStream = myResourceTableDao.streamIdsTypesAndUpdateTimesOfResourcesWithinUpdatedRangeOrderedFromOldest(
137                                        theStart, theEnd);
138                } else if (theRequestPartitionId.isDefaultPartition()) {
139                        ourLog.debug("Search for resources - default partition");
140                        rowStream =
141                                        myResourceTableDao
142                                                        .streamIdsTypesAndUpdateTimesOfResourcesWithinUpdatedRangeOrderedFromOldestForDefaultPartition(
143                                                                        theStart, theEnd);
144                } else {
145                        ourLog.debug("Search for resources - partition {}", theRequestPartitionId);
146                        rowStream =
147                                        myResourceTableDao
148                                                        .streamIdsTypesAndUpdateTimesOfResourcesWithinUpdatedRangeOrderedFromOldestForPartitionIds(
149                                                                        theStart, theEnd, theRequestPartitionId.getPartitionIds());
150                }
151
152                return rowStream.map(Batch2DaoSvcImpl::typedPidFromQueryArray);
153        }
154
155        @Deprecated(since = "6.11", forRemoval = true) // delete once the default method in the interface is gone.
156        @Override
157        public IResourcePidList fetchResourceIdsPage(
158                        Date theStart, Date theEnd, @Nullable RequestPartitionId theRequestPartitionId, @Nullable String theUrl) {
159                Validate.isTrue(false, "Unimplemented");
160                return null;
161        }
162
163        private static void validateUrl(@Nonnull String theUrl) {
164                if (!theUrl.contains("?")) {
165                        throw new InternalErrorException(Msg.code(2422) + "this should never happen: URL is missing a '?'");
166                }
167        }
168
169        @Nonnull
170        private SearchParameterMap parseQuery(String theUrl) {
171                String resourceType = theUrl.substring(0, theUrl.indexOf('?'));
172                RuntimeResourceDefinition def = myFhirContext.getResourceDefinition(resourceType);
173
174                SearchParameterMap searchParamMap = myMatchUrlService.translateMatchUrl(theUrl, def);
175                // this matches idx_res_type_del_updated
176                searchParamMap.setSort(new SortSpec(Constants.PARAM_LASTUPDATED).setChain(new SortSpec(Constants.PARAM_PID)));
177                // TODO this limits us to 2G resources.
178                searchParamMap.setLoadSynchronousUpTo(Integer.MAX_VALUE);
179                return searchParamMap;
180        }
181}