001/*-
002 * #%L
003 * HAPI FHIR JPA Server
004 * %%
005 * Copyright (C) 2014 - 2025 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.search.builder.tasks;
021
022import ca.uhn.fhir.context.FhirContext;
023import ca.uhn.fhir.i18n.Msg;
024import ca.uhn.fhir.interceptor.api.HookParams;
025import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
026import ca.uhn.fhir.interceptor.api.Pointcut;
027import ca.uhn.fhir.interceptor.model.RequestPartitionId;
028import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
029import ca.uhn.fhir.jpa.dao.IResultIterator;
030import ca.uhn.fhir.jpa.dao.ISearchBuilder;
031import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
032import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
033import ca.uhn.fhir.jpa.entity.Search;
034import ca.uhn.fhir.jpa.interceptor.JpaPreResourceAccessDetails;
035import ca.uhn.fhir.jpa.model.dao.JpaPid;
036import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
037import ca.uhn.fhir.jpa.model.search.SearchStatusEnum;
038import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc;
039import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc;
040import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
041import ca.uhn.fhir.jpa.util.QueryParameterUtils;
042import ca.uhn.fhir.jpa.util.SearchParameterMapCalculator;
043import ca.uhn.fhir.rest.api.server.IPreResourceAccessDetails;
044import ca.uhn.fhir.rest.api.server.RequestDetails;
045import ca.uhn.fhir.rest.server.IPagingProvider;
046import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException;
047import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
048import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
049import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster;
050import ca.uhn.fhir.system.HapiSystemProperties;
051import ca.uhn.fhir.util.AsyncUtil;
052import ca.uhn.fhir.util.StopWatch;
053import co.elastic.apm.api.ElasticApm;
054import co.elastic.apm.api.Span;
055import co.elastic.apm.api.Transaction;
056import jakarta.annotation.Nonnull;
057import org.apache.commons.lang3.Validate;
058import org.apache.commons.lang3.exception.ExceptionUtils;
059import org.hl7.fhir.instance.model.api.IBaseResource;
060import org.springframework.transaction.annotation.Propagation;
061
062import java.io.IOException;
063import java.util.ArrayList;
064import java.util.Iterator;
065import java.util.List;
066import java.util.concurrent.Callable;
067import java.util.concurrent.CountDownLatch;
068import java.util.concurrent.TimeUnit;
069import java.util.function.Consumer;
070
071import static ca.uhn.fhir.jpa.util.SearchParameterMapCalculator.isWantCount;
072import static ca.uhn.fhir.jpa.util.SearchParameterMapCalculator.isWantOnlyCount;
073import static java.util.Objects.nonNull;
074import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
075
076/**
077 * A search task is a Callable task that runs in
078 * a thread pool to handle an individual search. One instance
079 * is created for any requested search and runs from the
080 * beginning to the end of the search.
081 * <p>
082 * Understand:
083 * This class executes in its own thread separate from the
084 * web server client thread that made the request. We do that
085 * so that we can return to the client as soon as possible,
086 * but keep the search going in the background (and have
087 * the next page of results ready to go when the client asks).
088 */
089public class SearchTask implements Callable<Void> {
090
091        private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchTask.class);
092        // injected beans
093        protected final HapiTransactionService myTxService;
094        protected final FhirContext myContext;
095        protected final ISearchResultCacheSvc mySearchResultCacheSvc;
096        private final SearchParameterMap myParams;
097        private final String myResourceType;
098        private final ArrayList<JpaPid> mySyncedPids = new ArrayList<>();
099        private final CountDownLatch myInitialCollectionLatch = new CountDownLatch(1);
100        private final CountDownLatch myCompletionLatch;
101        private final ArrayList<JpaPid> myUnsyncedPids = new ArrayList<>();
102        private final RequestDetails myRequest;
103        private final RequestPartitionId myRequestPartitionId;
104        private final SearchRuntimeDetails mySearchRuntimeDetails;
105        private final Transaction myParentTransaction;
106        private final Consumer<String> myOnRemove;
107        private final int mySyncSize;
108        private final Integer myLoadingThrottleForUnitTests;
109        private final IInterceptorBroadcaster myInterceptorBroadcaster;
110        private final SearchBuilderFactory<JpaPid> mySearchBuilderFactory;
111        private final JpaStorageSettings myStorageSettings;
112        private final ISearchCacheSvc mySearchCacheSvc;
113        private final IPagingProvider myPagingProvider;
114        private final IInterceptorBroadcaster myCompositeBroadcaster;
115        private Search mySearch;
116        private boolean myAbortRequested;
117        private int myCountSavedTotal = 0;
118        private int myCountSavedThisPass = 0;
119        private int myCountBlockedThisPass = 0;
120        private boolean myAdditionalPrefetchThresholdsRemaining;
121        private List<JpaPid> myPreviouslyAddedResourcePids;
122        // The max number of results that we request from the search
123        // Note that this is set using the configured pre-fetch thresholds, maximum page size, and/or the client provided
124        // _count parameter
125        private Integer myMaxResultsToFetch;
126
127        /**
128         * Constructor
129         */
130        @SuppressWarnings({"unchecked", "rawtypes"})
131        public SearchTask(
132                        SearchTaskParameters theCreationParams,
133                        HapiTransactionService theManagedTxManager,
134                        FhirContext theContext,
135                        IInterceptorBroadcaster theInterceptorBroadcaster,
136                        SearchBuilderFactory theSearchBuilderFactory,
137                        ISearchResultCacheSvc theSearchResultCacheSvc,
138                        JpaStorageSettings theStorageSettings,
139                        ISearchCacheSvc theSearchCacheSvc,
140                        IPagingProvider thePagingProvider) {
141                // beans
142                myTxService = theManagedTxManager;
143                myContext = theContext;
144                myInterceptorBroadcaster = theInterceptorBroadcaster;
145                mySearchBuilderFactory = theSearchBuilderFactory;
146                mySearchResultCacheSvc = theSearchResultCacheSvc;
147                myStorageSettings = theStorageSettings;
148                mySearchCacheSvc = theSearchCacheSvc;
149                myPagingProvider = thePagingProvider;
150
151                // values
152                myOnRemove = theCreationParams.OnRemove;
153                mySearch = theCreationParams.Search;
154                myParams = theCreationParams.Params;
155                myResourceType = theCreationParams.ResourceType;
156                myRequest = theCreationParams.Request;
157                myCompletionLatch = new CountDownLatch(1);
158                mySyncSize = theCreationParams.SyncSize;
159                myLoadingThrottleForUnitTests = theCreationParams.getLoadingThrottleForUnitTests();
160
161                mySearchRuntimeDetails = new SearchRuntimeDetails(myRequest, mySearch.getUuid());
162                mySearchRuntimeDetails.setQueryString(myParams.toNormalizedQueryString(myContext));
163                myRequestPartitionId = theCreationParams.RequestPartitionId;
164                myParentTransaction = ElasticApm.currentTransaction();
165                myCompositeBroadcaster =
166                                CompositeInterceptorBroadcaster.newCompositeBroadcaster(myInterceptorBroadcaster, myRequest);
167        }
168
169        protected RequestPartitionId getRequestPartitionId() {
170                return myRequestPartitionId;
171        }
172
173        /**
174         * This method is called by the server HTTP thread, and
175         * will block until at least one page of results have been
176         * fetched from the DB, and will never block after that.
177         */
178        public Integer awaitInitialSync() {
179                ourLog.trace("Awaiting initial sync");
180                do {
181                        ourLog.trace("Search {} aborted: {}", getSearch().getUuid(), !isNotAborted());
182                        if (AsyncUtil.awaitLatchAndThrowInternalErrorExceptionOnInterrupt(
183                                        getInitialCollectionLatch(), 250L, TimeUnit.MILLISECONDS)) {
184                                break;
185                        }
186                } while (getSearch().getStatus() == SearchStatusEnum.LOADING);
187                ourLog.trace("Initial sync completed");
188
189                return getSearch().getTotalCount();
190        }
191
192        public Search getSearch() {
193                return mySearch;
194        }
195
196        public CountDownLatch getInitialCollectionLatch() {
197                return myInitialCollectionLatch;
198        }
199
200        public void setPreviouslyAddedResourcePids(List<JpaPid> thePreviouslyAddedResourcePids) {
201                myPreviouslyAddedResourcePids = thePreviouslyAddedResourcePids;
202                myCountSavedTotal = myPreviouslyAddedResourcePids.size();
203        }
204
205        @SuppressWarnings("rawtypes")
206        private ISearchBuilder newSearchBuilder() {
207                Class<? extends IBaseResource> resourceTypeClass =
208                                myContext.getResourceDefinition(myResourceType).getImplementingClass();
209                return mySearchBuilderFactory.newSearchBuilder(myResourceType, resourceTypeClass);
210        }
211
212        @Nonnull
213        public List<JpaPid> getResourcePids(int theFromIndex, int theToIndex) {
214                ourLog.debug("Requesting search PIDs from {}-{}", theFromIndex, theToIndex);
215
216                boolean keepWaiting;
217                do {
218                        synchronized (mySyncedPids) {
219                                ourLog.trace("Search status is {}", mySearch.getStatus());
220                                boolean haveEnoughResults = mySyncedPids.size() >= theToIndex;
221                                if (!haveEnoughResults) {
222                                        switch (mySearch.getStatus()) {
223                                                case LOADING:
224                                                        keepWaiting = true;
225                                                        break;
226                                                case PASSCMPLET:
227                                                        /*
228                                                         * If we get here, it means that the user requested resources that crossed the
229                                                         * current pre-fetch boundary. For example, if the prefetch threshold is 50 and the
230                                                         * user has requested resources 0-60, then they would get 0-50 back but the search
231                                                         * coordinator would then stop searching.SearchCoordinatorSvcImplTest
232                                                         */
233                                                        keepWaiting = false;
234                                                        break;
235                                                case FAILED:
236                                                case FINISHED:
237                                                case GONE:
238                                                default:
239                                                        keepWaiting = false;
240                                                        break;
241                                        }
242                                } else {
243                                        keepWaiting = false;
244                                }
245                        }
246
247                        if (keepWaiting) {
248                                ourLog.info(
249                                                "Waiting as we only have {} results - Search status: {}",
250                                                mySyncedPids.size(),
251                                                mySearch.getStatus());
252                                AsyncUtil.sleep(500L);
253                        }
254                } while (keepWaiting);
255
256                ourLog.debug("Proceeding, as we have {} results", mySyncedPids.size());
257
258                ArrayList<JpaPid> retVal = new ArrayList<>();
259                synchronized (mySyncedPids) {
260                        QueryParameterUtils.verifySearchHasntFailedOrThrowInternalErrorException(mySearch);
261
262                        int toIndex = theToIndex;
263                        if (mySyncedPids.size() < toIndex) {
264                                toIndex = mySyncedPids.size();
265                        }
266                        for (int i = theFromIndex; i < toIndex; i++) {
267                                retVal.add(mySyncedPids.get(i));
268                        }
269                }
270
271                ourLog.trace(
272                                "Done syncing results - Wanted {}-{} and returning {} of {}",
273                                theFromIndex,
274                                theToIndex,
275                                retVal.size(),
276                                mySyncedPids.size());
277
278                return retVal;
279        }
280
281        public void saveSearch() {
282                myTxService
283                                .withRequest(myRequest)
284                                .withRequestPartitionId(myRequestPartitionId)
285                                .withPropagation(Propagation.REQUIRES_NEW)
286                                .execute(this::doSaveSearch);
287        }
288
289        @SuppressWarnings("rawtypes")
290        private void saveUnsynced(final IResultIterator theResultIter) {
291                myTxService
292                                .withRequest(myRequest)
293                                .withRequestPartitionId(myRequestPartitionId)
294                                .execute(() -> {
295                                        if (mySearch.getId() == null) {
296                                                doSaveSearch();
297                                        }
298
299                                        ArrayList<JpaPid> unsyncedPids = myUnsyncedPids;
300                                        int countBlocked = 0;
301
302                                        // Interceptor call: STORAGE_PREACCESS_RESOURCES
303                                        // This can be used to remove results from the search result details before
304                                        // the user has a chance to know that they were in the results
305                                        if (mySearchRuntimeDetails.getRequestDetails() != null && !unsyncedPids.isEmpty()) {
306                                                JpaPreResourceAccessDetails accessDetails =
307                                                                new JpaPreResourceAccessDetails(unsyncedPids, this::newSearchBuilder);
308                                                HookParams params = new HookParams()
309                                                                .add(IPreResourceAccessDetails.class, accessDetails)
310                                                                .add(RequestDetails.class, mySearchRuntimeDetails.getRequestDetails())
311                                                                .addIfMatchesType(
312                                                                                ServletRequestDetails.class, mySearchRuntimeDetails.getRequestDetails());
313                                                myCompositeBroadcaster.callHooks(Pointcut.STORAGE_PREACCESS_RESOURCES, params);
314
315                                                for (int i = unsyncedPids.size() - 1; i >= 0; i--) {
316                                                        if (accessDetails.isDontReturnResourceAtIndex(i)) {
317                                                                unsyncedPids.remove(i);
318                                                                myCountBlockedThisPass++;
319                                                                myCountSavedTotal++;
320                                                                countBlocked++;
321                                                        }
322                                                }
323                                        }
324
325                                        // Actually store the results in the query cache storage
326                                        myCountSavedTotal += unsyncedPids.size();
327                                        myCountSavedThisPass += unsyncedPids.size();
328                                        mySearchResultCacheSvc.storeResults(
329                                                        mySearch, mySyncedPids, unsyncedPids, myRequest, getRequestPartitionId());
330
331                                        synchronized (mySyncedPids) {
332                                                int numSyncedThisPass = unsyncedPids.size();
333                                                ourLog.trace(
334                                                                "Syncing {} search results - Have more: {}",
335                                                                numSyncedThisPass,
336                                                                theResultIter.hasNext());
337                                                mySyncedPids.addAll(unsyncedPids);
338                                                unsyncedPids.clear();
339
340                                                if (!theResultIter.hasNext()) {
341                                                        int skippedCount = theResultIter.getSkippedCount();
342                                                        ourLog.trace(
343                                                                        "MaxToFetch[{}] SkippedCount[{}] CountSavedThisPass[{}] CountSavedThisTotal[{}] AdditionalPrefetchRemaining[{}]",
344                                                                        myMaxResultsToFetch,
345                                                                        skippedCount,
346                                                                        myCountSavedThisPass,
347                                                                        myCountSavedTotal,
348                                                                        myAdditionalPrefetchThresholdsRemaining);
349
350                                                        if (isFinished(theResultIter)) {
351                                                                // finished
352                                                                ourLog.trace("Setting search status to FINISHED");
353                                                                mySearch.setStatus(SearchStatusEnum.FINISHED);
354                                                                mySearch.setTotalCount(myCountSavedTotal - countBlocked);
355                                                        } else if (myAdditionalPrefetchThresholdsRemaining) {
356                                                                // pass complete
357                                                                ourLog.trace("Setting search status to PASSCMPLET");
358                                                                mySearch.setStatus(SearchStatusEnum.PASSCMPLET);
359                                                                mySearch.setSearchParameterMap(myParams);
360                                                        } else {
361                                                                // also finished
362                                                                ourLog.trace("Setting search status to FINISHED");
363                                                                mySearch.setStatus(SearchStatusEnum.FINISHED);
364                                                                mySearch.setTotalCount(myCountSavedTotal - countBlocked);
365                                                        }
366                                                }
367                                        }
368
369                                        mySearch.setNumFound(myCountSavedTotal);
370                                        mySearch.setNumBlocked(mySearch.getNumBlocked() + countBlocked);
371
372                                        int numSynced;
373                                        synchronized (mySyncedPids) {
374                                                numSynced = mySyncedPids.size();
375                                        }
376
377                                        if (myStorageSettings.getCountSearchResultsUpTo() == null
378                                                        || myStorageSettings.getCountSearchResultsUpTo() <= 0
379                                                        || myStorageSettings.getCountSearchResultsUpTo() <= numSynced) {
380                                                myInitialCollectionLatch.countDown();
381                                        }
382
383                                        doSaveSearch();
384
385                                        ourLog.trace("saveUnsynced() - pre-commit");
386                                });
387                ourLog.trace("saveUnsynced() - post-commit");
388        }
389
390        @SuppressWarnings("rawtypes")
391        private boolean isFinished(final IResultIterator theResultIter) {
392                int skippedCount = theResultIter.getSkippedCount();
393                int nonSkippedCount = theResultIter.getNonSkippedCount();
394                int totalFetched = skippedCount + myCountSavedThisPass + myCountBlockedThisPass;
395
396                if (myMaxResultsToFetch != null && totalFetched < myMaxResultsToFetch) {
397                        // total fetched < max results to fetch -> we've exhausted the search
398                        return true;
399                } else {
400                        if (nonSkippedCount == 0) {
401                                // no skipped resources in this query
402                                if (myParams.getCount() != null) {
403                                        // count supplied
404                                        // if the count is > what we've fetched -> we've exhausted the query
405                                        return myParams.getCount() > totalFetched;
406                                } else {
407                                        // legacy - we have no skipped resources - we are done
408                                        return true;
409                                }
410                        }
411                        // skipped resources means we have more to fetch
412                        return false;
413                }
414        }
415
416        public boolean isNotAborted() {
417                return !myAbortRequested;
418        }
419
420        public void markComplete() {
421                myCompletionLatch.countDown();
422        }
423
424        public CountDownLatch getCompletionLatch() {
425                return myCompletionLatch;
426        }
427
428        /**
429         * Request that the task abort as soon as possible
430         */
431        public void requestImmediateAbort() {
432                myAbortRequested = true;
433        }
434
435        /**
436         * This is the method which actually performs the search.
437         * It is called automatically by the thread pool.
438         */
439        @Override
440        public Void call() {
441                StopWatch sw = new StopWatch();
442                Span span = myParentTransaction.startSpan("db", "query", "search");
443                span.setName("FHIR Database Search");
444                try {
445                        // Create an initial search in the DB and give it an ID
446                        saveSearch();
447
448                        myTxService
449                                        .withRequest(myRequest)
450                                        .withRequestPartitionId(myRequestPartitionId)
451                                        .execute(this::doSearch);
452
453                        mySearchRuntimeDetails.setSearchStatus(mySearch.getStatus());
454                        if (mySearch.getStatus() == SearchStatusEnum.FINISHED) {
455                                HookParams params = new HookParams()
456                                                .add(RequestDetails.class, myRequest)
457                                                .addIfMatchesType(ServletRequestDetails.class, myRequest)
458                                                .add(SearchRuntimeDetails.class, mySearchRuntimeDetails);
459                                myCompositeBroadcaster.callHooks(Pointcut.JPA_PERFTRACE_SEARCH_COMPLETE, params);
460                        } else {
461                                HookParams params = new HookParams()
462                                                .add(RequestDetails.class, myRequest)
463                                                .addIfMatchesType(ServletRequestDetails.class, myRequest)
464                                                .add(SearchRuntimeDetails.class, mySearchRuntimeDetails);
465                                myCompositeBroadcaster.callHooks(Pointcut.JPA_PERFTRACE_SEARCH_PASS_COMPLETE, params);
466                        }
467
468                        ourLog.trace(
469                                        "Have completed search for [{}{}] and found {} resources in {}ms - Status is {}",
470                                        mySearch.getResourceType(),
471                                        mySearch.getSearchQueryString(),
472                                        mySyncedPids.size(),
473                                        sw.getMillis(),
474                                        mySearch.getStatus());
475
476                } catch (Throwable t) {
477
478                        /*
479                         * Don't print a stack trace for client errors (i.e. requests that
480                         * aren't valid because the client screwed up).. that's just noise
481                         * in the logs and who needs that.
482                         */
483                        boolean logged = false;
484                        if (t instanceof BaseServerResponseException) {
485                                BaseServerResponseException exception = (BaseServerResponseException) t;
486                                if (exception.getStatusCode() >= 400 && exception.getStatusCode() < 500) {
487                                        logged = true;
488                                        ourLog.warn("Failed during search due to invalid request: {}", t.toString());
489                                }
490                        }
491
492                        if (!logged) {
493                                ourLog.error("Failed during search loading after {}ms", sw.getMillis(), t);
494                        }
495                        myUnsyncedPids.clear();
496                        Throwable rootCause = ExceptionUtils.getRootCause(t);
497                        rootCause = defaultIfNull(rootCause, t);
498
499                        String failureMessage = rootCause.getMessage();
500
501                        int failureCode = InternalErrorException.STATUS_CODE;
502                        if (t instanceof BaseServerResponseException) {
503                                failureCode = ((BaseServerResponseException) t).getStatusCode();
504                        }
505
506                        if (HapiSystemProperties.isUnitTestCaptureStackEnabled()) {
507                                failureMessage += "\nStack\n" + ExceptionUtils.getStackTrace(rootCause);
508                        }
509
510                        mySearch.setFailureMessage(failureMessage);
511                        mySearch.setFailureCode(failureCode);
512                        mySearch.setStatus(SearchStatusEnum.FAILED);
513
514                        mySearchRuntimeDetails.setSearchStatus(mySearch.getStatus());
515                        HookParams params = new HookParams()
516                                        .add(RequestDetails.class, myRequest)
517                                        .addIfMatchesType(ServletRequestDetails.class, myRequest)
518                                        .add(SearchRuntimeDetails.class, mySearchRuntimeDetails);
519                        myCompositeBroadcaster.callHooks(Pointcut.JPA_PERFTRACE_SEARCH_FAILED, params);
520
521                        saveSearch();
522                        span.captureException(t);
523                } finally {
524                        myOnRemove.accept(mySearch.getUuid());
525
526                        myInitialCollectionLatch.countDown();
527                        markComplete();
528                        span.end();
529                }
530                return null;
531        }
532
533        private void doSaveSearch() {
534                Search newSearch = mySearchCacheSvc.save(mySearch, myRequestPartitionId);
535
536                // mySearchDao.save is not supposed to return null, but in unit tests
537                // it can if the mock search dao isn't set up to handle that
538                if (newSearch != null) {
539                        mySearch = newSearch;
540                }
541        }
542
543        /**
544         * This method actually creates the database query to perform the
545         * search, and starts it.
546         */
547        @SuppressWarnings({"rawtypes", "unchecked"})
548        private void doSearch() {
549                /*
550                 * If the user has explicitly requested a _count, perform a
551                 *
552                 * SELECT COUNT(*) ....
553                 *
554                 * before doing anything else.
555                 */
556                boolean myParamWantOnlyCount = isWantOnlyCount(myParams);
557                boolean myParamOrDefaultWantCount = nonNull(myParams.getSearchTotalMode())
558                                ? isWantCount(myParams)
559                                : SearchParameterMapCalculator.isWantCount(myStorageSettings.getDefaultTotalMode());
560
561                if (myParamWantOnlyCount || myParamOrDefaultWantCount) {
562                        doCountOnlyQuery(myParamWantOnlyCount);
563                        if (myParamWantOnlyCount) {
564                                return;
565                        }
566                }
567
568                ourLog.trace("Done count");
569                ISearchBuilder sb = newSearchBuilder();
570
571                /*
572                 * Figure out how many results we're actually going to fetch from the
573                 * database in this pass. This calculation takes into consideration the
574                 * "pre-fetch thresholds" specified in StorageSettings#getSearchPreFetchThresholds()
575                 * as well as the value of the _count parameter.
576                 */
577                int currentlyLoaded = defaultIfNull(mySearch.getNumFound(), 0);
578                int minWanted = 0;
579
580                // if no count is provided,
581                // we only use the values in SearchPreFetchThresholds
582                // but if there is a count...
583                if (myParams.getCount() != null) {
584                        minWanted = Math.min(myParams.getCount(), myPagingProvider.getMaximumPageSize());
585                        minWanted += currentlyLoaded;
586                }
587
588                // iterate through the search thresholds
589                for (Iterator<Integer> iter =
590                                                myStorageSettings.getSearchPreFetchThresholds().iterator();
591                                iter.hasNext(); ) {
592                        int next = iter.next();
593                        if (next != -1 && next <= currentlyLoaded) {
594                                continue;
595                        }
596
597                        if (next == -1) {
598                                sb.setMaxResultsToFetch(null);
599                                /*
600                                 * If we're past the last prefetch threshold then
601                                 * we're potentially fetching unlimited amounts of data.
602                                 * We'll move responsibility for deduplication to the database in this case
603                                 * so that we don't run the risk of blowing out the memory
604                                 * in the app server
605                                 */
606                                sb.setDeduplicateInDatabase(true);
607                        } else {
608                                // we want at least 1 more than our requested amount
609                                // so we know that there are other results
610                                // (in case we get the exact amount back)
611                                myMaxResultsToFetch = Math.max(next, minWanted) + 1;
612                                sb.setMaxResultsToFetch(myMaxResultsToFetch);
613                        }
614
615                        if (iter.hasNext()) {
616                                myAdditionalPrefetchThresholdsRemaining = true;
617                        }
618
619                        // If we get here's we've found an appropriate threshold
620                        break;
621                }
622
623                /*
624                 * Provide any PID we loaded in previous search passes to the
625                 * SearchBuilder so that we don't get duplicates coming from running
626                 * the same query again.
627                 *
628                 * We could possibly accomplish this in a different way by using sorted
629                 * results in our SQL query and specifying an offset. I don't actually
630                 * know if that would be faster or not. At some point should test this
631                 * idea.
632                 */
633                if (myPreviouslyAddedResourcePids != null) {
634                        sb.setPreviouslyAddedResourcePids(myPreviouslyAddedResourcePids);
635                        mySyncedPids.addAll(myPreviouslyAddedResourcePids);
636                }
637
638                /*
639                 * createQuery
640                 * Construct the SQL query we'll be sending to the database
641                 *
642                 * NB: (See createCountQuery above)
643                 * We will pass the original myParams here (not a copy)
644                 * because we actually _want_ the mutation of the myParams to happen.
645                 * Specifically because SearchBuilder itself will _expect_
646                 * not to have these parameters when dumping back
647                 * to our DB.
648                 *
649                 * This is an odd implementation behaviour, but the change
650                 * for this will require a lot more handling at higher levels
651                 */
652                try (IResultIterator<JpaPid> resultIterator =
653                                sb.createQuery(myParams, mySearchRuntimeDetails, myRequest, myRequestPartitionId)) {
654                        // resultIterator is SearchBuilder.QueryIterator
655                        assert (resultIterator != null);
656
657                        /*
658                         * The following loop actually loads the PIDs of the resources
659                         * matching the search off of the disk and into memory. After
660                         * every X results, we commit to the HFJ_SEARCH table.
661                         */
662                        int syncSize = mySyncSize;
663                        while (resultIterator.hasNext()) {
664                                myUnsyncedPids.add(resultIterator.next());
665
666                                boolean shouldSync = myUnsyncedPids.size() >= syncSize;
667
668                                if (myStorageSettings.getCountSearchResultsUpTo() != null
669                                                && myStorageSettings.getCountSearchResultsUpTo() > 0
670                                                && myStorageSettings.getCountSearchResultsUpTo() < myUnsyncedPids.size()) {
671                                        shouldSync = false;
672                                }
673
674                                if (myUnsyncedPids.size() > 50000) {
675                                        shouldSync = true;
676                                }
677
678                                // If no abort was requested, bail out
679                                Validate.isTrue(isNotAborted(), "Abort has been requested");
680
681                                if (shouldSync) {
682                                        saveUnsynced(resultIterator);
683                                }
684
685                                if (myLoadingThrottleForUnitTests != null) {
686                                        AsyncUtil.sleep(myLoadingThrottleForUnitTests);
687                                }
688                        }
689
690                        // If no abort was requested, bail out
691                        Validate.isTrue(isNotAborted(), "Abort has been requested");
692
693                        saveUnsynced(resultIterator);
694
695                } catch (IOException e) {
696                        ourLog.error("IO failure during database access", e);
697                        throw new InternalErrorException(Msg.code(1166) + e);
698                }
699        }
700
701        /**
702         * Does the query but only for the count.
703         * @param theParamWantOnlyCount - if count query is wanted only
704         */
705        private void doCountOnlyQuery(boolean theParamWantOnlyCount) {
706                ourLog.trace("Performing count");
707                @SuppressWarnings("rawtypes")
708                ISearchBuilder sb = newSearchBuilder();
709
710                /*
711                 * createCountQuery
712                 * NB: (see createQuery below)
713                 * Because FulltextSearchSvcImpl will (internally)
714                 * mutate the myParams (searchmap),
715                 * (specifically removing the _content and _text filters)
716                 * we will have to clone those parameters here so that
717                 * the "correct" params are used in createQuery below
718                 */
719                Long count = sb.createCountQuery(myParams.clone(), mySearch.getUuid(), myRequest, myRequestPartitionId);
720
721                ourLog.trace("Got count {}", count);
722
723                myTxService
724                                .withRequest(myRequest)
725                                .withRequestPartitionId(myRequestPartitionId)
726                                .execute(() -> {
727                                        mySearch.setTotalCount(count.intValue());
728                                        if (theParamWantOnlyCount) {
729                                                mySearch.setStatus(SearchStatusEnum.FINISHED);
730                                        }
731                                        doSaveSearch();
732                                });
733        }
734}