001/*-
002 * #%L
003 * HAPI FHIR JPA Server
004 * %%
005 * Copyright (C) 2014 - 2025 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%family
019 */
020package ca.uhn.fhir.jpa.search;
021
022import ca.uhn.fhir.context.FhirContext;
023import ca.uhn.fhir.i18n.Msg;
024import ca.uhn.fhir.interceptor.api.HookParams;
025import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
026import ca.uhn.fhir.interceptor.api.Pointcut;
027import ca.uhn.fhir.interceptor.model.RequestPartitionId;
028import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
029import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
030import ca.uhn.fhir.jpa.api.dao.IDao;
031import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
032import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
033import ca.uhn.fhir.jpa.config.SearchConfig;
034import ca.uhn.fhir.jpa.dao.BaseStorageDao;
035import ca.uhn.fhir.jpa.dao.ISearchBuilder;
036import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
037import ca.uhn.fhir.jpa.dao.search.ResourceNotFoundInIndexException;
038import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
039import ca.uhn.fhir.jpa.entity.Search;
040import ca.uhn.fhir.jpa.model.dao.JpaPid;
041import ca.uhn.fhir.jpa.model.search.SearchStatusEnum;
042import ca.uhn.fhir.jpa.search.builder.StorageInterceptorHooksFacade;
043import ca.uhn.fhir.jpa.search.builder.tasks.SearchContinuationTask;
044import ca.uhn.fhir.jpa.search.builder.tasks.SearchTask;
045import ca.uhn.fhir.jpa.search.builder.tasks.SearchTaskParameters;
046import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc;
047import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc;
048import ca.uhn.fhir.jpa.search.cache.SearchCacheStatusEnum;
049import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
050import ca.uhn.fhir.jpa.util.QueryParameterUtils;
051import ca.uhn.fhir.model.api.Include;
052import ca.uhn.fhir.rest.api.CacheControlDirective;
053import ca.uhn.fhir.rest.api.Constants;
054import ca.uhn.fhir.rest.api.RestSearchParameterTypeEnum;
055import ca.uhn.fhir.rest.api.SearchTotalModeEnum;
056import ca.uhn.fhir.rest.api.server.IBundleProvider;
057import ca.uhn.fhir.rest.api.server.RequestDetails;
058import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
059import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
060import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
061import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster;
062import ca.uhn.fhir.rest.server.util.ISearchParamRegistry;
063import ca.uhn.fhir.util.AsyncUtil;
064import ca.uhn.fhir.util.StopWatch;
065import ca.uhn.fhir.util.UrlUtil;
066import com.google.common.annotations.VisibleForTesting;
067import jakarta.annotation.Nonnull;
068import jakarta.annotation.Nullable;
069import org.apache.commons.lang3.time.DateUtils;
070import org.hl7.fhir.instance.model.api.IBaseResource;
071import org.springframework.beans.factory.BeanFactory;
072import org.springframework.data.domain.PageRequest;
073import org.springframework.data.domain.Pageable;
074import org.springframework.data.domain.Sort;
075import org.springframework.stereotype.Component;
076import org.springframework.transaction.support.TransactionSynchronizationManager;
077
078import java.time.Instant;
079import java.time.temporal.ChronoUnit;
080import java.util.List;
081import java.util.Optional;
082import java.util.Set;
083import java.util.UUID;
084import java.util.concurrent.Callable;
085import java.util.concurrent.ConcurrentHashMap;
086import java.util.concurrent.TimeUnit;
087import java.util.function.Consumer;
088import java.util.stream.Collectors;
089
090import static ca.uhn.fhir.jpa.util.QueryParameterUtils.DEFAULT_SYNC_SIZE;
091import static org.apache.commons.lang3.StringUtils.isBlank;
092import static org.apache.commons.lang3.StringUtils.isNotBlank;
093
094@Component("mySearchCoordinatorSvc")
095public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc<JpaPid> {
096
097        private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchCoordinatorSvcImpl.class);
098
099        private final FhirContext myContext;
100        private final JpaStorageSettings myStorageSettings;
101        private final IInterceptorBroadcaster myInterceptorBroadcaster;
102        private final HapiTransactionService myTxService;
103        private final ISearchCacheSvc mySearchCacheSvc;
104        private final ISearchResultCacheSvc mySearchResultCacheSvc;
105        private final DaoRegistry myDaoRegistry;
106        private final SearchBuilderFactory<JpaPid> mySearchBuilderFactory;
107        private final ISynchronousSearchSvc mySynchronousSearchSvc;
108        private final PersistedJpaBundleProviderFactory myPersistedJpaBundleProviderFactory;
109        private final ISearchParamRegistry mySearchParamRegistry;
110        private final SearchStrategyFactory mySearchStrategyFactory;
111        private final ExceptionService myExceptionSvc;
112        private final BeanFactory myBeanFactory;
113        private ConcurrentHashMap<String, SearchTask> myIdToSearchTask = new ConcurrentHashMap<>();
114
115        private final Consumer<String> myOnRemoveSearchTask = myIdToSearchTask::remove;
116
117        private final StorageInterceptorHooksFacade myStorageInterceptorHooks;
118        private Integer myLoadingThrottleForUnitTests = null;
119        private long myMaxMillisToWaitForRemoteResults = DateUtils.MILLIS_PER_MINUTE;
120        private boolean myNeverUseLocalSearchForUnitTests;
121        private int mySyncSize = DEFAULT_SYNC_SIZE;
122
123        /**
124         * Constructor
125         */
126        public SearchCoordinatorSvcImpl(
127                        FhirContext theContext,
128                        JpaStorageSettings theStorageSettings,
129                        IInterceptorBroadcaster theInterceptorBroadcaster,
130                        HapiTransactionService theTxService,
131                        ISearchCacheSvc theSearchCacheSvc,
132                        ISearchResultCacheSvc theSearchResultCacheSvc,
133                        DaoRegistry theDaoRegistry,
134                        SearchBuilderFactory<JpaPid> theSearchBuilderFactory,
135                        ISynchronousSearchSvc theSynchronousSearchSvc,
136                        PersistedJpaBundleProviderFactory thePersistedJpaBundleProviderFactory,
137                        ISearchParamRegistry theSearchParamRegistry,
138                        SearchStrategyFactory theSearchStrategyFactory,
139                        ExceptionService theExceptionSvc,
140                        BeanFactory theBeanFactory) {
141                super();
142                myContext = theContext;
143                myStorageSettings = theStorageSettings;
144                myInterceptorBroadcaster = theInterceptorBroadcaster;
145                myTxService = theTxService;
146                mySearchCacheSvc = theSearchCacheSvc;
147                mySearchResultCacheSvc = theSearchResultCacheSvc;
148                myDaoRegistry = theDaoRegistry;
149                mySearchBuilderFactory = theSearchBuilderFactory;
150                mySynchronousSearchSvc = theSynchronousSearchSvc;
151                myPersistedJpaBundleProviderFactory = thePersistedJpaBundleProviderFactory;
152                mySearchParamRegistry = theSearchParamRegistry;
153                mySearchStrategyFactory = theSearchStrategyFactory;
154                myExceptionSvc = theExceptionSvc;
155                myBeanFactory = theBeanFactory;
156
157                myStorageInterceptorHooks = new StorageInterceptorHooksFacade(myInterceptorBroadcaster);
158        }
159
160        @VisibleForTesting
161        Set<String> getActiveSearchIds() {
162                return myIdToSearchTask.keySet();
163        }
164
165        @VisibleForTesting
166        public void setIdToSearchTaskMapForUnitTests(ConcurrentHashMap<String, SearchTask> theIdToSearchTaskMap) {
167                myIdToSearchTask = theIdToSearchTaskMap;
168        }
169
170        @VisibleForTesting
171        public void setLoadingThrottleForUnitTests(Integer theLoadingThrottleForUnitTests) {
172                myLoadingThrottleForUnitTests = theLoadingThrottleForUnitTests;
173        }
174
175        @VisibleForTesting
176        public void setNeverUseLocalSearchForUnitTests(boolean theNeverUseLocalSearchForUnitTests) {
177                myNeverUseLocalSearchForUnitTests = theNeverUseLocalSearchForUnitTests;
178        }
179
180        @VisibleForTesting
181        public void setSyncSizeForUnitTests(int theSyncSize) {
182                mySyncSize = theSyncSize;
183        }
184
185        @Override
186        public void cancelAllActiveSearches() {
187                for (SearchTask next : myIdToSearchTask.values()) {
188                        ourLog.info(
189                                        "Requesting immediate abort of search: {}", next.getSearch().getUuid());
190                        next.requestImmediateAbort();
191                        AsyncUtil.awaitLatchAndIgnoreInterrupt(next.getCompletionLatch(), 30, TimeUnit.SECONDS);
192                }
193        }
194
195        @SuppressWarnings("SameParameterValue")
196        @VisibleForTesting
197        public void setMaxMillisToWaitForRemoteResultsForUnitTest(long theMaxMillisToWaitForRemoteResults) {
198                myMaxMillisToWaitForRemoteResults = theMaxMillisToWaitForRemoteResults;
199        }
200
201        /**
202         * This method is called by the HTTP client processing thread in order to
203         * fetch resources.
204         * <p>
205         * This method must not be called from inside a transaction. The rationale is that
206         * the {@link Search} entity is treated as a piece of shared state across client threads
207         * accessing the same thread, so we need to be able to update that table in a transaction
208         * and commit it right away in order for that to work. Examples of needing to do this
209         * include if two different clients request the same search and are both paging at the
210         * same time, but also includes clients that are hacking the paging links to
211         * fetch multiple pages of a search result in parallel. In both cases we need to only
212         * let one of them actually activate the search, or we will have conflicts. The other thread
213         * just needs to wait until the first one actually fetches more results.
214         */
215        @Override
216        public List<JpaPid> getResources(
217                        final String theUuid,
218                        int theFrom,
219                        int theTo,
220                        @Nullable RequestDetails theRequestDetails,
221                        RequestPartitionId theRequestPartitionId) {
222                assert !TransactionSynchronizationManager.isActualTransactionActive();
223
224                // If we're actively searching right now, don't try to do anything until at least one batch has been
225                // persisted in the DB
226                SearchTask searchTask = myIdToSearchTask.get(theUuid);
227                if (searchTask != null) {
228                        searchTask.awaitInitialSync();
229                }
230
231                ourLog.trace("About to start looking for resources {}-{}", theFrom, theTo);
232
233                Search search;
234                StopWatch sw = new StopWatch();
235                while (true) {
236
237                        if (myNeverUseLocalSearchForUnitTests == false) {
238                                if (searchTask != null) {
239                                        ourLog.trace("Local search found");
240                                        List<JpaPid> resourcePids = searchTask.getResourcePids(theFrom, theTo);
241                                        ourLog.trace(
242                                                        "Local search returned {} pids, wanted {}-{} - Search: {}",
243                                                        resourcePids.size(),
244                                                        theFrom,
245                                                        theTo,
246                                                        searchTask.getSearch());
247
248                                        /*
249                                         * Generally, if a search task is open, the fastest possible thing is to just return its results. This
250                                         * will work most of the time, but can fail if the task hit a search threshold and the client is requesting
251                                         * results beyond that threashold. In that case, we'll keep going below, since that will trigger another
252                                         * task.
253                                         */
254                                        if ((searchTask.getSearch().getNumFound()
255                                                                                        - searchTask.getSearch().getNumBlocked())
256                                                                        >= theTo
257                                                        || resourcePids.size() == (theTo - theFrom)) {
258                                                return resourcePids;
259                                        }
260                                }
261                        }
262
263                        Callable<Search> searchCallback = () -> mySearchCacheSvc
264                                        .fetchByUuid(theUuid, theRequestPartitionId)
265                                        .orElseThrow(() -> myExceptionSvc.newUnknownSearchException(theUuid));
266                        search = myTxService
267                                        .withRequest(theRequestDetails)
268                                        .withRequestPartitionId(theRequestPartitionId)
269                                        .execute(searchCallback);
270                        QueryParameterUtils.verifySearchHasntFailedOrThrowInternalErrorException(search);
271
272                        if (search.getStatus() == SearchStatusEnum.FINISHED) {
273                                ourLog.trace("Search entity marked as finished with {} results", search.getNumFound());
274                                break;
275                        }
276                        if ((search.getNumFound() - search.getNumBlocked()) >= theTo) {
277                                ourLog.trace("Search entity has {} results so far", search.getNumFound());
278                                break;
279                        }
280
281                        if (sw.getMillis() > myMaxMillisToWaitForRemoteResults) {
282                                ourLog.error(
283                                                "Search {} of type {} for {}{} timed out after {}ms with status {}.",
284                                                search.getId(),
285                                                search.getSearchType(),
286                                                search.getResourceType(),
287                                                search.getSearchQueryString(),
288                                                sw.getMillis(),
289                                                search.getStatus());
290                                throw new InternalErrorException(Msg.code(1163) + "Request timed out after " + sw.getMillis() + "ms");
291                        }
292
293                        // If the search was saved in "pass complete mode" it's probably time to
294                        // start a new pass
295                        if (search.getStatus() == SearchStatusEnum.PASSCMPLET) {
296                                ourLog.trace("Going to try to start next search");
297                                Optional<Search> newSearch =
298                                                mySearchCacheSvc.tryToMarkSearchAsInProgress(search, theRequestPartitionId);
299                                if (newSearch.isPresent()) {
300                                        ourLog.trace("Launching new search");
301                                        search = newSearch.get();
302                                        String resourceType = search.getResourceType();
303                                        SearchParameterMap params = search.getSearchParameterMap()
304                                                        .orElseThrow(() -> new IllegalStateException("No map in PASSCOMPLET search"));
305                                        IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(resourceType);
306
307                                        SearchTaskParameters parameters = new SearchTaskParameters(
308                                                        search,
309                                                        resourceDao,
310                                                        params,
311                                                        resourceType,
312                                                        theRequestDetails,
313                                                        theRequestPartitionId,
314                                                        myOnRemoveSearchTask,
315                                                        mySyncSize);
316                                        parameters.setLoadingThrottleForUnitTests(myLoadingThrottleForUnitTests);
317                                        SearchContinuationTask task =
318                                                        (SearchContinuationTask) myBeanFactory.getBean(SearchConfig.CONTINUE_TASK, parameters);
319                                        myIdToSearchTask.put(search.getUuid(), task);
320                                        task.call();
321                                }
322                        }
323
324                        if (!search.getStatus().isDone()) {
325                                AsyncUtil.sleep(500);
326                        }
327                }
328
329                ourLog.trace("Finished looping");
330
331                List<JpaPid> pids = fetchResultPids(theUuid, theFrom, theTo, theRequestDetails, search, theRequestPartitionId);
332
333                ourLog.trace("Fetched {} results", pids.size());
334
335                return pids;
336        }
337
338        @Nonnull
339        private List<JpaPid> fetchResultPids(
340                        String theUuid,
341                        int theFrom,
342                        int theTo,
343                        @Nullable RequestDetails theRequestDetails,
344                        Search theSearch,
345                        RequestPartitionId theRequestPartitionId) {
346                List<JpaPid> pids = mySearchResultCacheSvc.fetchResultPids(
347                                theSearch, theFrom, theTo, theRequestDetails, theRequestPartitionId);
348                if (pids == null) {
349                        throw myExceptionSvc.newUnknownSearchException(theUuid);
350                }
351                return pids;
352        }
353
354        @Override
355        public IBundleProvider registerSearch(
356                        final IFhirResourceDao<?> theCallingDao,
357                        final SearchParameterMap theParams,
358                        String theResourceType,
359                        CacheControlDirective theCacheControlDirective,
360                        RequestDetails theRequestDetails,
361                        RequestPartitionId theRequestPartitionId) {
362                final String searchUuid = UUID.randomUUID().toString();
363
364                final String queryString = theParams.toNormalizedQueryString(myContext);
365                ourLog.debug("Registering new search {}", searchUuid);
366
367                Search search = new Search();
368                QueryParameterUtils.populateSearchEntity(
369                                theParams, theResourceType, searchUuid, queryString, search, theRequestPartitionId);
370
371                myStorageInterceptorHooks.callStoragePresearchRegistered(
372                                theRequestDetails, theParams, search, theRequestPartitionId);
373
374                validateSearch(theParams);
375
376                Class<? extends IBaseResource> resourceTypeClass =
377                                myContext.getResourceDefinition(theResourceType).getImplementingClass();
378                final ISearchBuilder<JpaPid> sb = mySearchBuilderFactory.newSearchBuilder(theResourceType, resourceTypeClass);
379                sb.setFetchSize(mySyncSize);
380                sb.setRequireTotal(theParams.getCount() != null);
381
382                final Integer loadSynchronousUpTo = getLoadSynchronousUpToOrNull(theCacheControlDirective);
383                boolean isOffsetQuery = theParams.isOffsetQuery();
384
385                // todo someday - not today.
386                //              SearchStrategyFactory.ISearchStrategy searchStrategy = mySearchStrategyFactory.pickStrategy(theResourceType,
387                // theParams, theRequestDetails);
388                //              return searchStrategy.get();
389
390                if (theParams.isLoadSynchronous() || loadSynchronousUpTo != null || isOffsetQuery) {
391                        if (mySearchStrategyFactory.isSupportsHSearchDirect(theResourceType, theParams, theRequestDetails)) {
392                                ourLog.info("Search {} is using direct load strategy", searchUuid);
393                                SearchStrategyFactory.ISearchStrategy direct = mySearchStrategyFactory.makeDirectStrategy(
394                                                searchUuid, theResourceType, theParams, theRequestDetails);
395
396                                try {
397                                        return direct.get();
398                                } catch (ResourceNotFoundInIndexException theE) {
399                                        // some resources were not found in index, so we will inform this and resort to JPA search
400                                        ourLog.warn(
401                                                        "Some resources were not found in index. Make sure all resources were indexed. Resorting to database search.");
402                                }
403                        }
404
405                        // we need a max to fetch for synchronous searches;
406                        // otherwise we'll explode memory.
407                        Integer maxToLoad = getSynchronousMaxResultsToFetch(theParams, loadSynchronousUpTo);
408                        ourLog.debug("Setting a max fetch value of {} for synchronous search", maxToLoad);
409                        sb.setMaxResultsToFetch(maxToLoad);
410
411                        ourLog.debug("Search {} is loading in synchronous mode", searchUuid);
412                        return mySynchronousSearchSvc.executeQuery(
413                                        theParams, theRequestDetails, searchUuid, sb, loadSynchronousUpTo, theRequestPartitionId);
414                }
415
416                /*
417                 * See if there are any cached searches whose results we can return
418                 * instead
419                 */
420                SearchCacheStatusEnum cacheStatus = SearchCacheStatusEnum.MISS;
421                if (theCacheControlDirective != null && theCacheControlDirective.isNoCache()) {
422                        cacheStatus = SearchCacheStatusEnum.NOT_TRIED;
423                }
424
425                if (cacheStatus != SearchCacheStatusEnum.NOT_TRIED) {
426                        if (theParams.getEverythingMode() == null) {
427                                if (myStorageSettings.getReuseCachedSearchResultsForMillis() != null) {
428                                        PersistedJpaBundleProvider foundSearchProvider = findCachedQuery(
429                                                        theParams, theResourceType, theRequestDetails, queryString, theRequestPartitionId);
430                                        if (foundSearchProvider != null) {
431                                                foundSearchProvider.setCacheStatus(SearchCacheStatusEnum.HIT);
432                                                return foundSearchProvider;
433                                        }
434                                }
435                        }
436                }
437
438                PersistedJpaSearchFirstPageBundleProvider retVal = submitSearch(
439                                theCallingDao, theParams, theResourceType, theRequestDetails, sb, theRequestPartitionId, search);
440                retVal.setCacheStatus(cacheStatus);
441                return retVal;
442        }
443
444        /**
445         *      The max results to return if this is a synchronous search.
446         *
447         * We'll look in this order:
448         * * load synchronous up to (on params)
449         * * param count (+ offset)
450         * * StorageSettings fetch size default max
451         * *
452         */
453        private Integer getSynchronousMaxResultsToFetch(SearchParameterMap theParams, Integer theLoadSynchronousUpTo) {
454                if (theLoadSynchronousUpTo != null) {
455                        return theLoadSynchronousUpTo;
456                }
457
458                if (theParams.getCount() != null) {
459                        int valToReturn = theParams.getCount() + 1;
460                        if (theParams.getOffset() != null) {
461                                valToReturn += theParams.getOffset();
462                        }
463                        return valToReturn;
464                }
465
466                if (myStorageSettings.getFetchSizeDefaultMaximum() != null) {
467                        return myStorageSettings.getFetchSizeDefaultMaximum();
468                }
469
470                return myStorageSettings.getInternalSynchronousSearchSize();
471        }
472
473        private void validateSearch(SearchParameterMap theParams) {
474                validateIncludes(theParams.getIncludes(), Constants.PARAM_INCLUDE);
475                validateIncludes(theParams.getRevIncludes(), Constants.PARAM_REVINCLUDE);
476        }
477
478        private void validateIncludes(Set<Include> includes, String name) {
479                for (Include next : includes) {
480                        String value = next.getValue();
481                        if (value.equals(Constants.INCLUDE_STAR) || isBlank(value)) {
482                                continue;
483                        }
484
485                        String paramType = next.getParamType();
486                        String paramName = next.getParamName();
487                        String paramTargetType = next.getParamTargetType();
488
489                        if (isBlank(paramType) || isBlank(paramName)) {
490                                String msg = myContext
491                                                .getLocalizer()
492                                                .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidInclude", name, value, "");
493                                throw new InvalidRequestException(Msg.code(2018) + msg);
494                        }
495
496                        if (!myDaoRegistry.isResourceTypeSupported(paramType)) {
497                                String resourceTypeMsg = myContext
498                                                .getLocalizer()
499                                                .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidResourceType", paramType);
500                                String msg = myContext
501                                                .getLocalizer()
502                                                .getMessage(
503                                                                SearchCoordinatorSvcImpl.class,
504                                                                "invalidInclude",
505                                                                UrlUtil.sanitizeUrlPart(name),
506                                                                UrlUtil.sanitizeUrlPart(value),
507                                                                resourceTypeMsg); // last param is pre-sanitized
508                                throw new InvalidRequestException(Msg.code(2017) + msg);
509                        }
510
511                        if (isNotBlank(paramTargetType) && !myDaoRegistry.isResourceTypeSupported(paramTargetType)) {
512                                String resourceTypeMsg = myContext
513                                                .getLocalizer()
514                                                .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidResourceType", paramTargetType);
515                                String msg = myContext
516                                                .getLocalizer()
517                                                .getMessage(
518                                                                SearchCoordinatorSvcImpl.class,
519                                                                "invalidInclude",
520                                                                UrlUtil.sanitizeUrlPart(name),
521                                                                UrlUtil.sanitizeUrlPart(value),
522                                                                resourceTypeMsg); // last param is pre-sanitized
523                                throw new InvalidRequestException(Msg.code(2016) + msg);
524                        }
525
526                        if (!Constants.INCLUDE_STAR.equals(paramName)
527                                        && mySearchParamRegistry.getActiveSearchParam(
528                                                                        paramType, paramName, ISearchParamRegistry.SearchParamLookupContextEnum.SEARCH)
529                                                        == null) {
530                                List<String> validNames = mySearchParamRegistry
531                                                .getActiveSearchParams(paramType, ISearchParamRegistry.SearchParamLookupContextEnum.SEARCH)
532                                                .values()
533                                                .stream()
534                                                .filter(t -> t.getParamType() == RestSearchParameterTypeEnum.REFERENCE)
535                                                .map(t -> UrlUtil.sanitizeUrlPart(t.getName()))
536                                                .sorted()
537                                                .collect(Collectors.toList());
538                                String searchParamMessage = myContext
539                                                .getLocalizer()
540                                                .getMessage(
541                                                                BaseStorageDao.class,
542                                                                "invalidSearchParameter",
543                                                                UrlUtil.sanitizeUrlPart(paramName),
544                                                                UrlUtil.sanitizeUrlPart(paramType),
545                                                                validNames);
546                                String msg = myContext
547                                                .getLocalizer()
548                                                .getMessage(
549                                                                SearchCoordinatorSvcImpl.class,
550                                                                "invalidInclude",
551                                                                UrlUtil.sanitizeUrlPart(name),
552                                                                UrlUtil.sanitizeUrlPart(value),
553                                                                searchParamMessage); // last param is pre-sanitized
554                                throw new InvalidRequestException(Msg.code(2015) + msg);
555                        }
556                }
557        }
558
559        @Override
560        public Optional<Integer> getSearchTotal(
561                        String theUuid, @Nullable RequestDetails theRequestDetails, RequestPartitionId theRequestPartitionId) {
562                SearchTask task = myIdToSearchTask.get(theUuid);
563                if (task != null) {
564                        return Optional.ofNullable(task.awaitInitialSync());
565                }
566
567                /*
568                 * In case there is no running search, if the total is listed as accurate we know one is coming
569                 * so let's wait a bit for it to show up
570                 */
571                Optional<Search> search = myTxService
572                                .withRequest(theRequestDetails)
573                                .execute(() -> mySearchCacheSvc.fetchByUuid(theUuid, theRequestPartitionId));
574                if (search.isPresent()) {
575                        Optional<SearchParameterMap> searchParameterMap = search.get().getSearchParameterMap();
576                        if (searchParameterMap.isPresent()
577                                        && searchParameterMap.get().getSearchTotalMode() == SearchTotalModeEnum.ACCURATE) {
578                                for (int i = 0; i < 10; i++) {
579                                        if (search.isPresent()) {
580                                                QueryParameterUtils.verifySearchHasntFailedOrThrowInternalErrorException(search.get());
581                                                if (search.get().getTotalCount() != null) {
582                                                        return Optional.of(search.get().getTotalCount());
583                                                }
584                                        }
585                                        search = mySearchCacheSvc.fetchByUuid(theUuid, theRequestPartitionId);
586                                }
587                        }
588                }
589
590                return Optional.empty();
591        }
592
593        @Nonnull
594        private PersistedJpaSearchFirstPageBundleProvider submitSearch(
595                        IDao theCallingDao,
596                        SearchParameterMap theParams,
597                        String theResourceType,
598                        RequestDetails theRequestDetails,
599                        ISearchBuilder<JpaPid> theSb,
600                        RequestPartitionId theRequestPartitionId,
601                        Search theSearch) {
602                StopWatch w = new StopWatch();
603
604                SearchTaskParameters stp = new SearchTaskParameters(
605                                theSearch,
606                                theCallingDao,
607                                theParams,
608                                theResourceType,
609                                theRequestDetails,
610                                theRequestPartitionId,
611                                myOnRemoveSearchTask,
612                                mySyncSize);
613                stp.setLoadingThrottleForUnitTests(myLoadingThrottleForUnitTests);
614                SearchTask task = (SearchTask) myBeanFactory.getBean(SearchConfig.SEARCH_TASK, stp);
615                myIdToSearchTask.put(theSearch.getUuid(), task);
616                task.call();
617
618                PersistedJpaSearchFirstPageBundleProvider retVal = myPersistedJpaBundleProviderFactory.newInstanceFirstPage(
619                                theRequestDetails, task, theSb, theRequestPartitionId);
620
621                ourLog.debug("Search initial phase completed in {}ms", w.getMillis());
622                return retVal;
623        }
624
625        @Nullable
626        private PersistedJpaBundleProvider findCachedQuery(
627                        SearchParameterMap theParams,
628                        String theResourceType,
629                        RequestDetails theRequestDetails,
630                        String theQueryString,
631                        RequestPartitionId theRequestPartitionId) {
632                // May be null
633                return myTxService
634                                .withRequest(theRequestDetails)
635                                .withRequestPartitionId(theRequestPartitionId)
636                                .execute(() -> {
637                                        IInterceptorBroadcaster compositeBroadcaster =
638                                                        CompositeInterceptorBroadcaster.newCompositeBroadcaster(
639                                                                        myInterceptorBroadcaster, theRequestDetails);
640
641                                        // Interceptor call: STORAGE_PRECHECK_FOR_CACHED_SEARCH
642
643                                        HookParams params = new HookParams()
644                                                        .add(SearchParameterMap.class, theParams)
645                                                        .add(RequestDetails.class, theRequestDetails)
646                                                        .addIfMatchesType(ServletRequestDetails.class, theRequestDetails);
647                                        boolean canUseCache =
648                                                        compositeBroadcaster.callHooks(Pointcut.STORAGE_PRECHECK_FOR_CACHED_SEARCH, params);
649                                        if (!canUseCache) {
650                                                return null;
651                                        }
652
653                                        // Check for a search matching the given hash
654                                        Search searchToUse = findSearchToUseOrNull(theQueryString, theResourceType, theRequestPartitionId);
655                                        if (searchToUse == null) {
656                                                return null;
657                                        }
658
659                                        ourLog.debug("Reusing search {} from cache", searchToUse.getUuid());
660                                        // Interceptor call: JPA_PERFTRACE_SEARCH_REUSING_CACHED
661                                        params = new HookParams()
662                                                        .add(SearchParameterMap.class, theParams)
663                                                        .add(RequestDetails.class, theRequestDetails)
664                                                        .addIfMatchesType(ServletRequestDetails.class, theRequestDetails);
665                                        compositeBroadcaster.callHooks(Pointcut.JPA_PERFTRACE_SEARCH_REUSING_CACHED, params);
666
667                                        return myPersistedJpaBundleProviderFactory.newInstance(theRequestDetails, searchToUse.getUuid());
668                                });
669        }
670
671        @Nullable
672        private Search findSearchToUseOrNull(
673                        String theQueryString, String theResourceType, RequestPartitionId theRequestPartitionId) {
674                // createdCutoff is in recent past
675                final Instant createdCutoff =
676                                Instant.now().minus(myStorageSettings.getReuseCachedSearchResultsForMillis(), ChronoUnit.MILLIS);
677
678                Optional<Search> candidate = mySearchCacheSvc.findCandidatesForReuse(
679                                theResourceType, theQueryString, createdCutoff, theRequestPartitionId);
680                return candidate.orElse(null);
681        }
682
683        @Nullable
684        private Integer getLoadSynchronousUpToOrNull(CacheControlDirective theCacheControlDirective) {
685                final Integer loadSynchronousUpTo;
686                if (theCacheControlDirective != null && theCacheControlDirective.isNoStore()) {
687                        if (theCacheControlDirective.getMaxResults() != null) {
688                                loadSynchronousUpTo = theCacheControlDirective.getMaxResults();
689                                if (loadSynchronousUpTo > myStorageSettings.getCacheControlNoStoreMaxResultsUpperLimit()) {
690                                        throw new InvalidRequestException(Msg.code(1165) + Constants.HEADER_CACHE_CONTROL + " header "
691                                                        + Constants.CACHE_CONTROL_MAX_RESULTS + " value must not exceed "
692                                                        + myStorageSettings.getCacheControlNoStoreMaxResultsUpperLimit());
693                                }
694                        } else {
695                                loadSynchronousUpTo = 100;
696                        }
697                } else {
698                        loadSynchronousUpTo = null;
699                }
700                return loadSynchronousUpTo;
701        }
702
703        /**
704         * Creates a {@link Pageable} using a start and end index
705         */
706        @SuppressWarnings("WeakerAccess")
707        @Nullable
708        public static Pageable toPage(final int theFromIndex, int theToIndex) {
709                int pageSize = theToIndex - theFromIndex;
710                if (pageSize < 1) {
711                        return null;
712                }
713
714                int pageIndex = theFromIndex / pageSize;
715
716                return new PageRequest(pageIndex, pageSize, Sort.unsorted()) {
717                        private static final long serialVersionUID = 1L;
718
719                        @Override
720                        public long getOffset() {
721                                return theFromIndex;
722                        }
723                };
724        }
725}