001/*-
002 * #%L
003 * HAPI FHIR JPA Server
004 * %%
005 * Copyright (C) 2014 - 2025 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.search.builder.tasks;
021
022import ca.uhn.fhir.context.FhirContext;
023import ca.uhn.fhir.i18n.Msg;
024import ca.uhn.fhir.interceptor.api.HookParams;
025import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
026import ca.uhn.fhir.interceptor.api.Pointcut;
027import ca.uhn.fhir.interceptor.model.RequestPartitionId;
028import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
029import ca.uhn.fhir.jpa.dao.IResultIterator;
030import ca.uhn.fhir.jpa.dao.ISearchBuilder;
031import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
032import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
033import ca.uhn.fhir.jpa.entity.Search;
034import ca.uhn.fhir.jpa.interceptor.JpaPreResourceAccessDetails;
035import ca.uhn.fhir.jpa.model.dao.JpaPid;
036import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
037import ca.uhn.fhir.jpa.model.search.SearchStatusEnum;
038import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc;
039import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc;
040import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
041import ca.uhn.fhir.jpa.util.QueryParameterUtils;
042import ca.uhn.fhir.jpa.util.SearchParameterMapCalculator;
043import ca.uhn.fhir.parser.DataFormatException;
044import ca.uhn.fhir.rest.api.server.IPreResourceAccessDetails;
045import ca.uhn.fhir.rest.api.server.RequestDetails;
046import ca.uhn.fhir.rest.server.IPagingProvider;
047import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException;
048import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
049import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
050import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
051import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster;
052import ca.uhn.fhir.system.HapiSystemProperties;
053import ca.uhn.fhir.util.AsyncUtil;
054import ca.uhn.fhir.util.StopWatch;
055import co.elastic.apm.api.ElasticApm;
056import co.elastic.apm.api.Span;
057import co.elastic.apm.api.Transaction;
058import jakarta.annotation.Nonnull;
059import org.apache.commons.lang3.Validate;
060import org.apache.commons.lang3.exception.ExceptionUtils;
061import org.hl7.fhir.instance.model.api.IBaseResource;
062import org.springframework.transaction.annotation.Propagation;
063
064import java.io.IOException;
065import java.util.ArrayList;
066import java.util.Iterator;
067import java.util.List;
068import java.util.concurrent.Callable;
069import java.util.concurrent.CountDownLatch;
070import java.util.concurrent.TimeUnit;
071import java.util.function.Consumer;
072
073import static ca.uhn.fhir.jpa.util.SearchParameterMapCalculator.isWantCount;
074import static ca.uhn.fhir.jpa.util.SearchParameterMapCalculator.isWantOnlyCount;
075import static java.util.Objects.nonNull;
076import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
077
078/**
079 * A search task is a Callable task that runs in
080 * a thread pool to handle an individual search. One instance
081 * is created for any requested search and runs from the
082 * beginning to the end of the search.
083 * <p>
084 * Understand:
085 * This class executes in its own thread separate from the
086 * web server client thread that made the request. We do that
087 * so that we can return to the client as soon as possible,
088 * but keep the search going in the background (and have
089 * the next page of results ready to go when the client asks).
090 */
091public class SearchTask implements Callable<Void> {
092
093        private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchTask.class);
094        // injected beans
095        protected final HapiTransactionService myTxService;
096        protected final FhirContext myContext;
097        protected final ISearchResultCacheSvc mySearchResultCacheSvc;
098        private final SearchParameterMap myParams;
099        private final String myResourceType;
100        private final ArrayList<JpaPid> mySyncedPids = new ArrayList<>();
101        private final CountDownLatch myInitialCollectionLatch = new CountDownLatch(1);
102        private final CountDownLatch myCompletionLatch;
103        private final ArrayList<JpaPid> myUnsyncedPids = new ArrayList<>();
104        private final RequestDetails myRequest;
105        private final RequestPartitionId myRequestPartitionId;
106        private final SearchRuntimeDetails mySearchRuntimeDetails;
107        private final Transaction myParentTransaction;
108        private final Consumer<String> myOnRemove;
109        private final int mySyncSize;
110        private final Integer myLoadingThrottleForUnitTests;
111        private final IInterceptorBroadcaster myInterceptorBroadcaster;
112        private final SearchBuilderFactory<JpaPid> mySearchBuilderFactory;
113        private final JpaStorageSettings myStorageSettings;
114        private final ISearchCacheSvc mySearchCacheSvc;
115        private final IPagingProvider myPagingProvider;
116        private final IInterceptorBroadcaster myCompositeBroadcaster;
117        private Search mySearch;
118        private boolean myAbortRequested;
119        private int myCountSavedTotal = 0;
120        private int myCountSavedThisPass = 0;
121        private int myCountBlockedThisPass = 0;
122        private boolean myAdditionalPrefetchThresholdsRemaining;
123        private List<JpaPid> myPreviouslyAddedResourcePids;
124        // The max number of results that we request from the search
125        // Note that this is set using the configured pre-fetch thresholds, maximum page size, and/or the client provided
126        // _count parameter
127        private Integer myMaxResultsToFetch;
128
129        /**
130         * Constructor
131         */
132        @SuppressWarnings({"unchecked", "rawtypes"})
133        public SearchTask(
134                        SearchTaskParameters theCreationParams,
135                        HapiTransactionService theManagedTxManager,
136                        FhirContext theContext,
137                        IInterceptorBroadcaster theInterceptorBroadcaster,
138                        SearchBuilderFactory theSearchBuilderFactory,
139                        ISearchResultCacheSvc theSearchResultCacheSvc,
140                        JpaStorageSettings theStorageSettings,
141                        ISearchCacheSvc theSearchCacheSvc,
142                        IPagingProvider thePagingProvider) {
143                // beans
144                myTxService = theManagedTxManager;
145                myContext = theContext;
146                myInterceptorBroadcaster = theInterceptorBroadcaster;
147                mySearchBuilderFactory = theSearchBuilderFactory;
148                mySearchResultCacheSvc = theSearchResultCacheSvc;
149                myStorageSettings = theStorageSettings;
150                mySearchCacheSvc = theSearchCacheSvc;
151                myPagingProvider = thePagingProvider;
152
153                // values
154                myOnRemove = theCreationParams.OnRemove;
155                mySearch = theCreationParams.Search;
156                myParams = theCreationParams.Params;
157                myResourceType = theCreationParams.ResourceType;
158                myRequest = theCreationParams.Request;
159                myCompletionLatch = new CountDownLatch(1);
160                mySyncSize = theCreationParams.SyncSize;
161                myLoadingThrottleForUnitTests = theCreationParams.getLoadingThrottleForUnitTests();
162
163                mySearchRuntimeDetails = new SearchRuntimeDetails(myRequest, mySearch.getUuid());
164                mySearchRuntimeDetails.setQueryString(myParams.toNormalizedQueryString(myContext));
165                myRequestPartitionId = theCreationParams.RequestPartitionId;
166                myParentTransaction = ElasticApm.currentTransaction();
167                myCompositeBroadcaster =
168                                CompositeInterceptorBroadcaster.newCompositeBroadcaster(myInterceptorBroadcaster, myRequest);
169        }
170
171        protected RequestPartitionId getRequestPartitionId() {
172                return myRequestPartitionId;
173        }
174
175        /**
176         * This method is called by the server HTTP thread, and
177         * will block until at least one page of results have been
178         * fetched from the DB, and will never block after that.
179         */
180        public Integer awaitInitialSync() {
181                ourLog.trace("Awaiting initial sync");
182                do {
183                        ourLog.trace("Search {} aborted: {}", getSearch().getUuid(), !isNotAborted());
184                        if (AsyncUtil.awaitLatchAndThrowInternalErrorExceptionOnInterrupt(
185                                        getInitialCollectionLatch(), 250L, TimeUnit.MILLISECONDS)) {
186                                break;
187                        }
188                } while (getSearch().getStatus() == SearchStatusEnum.LOADING);
189                ourLog.trace("Initial sync completed");
190
191                return getSearch().getTotalCount();
192        }
193
194        public Search getSearch() {
195                return mySearch;
196        }
197
198        public CountDownLatch getInitialCollectionLatch() {
199                return myInitialCollectionLatch;
200        }
201
202        public void setPreviouslyAddedResourcePids(List<JpaPid> thePreviouslyAddedResourcePids) {
203                myPreviouslyAddedResourcePids = thePreviouslyAddedResourcePids;
204                myCountSavedTotal = myPreviouslyAddedResourcePids.size();
205        }
206
207        @SuppressWarnings("rawtypes")
208        private ISearchBuilder newSearchBuilder() {
209                Class<? extends IBaseResource> resourceTypeClass =
210                                myContext.getResourceDefinition(myResourceType).getImplementingClass();
211                return mySearchBuilderFactory.newSearchBuilder(myResourceType, resourceTypeClass);
212        }
213
214        @Nonnull
215        public List<JpaPid> getResourcePids(int theFromIndex, int theToIndex) {
216                ourLog.debug("Requesting search PIDs from {}-{}", theFromIndex, theToIndex);
217
218                boolean keepWaiting;
219                do {
220                        synchronized (mySyncedPids) {
221                                ourLog.trace("Search status is {}", mySearch.getStatus());
222                                boolean haveEnoughResults = mySyncedPids.size() >= theToIndex;
223                                if (!haveEnoughResults) {
224                                        switch (mySearch.getStatus()) {
225                                                case LOADING:
226                                                        keepWaiting = true;
227                                                        break;
228                                                case PASSCMPLET:
229                                                        /*
230                                                         * If we get here, it means that the user requested resources that crossed the
231                                                         * current pre-fetch boundary. For example, if the prefetch threshold is 50 and the
232                                                         * user has requested resources 0-60, then they would get 0-50 back but the search
233                                                         * coordinator would then stop searching.SearchCoordinatorSvcImplTest
234                                                         */
235                                                        keepWaiting = false;
236                                                        break;
237                                                case FAILED:
238                                                case FINISHED:
239                                                case GONE:
240                                                default:
241                                                        keepWaiting = false;
242                                                        break;
243                                        }
244                                } else {
245                                        keepWaiting = false;
246                                }
247                        }
248
249                        if (keepWaiting) {
250                                ourLog.info(
251                                                "Waiting as we only have {} results - Search status: {}",
252                                                mySyncedPids.size(),
253                                                mySearch.getStatus());
254                                AsyncUtil.sleep(500L);
255                        }
256                } while (keepWaiting);
257
258                ourLog.debug("Proceeding, as we have {} results", mySyncedPids.size());
259
260                ArrayList<JpaPid> retVal = new ArrayList<>();
261                synchronized (mySyncedPids) {
262                        QueryParameterUtils.verifySearchHasntFailedOrThrowInternalErrorException(mySearch);
263
264                        int toIndex = theToIndex;
265                        if (mySyncedPids.size() < toIndex) {
266                                toIndex = mySyncedPids.size();
267                        }
268                        for (int i = theFromIndex; i < toIndex; i++) {
269                                retVal.add(mySyncedPids.get(i));
270                        }
271                }
272
273                ourLog.trace(
274                                "Done syncing results - Wanted {}-{} and returning {} of {}",
275                                theFromIndex,
276                                theToIndex,
277                                retVal.size(),
278                                mySyncedPids.size());
279
280                return retVal;
281        }
282
283        public void saveSearch() {
284                myTxService
285                                .withRequest(myRequest)
286                                .withRequestPartitionId(myRequestPartitionId)
287                                .withPropagation(Propagation.REQUIRES_NEW)
288                                .execute(this::doSaveSearch);
289        }
290
291        @SuppressWarnings("rawtypes")
292        private void saveUnsynced(final IResultIterator theResultIter) {
293                myTxService
294                                .withRequest(myRequest)
295                                .withRequestPartitionId(myRequestPartitionId)
296                                .execute(() -> {
297                                        if (mySearch.getId() == null) {
298                                                doSaveSearch();
299                                        }
300
301                                        ArrayList<JpaPid> unsyncedPids = myUnsyncedPids;
302                                        int countBlocked = 0;
303
304                                        // Interceptor call: STORAGE_PREACCESS_RESOURCES
305                                        // This can be used to remove results from the search result details before
306                                        // the user has a chance to know that they were in the results
307                                        if (mySearchRuntimeDetails.getRequestDetails() != null && !unsyncedPids.isEmpty()) {
308                                                JpaPreResourceAccessDetails accessDetails =
309                                                                new JpaPreResourceAccessDetails(unsyncedPids, this::newSearchBuilder);
310                                                HookParams params = new HookParams()
311                                                                .add(IPreResourceAccessDetails.class, accessDetails)
312                                                                .add(RequestDetails.class, mySearchRuntimeDetails.getRequestDetails())
313                                                                .addIfMatchesType(
314                                                                                ServletRequestDetails.class, mySearchRuntimeDetails.getRequestDetails());
315                                                myCompositeBroadcaster.callHooks(Pointcut.STORAGE_PREACCESS_RESOURCES, params);
316
317                                                for (int i = unsyncedPids.size() - 1; i >= 0; i--) {
318                                                        if (accessDetails.isDontReturnResourceAtIndex(i)) {
319                                                                unsyncedPids.remove(i);
320                                                                myCountBlockedThisPass++;
321                                                                myCountSavedTotal++;
322                                                                countBlocked++;
323                                                        }
324                                                }
325                                        }
326
327                                        // Actually store the results in the query cache storage
328                                        myCountSavedTotal += unsyncedPids.size();
329                                        myCountSavedThisPass += unsyncedPids.size();
330                                        mySearchResultCacheSvc.storeResults(
331                                                        mySearch, mySyncedPids, unsyncedPids, myRequest, getRequestPartitionId());
332
333                                        synchronized (mySyncedPids) {
334                                                int numSyncedThisPass = unsyncedPids.size();
335                                                ourLog.trace(
336                                                                "Syncing {} search results - Have more: {}",
337                                                                numSyncedThisPass,
338                                                                theResultIter.hasNext());
339                                                mySyncedPids.addAll(unsyncedPids);
340                                                unsyncedPids.clear();
341
342                                                if (!theResultIter.hasNext()) {
343                                                        int skippedCount = theResultIter.getSkippedCount();
344                                                        ourLog.trace(
345                                                                        "MaxToFetch[{}] SkippedCount[{}] CountSavedThisPass[{}] CountSavedThisTotal[{}] AdditionalPrefetchRemaining[{}]",
346                                                                        myMaxResultsToFetch,
347                                                                        skippedCount,
348                                                                        myCountSavedThisPass,
349                                                                        myCountSavedTotal,
350                                                                        myAdditionalPrefetchThresholdsRemaining);
351
352                                                        if (isFinished(theResultIter)) {
353                                                                // finished
354                                                                ourLog.trace("Setting search status to FINISHED");
355                                                                mySearch.setStatus(SearchStatusEnum.FINISHED);
356                                                                mySearch.setTotalCount(myCountSavedTotal - countBlocked);
357                                                        } else if (myAdditionalPrefetchThresholdsRemaining) {
358                                                                // pass complete
359                                                                ourLog.trace("Setting search status to PASSCMPLET");
360                                                                mySearch.setStatus(SearchStatusEnum.PASSCMPLET);
361                                                                mySearch.setSearchParameterMap(myParams);
362                                                        } else {
363                                                                // also finished
364                                                                ourLog.trace("Setting search status to FINISHED");
365                                                                mySearch.setStatus(SearchStatusEnum.FINISHED);
366                                                                mySearch.setTotalCount(myCountSavedTotal - countBlocked);
367                                                        }
368                                                }
369                                        }
370
371                                        mySearch.setNumFound(myCountSavedTotal);
372                                        mySearch.setNumBlocked(mySearch.getNumBlocked() + countBlocked);
373
374                                        int numSynced;
375                                        synchronized (mySyncedPids) {
376                                                numSynced = mySyncedPids.size();
377                                        }
378
379                                        if (myStorageSettings.getCountSearchResultsUpTo() == null
380                                                        || myStorageSettings.getCountSearchResultsUpTo() <= 0
381                                                        || myStorageSettings.getCountSearchResultsUpTo() <= numSynced) {
382                                                myInitialCollectionLatch.countDown();
383                                        }
384
385                                        doSaveSearch();
386
387                                        ourLog.trace("saveUnsynced() - pre-commit");
388                                });
389                ourLog.trace("saveUnsynced() - post-commit");
390        }
391
392        @SuppressWarnings("rawtypes")
393        private boolean isFinished(final IResultIterator theResultIter) {
394                int skippedCount = theResultIter.getSkippedCount();
395                int nonSkippedCount = theResultIter.getNonSkippedCount();
396                int totalFetched = skippedCount + myCountSavedThisPass + myCountBlockedThisPass;
397
398                if (myMaxResultsToFetch != null && totalFetched < myMaxResultsToFetch) {
399                        // total fetched < max results to fetch -> we've exhausted the search
400                        return true;
401                } else {
402                        if (nonSkippedCount == 0) {
403                                // no skipped resources in this query
404                                if (myParams.getCount() != null) {
405                                        // count supplied
406                                        // if the count is > what we've fetched -> we've exhausted the query
407                                        return myParams.getCount() > totalFetched;
408                                } else {
409                                        // legacy - we have no skipped resources - we are done
410                                        return true;
411                                }
412                        }
413                        // skipped resources means we have more to fetch
414                        return false;
415                }
416        }
417
418        public boolean isNotAborted() {
419                return !myAbortRequested;
420        }
421
422        public void markComplete() {
423                myCompletionLatch.countDown();
424        }
425
426        public CountDownLatch getCompletionLatch() {
427                return myCompletionLatch;
428        }
429
430        /**
431         * Request that the task abort as soon as possible
432         */
433        public void requestImmediateAbort() {
434                myAbortRequested = true;
435        }
436
437        /**
438         * This is the method which actually performs the search.
439         * It is called automatically by the thread pool.
440         */
441        @Override
442        public Void call() {
443                StopWatch sw = new StopWatch();
444                Span span = myParentTransaction.startSpan("db", "query", "search");
445                span.setName("FHIR Database Search");
446                try {
447                        // Create an initial search in the DB and give it an ID
448                        saveSearch();
449
450                        myTxService
451                                        .withRequest(myRequest)
452                                        .withRequestPartitionId(myRequestPartitionId)
453                                        .execute(this::doSearch);
454
455                        mySearchRuntimeDetails.setSearchStatus(mySearch.getStatus());
456                        if (mySearch.getStatus() == SearchStatusEnum.FINISHED) {
457                                HookParams params = new HookParams()
458                                                .add(RequestDetails.class, myRequest)
459                                                .addIfMatchesType(ServletRequestDetails.class, myRequest)
460                                                .add(SearchRuntimeDetails.class, mySearchRuntimeDetails);
461                                myCompositeBroadcaster.callHooks(Pointcut.JPA_PERFTRACE_SEARCH_COMPLETE, params);
462                        } else {
463                                HookParams params = new HookParams()
464                                                .add(RequestDetails.class, myRequest)
465                                                .addIfMatchesType(ServletRequestDetails.class, myRequest)
466                                                .add(SearchRuntimeDetails.class, mySearchRuntimeDetails);
467                                myCompositeBroadcaster.callHooks(Pointcut.JPA_PERFTRACE_SEARCH_PASS_COMPLETE, params);
468                        }
469
470                        ourLog.trace(
471                                        "Have completed search for [{}{}] and found {} resources in {}ms - Status is {}",
472                                        mySearch.getResourceType(),
473                                        mySearch.getSearchQueryString(),
474                                        mySyncedPids.size(),
475                                        sw.getMillis(),
476                                        mySearch.getStatus());
477
478                } catch (Throwable t) {
479
480                        /*
481                         * Don't print a stack trace for client errors (i.e. requests that
482                         * aren't valid because the client screwed up).. that's just noise
483                         * in the logs and who needs that.
484                         */
485                        boolean logged = false;
486                        if (t instanceof BaseServerResponseException) {
487                                BaseServerResponseException exception = (BaseServerResponseException) t;
488                                if (exception.getStatusCode() >= 400 && exception.getStatusCode() < 500) {
489                                        logged = true;
490                                        ourLog.warn("Failed during search due to invalid request: {}", t.toString());
491                                }
492                        }
493
494                        if (!logged) {
495                                ourLog.error("Failed during search loading after {}ms", sw.getMillis(), t);
496                        }
497                        myUnsyncedPids.clear();
498
499                        markSearchAsFailedWithExceptionDetails(t);
500
501                        mySearchRuntimeDetails.setSearchStatus(mySearch.getStatus());
502                        HookParams params = new HookParams()
503                                        .add(RequestDetails.class, myRequest)
504                                        .addIfMatchesType(ServletRequestDetails.class, myRequest)
505                                        .add(SearchRuntimeDetails.class, mySearchRuntimeDetails);
506                        myCompositeBroadcaster.callHooks(Pointcut.JPA_PERFTRACE_SEARCH_FAILED, params);
507
508                        saveSearch();
509                        span.captureException(t);
510                } finally {
511                        myOnRemove.accept(mySearch.getUuid());
512
513                        myInitialCollectionLatch.countDown();
514                        markComplete();
515                        span.end();
516                }
517                return null;
518        }
519
520        /**
521         * Updates the search entity with failure information based on the exception.
522         * Determines the appropriate HTTP status code based on exception type:
523         * - DataFormatException -> 400 Bad Request (client error)
524         * - BaseServerResponseException -> Use exception's status code
525         * - Other exceptions -> 500 Internal Server Error
526         *
527         * Optionally appends stack trace if unit test capture is enabled.
528         *
529         * @param theThrowable The exception that caused the search to fail
530         */
531        protected void markSearchAsFailedWithExceptionDetails(Throwable theThrowable) {
532                Throwable rootCause = ExceptionUtils.getRootCause(theThrowable);
533                rootCause = defaultIfNull(rootCause, theThrowable);
534
535                String failureMessage = rootCause.getMessage();
536
537                int failureCode;
538                if (rootCause instanceof DataFormatException || theThrowable instanceof DataFormatException) {
539                        // DataFormatException indicates invalid client input
540                        // and should return HTTP 400 Bad Request, not 500 Internal Server Error.
541                        failureCode = InvalidRequestException.STATUS_CODE;
542                } else if (theThrowable instanceof BaseServerResponseException baseServerResponseException) {
543                        failureCode = baseServerResponseException.getStatusCode();
544                } else {
545                        failureCode = InternalErrorException.STATUS_CODE;
546                }
547
548                if (HapiSystemProperties.isUnitTestCaptureStackEnabled()) {
549                        failureMessage += "\nStack\n" + ExceptionUtils.getStackTrace(rootCause);
550                }
551
552                mySearch.setFailureMessage(failureMessage);
553                mySearch.setFailureCode(failureCode);
554                mySearch.setStatus(SearchStatusEnum.FAILED);
555        }
556
557        private void doSaveSearch() {
558                Search newSearch = mySearchCacheSvc.save(mySearch, myRequestPartitionId);
559
560                // mySearchDao.save is not supposed to return null, but in unit tests
561                // it can if the mock search dao isn't set up to handle that
562                if (newSearch != null) {
563                        mySearch = newSearch;
564                }
565        }
566
567        /**
568         * This method actually creates the database query to perform the
569         * search, and starts it.
570         */
571        @SuppressWarnings({"rawtypes", "unchecked"})
572        private void doSearch() {
573                /*
574                 * If the user has explicitly requested a _count, perform a
575                 *
576                 * SELECT COUNT(*) ....
577                 *
578                 * before doing anything else.
579                 */
580                boolean myParamWantOnlyCount = isWantOnlyCount(myParams);
581                boolean myParamOrDefaultWantCount = nonNull(myParams.getSearchTotalMode())
582                                ? isWantCount(myParams)
583                                : SearchParameterMapCalculator.isWantCount(myStorageSettings.getDefaultTotalMode());
584
585                if (myParamWantOnlyCount || myParamOrDefaultWantCount) {
586                        doCountOnlyQuery(myParamWantOnlyCount);
587                        if (myParamWantOnlyCount) {
588                                return;
589                        }
590                }
591
592                ourLog.trace("Done count");
593                ISearchBuilder sb = newSearchBuilder();
594
595                /*
596                 * Figure out how many results we're actually going to fetch from the
597                 * database in this pass. This calculation takes into consideration the
598                 * "pre-fetch thresholds" specified in StorageSettings#getSearchPreFetchThresholds()
599                 * as well as the value of the _count parameter.
600                 */
601                int currentlyLoaded = defaultIfNull(mySearch.getNumFound(), 0);
602                int minWanted = 0;
603
604                // if no count is provided,
605                // we only use the values in SearchPreFetchThresholds
606                // but if there is a count...
607                if (myParams.getCount() != null) {
608                        minWanted = Math.min(myParams.getCount(), myPagingProvider.getMaximumPageSize());
609                        minWanted += currentlyLoaded;
610                }
611
612                // iterate through the search thresholds
613                for (Iterator<Integer> iter =
614                                                myStorageSettings.getSearchPreFetchThresholds().iterator();
615                                iter.hasNext(); ) {
616                        int next = iter.next();
617                        if (next != -1 && next <= currentlyLoaded) {
618                                continue;
619                        }
620
621                        if (next == -1) {
622                                sb.setMaxResultsToFetch(null);
623                                /*
624                                 * If we're past the last prefetch threshold then
625                                 * we're potentially fetching unlimited amounts of data.
626                                 * We'll move responsibility for deduplication to the database in this case
627                                 * so that we don't run the risk of blowing out the memory
628                                 * in the app server
629                                 */
630                                sb.setDeduplicateInDatabase(true);
631                        } else {
632                                // we want at least 1 more than our requested amount
633                                // so we know that there are other results
634                                // (in case we get the exact amount back)
635                                myMaxResultsToFetch = Math.max(next, minWanted) + 1;
636                                sb.setMaxResultsToFetch(myMaxResultsToFetch);
637                        }
638
639                        if (iter.hasNext()) {
640                                myAdditionalPrefetchThresholdsRemaining = true;
641                        }
642
643                        // If we get here's we've found an appropriate threshold
644                        break;
645                }
646
647                /*
648                 * Provide any PID we loaded in previous search passes to the
649                 * SearchBuilder so that we don't get duplicates coming from running
650                 * the same query again.
651                 *
652                 * We could possibly accomplish this in a different way by using sorted
653                 * results in our SQL query and specifying an offset. I don't actually
654                 * know if that would be faster or not. At some point should test this
655                 * idea.
656                 */
657                if (myPreviouslyAddedResourcePids != null) {
658                        sb.setPreviouslyAddedResourcePids(myPreviouslyAddedResourcePids);
659                        mySyncedPids.addAll(myPreviouslyAddedResourcePids);
660                }
661
662                /*
663                 * createQuery
664                 * Construct the SQL query we'll be sending to the database
665                 *
666                 * NB: (See createCountQuery above)
667                 * We will pass the original myParams here (not a copy)
668                 * because we actually _want_ the mutation of the myParams to happen.
669                 * Specifically because SearchBuilder itself will _expect_
670                 * not to have these parameters when dumping back
671                 * to our DB.
672                 *
673                 * This is an odd implementation behaviour, but the change
674                 * for this will require a lot more handling at higher levels
675                 */
676                try (IResultIterator<JpaPid> resultIterator =
677                                sb.createQuery(myParams, mySearchRuntimeDetails, myRequest, myRequestPartitionId)) {
678                        // resultIterator is SearchBuilder.QueryIterator
679                        assert (resultIterator != null);
680
681                        /*
682                         * The following loop actually loads the PIDs of the resources
683                         * matching the search off of the disk and into memory. After
684                         * every X results, we commit to the HFJ_SEARCH table.
685                         */
686                        int syncSize = mySyncSize;
687                        while (resultIterator.hasNext()) {
688                                myUnsyncedPids.add(resultIterator.next());
689
690                                boolean shouldSync = myUnsyncedPids.size() >= syncSize;
691
692                                if (myStorageSettings.getCountSearchResultsUpTo() != null
693                                                && myStorageSettings.getCountSearchResultsUpTo() > 0
694                                                && myStorageSettings.getCountSearchResultsUpTo() < myUnsyncedPids.size()) {
695                                        shouldSync = false;
696                                }
697
698                                if (myUnsyncedPids.size() > 50000) {
699                                        shouldSync = true;
700                                }
701
702                                // If no abort was requested, bail out
703                                Validate.isTrue(isNotAborted(), "Abort has been requested");
704
705                                if (shouldSync) {
706                                        saveUnsynced(resultIterator);
707                                }
708
709                                if (myLoadingThrottleForUnitTests != null) {
710                                        AsyncUtil.sleep(myLoadingThrottleForUnitTests);
711                                }
712                        }
713
714                        // If no abort was requested, bail out
715                        Validate.isTrue(isNotAborted(), "Abort has been requested");
716
717                        saveUnsynced(resultIterator);
718
719                } catch (IOException e) {
720                        ourLog.error("IO failure during database access", e);
721                        throw new InternalErrorException(Msg.code(1166) + e);
722                }
723        }
724
725        /**
726         * Does the query but only for the count.
727         * @param theParamWantOnlyCount - if count query is wanted only
728         */
729        private void doCountOnlyQuery(boolean theParamWantOnlyCount) {
730                ourLog.trace("Performing count");
731                @SuppressWarnings("rawtypes")
732                ISearchBuilder sb = newSearchBuilder();
733
734                /*
735                 * createCountQuery
736                 * NB: (see createQuery below)
737                 * Because FulltextSearchSvcImpl will (internally)
738                 * mutate the myParams (searchmap),
739                 * (specifically removing the _content and _text filters)
740                 * we will have to clone those parameters here so that
741                 * the "correct" params are used in createQuery below
742                 */
743                Long count = sb.createCountQuery(myParams.clone(), mySearch.getUuid(), myRequest, myRequestPartitionId);
744
745                ourLog.trace("Got count {}", count);
746
747                myTxService
748                                .withRequest(myRequest)
749                                .withRequestPartitionId(myRequestPartitionId)
750                                .execute(() -> {
751                                        mySearch.setTotalCount(count.intValue());
752                                        if (theParamWantOnlyCount) {
753                                                mySearch.setStatus(SearchStatusEnum.FINISHED);
754                                        }
755                                        doSaveSearch();
756                                });
757        }
758}