001package ca.uhn.fhir.jpa.search;
002
003/*-
004 * #%L
005 * HAPI FHIR JPA Server
006 * %%
007 * Copyright (C) 2014 - 2022 Smile CDR, Inc.
008 * %%
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *      http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 * #L%family
021 */
022
023import ca.uhn.fhir.context.FhirContext;
024import ca.uhn.fhir.i18n.Msg;
025import ca.uhn.fhir.interceptor.api.HookParams;
026import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
027import ca.uhn.fhir.interceptor.api.Pointcut;
028import ca.uhn.fhir.interceptor.model.RequestPartitionId;
029import ca.uhn.fhir.jpa.api.config.DaoConfig;
030import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
031import ca.uhn.fhir.jpa.api.dao.IDao;
032import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
033import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
034import ca.uhn.fhir.jpa.dao.BaseStorageDao;
035import ca.uhn.fhir.jpa.dao.IResultIterator;
036import ca.uhn.fhir.jpa.dao.ISearchBuilder;
037import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
038import ca.uhn.fhir.jpa.entity.Search;
039import ca.uhn.fhir.jpa.entity.SearchInclude;
040import ca.uhn.fhir.jpa.entity.SearchTypeEnum;
041import ca.uhn.fhir.jpa.interceptor.JpaPreResourceAccessDetails;
042import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
043import ca.uhn.fhir.jpa.model.search.SearchStatusEnum;
044import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
045import ca.uhn.fhir.jpa.search.builder.predicate.ResourceLinkPredicateBuilder;
046import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc;
047import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc;
048import ca.uhn.fhir.jpa.search.cache.SearchCacheStatusEnum;
049import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
050import ca.uhn.fhir.model.api.Include;
051import ca.uhn.fhir.rest.api.CacheControlDirective;
052import ca.uhn.fhir.rest.api.Constants;
053import ca.uhn.fhir.rest.api.RestSearchParameterTypeEnum;
054import ca.uhn.fhir.rest.api.SearchTotalModeEnum;
055import ca.uhn.fhir.rest.api.server.IBundleProvider;
056import ca.uhn.fhir.rest.api.server.IPreResourceAccessDetails;
057import ca.uhn.fhir.rest.api.server.RequestDetails;
058import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
059import ca.uhn.fhir.rest.server.IPagingProvider;
060import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException;
061import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
062import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
063import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
064import ca.uhn.fhir.rest.server.method.PageMethodBinding;
065import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
066import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster;
067import ca.uhn.fhir.rest.server.util.ICachedSearchDetails;
068import ca.uhn.fhir.rest.server.util.ISearchParamRegistry;
069import ca.uhn.fhir.util.AsyncUtil;
070import ca.uhn.fhir.util.StopWatch;
071import ca.uhn.fhir.util.UrlUtil;
072import co.elastic.apm.api.ElasticApm;
073import co.elastic.apm.api.Span;
074import co.elastic.apm.api.Transaction;
075import com.google.common.annotations.VisibleForTesting;
076import org.apache.commons.lang3.Validate;
077import org.apache.commons.lang3.exception.ExceptionUtils;
078import org.apache.commons.lang3.time.DateUtils;
079import org.hl7.fhir.instance.model.api.IBaseResource;
080import org.springframework.beans.factory.annotation.Autowired;
081import org.springframework.data.domain.AbstractPageRequest;
082import org.springframework.data.domain.Pageable;
083import org.springframework.data.domain.Sort;
084import org.springframework.orm.jpa.JpaDialect;
085import org.springframework.orm.jpa.JpaTransactionManager;
086import org.springframework.orm.jpa.vendor.HibernateJpaDialect;
087import org.springframework.stereotype.Component;
088import org.springframework.transaction.PlatformTransactionManager;
089import org.springframework.transaction.TransactionDefinition;
090import org.springframework.transaction.TransactionStatus;
091import org.springframework.transaction.annotation.Propagation;
092import org.springframework.transaction.annotation.Transactional;
093import org.springframework.transaction.support.TransactionCallbackWithoutResult;
094import org.springframework.transaction.support.TransactionTemplate;
095
096import javax.annotation.Nonnull;
097import javax.annotation.Nullable;
098import javax.annotation.PostConstruct;
099import java.io.IOException;
100import java.time.Instant;
101import java.time.temporal.ChronoUnit;
102import java.util.ArrayList;
103import java.util.Date;
104import java.util.Iterator;
105import java.util.List;
106import java.util.Optional;
107import java.util.Set;
108import java.util.UUID;
109import java.util.concurrent.Callable;
110import java.util.concurrent.ConcurrentHashMap;
111import java.util.concurrent.CountDownLatch;
112import java.util.concurrent.TimeUnit;
113import java.util.stream.Collectors;
114
115import static ca.uhn.fhir.jpa.util.SearchParameterMapCalculator.isWantCount;
116import static ca.uhn.fhir.jpa.util.SearchParameterMapCalculator.isWantOnlyCount;
117import static java.util.Objects.nonNull;
118import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
119import static org.apache.commons.lang3.StringUtils.isBlank;
120import static org.apache.commons.lang3.StringUtils.isNotBlank;
121
122@Component("mySearchCoordinatorSvc")
123public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
124        public static final int DEFAULT_SYNC_SIZE = 250;
125        public static final String UNIT_TEST_CAPTURE_STACK = "unit_test_capture_stack";
126
127        private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchCoordinatorSvcImpl.class);
128        private final ConcurrentHashMap<String, SearchTask> myIdToSearchTask = new ConcurrentHashMap<>();
129        @Autowired
130        private FhirContext myContext;
131        @Autowired
132        private DaoConfig myDaoConfig;
133
134        private Integer myLoadingThrottleForUnitTests = null;
135        private long myMaxMillisToWaitForRemoteResults = DateUtils.MILLIS_PER_MINUTE;
136        private boolean myNeverUseLocalSearchForUnitTests;
137        @Autowired
138        private IInterceptorBroadcaster myInterceptorBroadcaster;
139        @Autowired
140        private PlatformTransactionManager myManagedTxManager;
141        @Autowired
142        private ISearchCacheSvc mySearchCacheSvc;
143        @Autowired
144        private ISearchResultCacheSvc mySearchResultCacheSvc;
145        @Autowired
146        private DaoRegistry myDaoRegistry;
147        @Autowired
148        private IPagingProvider myPagingProvider;
149        @Autowired
150        private SearchBuilderFactory mySearchBuilderFactory;
151
152        @Autowired
153        private ISynchronousSearchSvc mySynchronousSearchSvc;
154
155        private int mySyncSize = DEFAULT_SYNC_SIZE;
156        /**
157         * Set in {@link #start()}
158         */
159        private boolean myCustomIsolationSupported;
160        @Autowired
161        private PersistedJpaBundleProviderFactory myPersistedJpaBundleProviderFactory;
162        @Autowired
163        private IRequestPartitionHelperSvc myRequestPartitionHelperService;
164        @Autowired
165        private ISearchParamRegistry mySearchParamRegistry;
166        @Autowired
167        private SearchStrategyFactory mySearchStrategyFactory;
168
169        /**
170         * Constructor
171         */
172        @Autowired
173        public SearchCoordinatorSvcImpl() {
174                super();
175        }
176
177        @VisibleForTesting
178        Set<String> getActiveSearchIds() {
179                return myIdToSearchTask.keySet();
180        }
181
182        @VisibleForTesting
183        public void setSearchCacheServicesForUnitTest(ISearchCacheSvc theSearchCacheSvc, ISearchResultCacheSvc theSearchResultCacheSvc) {
184                mySearchCacheSvc = theSearchCacheSvc;
185                mySearchResultCacheSvc = theSearchResultCacheSvc;
186        }
187
188        @PostConstruct
189        public void start() {
190                if (myManagedTxManager instanceof JpaTransactionManager) {
191                        JpaDialect jpaDialect = ((JpaTransactionManager) myManagedTxManager).getJpaDialect();
192                        if (jpaDialect instanceof HibernateJpaDialect) {
193                                myCustomIsolationSupported = true;
194                        }
195                }
196                if (myCustomIsolationSupported == false) {
197                        ourLog.warn("JPA dialect does not support transaction isolation! This can have an impact on search performance.");
198                }
199        }
200
201        @Override
202        public void cancelAllActiveSearches() {
203                for (SearchTask next : myIdToSearchTask.values()) {
204                        ourLog.info("Requesting immediate abort of search: {}", next.getSearch().getUuid());
205                        next.requestImmediateAbort();
206                        AsyncUtil.awaitLatchAndIgnoreInterrupt(next.getCompletionLatch(), 30, TimeUnit.SECONDS);
207                }
208        }
209
210        @SuppressWarnings("SameParameterValue")
211        @VisibleForTesting
212        void setMaxMillisToWaitForRemoteResultsForUnitTest(long theMaxMillisToWaitForRemoteResults) {
213                myMaxMillisToWaitForRemoteResults = theMaxMillisToWaitForRemoteResults;
214        }
215
216        /**
217         * facade over raw hook intererface
218         */
219        public class StorageInterceptorHooks {
220                /**
221                 * Interceptor call: STORAGE_PRESEARCH_REGISTERED
222                 *
223                 * @param theRequestDetails
224                 * @param theParams
225                 * @param search
226                 */
227                private void callStoragePresearchRegistered(RequestDetails theRequestDetails, SearchParameterMap theParams, Search search) {
228                        HookParams params = new HookParams()
229                                .add(ICachedSearchDetails.class, search)
230                                .add(RequestDetails.class, theRequestDetails)
231                                .addIfMatchesType(ServletRequestDetails.class, theRequestDetails)
232                                .add(SearchParameterMap.class, theParams);
233                        CompositeInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, theRequestDetails, Pointcut.STORAGE_PRESEARCH_REGISTERED, params);
234                }
235                //private IInterceptorBroadcaster myInterceptorBroadcaster;
236        }
237        private StorageInterceptorHooks myStorageInterceptorHooks = new StorageInterceptorHooks();
238
239        /**
240         * This method is called by the HTTP client processing thread in order to
241         * fetch resources.
242         */
243        @Override
244        @Transactional(propagation = Propagation.NEVER)
245        public List<ResourcePersistentId> getResources(final String theUuid, int theFrom, int theTo, @Nullable RequestDetails theRequestDetails) {
246                TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
247                txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
248
249                // If we're actively searching right now, don't try to do anything until at least one batch has been
250                // persisted in the DB
251                SearchTask searchTask = myIdToSearchTask.get(theUuid);
252                if (searchTask != null) {
253                        searchTask.awaitInitialSync();
254                }
255
256                ourLog.trace("About to start looking for resources {}-{}", theFrom, theTo);
257
258                Search search;
259                StopWatch sw = new StopWatch();
260                while (true) {
261
262                        if (myNeverUseLocalSearchForUnitTests == false) {
263                                if (searchTask != null) {
264                                        ourLog.trace("Local search found");
265                                        List<ResourcePersistentId> resourcePids = searchTask.getResourcePids(theFrom, theTo);
266                                        ourLog.trace("Local search returned {} pids, wanted {}-{} - Search: {}", resourcePids.size(), theFrom, theTo, searchTask.getSearch());
267
268                                        /*
269                                         * Generally, if a search task is open, the fastest possible thing is to just return its results. This
270                                         * will work most of the time, but can fail if the task hit a search threshold and the client is requesting
271                                         * results beyond that threashold. In that case, we'll keep going below, since that will trigger another
272                                         * task.
273                                         */
274                                        if ((searchTask.getSearch().getNumFound() - searchTask.getSearch().getNumBlocked()) >= theTo || resourcePids.size() == (theTo - theFrom)) {
275                                                return resourcePids;
276                                        }
277                                }
278                        }
279
280                        search = mySearchCacheSvc
281                                .fetchByUuid(theUuid)
282                                .orElseThrow(() -> newResourceGoneException(theUuid));
283
284                        verifySearchHasntFailedOrThrowInternalErrorException(search);
285                        if (search.getStatus() == SearchStatusEnum.FINISHED) {
286                                ourLog.trace("Search entity marked as finished with {} results", search.getNumFound());
287                                break;
288                        }
289                        if ((search.getNumFound() - search.getNumBlocked()) >= theTo) {
290                                ourLog.trace("Search entity has {} results so far", search.getNumFound());
291                                break;
292                        }
293
294                        if (sw.getMillis() > myMaxMillisToWaitForRemoteResults) {
295                                ourLog.error("Search {} of type {} for {}{} timed out after {}ms", search.getId(), search.getSearchType(), search.getResourceType(), search.getSearchQueryString(), sw.getMillis());
296                                throw new InternalErrorException(Msg.code(1163) + "Request timed out after " + sw.getMillis() + "ms");
297                        }
298
299                        // If the search was saved in "pass complete mode" it's probably time to
300                        // start a new pass
301                        if (search.getStatus() == SearchStatusEnum.PASSCMPLET) {
302                                ourLog.trace("Going to try to start next search");
303                                Optional<Search> newSearch = mySearchCacheSvc.tryToMarkSearchAsInProgress(search);
304                                if (newSearch.isPresent()) {
305                                        ourLog.trace("Launching new search");
306                                        search = newSearch.get();
307                                        String resourceType = search.getResourceType();
308                                        SearchParameterMap params = search.getSearchParameterMap().orElseThrow(() -> new IllegalStateException("No map in PASSCOMPLET search"));
309                                        IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(resourceType);
310                                        RequestPartitionId requestPartitionId = myRequestPartitionHelperService.determineReadPartitionForRequestForSearchType(theRequestDetails, resourceType, params, null);
311                                        SearchContinuationTask task = new SearchContinuationTask(search, resourceDao, params, resourceType, theRequestDetails, requestPartitionId);
312                                        myIdToSearchTask.put(search.getUuid(), task);
313                                        task.call();
314                                }
315                        }
316
317                        AsyncUtil.sleep(500);
318                }
319
320                ourLog.trace("Finished looping");
321
322                List<ResourcePersistentId> pids = mySearchResultCacheSvc.fetchResultPids(search, theFrom, theTo);
323                if (pids == null) {
324                        throw newResourceGoneException(theUuid);
325                }
326
327                ourLog.trace("Fetched {} results", pids.size());
328
329                return pids;
330        }
331
332        @Nonnull
333        private ResourceGoneException newResourceGoneException(String theUuid) {
334                ourLog.trace("Client requested unknown paging ID[{}]", theUuid);
335                String msg = myContext.getLocalizer().getMessage(PageMethodBinding.class, "unknownSearchId", theUuid);
336                return new ResourceGoneException(msg);
337        }
338
339        @Override
340        public IBundleProvider registerSearch(final IFhirResourceDao<?> theCallingDao, final SearchParameterMap theParams, String theResourceType, CacheControlDirective theCacheControlDirective, RequestDetails theRequestDetails, RequestPartitionId theRequestPartitionId) {
341                final String searchUuid = UUID.randomUUID().toString();
342
343                final String queryString = theParams.toNormalizedQueryString(myContext);
344                ourLog.debug("Registering new search {}", searchUuid);
345
346                Search search = new Search();
347                populateSearchEntity(theParams, theResourceType, searchUuid, queryString, search, theRequestPartitionId);
348
349                myStorageInterceptorHooks.callStoragePresearchRegistered(theRequestDetails, theParams, search);
350
351                validateSearch(theParams);
352
353                Class<? extends IBaseResource> resourceTypeClass = myContext.getResourceDefinition(theResourceType).getImplementingClass();
354                final ISearchBuilder sb = mySearchBuilderFactory.newSearchBuilder(theCallingDao, theResourceType, resourceTypeClass);
355                sb.setFetchSize(mySyncSize);
356
357                final Integer loadSynchronousUpTo = getLoadSynchronousUpToOrNull(theCacheControlDirective);
358                boolean isOffsetQuery = theParams.isOffsetQuery();
359
360                // todo someday - not today.
361//              SearchStrategyFactory.ISearchStrategy searchStrategy = mySearchStrategyFactory.pickStrategy(theResourceType, theParams, theRequestDetails);
362//              return searchStrategy.get();
363
364                if (mySearchStrategyFactory.isSupportsHSearchDirect(theResourceType, theParams, theRequestDetails)) {
365                        ourLog.info("Search {} is using direct load strategy", searchUuid);
366                        SearchStrategyFactory.ISearchStrategy direct =  mySearchStrategyFactory.makeDirectStrategy(searchUuid, theResourceType, theParams, theRequestDetails);
367                        return direct.get();
368                }
369
370                if (theParams.isLoadSynchronous() || loadSynchronousUpTo != null || isOffsetQuery) {
371                        ourLog.debug("Search {} is loading in synchronous mode", searchUuid);
372                        return mySynchronousSearchSvc.executeQuery(theParams, theRequestDetails, searchUuid, sb, loadSynchronousUpTo, theRequestPartitionId);
373                }
374
375                /*
376                 * See if there are any cached searches whose results we can return
377                 * instead
378                 */
379                SearchCacheStatusEnum cacheStatus = SearchCacheStatusEnum.MISS;
380                if (theCacheControlDirective != null && theCacheControlDirective.isNoCache() == true) {
381                        cacheStatus = SearchCacheStatusEnum.NOT_TRIED;
382                }
383
384                if (cacheStatus != SearchCacheStatusEnum.NOT_TRIED) {
385                        if (theParams.getEverythingMode() == null) {
386                                if (myDaoConfig.getReuseCachedSearchResultsForMillis() != null) {
387                                        PersistedJpaBundleProvider foundSearchProvider = findCachedQuery(theParams, theResourceType, theRequestDetails, queryString, theRequestPartitionId);
388                                        if (foundSearchProvider != null) {
389                                                foundSearchProvider.setCacheStatus(SearchCacheStatusEnum.HIT);
390                                                return foundSearchProvider;
391                                        }
392                                }
393                        }
394                }
395
396                PersistedJpaSearchFirstPageBundleProvider retVal = submitSearch(theCallingDao, theParams, theResourceType, theRequestDetails, searchUuid, sb, queryString, theRequestPartitionId, search);
397                retVal.setCacheStatus(cacheStatus);
398                return retVal;
399        }
400
401        private void validateSearch(SearchParameterMap theParams) {
402                validateIncludes(theParams.getIncludes(), Constants.PARAM_INCLUDE);
403                validateIncludes(theParams.getRevIncludes(), Constants.PARAM_REVINCLUDE);
404        }
405
406        private void validateIncludes(Set<Include> includes, String name) {
407                for (Include next : includes) {
408                        String value = next.getValue();
409                        if (value.equals(Constants.INCLUDE_STAR) || isBlank(value)) {
410                                continue;
411                        }
412
413                        String paramType = next.getParamType();
414                        String paramName = next.getParamName();
415                        String paramTargetType = next.getParamTargetType();
416
417                        if (isBlank(paramType) || isBlank(paramName)) {
418                                String msg = myContext.getLocalizer().getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidInclude", name, value, "");
419                                throw new InvalidRequestException(Msg.code(2018) + msg);
420                        }
421
422                        if (!myDaoRegistry.isResourceTypeSupported(paramType)) {
423                                String resourceTypeMsg = myContext.getLocalizer().getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidResourceType", paramType);
424                                String msg = myContext.getLocalizer().getMessage(SearchCoordinatorSvcImpl.class, "invalidInclude", UrlUtil.sanitizeUrlPart(name), UrlUtil.sanitizeUrlPart(value), resourceTypeMsg); // last param is pre-sanitized
425                                throw new InvalidRequestException(Msg.code(2017) + msg);
426                        }
427
428                        if (isNotBlank(paramTargetType) && !myDaoRegistry.isResourceTypeSupported(paramTargetType)) {
429                                String resourceTypeMsg = myContext.getLocalizer().getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidResourceType", paramTargetType);
430                                String msg = myContext.getLocalizer().getMessage(SearchCoordinatorSvcImpl.class, "invalidInclude", UrlUtil.sanitizeUrlPart(name), UrlUtil.sanitizeUrlPart(value), resourceTypeMsg); // last param is pre-sanitized
431                                throw new InvalidRequestException(Msg.code(2016) + msg);
432                        }
433
434                        if (!Constants.INCLUDE_STAR.equals(paramName) && mySearchParamRegistry.getActiveSearchParam(paramType, paramName) == null) {
435                                List<String> validNames = mySearchParamRegistry
436                                        .getActiveSearchParams(paramType)
437                                        .values()
438                                        .stream()
439                                        .filter(t -> t.getParamType() == RestSearchParameterTypeEnum.REFERENCE)
440                                        .map(t -> UrlUtil.sanitizeUrlPart(t.getName()))
441                                        .sorted()
442                                        .collect(Collectors.toList());
443                                String searchParamMessage = myContext.getLocalizer().getMessage(BaseStorageDao.class, "invalidSearchParameter", UrlUtil.sanitizeUrlPart(paramName), UrlUtil.sanitizeUrlPart(paramType), validNames);
444                                String msg = myContext.getLocalizer().getMessage(SearchCoordinatorSvcImpl.class, "invalidInclude", UrlUtil.sanitizeUrlPart(name), UrlUtil.sanitizeUrlPart(value), searchParamMessage); // last param is pre-sanitized
445                                throw new InvalidRequestException(Msg.code(2015) + msg);
446                        }
447
448                }
449        }
450
451        @Override
452        public Optional<Integer> getSearchTotal(String theUuid) {
453                SearchTask task = myIdToSearchTask.get(theUuid);
454                if (task != null) {
455                        return Optional.ofNullable(task.awaitInitialSync());
456                }
457
458                /*
459                 * In case there is no running search, if the total is listed as accurate we know one is coming
460                 * so let's wait a bit for it to show up
461                 */
462                TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
463                txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
464                Optional<Search> search = mySearchCacheSvc.fetchByUuid(theUuid);
465                if (search.isPresent()) {
466                        Optional<SearchParameterMap> searchParameterMap = search.get().getSearchParameterMap();
467                        if (searchParameterMap.isPresent() && searchParameterMap.get().getSearchTotalMode() == SearchTotalModeEnum.ACCURATE) {
468                                for (int i = 0; i < 10; i++) {
469                                        if (search.isPresent()) {
470                                                verifySearchHasntFailedOrThrowInternalErrorException(search.get());
471                                                if (search.get().getTotalCount() != null) {
472                                                        return Optional.of(search.get().getTotalCount());
473                                                }
474                                        }
475                                        search = mySearchCacheSvc.fetchByUuid(theUuid);
476                                }
477                        }
478                }
479
480                return Optional.empty();
481        }
482
483        @Nonnull
484        private PersistedJpaSearchFirstPageBundleProvider submitSearch(IDao theCallingDao, SearchParameterMap theParams, String theResourceType, RequestDetails theRequestDetails, String theSearchUuid, ISearchBuilder theSb, String theQueryString, RequestPartitionId theRequestPartitionId, Search theSearch) {
485                StopWatch w = new StopWatch();
486                SearchTask task = new SearchTask(theSearch, theCallingDao, theParams, theResourceType, theRequestDetails, theRequestPartitionId);
487                myIdToSearchTask.put(theSearch.getUuid(), task);
488                task.call();
489
490                PersistedJpaSearchFirstPageBundleProvider retVal = myPersistedJpaBundleProviderFactory.newInstanceFirstPage(theRequestDetails, theSearch, task, theSb);
491
492                ourLog.debug("Search initial phase completed in {}ms", w.getMillis());
493                return retVal;
494        }
495
496        @Nullable
497        private PersistedJpaBundleProvider findCachedQuery(SearchParameterMap theParams, String theResourceType, RequestDetails theRequestDetails, String theQueryString, RequestPartitionId theRequestPartitionId) {
498                TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
499
500                // May be null
501                return txTemplate.execute(t -> {
502
503                        // Interceptor call: STORAGE_PRECHECK_FOR_CACHED_SEARCH
504                        HookParams params = new HookParams()
505                                .add(SearchParameterMap.class, theParams)
506                                .add(RequestDetails.class, theRequestDetails)
507                                .addIfMatchesType(ServletRequestDetails.class, theRequestDetails);
508                        Object outcome = CompositeInterceptorBroadcaster.doCallHooksAndReturnObject(myInterceptorBroadcaster, theRequestDetails, Pointcut.STORAGE_PRECHECK_FOR_CACHED_SEARCH, params);
509                        if (Boolean.FALSE.equals(outcome)) {
510                                return null;
511                        }
512
513                        // Check for a search matching the given hash
514                        Search searchToUse = findSearchToUseOrNull(theQueryString, theResourceType, theRequestPartitionId);
515                        if (searchToUse == null) {
516                                return null;
517                        }
518
519                        ourLog.debug("Reusing search {} from cache", searchToUse.getUuid());
520                        // Interceptor call: JPA_PERFTRACE_SEARCH_REUSING_CACHED
521                        params = new HookParams()
522                                .add(SearchParameterMap.class, theParams)
523                                .add(RequestDetails.class, theRequestDetails)
524                                .addIfMatchesType(ServletRequestDetails.class, theRequestDetails);
525                        CompositeInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, theRequestDetails, Pointcut.JPA_PERFTRACE_SEARCH_REUSING_CACHED, params);
526
527                        return myPersistedJpaBundleProviderFactory.newInstance(theRequestDetails, searchToUse.getUuid());
528                });
529        }
530
531        @Nullable
532        private Search findSearchToUseOrNull(String theQueryString, String theResourceType, RequestPartitionId theRequestPartitionId) {
533                // createdCutoff is in recent past
534                final Instant createdCutoff = Instant.now().minus(myDaoConfig.getReuseCachedSearchResultsForMillis(), ChronoUnit.MILLIS);
535
536                Optional<Search> candidate = mySearchCacheSvc.findCandidatesForReuse(theResourceType, theQueryString, createdCutoff, theRequestPartitionId);
537                return candidate.orElse(null);
538        }
539
540        @Nullable
541        private Integer getLoadSynchronousUpToOrNull(CacheControlDirective theCacheControlDirective) {
542                final Integer loadSynchronousUpTo;
543                if (theCacheControlDirective != null && theCacheControlDirective.isNoStore()) {
544                        if (theCacheControlDirective.getMaxResults() != null) {
545                                loadSynchronousUpTo = theCacheControlDirective.getMaxResults();
546                                if (loadSynchronousUpTo > myDaoConfig.getCacheControlNoStoreMaxResultsUpperLimit()) {
547                                        throw new InvalidRequestException(Msg.code(1165) + Constants.HEADER_CACHE_CONTROL + " header " + Constants.CACHE_CONTROL_MAX_RESULTS + " value must not exceed " + myDaoConfig.getCacheControlNoStoreMaxResultsUpperLimit());
548                                }
549                        } else {
550                                loadSynchronousUpTo = 100;
551                        }
552                } else {
553                        loadSynchronousUpTo = null;
554                }
555                return loadSynchronousUpTo;
556        }
557
558        @VisibleForTesting
559        void setContextForUnitTest(FhirContext theCtx) {
560                myContext = theCtx;
561        }
562
563        @VisibleForTesting
564        void setDaoConfigForUnitTest(DaoConfig theDaoConfig) {
565                myDaoConfig = theDaoConfig;
566        }
567
568        @VisibleForTesting
569        public void setLoadingThrottleForUnitTests(Integer theLoadingThrottleForUnitTests) {
570                myLoadingThrottleForUnitTests = theLoadingThrottleForUnitTests;
571        }
572
573        @VisibleForTesting
574        public void setNeverUseLocalSearchForUnitTests(boolean theNeverUseLocalSearchForUnitTests) {
575                myNeverUseLocalSearchForUnitTests = theNeverUseLocalSearchForUnitTests;
576        }
577
578        @VisibleForTesting
579        public void setSyncSizeForUnitTests(int theSyncSize) {
580                mySyncSize = theSyncSize;
581        }
582
583        @VisibleForTesting
584        void setTransactionManagerForUnitTest(PlatformTransactionManager theTxManager) {
585                myManagedTxManager = theTxManager;
586        }
587
588        @VisibleForTesting
589        void setDaoRegistryForUnitTest(DaoRegistry theDaoRegistry) {
590                myDaoRegistry = theDaoRegistry;
591        }
592
593        @VisibleForTesting
594        void setInterceptorBroadcasterForUnitTest(IInterceptorBroadcaster theInterceptorBroadcaster) {
595                myInterceptorBroadcaster = theInterceptorBroadcaster;
596        }
597
598        @VisibleForTesting
599        public void setSearchBuilderFactoryForUnitTest(SearchBuilderFactory theSearchBuilderFactory) {
600                mySearchBuilderFactory = theSearchBuilderFactory;
601        }
602
603        @VisibleForTesting
604        public void setPersistedJpaBundleProviderFactoryForUnitTest(PersistedJpaBundleProviderFactory thePersistedJpaBundleProviderFactory) {
605                myPersistedJpaBundleProviderFactory = thePersistedJpaBundleProviderFactory;
606        }
607
608        @VisibleForTesting
609        public void setRequestPartitionHelperService(IRequestPartitionHelperSvc theRequestPartitionHelperService) {
610                myRequestPartitionHelperService = theRequestPartitionHelperService;
611        }
612
613        @VisibleForTesting
614        public void setSynchronousSearchSvc(ISynchronousSearchSvc theSynchronousSearchSvc) {
615                mySynchronousSearchSvc = theSynchronousSearchSvc;
616        }
617
618        public static void populateSearchEntity(SearchParameterMap theParams, String theResourceType, String theSearchUuid, String theQueryString, Search theSearch, RequestPartitionId theRequestPartitionId) {
619                theSearch.setDeleted(false);
620                theSearch.setUuid(theSearchUuid);
621                theSearch.setCreated(new Date());
622                theSearch.setTotalCount(null);
623                theSearch.setNumFound(0);
624                theSearch.setPreferredPageSize(theParams.getCount());
625                theSearch.setSearchType(theParams.getEverythingMode() != null ? SearchTypeEnum.EVERYTHING : SearchTypeEnum.SEARCH);
626                theSearch.setLastUpdated(theParams.getLastUpdated());
627                theSearch.setResourceType(theResourceType);
628                theSearch.setStatus(SearchStatusEnum.LOADING);
629                theSearch.setSearchQueryString(theQueryString, theRequestPartitionId);
630
631                if (theParams.hasIncludes()) {
632                        for (Include next : theParams.getIncludes()) {
633                                theSearch.addInclude(new SearchInclude(theSearch, next.getValue(), false, next.isRecurse()));
634                        }
635                }
636
637                for (Include next : theParams.getRevIncludes()) {
638                        theSearch.addInclude(new SearchInclude(theSearch, next.getValue(), true, next.isRecurse()));
639                }
640        }
641
642        /**
643         * Creates a {@link Pageable} using a start and end index
644         */
645        @SuppressWarnings("WeakerAccess")
646        @Nullable
647        public static Pageable toPage(final int theFromIndex, int theToIndex) {
648                int pageSize = theToIndex - theFromIndex;
649                if (pageSize < 1) {
650                        return null;
651                }
652
653                int pageIndex = theFromIndex / pageSize;
654
655                Pageable page = new AbstractPageRequest(pageIndex, pageSize) {
656                        private static final long serialVersionUID = 1L;
657
658                        @Override
659                        public long getOffset() {
660                                return theFromIndex;
661                        }
662
663                        @Override
664                        public Sort getSort() {
665                                return Sort.unsorted();
666                        }
667
668                        @Override
669                        public Pageable next() {
670                                return null;
671                        }
672
673                        @Override
674                        public Pageable previous() {
675                                return null;
676                        }
677
678                        @Override
679                        public Pageable first() {
680                                return null;
681                        }
682
683                        @Override
684                        public Pageable withPage(int theI) {
685                                return null;
686                        }
687                };
688
689                return page;
690        }
691
692        static void verifySearchHasntFailedOrThrowInternalErrorException(Search theSearch) {
693                if (theSearch.getStatus() == SearchStatusEnum.FAILED) {
694                        Integer status = theSearch.getFailureCode();
695                        status = defaultIfNull(status, 500);
696
697                        String message = theSearch.getFailureMessage();
698                        throw BaseServerResponseException.newInstance(status, message);
699                }
700        }
701
702                /**
703         * A search task is a Callable task that runs in
704         * a thread pool to handle an individual search. One instance
705         * is created for any requested search and runs from the
706         * beginning to the end of the search.
707         * <p>
708         * Understand:
709         * This class executes in its own thread separate from the
710         * web server client thread that made the request. We do that
711         * so that we can return to the client as soon as possible,
712         * but keep the search going in the background (and have
713         * the next page of results ready to go when the client asks).
714         */
715        public class SearchTask implements Callable<Void> {
716                private final SearchParameterMap myParams;
717                private final IDao myCallingDao;
718                private final String myResourceType;
719                private final ArrayList<ResourcePersistentId> mySyncedPids = new ArrayList<>();
720                private final CountDownLatch myInitialCollectionLatch = new CountDownLatch(1);
721                private final CountDownLatch myCompletionLatch;
722                private final ArrayList<ResourcePersistentId> myUnsyncedPids = new ArrayList<>();
723                private final RequestDetails myRequest;
724                private final RequestPartitionId myRequestPartitionId;
725                private final SearchRuntimeDetails mySearchRuntimeDetails;
726                private final Transaction myParentTransaction;
727                private Search mySearch;
728                private boolean myAbortRequested;
729                private int myCountSavedTotal = 0;
730                private int myCountSavedThisPass = 0;
731                private int myCountBlockedThisPass = 0;
732                private boolean myAdditionalPrefetchThresholdsRemaining;
733                private List<ResourcePersistentId> myPreviouslyAddedResourcePids;
734                private Integer myMaxResultsToFetch;
735
736                /**
737                 * Constructor
738                 */
739                protected SearchTask(Search theSearch, IDao theCallingDao, SearchParameterMap theParams, String theResourceType, RequestDetails theRequest, RequestPartitionId theRequestPartitionId) {
740                        mySearch = theSearch;
741                        myCallingDao = theCallingDao;
742                        myParams = theParams;
743                        myResourceType = theResourceType;
744                        myCompletionLatch = new CountDownLatch(1);
745                        mySearchRuntimeDetails = new SearchRuntimeDetails(theRequest, mySearch.getUuid());
746                        mySearchRuntimeDetails.setQueryString(theParams.toNormalizedQueryString(theCallingDao.getContext()));
747                        myRequestPartitionId = theRequestPartitionId;
748                        myRequest = theRequest;
749                        myParentTransaction = ElasticApm.currentTransaction();
750                }
751
752                /**
753                 * This method is called by the server HTTP thread, and
754                 * will block until at least one page of results have been
755                 * fetched from the DB, and will never block after that.
756                 */
757                Integer awaitInitialSync() {
758                        ourLog.trace("Awaiting initial sync");
759                        do {
760                                ourLog.trace("Search {} aborted: {}", getSearch().getUuid(), !isNotAborted());
761                                if (AsyncUtil.awaitLatchAndThrowInternalErrorExceptionOnInterrupt(getInitialCollectionLatch(), 250L, TimeUnit.MILLISECONDS)) {
762                                        break;
763                                }
764                        } while (getSearch().getStatus() == SearchStatusEnum.LOADING);
765                        ourLog.trace("Initial sync completed");
766
767                        return getSearch().getTotalCount();
768                }
769
770                protected Search getSearch() {
771                        return mySearch;
772                }
773
774                CountDownLatch getInitialCollectionLatch() {
775                        return myInitialCollectionLatch;
776                }
777
778                void setPreviouslyAddedResourcePids(List<ResourcePersistentId> thePreviouslyAddedResourcePids) {
779                        myPreviouslyAddedResourcePids = thePreviouslyAddedResourcePids;
780                        myCountSavedTotal = myPreviouslyAddedResourcePids.size();
781                }
782
783                private ISearchBuilder newSearchBuilder() {
784                        Class<? extends IBaseResource> resourceTypeClass = myContext.getResourceDefinition(myResourceType).getImplementingClass();
785                        return mySearchBuilderFactory.newSearchBuilder(myCallingDao, myResourceType, resourceTypeClass);
786                }
787
788                @Nonnull
789                List<ResourcePersistentId> getResourcePids(int theFromIndex, int theToIndex) {
790                        ourLog.debug("Requesting search PIDs from {}-{}", theFromIndex, theToIndex);
791
792                        boolean keepWaiting;
793                        do {
794                                synchronized (mySyncedPids) {
795                                        ourLog.trace("Search status is {}", mySearch.getStatus());
796                                        boolean haveEnoughResults = mySyncedPids.size() >= theToIndex;
797                                        if (!haveEnoughResults) {
798                                                switch (mySearch.getStatus()) {
799                                                        case LOADING:
800                                                                keepWaiting = true;
801                                                                break;
802                                                        case PASSCMPLET:
803                                                                /*
804                                                                 * If we get here, it means that the user requested resources that crossed the
805                                                                 * current pre-fetch boundary. For example, if the prefetch threshold is 50 and the
806                                                                 * user has requested resources 0-60, then they would get 0-50 back but the search
807                                                                 * coordinator would then stop searching.SearchCoordinatorSvcImplTest
808                                                                 */
809                                                                keepWaiting = false;
810                                                                break;
811                                                        case FAILED:
812                                                        case FINISHED:
813                                                        case GONE:
814                                                        default:
815                                                                keepWaiting = false;
816                                                                break;
817                                                }
818                                        } else {
819                                                keepWaiting = false;
820                                        }
821                                }
822
823                                if (keepWaiting) {
824                                        ourLog.info("Waiting as we only have {} results - Search status: {}", mySyncedPids.size(), mySearch.getStatus());
825                                        AsyncUtil.sleep(500L);
826                                }
827                        } while (keepWaiting);
828
829                        ourLog.debug("Proceeding, as we have {} results", mySyncedPids.size());
830
831                        ArrayList<ResourcePersistentId> retVal = new ArrayList<>();
832                        synchronized (mySyncedPids) {
833                                verifySearchHasntFailedOrThrowInternalErrorException(mySearch);
834
835                                int toIndex = theToIndex;
836                                if (mySyncedPids.size() < toIndex) {
837                                        toIndex = mySyncedPids.size();
838                                }
839                                for (int i = theFromIndex; i < toIndex; i++) {
840                                        retVal.add(mySyncedPids.get(i));
841                                }
842                        }
843
844                        ourLog.trace("Done syncing results - Wanted {}-{} and returning {} of {}", theFromIndex, theToIndex, retVal.size(), mySyncedPids.size());
845
846                        return retVal;
847                }
848
849                void saveSearch() {
850                        TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
851                        txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
852                        txTemplate.execute(new TransactionCallbackWithoutResult() {
853                                @Override
854                                protected void doInTransactionWithoutResult(@Nonnull TransactionStatus theArg0) {
855                                        doSaveSearch();
856                                }
857
858                        });
859                }
860
861                private void saveUnsynced(final IResultIterator theResultIter) {
862                        TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
863                        txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
864                        txTemplate.execute(new TransactionCallbackWithoutResult() {
865                                @Override
866                                protected void doInTransactionWithoutResult(@Nonnull TransactionStatus theArg0) {
867                                        if (mySearch.getId() == null) {
868                                                doSaveSearch();
869                                        }
870
871                                        ArrayList<ResourcePersistentId> unsyncedPids = myUnsyncedPids;
872                                        int countBlocked = 0;
873
874                                        // Interceptor call: STORAGE_PREACCESS_RESOURCES
875                                        // This can be used to remove results from the search result details before
876                                        // the user has a chance to know that they were in the results
877                                        if (mySearchRuntimeDetails.getRequestDetails() != null && unsyncedPids.isEmpty() == false) {
878                                                JpaPreResourceAccessDetails accessDetails = new JpaPreResourceAccessDetails(unsyncedPids, () -> newSearchBuilder());
879                                                HookParams params = new HookParams()
880                                                        .add(IPreResourceAccessDetails.class, accessDetails)
881                                                        .add(RequestDetails.class, mySearchRuntimeDetails.getRequestDetails())
882                                                        .addIfMatchesType(ServletRequestDetails.class, mySearchRuntimeDetails.getRequestDetails());
883                                                CompositeInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, myRequest, Pointcut.STORAGE_PREACCESS_RESOURCES, params);
884
885                                                for (int i = unsyncedPids.size() - 1; i >= 0; i--) {
886                                                        if (accessDetails.isDontReturnResourceAtIndex(i)) {
887                                                                unsyncedPids.remove(i);
888                                                                myCountBlockedThisPass++;
889                                                                myCountSavedTotal++;
890                                                                countBlocked++;
891                                                        }
892                                                }
893                                        }
894
895                                        // Actually store the results in the query cache storage
896                                        myCountSavedTotal += unsyncedPids.size();
897                                        myCountSavedThisPass += unsyncedPids.size();
898                                        mySearchResultCacheSvc.storeResults(mySearch, mySyncedPids, unsyncedPids);
899
900                                        synchronized (mySyncedPids) {
901                                                int numSyncedThisPass = unsyncedPids.size();
902                                                ourLog.trace("Syncing {} search results - Have more: {}", numSyncedThisPass, theResultIter.hasNext());
903                                                mySyncedPids.addAll(unsyncedPids);
904                                                unsyncedPids.clear();
905
906                                                if (theResultIter.hasNext() == false) {
907                                                        int skippedCount = theResultIter.getSkippedCount();
908                                                        int nonSkippedCount = theResultIter.getNonSkippedCount();
909                                                        int totalFetched = skippedCount + myCountSavedThisPass + myCountBlockedThisPass;
910                                                        ourLog.trace("MaxToFetch[{}] SkippedCount[{}] CountSavedThisPass[{}] CountSavedThisTotal[{}] AdditionalPrefetchRemaining[{}]", myMaxResultsToFetch, skippedCount, myCountSavedThisPass, myCountSavedTotal, myAdditionalPrefetchThresholdsRemaining);
911
912                                                        if (nonSkippedCount == 0 || (myMaxResultsToFetch != null && totalFetched < myMaxResultsToFetch)) {
913                                                                ourLog.trace("Setting search status to FINISHED");
914                                                                mySearch.setStatus(SearchStatusEnum.FINISHED);
915                                                                mySearch.setTotalCount(myCountSavedTotal - countBlocked);
916                                                        } else if (myAdditionalPrefetchThresholdsRemaining) {
917                                                                ourLog.trace("Setting search status to PASSCMPLET");
918                                                                mySearch.setStatus(SearchStatusEnum.PASSCMPLET);
919                                                                mySearch.setSearchParameterMap(myParams);
920                                                        } else {
921                                                                ourLog.trace("Setting search status to FINISHED");
922                                                                mySearch.setStatus(SearchStatusEnum.FINISHED);
923                                                                mySearch.setTotalCount(myCountSavedTotal - countBlocked);
924                                                        }
925                                                }
926                                        }
927
928                                        mySearch.setNumFound(myCountSavedTotal);
929                                        mySearch.setNumBlocked(mySearch.getNumBlocked() + countBlocked);
930
931                                        int numSynced;
932                                        synchronized (mySyncedPids) {
933                                                numSynced = mySyncedPids.size();
934                                        }
935
936                                        if (myDaoConfig.getCountSearchResultsUpTo() == null ||
937                                                myDaoConfig.getCountSearchResultsUpTo() <= 0 ||
938                                                myDaoConfig.getCountSearchResultsUpTo() <= numSynced) {
939                                                myInitialCollectionLatch.countDown();
940                                        }
941
942                                        doSaveSearch();
943
944                                        ourLog.trace("saveUnsynced() - pre-commit");
945                                }
946                        });
947                        ourLog.trace("saveUnsynced() - post-commit");
948
949                }
950
951                boolean isNotAborted() {
952                        return myAbortRequested == false;
953                }
954
955                void markComplete() {
956                        myCompletionLatch.countDown();
957                }
958
959                CountDownLatch getCompletionLatch() {
960                        return myCompletionLatch;
961                }
962
963                /**
964                 * Request that the task abort as soon as possible
965                 */
966                void requestImmediateAbort() {
967                        myAbortRequested = true;
968                }
969
970                /**
971                 * This is the method which actually performs the search.
972                 * It is called automatically by the thread pool.
973                 */
974                @Override
975                public Void call() {
976                        StopWatch sw = new StopWatch();
977                        Span span = myParentTransaction.startSpan("db", "query", "search");
978                        span.setName("FHIR Database Search");
979                        try {
980                                // Create an initial search in the DB and give it an ID
981                                saveSearch();
982
983                                TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
984                                txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
985
986                                if (myCustomIsolationSupported) {
987                                        txTemplate.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
988                                }
989
990                                txTemplate.execute(new TransactionCallbackWithoutResult() {
991                                        @Override
992                                        protected void doInTransactionWithoutResult(@Nonnull TransactionStatus theStatus) {
993                                                doSearch();
994                                        }
995                                });
996
997                                mySearchRuntimeDetails.setSearchStatus(mySearch.getStatus());
998                                if (mySearch.getStatus() == SearchStatusEnum.FINISHED) {
999                                        HookParams params = new HookParams()
1000                                                .add(RequestDetails.class, myRequest)
1001                                                .addIfMatchesType(ServletRequestDetails.class, myRequest)
1002                                                .add(SearchRuntimeDetails.class, mySearchRuntimeDetails);
1003                                        CompositeInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, myRequest, Pointcut.JPA_PERFTRACE_SEARCH_COMPLETE, params);
1004                                } else {
1005                                        HookParams params = new HookParams()
1006                                                .add(RequestDetails.class, myRequest)
1007                                                .addIfMatchesType(ServletRequestDetails.class, myRequest)
1008                                                .add(SearchRuntimeDetails.class, mySearchRuntimeDetails);
1009                                        CompositeInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, myRequest, Pointcut.JPA_PERFTRACE_SEARCH_PASS_COMPLETE, params);
1010                                }
1011
1012                                ourLog.trace("Have completed search for [{}{}] and found {} resources in {}ms - Status is {}", mySearch.getResourceType(), mySearch.getSearchQueryString(), mySyncedPids.size(), sw.getMillis(), mySearch.getStatus());
1013
1014                        } catch (Throwable t) {
1015
1016                                /*
1017                                 * Don't print a stack trace for client errors (i.e. requests that
1018                                 * aren't valid because the client screwed up).. that's just noise
1019                                 * in the logs and who needs that.
1020                                 */
1021                                boolean logged = false;
1022                                if (t instanceof BaseServerResponseException) {
1023                                        BaseServerResponseException exception = (BaseServerResponseException) t;
1024                                        if (exception.getStatusCode() >= 400 && exception.getStatusCode() < 500) {
1025                                                logged = true;
1026                                                ourLog.warn("Failed during search due to invalid request: {}", t.toString());
1027                                        }
1028                                }
1029
1030                                if (!logged) {
1031                                        ourLog.error("Failed during search loading after {}ms", sw.getMillis(), t);
1032                                }
1033                                myUnsyncedPids.clear();
1034                                Throwable rootCause = ExceptionUtils.getRootCause(t);
1035                                rootCause = defaultIfNull(rootCause, t);
1036
1037                                String failureMessage = rootCause.getMessage();
1038
1039                                int failureCode = InternalErrorException.STATUS_CODE;
1040                                if (t instanceof BaseServerResponseException) {
1041                                        failureCode = ((BaseServerResponseException) t).getStatusCode();
1042                                }
1043
1044                                if (System.getProperty(UNIT_TEST_CAPTURE_STACK) != null) {
1045                                        failureMessage += "\nStack\n" + ExceptionUtils.getStackTrace(rootCause);
1046                                }
1047
1048                                mySearch.setFailureMessage(failureMessage);
1049                                mySearch.setFailureCode(failureCode);
1050                                mySearch.setStatus(SearchStatusEnum.FAILED);
1051
1052                                mySearchRuntimeDetails.setSearchStatus(mySearch.getStatus());
1053                                HookParams params = new HookParams()
1054                                        .add(RequestDetails.class, myRequest)
1055                                        .addIfMatchesType(ServletRequestDetails.class, myRequest)
1056                                        .add(SearchRuntimeDetails.class, mySearchRuntimeDetails);
1057                                CompositeInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, myRequest, Pointcut.JPA_PERFTRACE_SEARCH_FAILED, params);
1058
1059                                saveSearch();
1060                                span.captureException(t);
1061                        } finally {
1062
1063                                myIdToSearchTask.remove(mySearch.getUuid());
1064                                myInitialCollectionLatch.countDown();
1065                                markComplete();
1066                                span.end();
1067
1068                        }
1069                        return null;
1070                }
1071
1072                private void doSaveSearch() {
1073                        Search newSearch = mySearchCacheSvc.save(mySearch);
1074
1075                        // mySearchDao.save is not supposed to return null, but in unit tests
1076                        // it can if the mock search dao isn't set up to handle that
1077                        if (newSearch != null) {
1078                                mySearch = newSearch;
1079                        }
1080                }
1081
1082                /**
1083                 * This method actually creates the database query to perform the
1084                 * search, and starts it.
1085                 */
1086                private void doSearch() {
1087                        /*
1088                         * If the user has explicitly requested a _count, perform a
1089                         *
1090                         * SELECT COUNT(*) ....
1091                         *
1092                         * before doing anything else.
1093                         */
1094                        boolean myParamWantOnlyCount = isWantOnlyCount(myParams);
1095                        boolean myParamOrDefaultWantCount = nonNull(myParams.getSearchTotalMode()) ? isWantCount(myParams) : isWantCount(myDaoConfig.getDefaultTotalMode());
1096
1097                        if (myParamWantOnlyCount || myParamOrDefaultWantCount) {
1098                                ourLog.trace("Performing count");
1099                                ISearchBuilder sb = newSearchBuilder();
1100
1101                                /*
1102                                 * createCountQuery
1103                                 * NB: (see createQuery below)
1104                                 * Because FulltextSearchSvcImpl will (internally)
1105                                 * mutate the myParams (searchmap),
1106                                 * (specifically removing the _content and _text filters)
1107                                 * we will have to clone those parameters here so that
1108                                 * the "correct" params are used in createQuery below
1109                                 */
1110                                Long count = sb.createCountQuery(myParams.clone(), mySearch.getUuid(), myRequest, myRequestPartitionId);
1111
1112                                ourLog.trace("Got count {}", count);
1113
1114                                TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
1115                                txTemplate.execute(new TransactionCallbackWithoutResult() {
1116                                        @Override
1117                                        protected void doInTransactionWithoutResult(@Nonnull TransactionStatus theArg0) {
1118                                                mySearch.setTotalCount(count.intValue());
1119                                                if (myParamWantOnlyCount) {
1120                                                        mySearch.setStatus(SearchStatusEnum.FINISHED);
1121                                                }
1122                                                doSaveSearch();
1123                                        }
1124                                });
1125                                if (myParamWantOnlyCount) {
1126                                        return;
1127                                }
1128                        }
1129
1130                        ourLog.trace("Done count");
1131                        ISearchBuilder sb = newSearchBuilder();
1132
1133                        /*
1134                         * Figure out how many results we're actually going to fetch from the
1135                         * database in this pass. This calculation takes into consideration the
1136                         * "pre-fetch thresholds" specified in DaoConfig#getSearchPreFetchThresholds()
1137                         * as well as the value of the _count parameter.
1138                         */
1139                        int currentlyLoaded = defaultIfNull(mySearch.getNumFound(), 0);
1140                        int minWanted = 0;
1141                        if (myParams.getCount() != null) {
1142                                minWanted = myParams.getCount();
1143                                minWanted = Math.min(minWanted, myPagingProvider.getMaximumPageSize());
1144                                minWanted += currentlyLoaded;
1145                        }
1146
1147                        for (Iterator<Integer> iter = myDaoConfig.getSearchPreFetchThresholds().iterator(); iter.hasNext(); ) {
1148                                int next = iter.next();
1149                                if (next != -1 && next <= currentlyLoaded) {
1150                                        continue;
1151                                }
1152
1153                                if (next == -1) {
1154                                        sb.setMaxResultsToFetch(null);
1155                                } else {
1156                                        myMaxResultsToFetch = Math.max(next, minWanted);
1157                                        sb.setMaxResultsToFetch(myMaxResultsToFetch);
1158                                }
1159
1160                                if (iter.hasNext()) {
1161                                        myAdditionalPrefetchThresholdsRemaining = true;
1162                                }
1163
1164                                // If we get here's we've found an appropriate threshold
1165                                break;
1166                        }
1167
1168                        /*
1169                         * Provide any PID we loaded in previous search passes to the
1170                         * SearchBuilder so that we don't get duplicates coming from running
1171                         * the same query again.
1172                         *
1173                         * We could possibly accomplish this in a different way by using sorted
1174                         * results in our SQL query and specifying an offset. I don't actually
1175                         * know if that would be faster or not. At some point should test this
1176                         * idea.
1177                         */
1178                        if (myPreviouslyAddedResourcePids != null) {
1179                                sb.setPreviouslyAddedResourcePids(myPreviouslyAddedResourcePids);
1180                                mySyncedPids.addAll(myPreviouslyAddedResourcePids);
1181                        }
1182
1183                        /*
1184                         * createQuery
1185                         * Construct the SQL query we'll be sending to the database
1186                         *
1187                         * NB: (See createCountQuery above)
1188                         * We will pass the original myParams here (not a copy)
1189                         * because we actually _want_ the mutation of the myParams to happen.
1190                         * Specifically because SearchBuilder itself will _expect_
1191                         * not to have these parameters when dumping back
1192                         * to our DB.
1193                         *
1194                         * This is an odd implementation behaviour, but the change
1195                         * for this will require a lot more handling at higher levels
1196                         */
1197                        try (IResultIterator resultIterator = sb.createQuery(myParams, mySearchRuntimeDetails, myRequest, myRequestPartitionId)) {
1198                                assert (resultIterator != null);
1199
1200                                /*
1201                                 * The following loop actually loads the PIDs of the resources
1202                                 * matching the search off of the disk and into memory. After
1203                                 * every X results, we commit to the HFJ_SEARCH table.
1204                                 */
1205                                int syncSize = mySyncSize;
1206                                while (resultIterator.hasNext()) {
1207                                        myUnsyncedPids.add(resultIterator.next());
1208
1209                                        boolean shouldSync = myUnsyncedPids.size() >= syncSize;
1210
1211                                        if (myDaoConfig.getCountSearchResultsUpTo() != null &&
1212                                                myDaoConfig.getCountSearchResultsUpTo() > 0 &&
1213                                                myDaoConfig.getCountSearchResultsUpTo() < myUnsyncedPids.size()) {
1214                                                shouldSync = false;
1215                                        }
1216
1217                                        if (myUnsyncedPids.size() > 50000) {
1218                                                shouldSync = true;
1219                                        }
1220
1221                                        // If no abort was requested, bail out
1222                                        Validate.isTrue(isNotAborted(), "Abort has been requested");
1223
1224                                        if (shouldSync) {
1225                                                saveUnsynced(resultIterator);
1226                                        }
1227
1228                                        if (myLoadingThrottleForUnitTests != null) {
1229                                                AsyncUtil.sleep(myLoadingThrottleForUnitTests);
1230                                        }
1231
1232                                }
1233
1234                                // If no abort was requested, bail out
1235                                Validate.isTrue(isNotAborted(), "Abort has been requested");
1236
1237                                saveUnsynced(resultIterator);
1238
1239                        } catch (IOException e) {
1240                                ourLog.error("IO failure during database access", e);
1241                                throw new InternalErrorException(Msg.code(1166) + e);
1242                        }
1243                }
1244        }
1245
1246        public class SearchContinuationTask extends SearchTask {
1247
1248                public SearchContinuationTask(Search theSearch, IDao theCallingDao, SearchParameterMap theParams, String theResourceType, RequestDetails theRequest, RequestPartitionId theRequestPartitionId) {
1249                        super(theSearch, theCallingDao, theParams, theResourceType, theRequest, theRequestPartitionId);
1250                }
1251
1252                @Override
1253                public Void call() {
1254                        try {
1255                                TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
1256                                txTemplate.afterPropertiesSet();
1257                                txTemplate.execute(t -> {
1258                                        List<ResourcePersistentId> previouslyAddedResourcePids = mySearchResultCacheSvc.fetchAllResultPids(getSearch());
1259                                        if (previouslyAddedResourcePids == null) {
1260                                                throw newResourceGoneException(getSearch().getUuid());
1261                                        }
1262
1263                                        ourLog.trace("Have {} previously added IDs in search: {}", previouslyAddedResourcePids.size(), getSearch().getUuid());
1264                                        setPreviouslyAddedResourcePids(previouslyAddedResourcePids);
1265                                        return null;
1266                                });
1267                        } catch (Throwable e) {
1268                                ourLog.error("Failure processing search", e);
1269                                getSearch().setFailureMessage(e.getMessage());
1270                                getSearch().setStatus(SearchStatusEnum.FAILED);
1271                                if (e instanceof BaseServerResponseException) {
1272                                        getSearch().setFailureCode(((BaseServerResponseException) e).getStatusCode());
1273                                }
1274
1275                                saveSearch();
1276                                return null;
1277                        }
1278
1279                        return super.call();
1280                }
1281
1282        }
1283
1284}