001/*-
002 * #%L
003 * HAPI FHIR JPA Server
004 * %%
005 * Copyright (C) 2014 - 2025 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.batch2;
021
022import ca.uhn.fhir.context.FhirContext;
023import ca.uhn.fhir.context.RuntimeResourceDefinition;
024import ca.uhn.fhir.i18n.Msg;
025import ca.uhn.fhir.interceptor.model.RequestPartitionId;
026import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
027import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
028import ca.uhn.fhir.jpa.api.pid.IResourcePidList;
029import ca.uhn.fhir.jpa.api.pid.IResourcePidStream;
030import ca.uhn.fhir.jpa.api.pid.StreamTemplate;
031import ca.uhn.fhir.jpa.api.pid.TypedResourcePid;
032import ca.uhn.fhir.jpa.api.pid.TypedResourceStream;
033import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc;
034import ca.uhn.fhir.jpa.dao.data.IResourceLinkDao;
035import ca.uhn.fhir.jpa.dao.data.IResourceTableDao;
036import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
037import ca.uhn.fhir.jpa.model.config.PartitionSettings;
038import ca.uhn.fhir.jpa.model.dao.JpaPid;
039import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
040import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
041import ca.uhn.fhir.model.primitive.IdDt;
042import ca.uhn.fhir.rest.api.Constants;
043import ca.uhn.fhir.rest.api.SortSpec;
044import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
045import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
046import ca.uhn.fhir.util.DateRangeUtil;
047import ca.uhn.fhir.util.Logs;
048import jakarta.annotation.Nonnull;
049import jakarta.annotation.Nullable;
050import org.apache.commons.lang3.StringUtils;
051import org.apache.commons.lang3.Validate;
052import org.hl7.fhir.instance.model.api.IIdType;
053
054import java.util.Date;
055import java.util.function.Supplier;
056import java.util.stream.Stream;
057
058public class Batch2DaoSvcImpl implements IBatch2DaoSvc {
059        private static final org.slf4j.Logger ourLog = Logs.getBatchTroubleshootingLog();
060
061        private final IResourceTableDao myResourceTableDao;
062
063        private final IResourceLinkDao myResourceLinkDao;
064
065        private final MatchUrlService myMatchUrlService;
066
067        private final DaoRegistry myDaoRegistry;
068
069        private final FhirContext myFhirContext;
070
071        private final IHapiTransactionService myTransactionService;
072
073        private final PartitionSettings myPartitionSettings;
074
075        @Override
076        public boolean isAllResourceTypeSupported() {
077                return true;
078        }
079
080        public Batch2DaoSvcImpl(
081                        IResourceTableDao theResourceTableDao,
082                        IResourceLinkDao theResourceLinkDao,
083                        MatchUrlService theMatchUrlService,
084                        DaoRegistry theDaoRegistry,
085                        FhirContext theFhirContext,
086                        IHapiTransactionService theTransactionService,
087                        PartitionSettings thePartitionSettings) {
088                myResourceTableDao = theResourceTableDao;
089                myResourceLinkDao = theResourceLinkDao;
090                myMatchUrlService = theMatchUrlService;
091                myDaoRegistry = theDaoRegistry;
092                myFhirContext = theFhirContext;
093                myTransactionService = theTransactionService;
094                myPartitionSettings = thePartitionSettings;
095        }
096
097        @Override
098        public IResourcePidStream fetchResourceIdStream(
099                        Date theStart, Date theEnd, RequestPartitionId theRequestPartitionId, String theUrl) {
100                if (StringUtils.isBlank(theUrl)) {
101                        return makeStreamResult(
102                                        theRequestPartitionId, () -> streamResourceIdsNoUrl(theStart, theEnd, theRequestPartitionId));
103                } else {
104                        return makeStreamResult(
105                                        theRequestPartitionId,
106                                        () -> streamResourceIdsWithUrl(theStart, theEnd, theUrl, theRequestPartitionId));
107                }
108        }
109
110        @Override
111        public Stream<IdDt> streamSourceIdsThatReferenceTargetId(IIdType theTargetId) {
112                return myResourceLinkDao.streamSourceIdsForTargetFhirId(theTargetId.getResourceType(), theTargetId.getIdPart());
113        }
114
115        private Stream<TypedResourcePid> streamResourceIdsWithUrl(
116                        Date theStart, Date theEnd, String theUrl, RequestPartitionId theRequestPartitionId) {
117                validateUrl(theUrl);
118
119                SearchParameterMap searchParamMap = parseQuery(theUrl);
120                searchParamMap.setLastUpdated(DateRangeUtil.narrowDateRange(searchParamMap.getLastUpdated(), theStart, theEnd));
121
122                String resourceType = theUrl.substring(0, theUrl.indexOf('?'));
123                IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(resourceType);
124
125                SystemRequestDetails request = new SystemRequestDetails().setRequestPartitionId(theRequestPartitionId);
126
127                return dao.searchForIdStream(searchParamMap, request, null).map(pid -> new TypedResourcePid(resourceType, pid));
128        }
129
130        private static TypedResourcePid typedPidFromQueryArray(Object[] thePidTypeDateArray) {
131                JpaPid pid = (JpaPid) thePidTypeDateArray[0];
132                String resourceType = (String) thePidTypeDateArray[1];
133                return new TypedResourcePid(resourceType, pid);
134        }
135
136        @Nonnull
137        private TypedResourceStream makeStreamResult(
138                        RequestPartitionId theRequestPartitionId, Supplier<Stream<TypedResourcePid>> streamSupplier) {
139
140                IHapiTransactionService.IExecutionBuilder txSettings =
141                                myTransactionService.withSystemRequest().withRequestPartitionId(theRequestPartitionId);
142
143                StreamTemplate<TypedResourcePid> streamTemplate =
144                                StreamTemplate.fromSupplier(streamSupplier).withTransactionAdvice(txSettings);
145
146                return new TypedResourceStream(theRequestPartitionId, streamTemplate);
147        }
148
149        /**
150         * At the moment there is no use-case for this method.
151         * This can be cleaned up at a later point in time if there is no use for it.
152         */
153        @Nonnull
154        private Stream<TypedResourcePid> streamResourceIdsNoUrl(
155                        Date theStart, Date theEnd, RequestPartitionId theRequestPartitionId) {
156                Integer defaultPartitionId = myPartitionSettings.getDefaultPartitionId();
157                Stream<Object[]> rowStream;
158                if (theRequestPartitionId == null || theRequestPartitionId.isAllPartitions()) {
159                        ourLog.debug("Search for resources - all partitions");
160                        rowStream = myResourceTableDao.streamIdsTypesAndUpdateTimesOfResourcesWithinUpdatedRangeOrderedFromOldest(
161                                        theStart, theEnd);
162                } else if (theRequestPartitionId.isDefaultPartition(defaultPartitionId)) {
163                        ourLog.debug("Search for resources - default partition");
164                        rowStream =
165                                        myResourceTableDao
166                                                        .streamIdsTypesAndUpdateTimesOfResourcesWithinUpdatedRangeOrderedFromOldestForDefaultPartition(
167                                                                        theStart, theEnd, defaultPartitionId);
168                } else {
169                        ourLog.debug("Search for resources - partition {}", theRequestPartitionId);
170                        rowStream =
171                                        myResourceTableDao
172                                                        .streamIdsTypesAndUpdateTimesOfResourcesWithinUpdatedRangeOrderedFromOldestForPartitionIds(
173                                                                        theStart, theEnd, theRequestPartitionId.getPartitionIds());
174                }
175
176                return rowStream.map(Batch2DaoSvcImpl::typedPidFromQueryArray);
177        }
178
179        @Deprecated(since = "6.11", forRemoval = true) // delete once the default method in the interface is gone.
180        @Override
181        public IResourcePidList fetchResourceIdsPage(
182                        Date theStart, Date theEnd, @Nullable RequestPartitionId theRequestPartitionId, @Nullable String theUrl) {
183                Validate.isTrue(false, "Unimplemented");
184                return null;
185        }
186
187        private static void validateUrl(@Nonnull String theUrl) {
188                if (!theUrl.contains("?")) {
189                        throw new InternalErrorException(Msg.code(2422) + "this should never happen: URL is missing a '?'");
190                }
191        }
192
193        @Nonnull
194        private SearchParameterMap parseQuery(String theUrl) {
195                String resourceType = theUrl.substring(0, theUrl.indexOf('?'));
196                RuntimeResourceDefinition def = myFhirContext.getResourceDefinition(resourceType);
197
198                SearchParameterMap searchParamMap = myMatchUrlService.translateMatchUrl(theUrl, def);
199                // this matches idx_res_type_del_updated
200                searchParamMap.setSort(new SortSpec(Constants.PARAM_LASTUPDATED).setChain(new SortSpec(Constants.PARAM_PID)));
201                // TODO this limits us to 2G resources.
202                searchParamMap.setLoadSynchronousUpTo(Integer.MAX_VALUE);
203                return searchParamMap;
204        }
205}