001/*-
002 * #%L
003 * HAPI FHIR JPA Server
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%family
019 */
020package ca.uhn.fhir.jpa.search;
021
022import ca.uhn.fhir.context.FhirContext;
023import ca.uhn.fhir.i18n.Msg;
024import ca.uhn.fhir.interceptor.api.HookParams;
025import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
026import ca.uhn.fhir.interceptor.api.Pointcut;
027import ca.uhn.fhir.interceptor.model.RequestPartitionId;
028import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
029import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
030import ca.uhn.fhir.jpa.api.dao.IDao;
031import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
032import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
033import ca.uhn.fhir.jpa.config.SearchConfig;
034import ca.uhn.fhir.jpa.dao.BaseStorageDao;
035import ca.uhn.fhir.jpa.dao.ISearchBuilder;
036import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
037import ca.uhn.fhir.jpa.dao.search.ResourceNotFoundInIndexException;
038import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
039import ca.uhn.fhir.jpa.entity.Search;
040import ca.uhn.fhir.jpa.model.dao.JpaPid;
041import ca.uhn.fhir.jpa.model.search.SearchStatusEnum;
042import ca.uhn.fhir.jpa.search.builder.StorageInterceptorHooksFacade;
043import ca.uhn.fhir.jpa.search.builder.tasks.SearchContinuationTask;
044import ca.uhn.fhir.jpa.search.builder.tasks.SearchTask;
045import ca.uhn.fhir.jpa.search.builder.tasks.SearchTaskParameters;
046import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc;
047import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc;
048import ca.uhn.fhir.jpa.search.cache.SearchCacheStatusEnum;
049import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
050import ca.uhn.fhir.jpa.util.QueryParameterUtils;
051import ca.uhn.fhir.model.api.Include;
052import ca.uhn.fhir.rest.api.CacheControlDirective;
053import ca.uhn.fhir.rest.api.Constants;
054import ca.uhn.fhir.rest.api.RestSearchParameterTypeEnum;
055import ca.uhn.fhir.rest.api.SearchTotalModeEnum;
056import ca.uhn.fhir.rest.api.server.IBundleProvider;
057import ca.uhn.fhir.rest.api.server.RequestDetails;
058import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
059import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
060import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
061import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster;
062import ca.uhn.fhir.rest.server.util.ISearchParamRegistry;
063import ca.uhn.fhir.util.AsyncUtil;
064import ca.uhn.fhir.util.StopWatch;
065import ca.uhn.fhir.util.UrlUtil;
066import com.google.common.annotations.VisibleForTesting;
067import jakarta.annotation.Nonnull;
068import jakarta.annotation.Nullable;
069import org.apache.commons.lang3.time.DateUtils;
070import org.hl7.fhir.instance.model.api.IBaseResource;
071import org.springframework.beans.factory.BeanFactory;
072import org.springframework.data.domain.PageRequest;
073import org.springframework.data.domain.Pageable;
074import org.springframework.data.domain.Sort;
075import org.springframework.stereotype.Component;
076import org.springframework.transaction.support.TransactionSynchronizationManager;
077
078import java.time.Instant;
079import java.time.temporal.ChronoUnit;
080import java.util.List;
081import java.util.Optional;
082import java.util.Set;
083import java.util.UUID;
084import java.util.concurrent.Callable;
085import java.util.concurrent.ConcurrentHashMap;
086import java.util.concurrent.TimeUnit;
087import java.util.function.Consumer;
088import java.util.stream.Collectors;
089
090import static ca.uhn.fhir.jpa.util.QueryParameterUtils.DEFAULT_SYNC_SIZE;
091import static org.apache.commons.lang3.StringUtils.isBlank;
092import static org.apache.commons.lang3.StringUtils.isNotBlank;
093
094@Component("mySearchCoordinatorSvc")
095public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc<JpaPid> {
096
097        private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchCoordinatorSvcImpl.class);
098
099        private final FhirContext myContext;
100        private final JpaStorageSettings myStorageSettings;
101        private final IInterceptorBroadcaster myInterceptorBroadcaster;
102        private final HapiTransactionService myTxService;
103        private final ISearchCacheSvc mySearchCacheSvc;
104        private final ISearchResultCacheSvc mySearchResultCacheSvc;
105        private final DaoRegistry myDaoRegistry;
106        private final SearchBuilderFactory<JpaPid> mySearchBuilderFactory;
107        private final ISynchronousSearchSvc mySynchronousSearchSvc;
108        private final PersistedJpaBundleProviderFactory myPersistedJpaBundleProviderFactory;
109        private final ISearchParamRegistry mySearchParamRegistry;
110        private final SearchStrategyFactory mySearchStrategyFactory;
111        private final ExceptionService myExceptionSvc;
112        private final BeanFactory myBeanFactory;
113        private ConcurrentHashMap<String, SearchTask> myIdToSearchTask = new ConcurrentHashMap<>();
114
115        private final Consumer<String> myOnRemoveSearchTask = myIdToSearchTask::remove;
116
117        private final StorageInterceptorHooksFacade myStorageInterceptorHooks;
118        private Integer myLoadingThrottleForUnitTests = null;
119        private long myMaxMillisToWaitForRemoteResults = DateUtils.MILLIS_PER_MINUTE;
120        private boolean myNeverUseLocalSearchForUnitTests;
121        private int mySyncSize = DEFAULT_SYNC_SIZE;
122
123        /**
124         * Constructor
125         */
126        public SearchCoordinatorSvcImpl(
127                        FhirContext theContext,
128                        JpaStorageSettings theStorageSettings,
129                        IInterceptorBroadcaster theInterceptorBroadcaster,
130                        HapiTransactionService theTxService,
131                        ISearchCacheSvc theSearchCacheSvc,
132                        ISearchResultCacheSvc theSearchResultCacheSvc,
133                        DaoRegistry theDaoRegistry,
134                        SearchBuilderFactory<JpaPid> theSearchBuilderFactory,
135                        ISynchronousSearchSvc theSynchronousSearchSvc,
136                        PersistedJpaBundleProviderFactory thePersistedJpaBundleProviderFactory,
137                        ISearchParamRegistry theSearchParamRegistry,
138                        SearchStrategyFactory theSearchStrategyFactory,
139                        ExceptionService theExceptionSvc,
140                        BeanFactory theBeanFactory) {
141                super();
142                myContext = theContext;
143                myStorageSettings = theStorageSettings;
144                myInterceptorBroadcaster = theInterceptorBroadcaster;
145                myTxService = theTxService;
146                mySearchCacheSvc = theSearchCacheSvc;
147                mySearchResultCacheSvc = theSearchResultCacheSvc;
148                myDaoRegistry = theDaoRegistry;
149                mySearchBuilderFactory = theSearchBuilderFactory;
150                mySynchronousSearchSvc = theSynchronousSearchSvc;
151                myPersistedJpaBundleProviderFactory = thePersistedJpaBundleProviderFactory;
152                mySearchParamRegistry = theSearchParamRegistry;
153                mySearchStrategyFactory = theSearchStrategyFactory;
154                myExceptionSvc = theExceptionSvc;
155                myBeanFactory = theBeanFactory;
156
157                myStorageInterceptorHooks = new StorageInterceptorHooksFacade(myInterceptorBroadcaster);
158        }
159
160        @VisibleForTesting
161        Set<String> getActiveSearchIds() {
162                return myIdToSearchTask.keySet();
163        }
164
165        @VisibleForTesting
166        public void setIdToSearchTaskMapForUnitTests(ConcurrentHashMap<String, SearchTask> theIdToSearchTaskMap) {
167                myIdToSearchTask = theIdToSearchTaskMap;
168        }
169
170        @VisibleForTesting
171        public void setLoadingThrottleForUnitTests(Integer theLoadingThrottleForUnitTests) {
172                myLoadingThrottleForUnitTests = theLoadingThrottleForUnitTests;
173        }
174
175        @VisibleForTesting
176        public void setNeverUseLocalSearchForUnitTests(boolean theNeverUseLocalSearchForUnitTests) {
177                myNeverUseLocalSearchForUnitTests = theNeverUseLocalSearchForUnitTests;
178        }
179
180        @VisibleForTesting
181        public void setSyncSizeForUnitTests(int theSyncSize) {
182                mySyncSize = theSyncSize;
183        }
184
185        @Override
186        public void cancelAllActiveSearches() {
187                for (SearchTask next : myIdToSearchTask.values()) {
188                        ourLog.info(
189                                        "Requesting immediate abort of search: {}", next.getSearch().getUuid());
190                        next.requestImmediateAbort();
191                        AsyncUtil.awaitLatchAndIgnoreInterrupt(next.getCompletionLatch(), 30, TimeUnit.SECONDS);
192                }
193        }
194
195        @SuppressWarnings("SameParameterValue")
196        @VisibleForTesting
197        void setMaxMillisToWaitForRemoteResultsForUnitTest(long theMaxMillisToWaitForRemoteResults) {
198                myMaxMillisToWaitForRemoteResults = theMaxMillisToWaitForRemoteResults;
199        }
200
201        /**
202         * This method is called by the HTTP client processing thread in order to
203         * fetch resources.
204         * <p>
205         * This method must not be called from inside a transaction. The rationale is that
206         * the {@link Search} entity is treated as a piece of shared state across client threads
207         * accessing the same thread, so we need to be able to update that table in a transaction
208         * and commit it right away in order for that to work. Examples of needing to do this
209         * include if two different clients request the same search and are both paging at the
210         * same time, but also includes clients that are hacking the paging links to
211         * fetch multiple pages of a search result in parallel. In both cases we need to only
212         * let one of them actually activate the search, or we will have conficts. The other thread
213         * just needs to wait until the first one actually fetches more results.
214         */
215        @Override
216        public List<JpaPid> getResources(
217                        final String theUuid,
218                        int theFrom,
219                        int theTo,
220                        @Nullable RequestDetails theRequestDetails,
221                        RequestPartitionId theRequestPartitionId) {
222                assert !TransactionSynchronizationManager.isActualTransactionActive();
223
224                // If we're actively searching right now, don't try to do anything until at least one batch has been
225                // persisted in the DB
226                SearchTask searchTask = myIdToSearchTask.get(theUuid);
227                if (searchTask != null) {
228                        searchTask.awaitInitialSync();
229                }
230
231                ourLog.trace("About to start looking for resources {}-{}", theFrom, theTo);
232
233                Search search;
234                StopWatch sw = new StopWatch();
235                while (true) {
236
237                        if (myNeverUseLocalSearchForUnitTests == false) {
238                                if (searchTask != null) {
239                                        ourLog.trace("Local search found");
240                                        List<JpaPid> resourcePids = searchTask.getResourcePids(theFrom, theTo);
241                                        ourLog.trace(
242                                                        "Local search returned {} pids, wanted {}-{} - Search: {}",
243                                                        resourcePids.size(),
244                                                        theFrom,
245                                                        theTo,
246                                                        searchTask.getSearch());
247
248                                        /*
249                                         * Generally, if a search task is open, the fastest possible thing is to just return its results. This
250                                         * will work most of the time, but can fail if the task hit a search threshold and the client is requesting
251                                         * results beyond that threashold. In that case, we'll keep going below, since that will trigger another
252                                         * task.
253                                         */
254                                        if ((searchTask.getSearch().getNumFound()
255                                                                                        - searchTask.getSearch().getNumBlocked())
256                                                                        >= theTo
257                                                        || resourcePids.size() == (theTo - theFrom)) {
258                                                return resourcePids;
259                                        }
260                                }
261                        }
262
263                        Callable<Search> searchCallback = () -> mySearchCacheSvc
264                                        .fetchByUuid(theUuid, theRequestPartitionId)
265                                        .orElseThrow(() -> myExceptionSvc.newUnknownSearchException(theUuid));
266                        search = myTxService
267                                        .withRequest(theRequestDetails)
268                                        .withRequestPartitionId(theRequestPartitionId)
269                                        .execute(searchCallback);
270                        QueryParameterUtils.verifySearchHasntFailedOrThrowInternalErrorException(search);
271
272                        if (search.getStatus() == SearchStatusEnum.FINISHED) {
273                                ourLog.trace("Search entity marked as finished with {} results", search.getNumFound());
274                                break;
275                        }
276                        if ((search.getNumFound() - search.getNumBlocked()) >= theTo) {
277                                ourLog.trace("Search entity has {} results so far", search.getNumFound());
278                                break;
279                        }
280
281                        if (sw.getMillis() > myMaxMillisToWaitForRemoteResults) {
282                                ourLog.error(
283                                                "Search {} of type {} for {}{} timed out after {}ms",
284                                                search.getId(),
285                                                search.getSearchType(),
286                                                search.getResourceType(),
287                                                search.getSearchQueryString(),
288                                                sw.getMillis());
289                                throw new InternalErrorException(Msg.code(1163) + "Request timed out after " + sw.getMillis() + "ms");
290                        }
291
292                        // If the search was saved in "pass complete mode" it's probably time to
293                        // start a new pass
294                        if (search.getStatus() == SearchStatusEnum.PASSCMPLET) {
295                                ourLog.trace("Going to try to start next search");
296                                Optional<Search> newSearch =
297                                                mySearchCacheSvc.tryToMarkSearchAsInProgress(search, theRequestPartitionId);
298                                if (newSearch.isPresent()) {
299                                        ourLog.trace("Launching new search");
300                                        search = newSearch.get();
301                                        String resourceType = search.getResourceType();
302                                        SearchParameterMap params = search.getSearchParameterMap()
303                                                        .orElseThrow(() -> new IllegalStateException("No map in PASSCOMPLET search"));
304                                        IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(resourceType);
305
306                                        SearchTaskParameters parameters = new SearchTaskParameters(
307                                                        search,
308                                                        resourceDao,
309                                                        params,
310                                                        resourceType,
311                                                        theRequestDetails,
312                                                        theRequestPartitionId,
313                                                        myOnRemoveSearchTask,
314                                                        mySyncSize);
315                                        parameters.setLoadingThrottleForUnitTests(myLoadingThrottleForUnitTests);
316                                        SearchContinuationTask task =
317                                                        (SearchContinuationTask) myBeanFactory.getBean(SearchConfig.CONTINUE_TASK, parameters);
318                                        myIdToSearchTask.put(search.getUuid(), task);
319                                        task.call();
320                                }
321                        }
322
323                        if (!search.getStatus().isDone()) {
324                                AsyncUtil.sleep(500);
325                        }
326                }
327
328                ourLog.trace("Finished looping");
329
330                List<JpaPid> pids = fetchResultPids(theUuid, theFrom, theTo, theRequestDetails, search, theRequestPartitionId);
331
332                ourLog.trace("Fetched {} results", pids.size());
333
334                return pids;
335        }
336
337        @Nonnull
338        private List<JpaPid> fetchResultPids(
339                        String theUuid,
340                        int theFrom,
341                        int theTo,
342                        @Nullable RequestDetails theRequestDetails,
343                        Search theSearch,
344                        RequestPartitionId theRequestPartitionId) {
345                List<JpaPid> pids = mySearchResultCacheSvc.fetchResultPids(
346                                theSearch, theFrom, theTo, theRequestDetails, theRequestPartitionId);
347                if (pids == null) {
348                        throw myExceptionSvc.newUnknownSearchException(theUuid);
349                }
350                return pids;
351        }
352
353        @Override
354        public IBundleProvider registerSearch(
355                        final IFhirResourceDao<?> theCallingDao,
356                        final SearchParameterMap theParams,
357                        String theResourceType,
358                        CacheControlDirective theCacheControlDirective,
359                        RequestDetails theRequestDetails,
360                        RequestPartitionId theRequestPartitionId) {
361                final String searchUuid = UUID.randomUUID().toString();
362
363                final String queryString = theParams.toNormalizedQueryString(myContext);
364                ourLog.debug("Registering new search {}", searchUuid);
365
366                Search search = new Search();
367                QueryParameterUtils.populateSearchEntity(
368                                theParams, theResourceType, searchUuid, queryString, search, theRequestPartitionId);
369
370                myStorageInterceptorHooks.callStoragePresearchRegistered(
371                                theRequestDetails, theParams, search, theRequestPartitionId);
372
373                validateSearch(theParams);
374
375                Class<? extends IBaseResource> resourceTypeClass =
376                                myContext.getResourceDefinition(theResourceType).getImplementingClass();
377                final ISearchBuilder<JpaPid> sb = mySearchBuilderFactory.newSearchBuilder(theResourceType, resourceTypeClass);
378                sb.setFetchSize(mySyncSize);
379
380                final Integer loadSynchronousUpTo = getLoadSynchronousUpToOrNull(theCacheControlDirective);
381                boolean isOffsetQuery = theParams.isOffsetQuery();
382
383                // todo someday - not today.
384                //              SearchStrategyFactory.ISearchStrategy searchStrategy = mySearchStrategyFactory.pickStrategy(theResourceType,
385                // theParams, theRequestDetails);
386                //              return searchStrategy.get();
387
388                if (theParams.isLoadSynchronous() || loadSynchronousUpTo != null || isOffsetQuery) {
389                        if (mySearchStrategyFactory.isSupportsHSearchDirect(theResourceType, theParams, theRequestDetails)) {
390                                ourLog.info("Search {} is using direct load strategy", searchUuid);
391                                SearchStrategyFactory.ISearchStrategy direct = mySearchStrategyFactory.makeDirectStrategy(
392                                                searchUuid, theResourceType, theParams, theRequestDetails);
393
394                                try {
395                                        return direct.get();
396
397                                } catch (ResourceNotFoundInIndexException theE) {
398                                        // some resources were not found in index, so we will inform this and resort to JPA search
399                                        ourLog.warn(
400                                                        "Some resources were not found in index. Make sure all resources were indexed. Resorting to database search.");
401                                }
402                        }
403
404                        ourLog.debug("Search {} is loading in synchronous mode", searchUuid);
405                        return mySynchronousSearchSvc.executeQuery(
406                                        theParams, theRequestDetails, searchUuid, sb, loadSynchronousUpTo, theRequestPartitionId);
407                }
408
409                /*
410                 * See if there are any cached searches whose results we can return
411                 * instead
412                 */
413                SearchCacheStatusEnum cacheStatus = SearchCacheStatusEnum.MISS;
414                if (theCacheControlDirective != null && theCacheControlDirective.isNoCache()) {
415                        cacheStatus = SearchCacheStatusEnum.NOT_TRIED;
416                }
417
418                if (cacheStatus != SearchCacheStatusEnum.NOT_TRIED) {
419                        if (theParams.getEverythingMode() == null) {
420                                if (myStorageSettings.getReuseCachedSearchResultsForMillis() != null) {
421                                        PersistedJpaBundleProvider foundSearchProvider = findCachedQuery(
422                                                        theParams, theResourceType, theRequestDetails, queryString, theRequestPartitionId);
423                                        if (foundSearchProvider != null) {
424                                                foundSearchProvider.setCacheStatus(SearchCacheStatusEnum.HIT);
425                                                return foundSearchProvider;
426                                        }
427                                }
428                        }
429                }
430
431                PersistedJpaSearchFirstPageBundleProvider retVal = submitSearch(
432                                theCallingDao, theParams, theResourceType, theRequestDetails, sb, theRequestPartitionId, search);
433                retVal.setCacheStatus(cacheStatus);
434                return retVal;
435        }
436
437        private void validateSearch(SearchParameterMap theParams) {
438                validateIncludes(theParams.getIncludes(), Constants.PARAM_INCLUDE);
439                validateIncludes(theParams.getRevIncludes(), Constants.PARAM_REVINCLUDE);
440        }
441
442        private void validateIncludes(Set<Include> includes, String name) {
443                for (Include next : includes) {
444                        String value = next.getValue();
445                        if (value.equals(Constants.INCLUDE_STAR) || isBlank(value)) {
446                                continue;
447                        }
448
449                        String paramType = next.getParamType();
450                        String paramName = next.getParamName();
451                        String paramTargetType = next.getParamTargetType();
452
453                        if (isBlank(paramType) || isBlank(paramName)) {
454                                String msg = myContext
455                                                .getLocalizer()
456                                                .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidInclude", name, value, "");
457                                throw new InvalidRequestException(Msg.code(2018) + msg);
458                        }
459
460                        if (!myDaoRegistry.isResourceTypeSupported(paramType)) {
461                                String resourceTypeMsg = myContext
462                                                .getLocalizer()
463                                                .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidResourceType", paramType);
464                                String msg = myContext
465                                                .getLocalizer()
466                                                .getMessage(
467                                                                SearchCoordinatorSvcImpl.class,
468                                                                "invalidInclude",
469                                                                UrlUtil.sanitizeUrlPart(name),
470                                                                UrlUtil.sanitizeUrlPart(value),
471                                                                resourceTypeMsg); // last param is pre-sanitized
472                                throw new InvalidRequestException(Msg.code(2017) + msg);
473                        }
474
475                        if (isNotBlank(paramTargetType) && !myDaoRegistry.isResourceTypeSupported(paramTargetType)) {
476                                String resourceTypeMsg = myContext
477                                                .getLocalizer()
478                                                .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidResourceType", paramTargetType);
479                                String msg = myContext
480                                                .getLocalizer()
481                                                .getMessage(
482                                                                SearchCoordinatorSvcImpl.class,
483                                                                "invalidInclude",
484                                                                UrlUtil.sanitizeUrlPart(name),
485                                                                UrlUtil.sanitizeUrlPart(value),
486                                                                resourceTypeMsg); // last param is pre-sanitized
487                                throw new InvalidRequestException(Msg.code(2016) + msg);
488                        }
489
490                        if (!Constants.INCLUDE_STAR.equals(paramName)
491                                        && mySearchParamRegistry.getActiveSearchParam(
492                                                                        paramType, paramName, ISearchParamRegistry.SearchParamLookupContextEnum.SEARCH)
493                                                        == null) {
494                                List<String> validNames = mySearchParamRegistry
495                                                .getActiveSearchParams(paramType, ISearchParamRegistry.SearchParamLookupContextEnum.SEARCH)
496                                                .values()
497                                                .stream()
498                                                .filter(t -> t.getParamType() == RestSearchParameterTypeEnum.REFERENCE)
499                                                .map(t -> UrlUtil.sanitizeUrlPart(t.getName()))
500                                                .sorted()
501                                                .collect(Collectors.toList());
502                                String searchParamMessage = myContext
503                                                .getLocalizer()
504                                                .getMessage(
505                                                                BaseStorageDao.class,
506                                                                "invalidSearchParameter",
507                                                                UrlUtil.sanitizeUrlPart(paramName),
508                                                                UrlUtil.sanitizeUrlPart(paramType),
509                                                                validNames);
510                                String msg = myContext
511                                                .getLocalizer()
512                                                .getMessage(
513                                                                SearchCoordinatorSvcImpl.class,
514                                                                "invalidInclude",
515                                                                UrlUtil.sanitizeUrlPart(name),
516                                                                UrlUtil.sanitizeUrlPart(value),
517                                                                searchParamMessage); // last param is pre-sanitized
518                                throw new InvalidRequestException(Msg.code(2015) + msg);
519                        }
520                }
521        }
522
523        @Override
524        public Optional<Integer> getSearchTotal(
525                        String theUuid, @Nullable RequestDetails theRequestDetails, RequestPartitionId theRequestPartitionId) {
526                SearchTask task = myIdToSearchTask.get(theUuid);
527                if (task != null) {
528                        return Optional.ofNullable(task.awaitInitialSync());
529                }
530
531                /*
532                 * In case there is no running search, if the total is listed as accurate we know one is coming
533                 * so let's wait a bit for it to show up
534                 */
535                Optional<Search> search = myTxService
536                                .withRequest(theRequestDetails)
537                                .execute(() -> mySearchCacheSvc.fetchByUuid(theUuid, theRequestPartitionId));
538                if (search.isPresent()) {
539                        Optional<SearchParameterMap> searchParameterMap = search.get().getSearchParameterMap();
540                        if (searchParameterMap.isPresent()
541                                        && searchParameterMap.get().getSearchTotalMode() == SearchTotalModeEnum.ACCURATE) {
542                                for (int i = 0; i < 10; i++) {
543                                        if (search.isPresent()) {
544                                                QueryParameterUtils.verifySearchHasntFailedOrThrowInternalErrorException(search.get());
545                                                if (search.get().getTotalCount() != null) {
546                                                        return Optional.of(search.get().getTotalCount());
547                                                }
548                                        }
549                                        search = mySearchCacheSvc.fetchByUuid(theUuid, theRequestPartitionId);
550                                }
551                        }
552                }
553
554                return Optional.empty();
555        }
556
557        @Nonnull
558        private PersistedJpaSearchFirstPageBundleProvider submitSearch(
559                        IDao theCallingDao,
560                        SearchParameterMap theParams,
561                        String theResourceType,
562                        RequestDetails theRequestDetails,
563                        ISearchBuilder<JpaPid> theSb,
564                        RequestPartitionId theRequestPartitionId,
565                        Search theSearch) {
566                StopWatch w = new StopWatch();
567
568                SearchTaskParameters stp = new SearchTaskParameters(
569                                theSearch,
570                                theCallingDao,
571                                theParams,
572                                theResourceType,
573                                theRequestDetails,
574                                theRequestPartitionId,
575                                myOnRemoveSearchTask,
576                                mySyncSize);
577                stp.setLoadingThrottleForUnitTests(myLoadingThrottleForUnitTests);
578                SearchTask task = (SearchTask) myBeanFactory.getBean(SearchConfig.SEARCH_TASK, stp);
579                myIdToSearchTask.put(theSearch.getUuid(), task);
580                task.call();
581
582                PersistedJpaSearchFirstPageBundleProvider retVal = myPersistedJpaBundleProviderFactory.newInstanceFirstPage(
583                                theRequestDetails, task, theSb, theRequestPartitionId);
584
585                ourLog.debug("Search initial phase completed in {}ms", w.getMillis());
586                return retVal;
587        }
588
589        @Nullable
590        private PersistedJpaBundleProvider findCachedQuery(
591                        SearchParameterMap theParams,
592                        String theResourceType,
593                        RequestDetails theRequestDetails,
594                        String theQueryString,
595                        RequestPartitionId theRequestPartitionId) {
596                // May be null
597                return myTxService
598                                .withRequest(theRequestDetails)
599                                .withRequestPartitionId(theRequestPartitionId)
600                                .execute(() -> {
601                                        IInterceptorBroadcaster compositeBroadcaster =
602                                                        CompositeInterceptorBroadcaster.newCompositeBroadcaster(
603                                                                        myInterceptorBroadcaster, theRequestDetails);
604
605                                        // Interceptor call: STORAGE_PRECHECK_FOR_CACHED_SEARCH
606
607                                        HookParams params = new HookParams()
608                                                        .add(SearchParameterMap.class, theParams)
609                                                        .add(RequestDetails.class, theRequestDetails)
610                                                        .addIfMatchesType(ServletRequestDetails.class, theRequestDetails);
611                                        boolean canUseCache =
612                                                        compositeBroadcaster.callHooks(Pointcut.STORAGE_PRECHECK_FOR_CACHED_SEARCH, params);
613                                        if (!canUseCache) {
614                                                return null;
615                                        }
616
617                                        // Check for a search matching the given hash
618                                        Search searchToUse = findSearchToUseOrNull(theQueryString, theResourceType, theRequestPartitionId);
619                                        if (searchToUse == null) {
620                                                return null;
621                                        }
622
623                                        ourLog.debug("Reusing search {} from cache", searchToUse.getUuid());
624                                        // Interceptor call: JPA_PERFTRACE_SEARCH_REUSING_CACHED
625                                        params = new HookParams()
626                                                        .add(SearchParameterMap.class, theParams)
627                                                        .add(RequestDetails.class, theRequestDetails)
628                                                        .addIfMatchesType(ServletRequestDetails.class, theRequestDetails);
629                                        compositeBroadcaster.callHooks(Pointcut.JPA_PERFTRACE_SEARCH_REUSING_CACHED, params);
630
631                                        return myPersistedJpaBundleProviderFactory.newInstance(theRequestDetails, searchToUse.getUuid());
632                                });
633        }
634
635        @Nullable
636        private Search findSearchToUseOrNull(
637                        String theQueryString, String theResourceType, RequestPartitionId theRequestPartitionId) {
638                // createdCutoff is in recent past
639                final Instant createdCutoff =
640                                Instant.now().minus(myStorageSettings.getReuseCachedSearchResultsForMillis(), ChronoUnit.MILLIS);
641
642                Optional<Search> candidate = mySearchCacheSvc.findCandidatesForReuse(
643                                theResourceType, theQueryString, createdCutoff, theRequestPartitionId);
644                return candidate.orElse(null);
645        }
646
647        @Nullable
648        private Integer getLoadSynchronousUpToOrNull(CacheControlDirective theCacheControlDirective) {
649                final Integer loadSynchronousUpTo;
650                if (theCacheControlDirective != null && theCacheControlDirective.isNoStore()) {
651                        if (theCacheControlDirective.getMaxResults() != null) {
652                                loadSynchronousUpTo = theCacheControlDirective.getMaxResults();
653                                if (loadSynchronousUpTo > myStorageSettings.getCacheControlNoStoreMaxResultsUpperLimit()) {
654                                        throw new InvalidRequestException(Msg.code(1165) + Constants.HEADER_CACHE_CONTROL + " header "
655                                                        + Constants.CACHE_CONTROL_MAX_RESULTS + " value must not exceed "
656                                                        + myStorageSettings.getCacheControlNoStoreMaxResultsUpperLimit());
657                                }
658                        } else {
659                                loadSynchronousUpTo = 100;
660                        }
661                } else {
662                        loadSynchronousUpTo = null;
663                }
664                return loadSynchronousUpTo;
665        }
666
667        /**
668         * Creates a {@link Pageable} using a start and end index
669         */
670        @SuppressWarnings("WeakerAccess")
671        @Nullable
672        public static Pageable toPage(final int theFromIndex, int theToIndex) {
673                int pageSize = theToIndex - theFromIndex;
674                if (pageSize < 1) {
675                        return null;
676                }
677
678                int pageIndex = theFromIndex / pageSize;
679
680                return new PageRequest(pageIndex, pageSize, Sort.unsorted()) {
681                        private static final long serialVersionUID = 1L;
682
683                        @Override
684                        public long getOffset() {
685                                return theFromIndex;
686                        }
687                };
688        }
689}