001package ca.uhn.fhir.jpa.search;
002
003/*-
004 * #%L
005 * HAPI FHIR JPA Server
006 * %%
007 * Copyright (C) 2014 - 2021 Smile CDR, Inc.
008 * %%
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *      http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 * #L%family
021 */
022
023import ca.uhn.fhir.context.FhirContext;
024import ca.uhn.fhir.interceptor.api.HookParams;
025import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
026import ca.uhn.fhir.interceptor.api.Pointcut;
027import ca.uhn.fhir.interceptor.model.RequestPartitionId;
028import ca.uhn.fhir.jpa.api.config.DaoConfig;
029import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
030import ca.uhn.fhir.jpa.api.dao.IDao;
031import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
032import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
033import ca.uhn.fhir.jpa.dao.IResultIterator;
034import ca.uhn.fhir.jpa.dao.ISearchBuilder;
035import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
036import ca.uhn.fhir.jpa.entity.Search;
037import ca.uhn.fhir.jpa.entity.SearchInclude;
038import ca.uhn.fhir.jpa.entity.SearchTypeEnum;
039import ca.uhn.fhir.jpa.interceptor.JpaPreResourceAccessDetails;
040import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
041import ca.uhn.fhir.jpa.model.search.SearchStatusEnum;
042import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
043import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc;
044import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc;
045import ca.uhn.fhir.jpa.search.cache.SearchCacheStatusEnum;
046import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
047import ca.uhn.fhir.jpa.util.InterceptorUtil;
048import ca.uhn.fhir.model.api.IQueryParameterType;
049import ca.uhn.fhir.model.api.Include;
050import ca.uhn.fhir.rest.api.CacheControlDirective;
051import ca.uhn.fhir.rest.api.Constants;
052import ca.uhn.fhir.rest.api.SearchTotalModeEnum;
053import ca.uhn.fhir.rest.api.SummaryEnum;
054import ca.uhn.fhir.rest.api.server.IBundleProvider;
055import ca.uhn.fhir.rest.api.server.IPreResourceAccessDetails;
056import ca.uhn.fhir.rest.api.server.RequestDetails;
057import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
058import ca.uhn.fhir.rest.server.IPagingProvider;
059import ca.uhn.fhir.rest.server.SimpleBundleProvider;
060import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException;
061import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
062import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
063import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
064import ca.uhn.fhir.rest.server.method.PageMethodBinding;
065import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
066import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster;
067import ca.uhn.fhir.rest.server.util.ICachedSearchDetails;
068import ca.uhn.fhir.util.AsyncUtil;
069import ca.uhn.fhir.util.StopWatch;
070import co.elastic.apm.api.ElasticApm;
071import co.elastic.apm.api.Span;
072import co.elastic.apm.api.Transaction;
073import com.google.common.annotations.VisibleForTesting;
074import org.apache.commons.lang3.Validate;
075import org.apache.commons.lang3.exception.ExceptionUtils;
076import org.apache.commons.lang3.time.DateUtils;
077import org.hl7.fhir.instance.model.api.IBaseResource;
078import org.springframework.beans.factory.annotation.Autowired;
079import org.springframework.data.domain.AbstractPageRequest;
080import org.springframework.data.domain.Pageable;
081import org.springframework.data.domain.Sort;
082import org.springframework.orm.jpa.JpaDialect;
083import org.springframework.orm.jpa.JpaTransactionManager;
084import org.springframework.orm.jpa.vendor.HibernateJpaDialect;
085import org.springframework.stereotype.Component;
086import org.springframework.transaction.PlatformTransactionManager;
087import org.springframework.transaction.TransactionDefinition;
088import org.springframework.transaction.TransactionStatus;
089import org.springframework.transaction.annotation.Propagation;
090import org.springframework.transaction.annotation.Transactional;
091import org.springframework.transaction.support.TransactionCallbackWithoutResult;
092import org.springframework.transaction.support.TransactionSynchronizationManager;
093import org.springframework.transaction.support.TransactionTemplate;
094
095import javax.annotation.Nonnull;
096import javax.annotation.Nullable;
097import javax.annotation.PostConstruct;
098import javax.persistence.EntityManager;
099import javax.validation.constraints.NotNull;
100import java.io.IOException;
101import java.time.Instant;
102import java.time.temporal.ChronoUnit;
103import java.util.ArrayList;
104import java.util.Date;
105import java.util.Iterator;
106import java.util.List;
107import java.util.Optional;
108import java.util.Set;
109import java.util.UUID;
110import java.util.concurrent.Callable;
111import java.util.concurrent.ConcurrentHashMap;
112import java.util.concurrent.CountDownLatch;
113import java.util.concurrent.TimeUnit;
114
115import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
116
117@Component("mySearchCoordinatorSvc")
118public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
119        public static final int DEFAULT_SYNC_SIZE = 250;
120        public static final String UNIT_TEST_CAPTURE_STACK = "unit_test_capture_stack";
121        public static final Integer INTEGER_0 = 0;
122        private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchCoordinatorSvcImpl.class);
123        private final ConcurrentHashMap<String, SearchTask> myIdToSearchTask = new ConcurrentHashMap<>();
124        @Autowired
125        private FhirContext myContext;
126        @Autowired
127        private DaoConfig myDaoConfig;
128        @Autowired
129        private EntityManager myEntityManager;
130        private Integer myLoadingThrottleForUnitTests = null;
131        private long myMaxMillisToWaitForRemoteResults = DateUtils.MILLIS_PER_MINUTE;
132        private boolean myNeverUseLocalSearchForUnitTests;
133        @Autowired
134        private IInterceptorBroadcaster myInterceptorBroadcaster;
135        @Autowired
136        private PlatformTransactionManager myManagedTxManager;
137        @Autowired
138        private ISearchCacheSvc mySearchCacheSvc;
139        @Autowired
140        private ISearchResultCacheSvc mySearchResultCacheSvc;
141        @Autowired
142        private DaoRegistry myDaoRegistry;
143        @Autowired
144        private IPagingProvider myPagingProvider;
145        @Autowired
146        private SearchBuilderFactory mySearchBuilderFactory;
147
148        private int mySyncSize = DEFAULT_SYNC_SIZE;
149        /**
150         * Set in {@link #start()}
151         */
152        private boolean myCustomIsolationSupported;
153        @Autowired
154        private PersistedJpaBundleProviderFactory myPersistedJpaBundleProviderFactory;
155        @Autowired
156        private IRequestPartitionHelperSvc myRequestPartitionHelperService;
157
158        /**
159         * Constructor
160         */
161        @Autowired
162        public SearchCoordinatorSvcImpl() {
163                super();
164        }
165
166        @VisibleForTesting
167        Set<String> getActiveSearchIds() {
168                return myIdToSearchTask.keySet();
169        }
170
171        @VisibleForTesting
172        public void setSearchCacheServicesForUnitTest(ISearchCacheSvc theSearchCacheSvc, ISearchResultCacheSvc theSearchResultCacheSvc) {
173                mySearchCacheSvc = theSearchCacheSvc;
174                mySearchResultCacheSvc = theSearchResultCacheSvc;
175        }
176
177        @PostConstruct
178        public void start() {
179                if (myManagedTxManager instanceof JpaTransactionManager) {
180                        JpaDialect jpaDialect = ((JpaTransactionManager) myManagedTxManager).getJpaDialect();
181                        if (jpaDialect instanceof HibernateJpaDialect) {
182                                myCustomIsolationSupported = true;
183                        }
184                }
185                if (myCustomIsolationSupported == false) {
186                        ourLog.warn("JPA dialect does not support transaction isolation! This can have an impact on search performance.");
187                }
188        }
189
190        @Override
191        public void cancelAllActiveSearches() {
192                for (SearchTask next : myIdToSearchTask.values()) {
193                        ourLog.info("Requesting immediate abort of search: {}", next.getSearch().getUuid());
194                        next.requestImmediateAbort();
195                        AsyncUtil.awaitLatchAndIgnoreInterrupt(next.getCompletionLatch(), 30, TimeUnit.SECONDS);
196                }
197        }
198
199        @SuppressWarnings("SameParameterValue")
200        @VisibleForTesting
201        void setMaxMillisToWaitForRemoteResultsForUnitTest(long theMaxMillisToWaitForRemoteResults) {
202                myMaxMillisToWaitForRemoteResults = theMaxMillisToWaitForRemoteResults;
203        }
204
205        /**
206         * This method is called by the HTTP client processing thread in order to
207         * fetch resources.
208         */
209        @Override
210        @Transactional(propagation = Propagation.NEVER)
211        public List<ResourcePersistentId> getResources(final String theUuid, int theFrom, int theTo, @Nullable RequestDetails theRequestDetails) {
212                TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
213                txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
214
215                // If we're actively searching right now, don't try to do anything until at least one batch has been
216                // persisted in the DB
217                SearchTask searchTask = myIdToSearchTask.get(theUuid);
218                if (searchTask != null) {
219                        searchTask.awaitInitialSync();
220                }
221
222                ourLog.trace("About to start looking for resources {}-{}", theFrom, theTo);
223
224                Search search;
225                StopWatch sw = new StopWatch();
226                while (true) {
227
228                        if (myNeverUseLocalSearchForUnitTests == false) {
229                                if (searchTask != null) {
230                                        ourLog.trace("Local search found");
231                                        List<ResourcePersistentId> resourcePids = searchTask.getResourcePids(theFrom, theTo);
232                                        ourLog.trace("Local search returned {} pids, wanted {}-{} - Search: {}", resourcePids.size(), theFrom, theTo, searchTask.getSearch());
233
234                                        /*
235                                         * Generally, if a search task is open, the fastest possible thing is to just return its results. This
236                                         * will work most of the time, but can fail if the task hit a search threshold and the client is requesting
237                                         * results beyond that threashold. In that case, we'll keep going below, since that will trigger another
238                                         * task.
239                                         */
240                                        if ((searchTask.getSearch().getNumFound() - searchTask.getSearch().getNumBlocked()) >= theTo || resourcePids.size() == (theTo - theFrom)) {
241                                                return resourcePids;
242                                        }
243                                }
244                        }
245
246                        search = mySearchCacheSvc
247                                .fetchByUuid(theUuid)
248                                .orElseThrow(() -> newResourceGoneException(theUuid));
249
250                        verifySearchHasntFailedOrThrowInternalErrorException(search);
251                        if (search.getStatus() == SearchStatusEnum.FINISHED) {
252                                ourLog.trace("Search entity marked as finished with {} results", search.getNumFound());
253                                break;
254                        }
255                        if ((search.getNumFound() - search.getNumBlocked()) >= theTo) {
256                                ourLog.trace("Search entity has {} results so far", search.getNumFound());
257                                break;
258                        }
259
260                        if (sw.getMillis() > myMaxMillisToWaitForRemoteResults) {
261                                ourLog.error("Search {} of type {} for {}{} timed out after {}ms", search.getId(), search.getSearchType(), search.getResourceType(), search.getSearchQueryString(), sw.getMillis());
262                                throw new InternalErrorException("Request timed out after " + sw.getMillis() + "ms");
263                        }
264
265                        // If the search was saved in "pass complete mode" it's probably time to
266                        // start a new pass
267                        if (search.getStatus() == SearchStatusEnum.PASSCMPLET) {
268                                ourLog.trace("Going to try to start next search");
269                                Optional<Search> newSearch = mySearchCacheSvc.tryToMarkSearchAsInProgress(search);
270                                if (newSearch.isPresent()) {
271                                        ourLog.trace("Launching new search");
272                                        search = newSearch.get();
273                                        String resourceType = search.getResourceType();
274                                        SearchParameterMap params = search.getSearchParameterMap().orElseThrow(() -> new IllegalStateException("No map in PASSCOMPLET search"));
275                                        IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(resourceType);
276                                        RequestPartitionId requestPartitionId = myRequestPartitionHelperService.determineReadPartitionForRequestForSearchType(theRequestDetails, resourceType, params, null);
277                                        SearchContinuationTask task = new SearchContinuationTask(search, resourceDao, params, resourceType, theRequestDetails, requestPartitionId);
278                                        myIdToSearchTask.put(search.getUuid(), task);
279                                        task.call();
280                                }
281                        }
282
283                        AsyncUtil.sleep(500);
284                }
285
286                ourLog.trace("Finished looping");
287
288                List<ResourcePersistentId> pids = mySearchResultCacheSvc.fetchResultPids(search, theFrom, theTo);
289                if (pids == null) {
290                        throw newResourceGoneException(theUuid);
291                }
292
293                ourLog.trace("Fetched {} results", pids.size());
294
295                return pids;
296        }
297
298        @Nonnull
299        private ResourceGoneException newResourceGoneException(String theUuid) {
300                ourLog.trace("Client requested unknown paging ID[{}]", theUuid);
301                String msg = myContext.getLocalizer().getMessage(PageMethodBinding.class, "unknownSearchId", theUuid);
302                return new ResourceGoneException(msg);
303        }
304
305        @Override
306        public IBundleProvider registerSearch(final IFhirResourceDao<?> theCallingDao, final SearchParameterMap theParams, String theResourceType, CacheControlDirective theCacheControlDirective, RequestDetails theRequestDetails, RequestPartitionId theRequestPartitionId) {
307                final String searchUuid = UUID.randomUUID().toString();
308
309                final String queryString = theParams.toNormalizedQueryString(myContext);
310                ourLog.debug("Registering new search {}", searchUuid);
311
312                Search search = new Search();
313                populateSearchEntity(theParams, theResourceType, searchUuid, queryString, search, theRequestPartitionId);
314
315                // Interceptor call: STORAGE_PRESEARCH_REGISTERED
316                HookParams params = new HookParams()
317                        .add(ICachedSearchDetails.class, search)
318                        .add(RequestDetails.class, theRequestDetails)
319                        .addIfMatchesType(ServletRequestDetails.class, theRequestDetails)
320                        .add(SearchParameterMap.class, theParams);
321                CompositeInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, theRequestDetails, Pointcut.STORAGE_PRESEARCH_REGISTERED, params);
322
323                Class<? extends IBaseResource> resourceTypeClass = myContext.getResourceDefinition(theResourceType).getImplementingClass();
324                final ISearchBuilder sb = mySearchBuilderFactory.newSearchBuilder(theCallingDao, theResourceType, resourceTypeClass);
325                sb.setFetchSize(mySyncSize);
326
327                final Integer loadSynchronousUpTo = getLoadSynchronousUpToOrNull(theCacheControlDirective);
328                boolean isOffsetQuery = theParams.isOffsetQuery();
329
330                if (theParams.isLoadSynchronous() || loadSynchronousUpTo != null || isOffsetQuery) {
331                        ourLog.debug("Search {} is loading in synchronous mode", searchUuid);
332                        return executeQuery(theResourceType, theParams, theRequestDetails, searchUuid, sb, loadSynchronousUpTo, theRequestPartitionId);
333                }
334
335                /*
336                 * See if there are any cached searches whose results we can return
337                 * instead
338                 */
339                SearchCacheStatusEnum cacheStatus = SearchCacheStatusEnum.MISS;
340                if (theCacheControlDirective != null && theCacheControlDirective.isNoCache() == true) {
341                        cacheStatus = SearchCacheStatusEnum.NOT_TRIED;
342                }
343
344                if (cacheStatus != SearchCacheStatusEnum.NOT_TRIED) {
345                        if (theParams.getEverythingMode() == null) {
346                                if (myDaoConfig.getReuseCachedSearchResultsForMillis() != null) {
347                                        PersistedJpaBundleProvider foundSearchProvider = findCachedQuery(theParams, theResourceType, theRequestDetails, queryString, theRequestPartitionId);
348                                        if (foundSearchProvider != null) {
349                                                foundSearchProvider.setCacheStatus(SearchCacheStatusEnum.HIT);
350                                                return foundSearchProvider;
351                                        }
352                                }
353                        }
354                }
355
356                PersistedJpaSearchFirstPageBundleProvider retVal = submitSearch(theCallingDao, theParams, theResourceType, theRequestDetails, searchUuid, sb, queryString, theRequestPartitionId, search);
357                retVal.setCacheStatus(cacheStatus);
358                return retVal;
359        }
360
361        @Override
362        public Optional<Integer> getSearchTotal(String theUuid) {
363                SearchTask task = myIdToSearchTask.get(theUuid);
364                if (task != null) {
365                        return Optional.ofNullable(task.awaitInitialSync());
366                }
367
368                /*
369                 * In case there is no running search, if the total is listed as accurate we know one is coming
370                 * so let's wait a bit for it to show up
371                 */
372                TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
373                txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
374                Optional<Search> search = mySearchCacheSvc.fetchByUuid(theUuid);
375                if (search.isPresent()) {
376                        Optional<SearchParameterMap> searchParameterMap = search.get().getSearchParameterMap();
377                        if (searchParameterMap.isPresent() && searchParameterMap.get().getSearchTotalMode() == SearchTotalModeEnum.ACCURATE) {
378                                for (int i = 0; i < 10; i++) {
379                                        if (search.isPresent()) {
380                                                verifySearchHasntFailedOrThrowInternalErrorException(search.get());
381                                                if (search.get().getTotalCount() != null) {
382                                                        return Optional.of(search.get().getTotalCount());
383                                                }
384                                        }
385                                        search = mySearchCacheSvc.fetchByUuid(theUuid);
386                                }
387                        }
388                }
389
390                return Optional.empty();
391        }
392
393        @NotNull
394        private PersistedJpaSearchFirstPageBundleProvider submitSearch(IDao theCallingDao, SearchParameterMap theParams, String theResourceType, RequestDetails theRequestDetails, String theSearchUuid, ISearchBuilder theSb, String theQueryString, RequestPartitionId theRequestPartitionId, Search theSearch) {
395                StopWatch w = new StopWatch();
396//              Search search = new Search();
397                //TODO GGG MOVE THIS POPULATE AND ALSO THE HOOK CALL HIGHER UP IN THE STACK.
398//              populateSearchEntity(theParams, theResourceType, theSearchUuid, theQueryString, search, theRequestPartitionId);
399
400//               Interceptor call: STORAGE_PRESEARCH_REGISTERED
401//              HookParams params = new HookParams()
402//                      .add(ICachedSearchDetails.class, search)
403//                      .add(RequestDetails.class, theRequestDetails)
404//                      .addIfMatchesType(ServletRequestDetails.class, theRequestDetails)
405//                      .add(SearchParameterMap.class, theParams);
406//              CompositeInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, theRequestDetails, Pointcut.STORAGE_PRESEARCH_REGISTERED, params);
407
408                SearchTask task = new SearchTask(theSearch, theCallingDao, theParams, theResourceType, theRequestDetails, theRequestPartitionId);
409                myIdToSearchTask.put(theSearch.getUuid(), task);
410                task.call();
411
412                PersistedJpaSearchFirstPageBundleProvider retVal = myPersistedJpaBundleProviderFactory.newInstanceFirstPage(theRequestDetails, theSearch, task, theSb);
413
414                ourLog.debug("Search initial phase completed in {}ms", w.getMillis());
415                return retVal;
416        }
417
418        @Nullable
419        private PersistedJpaBundleProvider findCachedQuery(SearchParameterMap theParams, String theResourceType, RequestDetails theRequestDetails, String theQueryString, RequestPartitionId theRequestPartitionId) {
420                TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
421
422                // May be null
423                return txTemplate.execute(t -> {
424
425                        // Interceptor call: STORAGE_PRECHECK_FOR_CACHED_SEARCH
426                        HookParams params = new HookParams()
427                                .add(SearchParameterMap.class, theParams)
428                                .add(RequestDetails.class, theRequestDetails)
429                                .addIfMatchesType(ServletRequestDetails.class, theRequestDetails);
430                        Object outcome = CompositeInterceptorBroadcaster.doCallHooksAndReturnObject(myInterceptorBroadcaster, theRequestDetails, Pointcut.STORAGE_PRECHECK_FOR_CACHED_SEARCH, params);
431                        if (Boolean.FALSE.equals(outcome)) {
432                                return null;
433                        }
434
435                        // Check for a search matching the given hash
436                        Search searchToUse = findSearchToUseOrNull(theQueryString, theResourceType, theRequestPartitionId);
437                        if (searchToUse == null) {
438                                return null;
439                        }
440
441                        ourLog.debug("Reusing search {} from cache", searchToUse.getUuid());
442                        // Interceptor call: JPA_PERFTRACE_SEARCH_REUSING_CACHED
443                        params = new HookParams()
444                                .add(SearchParameterMap.class, theParams)
445                                .add(RequestDetails.class, theRequestDetails)
446                                .addIfMatchesType(ServletRequestDetails.class, theRequestDetails);
447                        CompositeInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, theRequestDetails, Pointcut.JPA_PERFTRACE_SEARCH_REUSING_CACHED, params);
448
449                        return myPersistedJpaBundleProviderFactory.newInstance(theRequestDetails, searchToUse.getUuid());
450                });
451        }
452
453        @Nullable
454        private Search findSearchToUseOrNull(String theQueryString, String theResourceType, RequestPartitionId theRequestPartitionId) {
455                // createdCutoff is in recent past
456                final Instant createdCutoff = Instant.now().minus(myDaoConfig.getReuseCachedSearchResultsForMillis(), ChronoUnit.MILLIS);
457
458                Optional<Search> candidate = mySearchCacheSvc.findCandidatesForReuse(theResourceType, theQueryString, createdCutoff, theRequestPartitionId);
459                return candidate.orElse(null);
460        }
461
462        private IBundleProvider executeQuery(String theResourceType, SearchParameterMap theParams, RequestDetails theRequestDetails, String theSearchUuid, ISearchBuilder theSb, Integer theLoadSynchronousUpTo, RequestPartitionId theRequestPartitionId) {
463                SearchRuntimeDetails searchRuntimeDetails = new SearchRuntimeDetails(theRequestDetails, theSearchUuid);
464                searchRuntimeDetails.setLoadSynchronous(true);
465
466                boolean wantOnlyCount = isWantOnlyCount(theParams);
467                boolean wantCount = isWantCount(theParams, wantOnlyCount);
468
469                // Execute the query and make sure we return distinct results
470                TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
471                txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
472                txTemplate.setReadOnly(theParams.isLoadSynchronous() || theParams.isOffsetQuery());
473                return txTemplate.execute(t -> {
474
475                        // Load the results synchronously
476                        final List<ResourcePersistentId> pids = new ArrayList<>();
477
478                        Long count = 0L;
479                        if (wantCount) {
480                                ourLog.trace("Performing count");
481                                // TODO FulltextSearchSvcImpl will remove necessary parameters from the "theParams", this will cause actual query after count to
482                                //  return wrong response. This is some dirty fix to avoid that issue. Params should not be mutated?
483                                //  Maybe instead of removing them we could skip them in db query builder if full text search was used?
484                                List<List<IQueryParameterType>> contentAndTerms = theParams.get(Constants.PARAM_CONTENT);
485                                List<List<IQueryParameterType>> textAndTerms = theParams.get(Constants.PARAM_TEXT);
486
487                                Iterator<Long> countIterator = theSb.createCountQuery(theParams, theSearchUuid, theRequestDetails, theRequestPartitionId);
488
489                                if (contentAndTerms != null) theParams.put(Constants.PARAM_CONTENT, contentAndTerms);
490                                if (textAndTerms != null) theParams.put(Constants.PARAM_TEXT, textAndTerms);
491
492                                count = countIterator.next();
493                                ourLog.trace("Got count {}", count);
494                        }
495
496                        if (wantOnlyCount) {
497                                SimpleBundleProvider bundleProvider = new SimpleBundleProvider();
498                                bundleProvider.setSize(count.intValue());
499                                return bundleProvider;
500                        }
501
502                        try (IResultIterator resultIter = theSb.createQuery(theParams, searchRuntimeDetails, theRequestDetails, theRequestPartitionId)) {
503                                while (resultIter.hasNext()) {
504                                        pids.add(resultIter.next());
505                                        if (theLoadSynchronousUpTo != null && pids.size() >= theLoadSynchronousUpTo) {
506                                                break;
507                                        }
508                                        if (theParams.getLoadSynchronousUpTo() != null && pids.size() >= theParams.getLoadSynchronousUpTo()) {
509                                                break;
510                                        }
511                                }
512                        } catch (IOException e) {
513                                ourLog.error("IO failure during database access", e);
514                                throw new InternalErrorException(e);
515                        }
516
517                        JpaPreResourceAccessDetails accessDetails = new JpaPreResourceAccessDetails(pids, () -> theSb);
518                        HookParams params = new HookParams()
519                                .add(IPreResourceAccessDetails.class, accessDetails)
520                                .add(RequestDetails.class, theRequestDetails)
521                                .addIfMatchesType(ServletRequestDetails.class, theRequestDetails);
522                        CompositeInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, theRequestDetails, Pointcut.STORAGE_PREACCESS_RESOURCES, params);
523
524                        for (int i = pids.size() - 1; i >= 0; i--) {
525                                if (accessDetails.isDontReturnResourceAtIndex(i)) {
526                                        pids.remove(i);
527                                }
528                        }
529
530                        /*
531                         * For synchronous queries, we load all the includes right away
532                         * since we're returning a static bundle with all the results
533                         * pre-loaded. This is ok because synchronous requests are not
534                         * expected to be paged
535                         *
536                         * On the other hand for async queries we load includes/revincludes
537                         * individually for pages as we return them to clients
538                         */
539
540                        // _includes
541                        Integer maxIncludes = myDaoConfig.getMaximumIncludesToLoadPerPage();
542                        final Set<ResourcePersistentId> includedPids = theSb.loadIncludes(myContext, myEntityManager, pids, theParams.getRevIncludes(), true, theParams.getLastUpdated(), "(synchronous)", theRequestDetails, maxIncludes);
543                        if (maxIncludes != null) {
544                                maxIncludes -= includedPids.size();
545                        }
546                        pids.addAll(includedPids);
547                        List<ResourcePersistentId> includedPidsList = new ArrayList<>(includedPids);
548
549                        // _revincludes
550                        if (theParams.getEverythingMode() == null && (maxIncludes == null || maxIncludes > 0)) {
551                                Set<ResourcePersistentId> revIncludedPids = theSb.loadIncludes(myContext, myEntityManager, pids, theParams.getIncludes(), false, theParams.getLastUpdated(), "(synchronous)", theRequestDetails, maxIncludes);
552                                includedPids.addAll(revIncludedPids);
553                                pids.addAll(revIncludedPids);
554                                includedPidsList.addAll(revIncludedPids);
555                        }
556
557                        List<IBaseResource> resources = new ArrayList<>();
558                        theSb.loadResourcesByPid(pids, includedPidsList, resources, false, theRequestDetails);
559
560                        // Hook: STORAGE_PRESHOW_RESOURCES
561                        resources = InterceptorUtil.fireStoragePreshowResource(resources, theRequestDetails, myInterceptorBroadcaster);
562
563                        SimpleBundleProvider bundleProvider = new SimpleBundleProvider(resources);
564                        if (theParams.isOffsetQuery()) {
565                                bundleProvider.setCurrentPageOffset(theParams.getOffset());
566                                bundleProvider.setCurrentPageSize(theParams.getCount());
567                        }
568
569                        if (wantCount) {
570                                bundleProvider.setSize(count.intValue());
571                        } else {
572                                Integer queryCount = getQueryCount(theLoadSynchronousUpTo, theParams);
573                                if (queryCount == null || queryCount > resources.size()) {
574                                        // No limit, last page or everything was fetched within the limit
575                                        bundleProvider.setSize(getTotalCount(queryCount, theParams.getOffset(), resources.size()));
576                                } else {
577                                        bundleProvider.setSize(null);
578                                }
579                        }
580
581                        return bundleProvider;
582                });
583        }
584
585        private int getTotalCount(Integer queryCount, Integer offset, int queryResultCount) {
586                if (queryCount != null) {
587                        if (offset != null) {
588                                return offset + queryResultCount;
589                        } else {
590                                return queryResultCount;
591                        }
592                } else {
593                        return queryResultCount;
594                }
595        }
596
597        private Integer getQueryCount(Integer theLoadSynchronousUpTo, SearchParameterMap theParams) {
598                if (theLoadSynchronousUpTo != null) {
599                        return theLoadSynchronousUpTo;
600                } else if (theParams.getCount() != null) {
601                        return theParams.getCount();
602                } else if (myDaoConfig.getFetchSizeDefaultMaximum() != null) {
603                        return myDaoConfig.getFetchSizeDefaultMaximum();
604                }
605                return null;
606        }
607
608        @Nullable
609        private Integer getLoadSynchronousUpToOrNull(CacheControlDirective theCacheControlDirective) {
610                final Integer loadSynchronousUpTo;
611                if (theCacheControlDirective != null && theCacheControlDirective.isNoStore()) {
612                        if (theCacheControlDirective.getMaxResults() != null) {
613                                loadSynchronousUpTo = theCacheControlDirective.getMaxResults();
614                                if (loadSynchronousUpTo > myDaoConfig.getCacheControlNoStoreMaxResultsUpperLimit()) {
615                                        throw new InvalidRequestException(Constants.HEADER_CACHE_CONTROL + " header " + Constants.CACHE_CONTROL_MAX_RESULTS + " value must not exceed " + myDaoConfig.getCacheControlNoStoreMaxResultsUpperLimit());
616                                }
617                        } else {
618                                loadSynchronousUpTo = 100;
619                        }
620                } else {
621                        loadSynchronousUpTo = null;
622                }
623                return loadSynchronousUpTo;
624        }
625
626        @VisibleForTesting
627        void setContextForUnitTest(FhirContext theCtx) {
628                myContext = theCtx;
629        }
630
631        @VisibleForTesting
632        void setDaoConfigForUnitTest(DaoConfig theDaoConfig) {
633                myDaoConfig = theDaoConfig;
634        }
635
636        @VisibleForTesting
637        void setEntityManagerForUnitTest(EntityManager theEntityManager) {
638                myEntityManager = theEntityManager;
639        }
640
641        @VisibleForTesting
642        public void setLoadingThrottleForUnitTests(Integer theLoadingThrottleForUnitTests) {
643                myLoadingThrottleForUnitTests = theLoadingThrottleForUnitTests;
644        }
645
646        @VisibleForTesting
647        public void setNeverUseLocalSearchForUnitTests(boolean theNeverUseLocalSearchForUnitTests) {
648                myNeverUseLocalSearchForUnitTests = theNeverUseLocalSearchForUnitTests;
649        }
650
651        @VisibleForTesting
652        public void setSyncSizeForUnitTests(int theSyncSize) {
653                mySyncSize = theSyncSize;
654        }
655
656        @VisibleForTesting
657        void setTransactionManagerForUnitTest(PlatformTransactionManager theTxManager) {
658                myManagedTxManager = theTxManager;
659        }
660
661        @VisibleForTesting
662        void setDaoRegistryForUnitTest(DaoRegistry theDaoRegistry) {
663                myDaoRegistry = theDaoRegistry;
664        }
665
666        @VisibleForTesting
667        void setInterceptorBroadcasterForUnitTest(IInterceptorBroadcaster theInterceptorBroadcaster) {
668                myInterceptorBroadcaster = theInterceptorBroadcaster;
669        }
670
671        @VisibleForTesting
672        public void setSearchBuilderFactoryForUnitTest(SearchBuilderFactory theSearchBuilderFactory) {
673                mySearchBuilderFactory = theSearchBuilderFactory;
674        }
675
676        @VisibleForTesting
677        public void setPersistedJpaBundleProviderFactoryForUnitTest(PersistedJpaBundleProviderFactory thePersistedJpaBundleProviderFactory) {
678                myPersistedJpaBundleProviderFactory = thePersistedJpaBundleProviderFactory;
679        }
680
681        @VisibleForTesting
682        public void setRequestPartitionHelperService(IRequestPartitionHelperSvc theRequestPartitionHelperService) {
683                myRequestPartitionHelperService = theRequestPartitionHelperService;
684        }
685
686        private boolean isWantCount(SearchParameterMap myParams, boolean wantOnlyCount) {
687                return wantOnlyCount ||
688                        SearchTotalModeEnum.ACCURATE.equals(myParams.getSearchTotalMode()) ||
689                        (myParams.getSearchTotalMode() == null && SearchTotalModeEnum.ACCURATE.equals(myDaoConfig.getDefaultTotalMode()));
690        }
691
692        /**
693         * A search task is a Callable task that runs in
694         * a thread pool to handle an individual search. One instance
695         * is created for any requested search and runs from the
696         * beginning to the end of the search.
697         * <p>
698         * Understand:
699         * This class executes in its own thread separate from the
700         * web server client thread that made the request. We do that
701         * so that we can return to the client as soon as possible,
702         * but keep the search going in the background (and have
703         * the next page of results ready to go when the client asks).
704         */
705        public class SearchTask implements Callable<Void> {
706                private final SearchParameterMap myParams;
707                private final IDao myCallingDao;
708                private final String myResourceType;
709                private final ArrayList<ResourcePersistentId> mySyncedPids = new ArrayList<>();
710                private final CountDownLatch myInitialCollectionLatch = new CountDownLatch(1);
711                private final CountDownLatch myCompletionLatch;
712                private final ArrayList<ResourcePersistentId> myUnsyncedPids = new ArrayList<>();
713                private final RequestDetails myRequest;
714                private final RequestPartitionId myRequestPartitionId;
715                private final SearchRuntimeDetails mySearchRuntimeDetails;
716                private final Transaction myParentTransaction;
717                private Search mySearch;
718                private boolean myAbortRequested;
719                private int myCountSavedTotal = 0;
720                private int myCountSavedThisPass = 0;
721                private int myCountBlockedThisPass = 0;
722                private boolean myAdditionalPrefetchThresholdsRemaining;
723                private List<ResourcePersistentId> myPreviouslyAddedResourcePids;
724                private Integer myMaxResultsToFetch;
725
726                /**
727                 * Constructor
728                 */
729                protected SearchTask(Search theSearch, IDao theCallingDao, SearchParameterMap theParams, String theResourceType, RequestDetails theRequest, RequestPartitionId theRequestPartitionId) {
730                        mySearch = theSearch;
731                        myCallingDao = theCallingDao;
732                        myParams = theParams;
733                        myResourceType = theResourceType;
734                        myCompletionLatch = new CountDownLatch(1);
735                        mySearchRuntimeDetails = new SearchRuntimeDetails(theRequest, mySearch.getUuid());
736                        mySearchRuntimeDetails.setQueryString(theParams.toNormalizedQueryString(theCallingDao.getContext()));
737                        myRequestPartitionId = theRequestPartitionId;
738                        myRequest = theRequest;
739                        myParentTransaction = ElasticApm.currentTransaction();
740                }
741
742                /**
743                 * This method is called by the server HTTP thread, and
744                 * will block until at least one page of results have been
745                 * fetched from the DB, and will never block after that.
746                 */
747                Integer awaitInitialSync() {
748                        ourLog.trace("Awaiting initial sync");
749                        do {
750                                ourLog.trace("Search {} aborted: {}", getSearch().getUuid(), !isNotAborted());
751                                if (AsyncUtil.awaitLatchAndThrowInternalErrorExceptionOnInterrupt(getInitialCollectionLatch(), 250L, TimeUnit.MILLISECONDS)) {
752                                        break;
753                                }
754                        } while (getSearch().getStatus() == SearchStatusEnum.LOADING);
755                        ourLog.trace("Initial sync completed");
756
757                        return getSearch().getTotalCount();
758                }
759
760                protected Search getSearch() {
761                        return mySearch;
762                }
763
764                CountDownLatch getInitialCollectionLatch() {
765                        return myInitialCollectionLatch;
766                }
767
768                void setPreviouslyAddedResourcePids(List<ResourcePersistentId> thePreviouslyAddedResourcePids) {
769                        myPreviouslyAddedResourcePids = thePreviouslyAddedResourcePids;
770                        myCountSavedTotal = myPreviouslyAddedResourcePids.size();
771                }
772
773                private ISearchBuilder newSearchBuilder() {
774                        Class<? extends IBaseResource> resourceTypeClass = myContext.getResourceDefinition(myResourceType).getImplementingClass();
775                        return mySearchBuilderFactory.newSearchBuilder(myCallingDao, myResourceType, resourceTypeClass);
776                }
777
778                @Nonnull
779                List<ResourcePersistentId> getResourcePids(int theFromIndex, int theToIndex) {
780                        ourLog.debug("Requesting search PIDs from {}-{}", theFromIndex, theToIndex);
781
782                        boolean keepWaiting;
783                        do {
784                                synchronized (mySyncedPids) {
785                                        ourLog.trace("Search status is {}", mySearch.getStatus());
786                                        boolean haveEnoughResults = mySyncedPids.size() >= theToIndex;
787                                        if (!haveEnoughResults) {
788                                                switch (mySearch.getStatus()) {
789                                                        case LOADING:
790                                                                keepWaiting = true;
791                                                                break;
792                                                        case PASSCMPLET:
793                                                                /*
794                                                                 * If we get here, it means that the user requested resources that crossed the
795                                                                 * current pre-fetch boundary. For example, if the prefetch threshold is 50 and the
796                                                                 * user has requested resources 0-60, then they would get 0-50 back but the search
797                                                                 * coordinator would then stop searching.SearchCoordinatorSvcImplTest
798                                                                 */
799                                                                keepWaiting = false;
800                                                                break;
801                                                        case FAILED:
802                                                        case FINISHED:
803                                                        case GONE:
804                                                        default:
805                                                                keepWaiting = false;
806                                                                break;
807                                                }
808                                        } else {
809                                                keepWaiting = false;
810                                        }
811                                }
812
813                                if (keepWaiting) {
814                                        ourLog.info("Waiting as we only have {} results - Search status: {}", mySyncedPids.size(), mySearch.getStatus());
815                                        AsyncUtil.sleep(500L);
816                                }
817                        } while (keepWaiting);
818
819                        ourLog.debug("Proceeding, as we have {} results", mySyncedPids.size());
820
821                        ArrayList<ResourcePersistentId> retVal = new ArrayList<>();
822                        synchronized (mySyncedPids) {
823                                verifySearchHasntFailedOrThrowInternalErrorException(mySearch);
824
825                                int toIndex = theToIndex;
826                                if (mySyncedPids.size() < toIndex) {
827                                        toIndex = mySyncedPids.size();
828                                }
829                                for (int i = theFromIndex; i < toIndex; i++) {
830                                        retVal.add(mySyncedPids.get(i));
831                                }
832                        }
833
834                        ourLog.trace("Done syncing results - Wanted {}-{} and returning {} of {}", theFromIndex, theToIndex, retVal.size(), mySyncedPids.size());
835
836                        return retVal;
837                }
838
839                void saveSearch() {
840                        TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
841                        txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
842                        txTemplate.execute(new TransactionCallbackWithoutResult() {
843                                @Override
844                                protected void doInTransactionWithoutResult(@Nonnull @NotNull TransactionStatus theArg0) {
845                                        doSaveSearch();
846                                }
847
848                        });
849                }
850
851                private void saveUnsynced(final IResultIterator theResultIter) {
852                        TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
853                        txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
854                        txTemplate.execute(new TransactionCallbackWithoutResult() {
855                                @Override
856                                protected void doInTransactionWithoutResult(@Nonnull @NotNull TransactionStatus theArg0) {
857                                        if (mySearch.getId() == null) {
858                                                doSaveSearch();
859                                        }
860
861                                        ArrayList<ResourcePersistentId> unsyncedPids = myUnsyncedPids;
862                                        int countBlocked = 0;
863
864                                        // Interceptor call: STORAGE_PREACCESS_RESOURCES
865                                        // This can be used to remove results from the search result details before
866                                        // the user has a chance to know that they were in the results
867                                        if (mySearchRuntimeDetails.getRequestDetails() != null && unsyncedPids.isEmpty() == false) {
868                                                JpaPreResourceAccessDetails accessDetails = new JpaPreResourceAccessDetails(unsyncedPids, () -> newSearchBuilder());
869                                                HookParams params = new HookParams()
870                                                        .add(IPreResourceAccessDetails.class, accessDetails)
871                                                        .add(RequestDetails.class, mySearchRuntimeDetails.getRequestDetails())
872                                                        .addIfMatchesType(ServletRequestDetails.class, mySearchRuntimeDetails.getRequestDetails());
873                                                CompositeInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, myRequest, Pointcut.STORAGE_PREACCESS_RESOURCES, params);
874
875                                                for (int i = unsyncedPids.size() - 1; i >= 0; i--) {
876                                                        if (accessDetails.isDontReturnResourceAtIndex(i)) {
877                                                                unsyncedPids.remove(i);
878                                                                myCountBlockedThisPass++;
879                                                                myCountSavedTotal++;
880                                                                countBlocked++;
881                                                        }
882                                                }
883                                        }
884
885                                        // Actually store the results in the query cache storage
886                                        myCountSavedTotal += unsyncedPids.size();
887                                        myCountSavedThisPass += unsyncedPids.size();
888                                        mySearchResultCacheSvc.storeResults(mySearch, mySyncedPids, unsyncedPids);
889
890                                        synchronized (mySyncedPids) {
891                                                int numSyncedThisPass = unsyncedPids.size();
892                                                ourLog.trace("Syncing {} search results - Have more: {}", numSyncedThisPass, theResultIter.hasNext());
893                                                mySyncedPids.addAll(unsyncedPids);
894                                                unsyncedPids.clear();
895
896                                                if (theResultIter.hasNext() == false) {
897                                                        int skippedCount = theResultIter.getSkippedCount();
898                                                        int nonSkippedCount = theResultIter.getNonSkippedCount();
899                                                        int totalFetched = skippedCount + myCountSavedThisPass + myCountBlockedThisPass;
900                                                        ourLog.trace("MaxToFetch[{}] SkippedCount[{}] CountSavedThisPass[{}] CountSavedThisTotal[{}] AdditionalPrefetchRemaining[{}]", myMaxResultsToFetch, skippedCount, myCountSavedThisPass, myCountSavedTotal, myAdditionalPrefetchThresholdsRemaining);
901
902                                                        if (nonSkippedCount == 0 || (myMaxResultsToFetch != null && totalFetched < myMaxResultsToFetch)) {
903                                                                ourLog.trace("Setting search status to FINISHED");
904                                                                mySearch.setStatus(SearchStatusEnum.FINISHED);
905                                                                mySearch.setTotalCount(myCountSavedTotal - countBlocked);
906                                                        } else if (myAdditionalPrefetchThresholdsRemaining) {
907                                                                ourLog.trace("Setting search status to PASSCMPLET");
908                                                                mySearch.setStatus(SearchStatusEnum.PASSCMPLET);
909                                                                mySearch.setSearchParameterMap(myParams);
910                                                        } else {
911                                                                ourLog.trace("Setting search status to FINISHED");
912                                                                mySearch.setStatus(SearchStatusEnum.FINISHED);
913                                                                mySearch.setTotalCount(myCountSavedTotal - countBlocked);
914                                                        }
915                                                }
916                                        }
917
918                                        mySearch.setNumFound(myCountSavedTotal);
919                                        mySearch.setNumBlocked(mySearch.getNumBlocked() + countBlocked);
920
921                                        int numSynced;
922                                        synchronized (mySyncedPids) {
923                                                numSynced = mySyncedPids.size();
924                                        }
925
926                                        if (myDaoConfig.getCountSearchResultsUpTo() == null ||
927                                                myDaoConfig.getCountSearchResultsUpTo() <= 0 ||
928                                                myDaoConfig.getCountSearchResultsUpTo() <= numSynced) {
929                                                myInitialCollectionLatch.countDown();
930                                        }
931
932                                        doSaveSearch();
933
934                                }
935                        });
936
937                }
938
939                boolean isNotAborted() {
940                        return myAbortRequested == false;
941                }
942
943                void markComplete() {
944                        myCompletionLatch.countDown();
945                }
946
947                CountDownLatch getCompletionLatch() {
948                        return myCompletionLatch;
949                }
950
951                /**
952                 * Request that the task abort as soon as possible
953                 */
954                void requestImmediateAbort() {
955                        myAbortRequested = true;
956                }
957
958                /**
959                 * This is the method which actually performs the search.
960                 * It is called automatically by the thread pool.
961                 */
962                @Override
963                public Void call() {
964                        StopWatch sw = new StopWatch();
965                        Span span = myParentTransaction.startSpan("db", "query", "search");
966                        span.setName("FHIR Database Search");
967                        try {
968                                // Create an initial search in the DB and give it an ID
969                                saveSearch();
970
971                                TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
972                                txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
973
974                                if (myCustomIsolationSupported) {
975                                        txTemplate.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
976                                }
977
978                                txTemplate.execute(new TransactionCallbackWithoutResult() {
979                                        @Override
980                                        protected void doInTransactionWithoutResult(@Nonnull TransactionStatus theStatus) {
981                                                doSearch();
982                                        }
983                                });
984
985                                mySearchRuntimeDetails.setSearchStatus(mySearch.getStatus());
986                                if (mySearch.getStatus() == SearchStatusEnum.FINISHED) {
987                                        HookParams params = new HookParams()
988                                                .add(RequestDetails.class, myRequest)
989                                                .addIfMatchesType(ServletRequestDetails.class, myRequest)
990                                                .add(SearchRuntimeDetails.class, mySearchRuntimeDetails);
991                                        CompositeInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, myRequest, Pointcut.JPA_PERFTRACE_SEARCH_COMPLETE, params);
992                                } else {
993                                        HookParams params = new HookParams()
994                                                .add(RequestDetails.class, myRequest)
995                                                .addIfMatchesType(ServletRequestDetails.class, myRequest)
996                                                .add(SearchRuntimeDetails.class, mySearchRuntimeDetails);
997                                        CompositeInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, myRequest, Pointcut.JPA_PERFTRACE_SEARCH_PASS_COMPLETE, params);
998                                }
999
1000                                ourLog.trace("Have completed search for [{}{}] and found {} resources in {}ms - Status is {}", mySearch.getResourceType(), mySearch.getSearchQueryString(), mySyncedPids.size(), sw.getMillis(), mySearch.getStatus());
1001
1002                        } catch (Throwable t) {
1003
1004                                /*
1005                                 * Don't print a stack trace for client errors (i.e. requests that
1006                                 * aren't valid because the client screwed up).. that's just noise
1007                                 * in the logs and who needs that.
1008                                 */
1009                                boolean logged = false;
1010                                if (t instanceof BaseServerResponseException) {
1011                                        BaseServerResponseException exception = (BaseServerResponseException) t;
1012                                        if (exception.getStatusCode() >= 400 && exception.getStatusCode() < 500) {
1013                                                logged = true;
1014                                                ourLog.warn("Failed during search due to invalid request: {}", t.toString());
1015                                        }
1016                                }
1017
1018                                if (!logged) {
1019                                        ourLog.error("Failed during search loading after {}ms", sw.getMillis(), t);
1020                                }
1021                                myUnsyncedPids.clear();
1022                                Throwable rootCause = ExceptionUtils.getRootCause(t);
1023                                rootCause = defaultIfNull(rootCause, t);
1024
1025                                String failureMessage = rootCause.getMessage();
1026
1027                                int failureCode = InternalErrorException.STATUS_CODE;
1028                                if (t instanceof BaseServerResponseException) {
1029                                        failureCode = ((BaseServerResponseException) t).getStatusCode();
1030                                }
1031
1032                                if (System.getProperty(UNIT_TEST_CAPTURE_STACK) != null) {
1033                                        failureMessage += "\nStack\n" + ExceptionUtils.getStackTrace(rootCause);
1034                                }
1035
1036                                mySearch.setFailureMessage(failureMessage);
1037                                mySearch.setFailureCode(failureCode);
1038                                mySearch.setStatus(SearchStatusEnum.FAILED);
1039
1040                                mySearchRuntimeDetails.setSearchStatus(mySearch.getStatus());
1041                                HookParams params = new HookParams()
1042                                        .add(RequestDetails.class, myRequest)
1043                                        .addIfMatchesType(ServletRequestDetails.class, myRequest)
1044                                        .add(SearchRuntimeDetails.class, mySearchRuntimeDetails);
1045                                CompositeInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, myRequest, Pointcut.JPA_PERFTRACE_SEARCH_FAILED, params);
1046
1047                                saveSearch();
1048                                span.captureException(t);
1049                        } finally {
1050
1051                                myIdToSearchTask.remove(mySearch.getUuid());
1052                                myInitialCollectionLatch.countDown();
1053                                markComplete();
1054                                span.end();
1055
1056                        }
1057                        return null;
1058                }
1059
1060                private void doSaveSearch() {
1061                        Search newSearch = mySearchCacheSvc.save(mySearch);
1062
1063                        // mySearchDao.save is not supposed to return null, but in unit tests
1064                        // it can if the mock search dao isn't set up to handle that
1065                        if (newSearch != null) {
1066                                mySearch = newSearch;
1067                        }
1068                }
1069
1070                /**
1071                 * This method actually creates the database query to perform the
1072                 * search, and starts it.
1073                 */
1074                private void doSearch() {
1075
1076                        /*
1077                         * If the user has explicitly requested a _count, perform a
1078                         *
1079                         * SELECT COUNT(*) ....
1080                         *
1081                         * before doing anything else.
1082                         */
1083                        boolean wantOnlyCount = isWantOnlyCount(myParams);
1084                        boolean wantCount = isWantCount(myParams, wantOnlyCount);
1085                        if (wantCount) {
1086                                ourLog.trace("Performing count");
1087                                ISearchBuilder sb = newSearchBuilder();
1088                                Iterator<Long> countIterator = sb.createCountQuery(myParams, mySearch.getUuid(), myRequest, myRequestPartitionId);
1089                                Long count = countIterator.hasNext() ? countIterator.next() : 0L;
1090                                ourLog.trace("Got count {}", count);
1091
1092                                TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
1093                                txTemplate.execute(new TransactionCallbackWithoutResult() {
1094                                        @Override
1095                                        protected void doInTransactionWithoutResult(@Nonnull TransactionStatus theArg0) {
1096                                                mySearch.setTotalCount(count.intValue());
1097                                                if (wantOnlyCount) {
1098                                                        mySearch.setStatus(SearchStatusEnum.FINISHED);
1099                                                }
1100                                                doSaveSearch();
1101                                        }
1102                                });
1103                                if (wantOnlyCount) {
1104                                        return;
1105                                }
1106                        }
1107
1108                        ourLog.trace("Done count");
1109                        ISearchBuilder sb = newSearchBuilder();
1110
1111                        /*
1112                         * Figure out how many results we're actually going to fetch from the
1113                         * database in this pass. This calculation takes into consideration the
1114                         * "pre-fetch thresholds" specified in DaoConfig#getSearchPreFetchThresholds()
1115                         * as well as the value of the _count parameter.
1116                         */
1117                        int currentlyLoaded = defaultIfNull(mySearch.getNumFound(), 0);
1118                        int minWanted = 0;
1119                        if (myParams.getCount() != null) {
1120                                minWanted = myParams.getCount();
1121                                minWanted = Math.min(minWanted, myPagingProvider.getMaximumPageSize());
1122                                minWanted += currentlyLoaded;
1123                        }
1124
1125                        for (Iterator<Integer> iter = myDaoConfig.getSearchPreFetchThresholds().iterator(); iter.hasNext(); ) {
1126                                int next = iter.next();
1127                                if (next != -1 && next <= currentlyLoaded) {
1128                                        continue;
1129                                }
1130
1131                                if (next == -1) {
1132                                        sb.setMaxResultsToFetch(null);
1133                                } else {
1134                                        myMaxResultsToFetch = Math.max(next, minWanted);
1135                                        sb.setMaxResultsToFetch(myMaxResultsToFetch);
1136                                }
1137
1138                                if (iter.hasNext()) {
1139                                        myAdditionalPrefetchThresholdsRemaining = true;
1140                                }
1141
1142                                // If we get here's we've found an appropriate threshold
1143                                break;
1144                        }
1145
1146                        /*
1147                         * Provide any PID we loaded in previous search passes to the
1148                         * SearchBuilder so that we don't get duplicates coming from running
1149                         * the same query again.
1150                         *
1151                         * We could possibly accomplish this in a different way by using sorted
1152                         * results in our SQL query and specifying an offset. I don't actually
1153                         * know if that would be faster or not. At some point should test this
1154                         * idea.
1155                         */
1156                        if (myPreviouslyAddedResourcePids != null) {
1157                                sb.setPreviouslyAddedResourcePids(myPreviouslyAddedResourcePids);
1158                                mySyncedPids.addAll(myPreviouslyAddedResourcePids);
1159                        }
1160
1161                        /*
1162                         * Construct the SQL query we'll be sending to the database
1163                         */
1164                        try (IResultIterator resultIterator = sb.createQuery(myParams, mySearchRuntimeDetails, myRequest, myRequestPartitionId)) {
1165                                assert (resultIterator != null);
1166
1167                                /*
1168                                 * The following loop actually loads the PIDs of the resources
1169                                 * matching the search off of the disk and into memory. After
1170                                 * every X results, we commit to the HFJ_SEARCH table.
1171                                 */
1172                                int syncSize = mySyncSize;
1173                                while (resultIterator.hasNext()) {
1174                                        myUnsyncedPids.add(resultIterator.next());
1175
1176                                        boolean shouldSync = myUnsyncedPids.size() >= syncSize;
1177
1178                                        if (myDaoConfig.getCountSearchResultsUpTo() != null &&
1179                                                myDaoConfig.getCountSearchResultsUpTo() > 0 &&
1180                                                myDaoConfig.getCountSearchResultsUpTo() < myUnsyncedPids.size()) {
1181                                                shouldSync = false;
1182                                        }
1183
1184                                        if (myUnsyncedPids.size() > 50000) {
1185                                                shouldSync = true;
1186                                        }
1187
1188                                        // If no abort was requested, bail out
1189                                        Validate.isTrue(isNotAborted(), "Abort has been requested");
1190
1191                                        if (shouldSync) {
1192                                                saveUnsynced(resultIterator);
1193                                        }
1194
1195                                        if (myLoadingThrottleForUnitTests != null) {
1196                                                AsyncUtil.sleep(myLoadingThrottleForUnitTests);
1197                                        }
1198
1199                                }
1200
1201                                // If no abort was requested, bail out
1202                                Validate.isTrue(isNotAborted(), "Abort has been requested");
1203
1204                                saveUnsynced(resultIterator);
1205
1206                        } catch (IOException e) {
1207                                ourLog.error("IO failure during database access", e);
1208                                throw new InternalErrorException(e);
1209                        }
1210                }
1211        }
1212
1213        public class SearchContinuationTask extends SearchTask {
1214
1215                public SearchContinuationTask(Search theSearch, IDao theCallingDao, SearchParameterMap theParams, String theResourceType, RequestDetails theRequest, RequestPartitionId theRequestPartitionId) {
1216                        super(theSearch, theCallingDao, theParams, theResourceType, theRequest, theRequestPartitionId);
1217                }
1218
1219                @Override
1220                public Void call() {
1221                        try {
1222                                TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
1223                                txTemplate.afterPropertiesSet();
1224                                txTemplate.execute(t -> {
1225                                        List<ResourcePersistentId> previouslyAddedResourcePids = mySearchResultCacheSvc.fetchAllResultPids(getSearch());
1226                                        if (previouslyAddedResourcePids == null) {
1227                                                throw newResourceGoneException(getSearch().getUuid());
1228                                        }
1229
1230                                        ourLog.trace("Have {} previously added IDs in search: {}", previouslyAddedResourcePids.size(), getSearch().getUuid());
1231                                        setPreviouslyAddedResourcePids(previouslyAddedResourcePids);
1232                                        return null;
1233                                });
1234                        } catch (Throwable e) {
1235                                ourLog.error("Failure processing search", e);
1236                                getSearch().setFailureMessage(e.getMessage());
1237                                getSearch().setStatus(SearchStatusEnum.FAILED);
1238                                if (e instanceof BaseServerResponseException) {
1239                                        getSearch().setFailureCode(((BaseServerResponseException) e).getStatusCode());
1240                                }
1241
1242                                saveSearch();
1243                                return null;
1244                        }
1245
1246                        return super.call();
1247                }
1248
1249        }
1250
1251        private static boolean isWantOnlyCount(SearchParameterMap myParams) {
1252                return SummaryEnum.COUNT.equals(myParams.getSummaryMode())
1253                        | INTEGER_0.equals(myParams.getCount());
1254        }
1255
1256        public static void populateSearchEntity(SearchParameterMap theParams, String theResourceType, String theSearchUuid, String theQueryString, Search theSearch, RequestPartitionId theRequestPartitionId) {
1257                theSearch.setDeleted(false);
1258                theSearch.setUuid(theSearchUuid);
1259                theSearch.setCreated(new Date());
1260                theSearch.setTotalCount(null);
1261                theSearch.setNumFound(0);
1262                theSearch.setPreferredPageSize(theParams.getCount());
1263                theSearch.setSearchType(theParams.getEverythingMode() != null ? SearchTypeEnum.EVERYTHING : SearchTypeEnum.SEARCH);
1264                theSearch.setLastUpdated(theParams.getLastUpdated());
1265                theSearch.setResourceType(theResourceType);
1266                theSearch.setStatus(SearchStatusEnum.LOADING);
1267                theSearch.setSearchQueryString(theQueryString, theRequestPartitionId);
1268
1269                if (theParams.hasIncludes()) {
1270                        for (Include next : theParams.getIncludes()) {
1271                                theSearch.addInclude(new SearchInclude(theSearch, next.getValue(), false, next.isRecurse()));
1272                        }
1273                }
1274
1275                for (Include next : theParams.getRevIncludes()) {
1276                        theSearch.addInclude(new SearchInclude(theSearch, next.getValue(), true, next.isRecurse()));
1277                }
1278        }
1279
1280        /**
1281         * Creates a {@link Pageable} using a start and end index
1282         */
1283        @SuppressWarnings("WeakerAccess")
1284        @Nullable
1285        public static Pageable toPage(final int theFromIndex, int theToIndex) {
1286                int pageSize = theToIndex - theFromIndex;
1287                if (pageSize < 1) {
1288                        return null;
1289                }
1290
1291                int pageIndex = theFromIndex / pageSize;
1292
1293                Pageable page = new AbstractPageRequest(pageIndex, pageSize) {
1294                        private static final long serialVersionUID = 1L;
1295
1296                        @Override
1297                        public long getOffset() {
1298                                return theFromIndex;
1299                        }
1300
1301                        @Override
1302                        public Sort getSort() {
1303                                return Sort.unsorted();
1304                        }
1305
1306                        @Override
1307                        public Pageable next() {
1308                                return null;
1309                        }
1310
1311                        @Override
1312                        public Pageable previous() {
1313                                return null;
1314                        }
1315
1316                        @Override
1317                        public Pageable first() {
1318                                return null;
1319                        }
1320
1321                        @Override
1322                        public Pageable withPage(int theI) {
1323                                return null;
1324                        }
1325                };
1326
1327                return page;
1328        }
1329
1330        static void verifySearchHasntFailedOrThrowInternalErrorException(Search theSearch) {
1331                if (theSearch.getStatus() == SearchStatusEnum.FAILED) {
1332                        Integer status = theSearch.getFailureCode();
1333                        status = defaultIfNull(status, 500);
1334
1335                        String message = theSearch.getFailureMessage();
1336                        throw BaseServerResponseException.newInstance(status, message);
1337                }
1338        }
1339
1340}