001/*-
002 * #%L
003 * HAPI FHIR JPA Server
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%family
019 */
020package ca.uhn.fhir.jpa.search;
021
022import ca.uhn.fhir.context.FhirContext;
023import ca.uhn.fhir.i18n.Msg;
024import ca.uhn.fhir.interceptor.api.HookParams;
025import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
026import ca.uhn.fhir.interceptor.api.Pointcut;
027import ca.uhn.fhir.interceptor.model.RequestPartitionId;
028import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
029import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
030import ca.uhn.fhir.jpa.api.dao.IDao;
031import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
032import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
033import ca.uhn.fhir.jpa.config.SearchConfig;
034import ca.uhn.fhir.jpa.dao.BaseStorageDao;
035import ca.uhn.fhir.jpa.dao.ISearchBuilder;
036import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
037import ca.uhn.fhir.jpa.dao.search.ResourceNotFoundInIndexException;
038import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
039import ca.uhn.fhir.jpa.entity.Search;
040import ca.uhn.fhir.jpa.model.dao.JpaPid;
041import ca.uhn.fhir.jpa.model.search.SearchStatusEnum;
042import ca.uhn.fhir.jpa.search.builder.StorageInterceptorHooksFacade;
043import ca.uhn.fhir.jpa.search.builder.tasks.SearchContinuationTask;
044import ca.uhn.fhir.jpa.search.builder.tasks.SearchTask;
045import ca.uhn.fhir.jpa.search.builder.tasks.SearchTaskParameters;
046import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc;
047import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc;
048import ca.uhn.fhir.jpa.search.cache.SearchCacheStatusEnum;
049import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
050import ca.uhn.fhir.jpa.util.QueryParameterUtils;
051import ca.uhn.fhir.model.api.Include;
052import ca.uhn.fhir.rest.api.CacheControlDirective;
053import ca.uhn.fhir.rest.api.Constants;
054import ca.uhn.fhir.rest.api.RestSearchParameterTypeEnum;
055import ca.uhn.fhir.rest.api.SearchTotalModeEnum;
056import ca.uhn.fhir.rest.api.server.IBundleProvider;
057import ca.uhn.fhir.rest.api.server.RequestDetails;
058import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
059import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
060import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
061import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster;
062import ca.uhn.fhir.rest.server.util.ISearchParamRegistry;
063import ca.uhn.fhir.util.AsyncUtil;
064import ca.uhn.fhir.util.StopWatch;
065import ca.uhn.fhir.util.UrlUtil;
066import com.google.common.annotations.VisibleForTesting;
067import jakarta.annotation.Nonnull;
068import jakarta.annotation.Nullable;
069import org.apache.commons.lang3.time.DateUtils;
070import org.hl7.fhir.instance.model.api.IBaseResource;
071import org.springframework.beans.factory.BeanFactory;
072import org.springframework.data.domain.PageRequest;
073import org.springframework.data.domain.Pageable;
074import org.springframework.data.domain.Sort;
075import org.springframework.stereotype.Component;
076import org.springframework.transaction.support.TransactionSynchronizationManager;
077
078import java.time.Instant;
079import java.time.temporal.ChronoUnit;
080import java.util.List;
081import java.util.Optional;
082import java.util.Set;
083import java.util.UUID;
084import java.util.concurrent.Callable;
085import java.util.concurrent.ConcurrentHashMap;
086import java.util.concurrent.TimeUnit;
087import java.util.function.Consumer;
088import java.util.stream.Collectors;
089
090import static ca.uhn.fhir.jpa.util.QueryParameterUtils.DEFAULT_SYNC_SIZE;
091import static org.apache.commons.lang3.StringUtils.isBlank;
092import static org.apache.commons.lang3.StringUtils.isNotBlank;
093
094@Component("mySearchCoordinatorSvc")
095public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc<JpaPid> {
096
097        private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchCoordinatorSvcImpl.class);
098
099        private final FhirContext myContext;
100        private final JpaStorageSettings myStorageSettings;
101        private final IInterceptorBroadcaster myInterceptorBroadcaster;
102        private final HapiTransactionService myTxService;
103        private final ISearchCacheSvc mySearchCacheSvc;
104        private final ISearchResultCacheSvc mySearchResultCacheSvc;
105        private final DaoRegistry myDaoRegistry;
106        private final SearchBuilderFactory<JpaPid> mySearchBuilderFactory;
107        private final ISynchronousSearchSvc mySynchronousSearchSvc;
108        private final PersistedJpaBundleProviderFactory myPersistedJpaBundleProviderFactory;
109        private final ISearchParamRegistry mySearchParamRegistry;
110        private final SearchStrategyFactory mySearchStrategyFactory;
111        private final ExceptionService myExceptionSvc;
112        private final BeanFactory myBeanFactory;
113        private ConcurrentHashMap<String, SearchTask> myIdToSearchTask = new ConcurrentHashMap<>();
114
115        private final Consumer<String> myOnRemoveSearchTask = myIdToSearchTask::remove;
116
117        private final StorageInterceptorHooksFacade myStorageInterceptorHooks;
118        private Integer myLoadingThrottleForUnitTests = null;
119        private long myMaxMillisToWaitForRemoteResults = DateUtils.MILLIS_PER_MINUTE;
120        private boolean myNeverUseLocalSearchForUnitTests;
121        private int mySyncSize = DEFAULT_SYNC_SIZE;
122
123        /**
124         * Constructor
125         */
126        public SearchCoordinatorSvcImpl(
127                        FhirContext theContext,
128                        JpaStorageSettings theStorageSettings,
129                        IInterceptorBroadcaster theInterceptorBroadcaster,
130                        HapiTransactionService theTxService,
131                        ISearchCacheSvc theSearchCacheSvc,
132                        ISearchResultCacheSvc theSearchResultCacheSvc,
133                        DaoRegistry theDaoRegistry,
134                        SearchBuilderFactory<JpaPid> theSearchBuilderFactory,
135                        ISynchronousSearchSvc theSynchronousSearchSvc,
136                        PersistedJpaBundleProviderFactory thePersistedJpaBundleProviderFactory,
137                        ISearchParamRegistry theSearchParamRegistry,
138                        SearchStrategyFactory theSearchStrategyFactory,
139                        ExceptionService theExceptionSvc,
140                        BeanFactory theBeanFactory) {
141                super();
142                myContext = theContext;
143                myStorageSettings = theStorageSettings;
144                myInterceptorBroadcaster = theInterceptorBroadcaster;
145                myTxService = theTxService;
146                mySearchCacheSvc = theSearchCacheSvc;
147                mySearchResultCacheSvc = theSearchResultCacheSvc;
148                myDaoRegistry = theDaoRegistry;
149                mySearchBuilderFactory = theSearchBuilderFactory;
150                mySynchronousSearchSvc = theSynchronousSearchSvc;
151                myPersistedJpaBundleProviderFactory = thePersistedJpaBundleProviderFactory;
152                mySearchParamRegistry = theSearchParamRegistry;
153                mySearchStrategyFactory = theSearchStrategyFactory;
154                myExceptionSvc = theExceptionSvc;
155                myBeanFactory = theBeanFactory;
156
157                myStorageInterceptorHooks = new StorageInterceptorHooksFacade(myInterceptorBroadcaster);
158        }
159
160        @VisibleForTesting
161        Set<String> getActiveSearchIds() {
162                return myIdToSearchTask.keySet();
163        }
164
165        @VisibleForTesting
166        public void setIdToSearchTaskMapForUnitTests(ConcurrentHashMap<String, SearchTask> theIdToSearchTaskMap) {
167                myIdToSearchTask = theIdToSearchTaskMap;
168        }
169
170        @VisibleForTesting
171        public void setLoadingThrottleForUnitTests(Integer theLoadingThrottleForUnitTests) {
172                myLoadingThrottleForUnitTests = theLoadingThrottleForUnitTests;
173        }
174
175        @VisibleForTesting
176        public void setNeverUseLocalSearchForUnitTests(boolean theNeverUseLocalSearchForUnitTests) {
177                myNeverUseLocalSearchForUnitTests = theNeverUseLocalSearchForUnitTests;
178        }
179
180        @VisibleForTesting
181        public void setSyncSizeForUnitTests(int theSyncSize) {
182                mySyncSize = theSyncSize;
183        }
184
185        @Override
186        public void cancelAllActiveSearches() {
187                for (SearchTask next : myIdToSearchTask.values()) {
188                        ourLog.info(
189                                        "Requesting immediate abort of search: {}", next.getSearch().getUuid());
190                        next.requestImmediateAbort();
191                        AsyncUtil.awaitLatchAndIgnoreInterrupt(next.getCompletionLatch(), 30, TimeUnit.SECONDS);
192                }
193        }
194
195        @SuppressWarnings("SameParameterValue")
196        @VisibleForTesting
197        void setMaxMillisToWaitForRemoteResultsForUnitTest(long theMaxMillisToWaitForRemoteResults) {
198                myMaxMillisToWaitForRemoteResults = theMaxMillisToWaitForRemoteResults;
199        }
200
201        /**
202         * This method is called by the HTTP client processing thread in order to
203         * fetch resources.
204         * <p>
205         * This method must not be called from inside a transaction. The rationale is that
206         * the {@link Search} entity is treated as a piece of shared state across client threads
207         * accessing the same thread, so we need to be able to update that table in a transaction
208         * and commit it right away in order for that to work. Examples of needing to do this
209         * include if two different clients request the same search and are both paging at the
210         * same time, but also includes clients that are hacking the paging links to
211         * fetch multiple pages of a search result in parallel. In both cases we need to only
212         * let one of them actually activate the search, or we will have conficts. The other thread
213         * just needs to wait until the first one actually fetches more results.
214         */
215        @Override
216        public List<JpaPid> getResources(
217                        final String theUuid,
218                        int theFrom,
219                        int theTo,
220                        @Nullable RequestDetails theRequestDetails,
221                        RequestPartitionId theRequestPartitionId) {
222                assert !TransactionSynchronizationManager.isActualTransactionActive();
223
224                // If we're actively searching right now, don't try to do anything until at least one batch has been
225                // persisted in the DB
226                SearchTask searchTask = myIdToSearchTask.get(theUuid);
227                if (searchTask != null) {
228                        searchTask.awaitInitialSync();
229                }
230
231                ourLog.trace("About to start looking for resources {}-{}", theFrom, theTo);
232
233                Search search;
234                StopWatch sw = new StopWatch();
235                while (true) {
236
237                        if (myNeverUseLocalSearchForUnitTests == false) {
238                                if (searchTask != null) {
239                                        ourLog.trace("Local search found");
240                                        List<JpaPid> resourcePids = searchTask.getResourcePids(theFrom, theTo);
241                                        ourLog.trace(
242                                                        "Local search returned {} pids, wanted {}-{} - Search: {}",
243                                                        resourcePids.size(),
244                                                        theFrom,
245                                                        theTo,
246                                                        searchTask.getSearch());
247
248                                        /*
249                                         * Generally, if a search task is open, the fastest possible thing is to just return its results. This
250                                         * will work most of the time, but can fail if the task hit a search threshold and the client is requesting
251                                         * results beyond that threashold. In that case, we'll keep going below, since that will trigger another
252                                         * task.
253                                         */
254                                        if ((searchTask.getSearch().getNumFound()
255                                                                                        - searchTask.getSearch().getNumBlocked())
256                                                                        >= theTo
257                                                        || resourcePids.size() == (theTo - theFrom)) {
258                                                return resourcePids;
259                                        }
260                                }
261                        }
262
263                        Callable<Search> searchCallback = () -> mySearchCacheSvc
264                                        .fetchByUuid(theUuid, theRequestPartitionId)
265                                        .orElseThrow(() -> myExceptionSvc.newUnknownSearchException(theUuid));
266                        search = myTxService
267                                        .withRequest(theRequestDetails)
268                                        .withRequestPartitionId(theRequestPartitionId)
269                                        .execute(searchCallback);
270                        QueryParameterUtils.verifySearchHasntFailedOrThrowInternalErrorException(search);
271
272                        if (search.getStatus() == SearchStatusEnum.FINISHED) {
273                                ourLog.trace("Search entity marked as finished with {} results", search.getNumFound());
274                                break;
275                        }
276                        if ((search.getNumFound() - search.getNumBlocked()) >= theTo) {
277                                ourLog.trace("Search entity has {} results so far", search.getNumFound());
278                                break;
279                        }
280
281                        if (sw.getMillis() > myMaxMillisToWaitForRemoteResults) {
282                                ourLog.error(
283                                                "Search {} of type {} for {}{} timed out after {}ms",
284                                                search.getId(),
285                                                search.getSearchType(),
286                                                search.getResourceType(),
287                                                search.getSearchQueryString(),
288                                                sw.getMillis());
289                                throw new InternalErrorException(Msg.code(1163) + "Request timed out after " + sw.getMillis() + "ms");
290                        }
291
292                        // If the search was saved in "pass complete mode" it's probably time to
293                        // start a new pass
294                        if (search.getStatus() == SearchStatusEnum.PASSCMPLET) {
295                                ourLog.trace("Going to try to start next search");
296                                Optional<Search> newSearch =
297                                                mySearchCacheSvc.tryToMarkSearchAsInProgress(search, theRequestPartitionId);
298                                if (newSearch.isPresent()) {
299                                        ourLog.trace("Launching new search");
300                                        search = newSearch.get();
301                                        String resourceType = search.getResourceType();
302                                        SearchParameterMap params = search.getSearchParameterMap()
303                                                        .orElseThrow(() -> new IllegalStateException("No map in PASSCOMPLET search"));
304                                        IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(resourceType);
305
306                                        SearchTaskParameters parameters = new SearchTaskParameters(
307                                                        search,
308                                                        resourceDao,
309                                                        params,
310                                                        resourceType,
311                                                        theRequestDetails,
312                                                        theRequestPartitionId,
313                                                        myOnRemoveSearchTask,
314                                                        mySyncSize);
315                                        parameters.setLoadingThrottleForUnitTests(myLoadingThrottleForUnitTests);
316                                        SearchContinuationTask task =
317                                                        (SearchContinuationTask) myBeanFactory.getBean(SearchConfig.CONTINUE_TASK, parameters);
318                                        myIdToSearchTask.put(search.getUuid(), task);
319                                        task.call();
320                                }
321                        }
322
323                        if (!search.getStatus().isDone()) {
324                                AsyncUtil.sleep(500);
325                        }
326                }
327
328                ourLog.trace("Finished looping");
329
330                List<JpaPid> pids = fetchResultPids(theUuid, theFrom, theTo, theRequestDetails, search, theRequestPartitionId);
331
332                ourLog.trace("Fetched {} results", pids.size());
333
334                return pids;
335        }
336
337        @Nonnull
338        private List<JpaPid> fetchResultPids(
339                        String theUuid,
340                        int theFrom,
341                        int theTo,
342                        @Nullable RequestDetails theRequestDetails,
343                        Search theSearch,
344                        RequestPartitionId theRequestPartitionId) {
345                List<JpaPid> pids = mySearchResultCacheSvc.fetchResultPids(
346                                theSearch, theFrom, theTo, theRequestDetails, theRequestPartitionId);
347                if (pids == null) {
348                        throw myExceptionSvc.newUnknownSearchException(theUuid);
349                }
350                return pids;
351        }
352
353        @Override
354        public IBundleProvider registerSearch(
355                        final IFhirResourceDao<?> theCallingDao,
356                        final SearchParameterMap theParams,
357                        String theResourceType,
358                        CacheControlDirective theCacheControlDirective,
359                        RequestDetails theRequestDetails,
360                        RequestPartitionId theRequestPartitionId) {
361                final String searchUuid = UUID.randomUUID().toString();
362
363                final String queryString = theParams.toNormalizedQueryString(myContext);
364                ourLog.debug("Registering new search {}", searchUuid);
365
366                Search search = new Search();
367                QueryParameterUtils.populateSearchEntity(
368                                theParams, theResourceType, searchUuid, queryString, search, theRequestPartitionId);
369
370                myStorageInterceptorHooks.callStoragePresearchRegistered(
371                                theRequestDetails, theParams, search, theRequestPartitionId);
372
373                validateSearch(theParams);
374
375                Class<? extends IBaseResource> resourceTypeClass =
376                                myContext.getResourceDefinition(theResourceType).getImplementingClass();
377                final ISearchBuilder<JpaPid> sb =
378                                mySearchBuilderFactory.newSearchBuilder(theCallingDao, theResourceType, resourceTypeClass);
379                sb.setFetchSize(mySyncSize);
380
381                final Integer loadSynchronousUpTo = getLoadSynchronousUpToOrNull(theCacheControlDirective);
382                boolean isOffsetQuery = theParams.isOffsetQuery();
383
384                // todo someday - not today.
385                //              SearchStrategyFactory.ISearchStrategy searchStrategy = mySearchStrategyFactory.pickStrategy(theResourceType,
386                // theParams, theRequestDetails);
387                //              return searchStrategy.get();
388
389                if (theParams.isLoadSynchronous() || loadSynchronousUpTo != null || isOffsetQuery) {
390                        if (mySearchStrategyFactory.isSupportsHSearchDirect(theResourceType, theParams, theRequestDetails)) {
391                                ourLog.info("Search {} is using direct load strategy", searchUuid);
392                                SearchStrategyFactory.ISearchStrategy direct = mySearchStrategyFactory.makeDirectStrategy(
393                                                searchUuid, theResourceType, theParams, theRequestDetails);
394
395                                try {
396                                        return direct.get();
397
398                                } catch (ResourceNotFoundInIndexException theE) {
399                                        // some resources were not found in index, so we will inform this and resort to JPA search
400                                        ourLog.warn(
401                                                        "Some resources were not found in index. Make sure all resources were indexed. Resorting to database search.");
402                                }
403                        }
404
405                        ourLog.debug("Search {} is loading in synchronous mode", searchUuid);
406                        return mySynchronousSearchSvc.executeQuery(
407                                        theParams, theRequestDetails, searchUuid, sb, loadSynchronousUpTo, theRequestPartitionId);
408                }
409
410                /*
411                 * See if there are any cached searches whose results we can return
412                 * instead
413                 */
414                SearchCacheStatusEnum cacheStatus = SearchCacheStatusEnum.MISS;
415                if (theCacheControlDirective != null && theCacheControlDirective.isNoCache()) {
416                        cacheStatus = SearchCacheStatusEnum.NOT_TRIED;
417                }
418
419                if (cacheStatus != SearchCacheStatusEnum.NOT_TRIED) {
420                        if (theParams.getEverythingMode() == null) {
421                                if (myStorageSettings.getReuseCachedSearchResultsForMillis() != null) {
422                                        PersistedJpaBundleProvider foundSearchProvider = findCachedQuery(
423                                                        theParams, theResourceType, theRequestDetails, queryString, theRequestPartitionId);
424                                        if (foundSearchProvider != null) {
425                                                foundSearchProvider.setCacheStatus(SearchCacheStatusEnum.HIT);
426                                                return foundSearchProvider;
427                                        }
428                                }
429                        }
430                }
431
432                PersistedJpaSearchFirstPageBundleProvider retVal = submitSearch(
433                                theCallingDao, theParams, theResourceType, theRequestDetails, sb, theRequestPartitionId, search);
434                retVal.setCacheStatus(cacheStatus);
435                return retVal;
436        }
437
438        private void validateSearch(SearchParameterMap theParams) {
439                validateIncludes(theParams.getIncludes(), Constants.PARAM_INCLUDE);
440                validateIncludes(theParams.getRevIncludes(), Constants.PARAM_REVINCLUDE);
441        }
442
443        private void validateIncludes(Set<Include> includes, String name) {
444                for (Include next : includes) {
445                        String value = next.getValue();
446                        if (value.equals(Constants.INCLUDE_STAR) || isBlank(value)) {
447                                continue;
448                        }
449
450                        String paramType = next.getParamType();
451                        String paramName = next.getParamName();
452                        String paramTargetType = next.getParamTargetType();
453
454                        if (isBlank(paramType) || isBlank(paramName)) {
455                                String msg = myContext
456                                                .getLocalizer()
457                                                .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidInclude", name, value, "");
458                                throw new InvalidRequestException(Msg.code(2018) + msg);
459                        }
460
461                        if (!myDaoRegistry.isResourceTypeSupported(paramType)) {
462                                String resourceTypeMsg = myContext
463                                                .getLocalizer()
464                                                .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidResourceType", paramType);
465                                String msg = myContext
466                                                .getLocalizer()
467                                                .getMessage(
468                                                                SearchCoordinatorSvcImpl.class,
469                                                                "invalidInclude",
470                                                                UrlUtil.sanitizeUrlPart(name),
471                                                                UrlUtil.sanitizeUrlPart(value),
472                                                                resourceTypeMsg); // last param is pre-sanitized
473                                throw new InvalidRequestException(Msg.code(2017) + msg);
474                        }
475
476                        if (isNotBlank(paramTargetType) && !myDaoRegistry.isResourceTypeSupported(paramTargetType)) {
477                                String resourceTypeMsg = myContext
478                                                .getLocalizer()
479                                                .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidResourceType", paramTargetType);
480                                String msg = myContext
481                                                .getLocalizer()
482                                                .getMessage(
483                                                                SearchCoordinatorSvcImpl.class,
484                                                                "invalidInclude",
485                                                                UrlUtil.sanitizeUrlPart(name),
486                                                                UrlUtil.sanitizeUrlPart(value),
487                                                                resourceTypeMsg); // last param is pre-sanitized
488                                throw new InvalidRequestException(Msg.code(2016) + msg);
489                        }
490
491                        if (!Constants.INCLUDE_STAR.equals(paramName)
492                                        && mySearchParamRegistry.getActiveSearchParam(paramType, paramName) == null) {
493                                List<String> validNames = mySearchParamRegistry.getActiveSearchParams(paramType).values().stream()
494                                                .filter(t -> t.getParamType() == RestSearchParameterTypeEnum.REFERENCE)
495                                                .map(t -> UrlUtil.sanitizeUrlPart(t.getName()))
496                                                .sorted()
497                                                .collect(Collectors.toList());
498                                String searchParamMessage = myContext
499                                                .getLocalizer()
500                                                .getMessage(
501                                                                BaseStorageDao.class,
502                                                                "invalidSearchParameter",
503                                                                UrlUtil.sanitizeUrlPart(paramName),
504                                                                UrlUtil.sanitizeUrlPart(paramType),
505                                                                validNames);
506                                String msg = myContext
507                                                .getLocalizer()
508                                                .getMessage(
509                                                                SearchCoordinatorSvcImpl.class,
510                                                                "invalidInclude",
511                                                                UrlUtil.sanitizeUrlPart(name),
512                                                                UrlUtil.sanitizeUrlPart(value),
513                                                                searchParamMessage); // last param is pre-sanitized
514                                throw new InvalidRequestException(Msg.code(2015) + msg);
515                        }
516                }
517        }
518
519        @Override
520        public Optional<Integer> getSearchTotal(
521                        String theUuid, @Nullable RequestDetails theRequestDetails, RequestPartitionId theRequestPartitionId) {
522                SearchTask task = myIdToSearchTask.get(theUuid);
523                if (task != null) {
524                        return Optional.ofNullable(task.awaitInitialSync());
525                }
526
527                /*
528                 * In case there is no running search, if the total is listed as accurate we know one is coming
529                 * so let's wait a bit for it to show up
530                 */
531                Optional<Search> search = myTxService
532                                .withRequest(theRequestDetails)
533                                .execute(() -> mySearchCacheSvc.fetchByUuid(theUuid, theRequestPartitionId));
534                if (search.isPresent()) {
535                        Optional<SearchParameterMap> searchParameterMap = search.get().getSearchParameterMap();
536                        if (searchParameterMap.isPresent()
537                                        && searchParameterMap.get().getSearchTotalMode() == SearchTotalModeEnum.ACCURATE) {
538                                for (int i = 0; i < 10; i++) {
539                                        if (search.isPresent()) {
540                                                QueryParameterUtils.verifySearchHasntFailedOrThrowInternalErrorException(search.get());
541                                                if (search.get().getTotalCount() != null) {
542                                                        return Optional.of(search.get().getTotalCount());
543                                                }
544                                        }
545                                        search = mySearchCacheSvc.fetchByUuid(theUuid, theRequestPartitionId);
546                                }
547                        }
548                }
549
550                return Optional.empty();
551        }
552
553        @Nonnull
554        private PersistedJpaSearchFirstPageBundleProvider submitSearch(
555                        IDao theCallingDao,
556                        SearchParameterMap theParams,
557                        String theResourceType,
558                        RequestDetails theRequestDetails,
559                        ISearchBuilder<JpaPid> theSb,
560                        RequestPartitionId theRequestPartitionId,
561                        Search theSearch) {
562                StopWatch w = new StopWatch();
563
564                SearchTaskParameters stp = new SearchTaskParameters(
565                                theSearch,
566                                theCallingDao,
567                                theParams,
568                                theResourceType,
569                                theRequestDetails,
570                                theRequestPartitionId,
571                                myOnRemoveSearchTask,
572                                mySyncSize);
573                stp.setLoadingThrottleForUnitTests(myLoadingThrottleForUnitTests);
574                SearchTask task = (SearchTask) myBeanFactory.getBean(SearchConfig.SEARCH_TASK, stp);
575                myIdToSearchTask.put(theSearch.getUuid(), task);
576                task.call();
577
578                PersistedJpaSearchFirstPageBundleProvider retVal = myPersistedJpaBundleProviderFactory.newInstanceFirstPage(
579                                theRequestDetails, task, theSb, theRequestPartitionId);
580
581                ourLog.debug("Search initial phase completed in {}ms", w.getMillis());
582                return retVal;
583        }
584
585        @Nullable
586        private PersistedJpaBundleProvider findCachedQuery(
587                        SearchParameterMap theParams,
588                        String theResourceType,
589                        RequestDetails theRequestDetails,
590                        String theQueryString,
591                        RequestPartitionId theRequestPartitionId) {
592                // May be null
593                return myTxService
594                                .withRequest(theRequestDetails)
595                                .withRequestPartitionId(theRequestPartitionId)
596                                .execute(() -> {
597
598                                        // Interceptor call: STORAGE_PRECHECK_FOR_CACHED_SEARCH
599                                        HookParams params = new HookParams()
600                                                        .add(SearchParameterMap.class, theParams)
601                                                        .add(RequestDetails.class, theRequestDetails)
602                                                        .addIfMatchesType(ServletRequestDetails.class, theRequestDetails);
603                                        Object outcome = CompositeInterceptorBroadcaster.doCallHooksAndReturnObject(
604                                                        myInterceptorBroadcaster,
605                                                        theRequestDetails,
606                                                        Pointcut.STORAGE_PRECHECK_FOR_CACHED_SEARCH,
607                                                        params);
608                                        if (Boolean.FALSE.equals(outcome)) {
609                                                return null;
610                                        }
611
612                                        // Check for a search matching the given hash
613                                        Search searchToUse = findSearchToUseOrNull(theQueryString, theResourceType, theRequestPartitionId);
614                                        if (searchToUse == null) {
615                                                return null;
616                                        }
617
618                                        ourLog.debug("Reusing search {} from cache", searchToUse.getUuid());
619                                        // Interceptor call: JPA_PERFTRACE_SEARCH_REUSING_CACHED
620                                        params = new HookParams()
621                                                        .add(SearchParameterMap.class, theParams)
622                                                        .add(RequestDetails.class, theRequestDetails)
623                                                        .addIfMatchesType(ServletRequestDetails.class, theRequestDetails);
624                                        CompositeInterceptorBroadcaster.doCallHooks(
625                                                        myInterceptorBroadcaster,
626                                                        theRequestDetails,
627                                                        Pointcut.JPA_PERFTRACE_SEARCH_REUSING_CACHED,
628                                                        params);
629
630                                        return myPersistedJpaBundleProviderFactory.newInstance(theRequestDetails, searchToUse.getUuid());
631                                });
632        }
633
634        @Nullable
635        private Search findSearchToUseOrNull(
636                        String theQueryString, String theResourceType, RequestPartitionId theRequestPartitionId) {
637                // createdCutoff is in recent past
638                final Instant createdCutoff =
639                                Instant.now().minus(myStorageSettings.getReuseCachedSearchResultsForMillis(), ChronoUnit.MILLIS);
640
641                Optional<Search> candidate = mySearchCacheSvc.findCandidatesForReuse(
642                                theResourceType, theQueryString, createdCutoff, theRequestPartitionId);
643                return candidate.orElse(null);
644        }
645
646        @Nullable
647        private Integer getLoadSynchronousUpToOrNull(CacheControlDirective theCacheControlDirective) {
648                final Integer loadSynchronousUpTo;
649                if (theCacheControlDirective != null && theCacheControlDirective.isNoStore()) {
650                        if (theCacheControlDirective.getMaxResults() != null) {
651                                loadSynchronousUpTo = theCacheControlDirective.getMaxResults();
652                                if (loadSynchronousUpTo > myStorageSettings.getCacheControlNoStoreMaxResultsUpperLimit()) {
653                                        throw new InvalidRequestException(Msg.code(1165) + Constants.HEADER_CACHE_CONTROL + " header "
654                                                        + Constants.CACHE_CONTROL_MAX_RESULTS + " value must not exceed "
655                                                        + myStorageSettings.getCacheControlNoStoreMaxResultsUpperLimit());
656                                }
657                        } else {
658                                loadSynchronousUpTo = 100;
659                        }
660                } else {
661                        loadSynchronousUpTo = null;
662                }
663                return loadSynchronousUpTo;
664        }
665
666        /**
667         * Creates a {@link Pageable} using a start and end index
668         */
669        @SuppressWarnings("WeakerAccess")
670        @Nullable
671        public static Pageable toPage(final int theFromIndex, int theToIndex) {
672                int pageSize = theToIndex - theFromIndex;
673                if (pageSize < 1) {
674                        return null;
675                }
676
677                int pageIndex = theFromIndex / pageSize;
678
679                return new PageRequest(pageIndex, pageSize, Sort.unsorted()) {
680                        private static final long serialVersionUID = 1L;
681
682                        @Override
683                        public long getOffset() {
684                                return theFromIndex;
685                        }
686                };
687        }
688}