001/*-
002 * #%L
003 * HAPI FHIR JPA Server
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.search.builder.tasks;
021
022import ca.uhn.fhir.context.FhirContext;
023import ca.uhn.fhir.i18n.Msg;
024import ca.uhn.fhir.interceptor.api.HookParams;
025import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
026import ca.uhn.fhir.interceptor.api.Pointcut;
027import ca.uhn.fhir.interceptor.model.RequestPartitionId;
028import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
029import ca.uhn.fhir.jpa.api.dao.IDao;
030import ca.uhn.fhir.jpa.dao.IResultIterator;
031import ca.uhn.fhir.jpa.dao.ISearchBuilder;
032import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
033import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
034import ca.uhn.fhir.jpa.entity.Search;
035import ca.uhn.fhir.jpa.interceptor.JpaPreResourceAccessDetails;
036import ca.uhn.fhir.jpa.model.dao.JpaPid;
037import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
038import ca.uhn.fhir.jpa.model.search.SearchStatusEnum;
039import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc;
040import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc;
041import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
042import ca.uhn.fhir.jpa.util.QueryParameterUtils;
043import ca.uhn.fhir.jpa.util.SearchParameterMapCalculator;
044import ca.uhn.fhir.rest.api.server.IPreResourceAccessDetails;
045import ca.uhn.fhir.rest.api.server.RequestDetails;
046import ca.uhn.fhir.rest.server.IPagingProvider;
047import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException;
048import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
049import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
050import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster;
051import ca.uhn.fhir.system.HapiSystemProperties;
052import ca.uhn.fhir.util.AsyncUtil;
053import ca.uhn.fhir.util.StopWatch;
054import co.elastic.apm.api.ElasticApm;
055import co.elastic.apm.api.Span;
056import co.elastic.apm.api.Transaction;
057import jakarta.annotation.Nonnull;
058import org.apache.commons.lang3.Validate;
059import org.apache.commons.lang3.exception.ExceptionUtils;
060import org.hl7.fhir.instance.model.api.IBaseResource;
061import org.springframework.transaction.annotation.Propagation;
062
063import java.io.IOException;
064import java.util.ArrayList;
065import java.util.Iterator;
066import java.util.List;
067import java.util.concurrent.Callable;
068import java.util.concurrent.CountDownLatch;
069import java.util.concurrent.TimeUnit;
070import java.util.function.Consumer;
071
072import static ca.uhn.fhir.jpa.util.SearchParameterMapCalculator.isWantCount;
073import static ca.uhn.fhir.jpa.util.SearchParameterMapCalculator.isWantOnlyCount;
074import static java.util.Objects.nonNull;
075import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
076
077/**
078 * A search task is a Callable task that runs in
079 * a thread pool to handle an individual search. One instance
080 * is created for any requested search and runs from the
081 * beginning to the end of the search.
082 * <p>
083 * Understand:
084 * This class executes in its own thread separate from the
085 * web server client thread that made the request. We do that
086 * so that we can return to the client as soon as possible,
087 * but keep the search going in the background (and have
088 * the next page of results ready to go when the client asks).
089 */
090public class SearchTask implements Callable<Void> {
091
092        private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchTask.class);
093        // injected beans
094        protected final HapiTransactionService myTxService;
095        protected final FhirContext myContext;
096        protected final ISearchResultCacheSvc mySearchResultCacheSvc;
097        private final SearchParameterMap myParams;
098        private final IDao myCallingDao;
099        private final String myResourceType;
100        private final ArrayList<JpaPid> mySyncedPids = new ArrayList<>();
101        private final CountDownLatch myInitialCollectionLatch = new CountDownLatch(1);
102        private final CountDownLatch myCompletionLatch;
103        private final ArrayList<JpaPid> myUnsyncedPids = new ArrayList<>();
104        private final RequestDetails myRequest;
105        private final RequestPartitionId myRequestPartitionId;
106        private final SearchRuntimeDetails mySearchRuntimeDetails;
107        private final Transaction myParentTransaction;
108        private final Consumer<String> myOnRemove;
109        private final int mySyncSize;
110        private final Integer myLoadingThrottleForUnitTests;
111        private final IInterceptorBroadcaster myInterceptorBroadcaster;
112        private final SearchBuilderFactory<JpaPid> mySearchBuilderFactory;
113        private final JpaStorageSettings myStorageSettings;
114        private final ISearchCacheSvc mySearchCacheSvc;
115        private final IPagingProvider myPagingProvider;
116        private Search mySearch;
117        private boolean myAbortRequested;
118        private int myCountSavedTotal = 0;
119        private int myCountSavedThisPass = 0;
120        private int myCountBlockedThisPass = 0;
121        private boolean myAdditionalPrefetchThresholdsRemaining;
122        private List<JpaPid> myPreviouslyAddedResourcePids;
123        private Integer myMaxResultsToFetch;
124
125        /**
126         * Constructor
127         */
128        @SuppressWarnings({"unchecked", "rawtypes"})
129        public SearchTask(
130                        SearchTaskParameters theCreationParams,
131                        HapiTransactionService theManagedTxManager,
132                        FhirContext theContext,
133                        IInterceptorBroadcaster theInterceptorBroadcaster,
134                        SearchBuilderFactory theSearchBuilderFactory,
135                        ISearchResultCacheSvc theSearchResultCacheSvc,
136                        JpaStorageSettings theStorageSettings,
137                        ISearchCacheSvc theSearchCacheSvc,
138                        IPagingProvider thePagingProvider) {
139                // beans
140                myTxService = theManagedTxManager;
141                myContext = theContext;
142                myInterceptorBroadcaster = theInterceptorBroadcaster;
143                mySearchBuilderFactory = theSearchBuilderFactory;
144                mySearchResultCacheSvc = theSearchResultCacheSvc;
145                myStorageSettings = theStorageSettings;
146                mySearchCacheSvc = theSearchCacheSvc;
147                myPagingProvider = thePagingProvider;
148
149                // values
150                myOnRemove = theCreationParams.OnRemove;
151                mySearch = theCreationParams.Search;
152                myCallingDao = theCreationParams.CallingDao;
153                myParams = theCreationParams.Params;
154                myResourceType = theCreationParams.ResourceType;
155                myRequest = theCreationParams.Request;
156                myCompletionLatch = new CountDownLatch(1);
157                mySyncSize = theCreationParams.SyncSize;
158                myLoadingThrottleForUnitTests = theCreationParams.getLoadingThrottleForUnitTests();
159
160                mySearchRuntimeDetails = new SearchRuntimeDetails(myRequest, mySearch.getUuid());
161                mySearchRuntimeDetails.setQueryString(myParams.toNormalizedQueryString(myCallingDao.getContext()));
162                myRequestPartitionId = theCreationParams.RequestPartitionId;
163                myParentTransaction = ElasticApm.currentTransaction();
164        }
165
166        protected RequestPartitionId getRequestPartitionId() {
167                return myRequestPartitionId;
168        }
169
170        /**
171         * This method is called by the server HTTP thread, and
172         * will block until at least one page of results have been
173         * fetched from the DB, and will never block after that.
174         */
175        public Integer awaitInitialSync() {
176                ourLog.trace("Awaiting initial sync");
177                do {
178                        ourLog.trace("Search {} aborted: {}", getSearch().getUuid(), !isNotAborted());
179                        if (AsyncUtil.awaitLatchAndThrowInternalErrorExceptionOnInterrupt(
180                                        getInitialCollectionLatch(), 250L, TimeUnit.MILLISECONDS)) {
181                                break;
182                        }
183                } while (getSearch().getStatus() == SearchStatusEnum.LOADING);
184                ourLog.trace("Initial sync completed");
185
186                return getSearch().getTotalCount();
187        }
188
189        public Search getSearch() {
190                return mySearch;
191        }
192
193        public CountDownLatch getInitialCollectionLatch() {
194                return myInitialCollectionLatch;
195        }
196
197        public void setPreviouslyAddedResourcePids(List<JpaPid> thePreviouslyAddedResourcePids) {
198                myPreviouslyAddedResourcePids = thePreviouslyAddedResourcePids;
199                myCountSavedTotal = myPreviouslyAddedResourcePids.size();
200        }
201
202        @SuppressWarnings("rawtypes")
203        private ISearchBuilder newSearchBuilder() {
204                Class<? extends IBaseResource> resourceTypeClass =
205                                myContext.getResourceDefinition(myResourceType).getImplementingClass();
206                return mySearchBuilderFactory.newSearchBuilder(myCallingDao, myResourceType, resourceTypeClass);
207        }
208
209        @Nonnull
210        public List<JpaPid> getResourcePids(int theFromIndex, int theToIndex) {
211                ourLog.debug("Requesting search PIDs from {}-{}", theFromIndex, theToIndex);
212
213                boolean keepWaiting;
214                do {
215                        synchronized (mySyncedPids) {
216                                ourLog.trace("Search status is {}", mySearch.getStatus());
217                                boolean haveEnoughResults = mySyncedPids.size() >= theToIndex;
218                                if (!haveEnoughResults) {
219                                        switch (mySearch.getStatus()) {
220                                                case LOADING:
221                                                        keepWaiting = true;
222                                                        break;
223                                                case PASSCMPLET:
224                                                        /*
225                                                         * If we get here, it means that the user requested resources that crossed the
226                                                         * current pre-fetch boundary. For example, if the prefetch threshold is 50 and the
227                                                         * user has requested resources 0-60, then they would get 0-50 back but the search
228                                                         * coordinator would then stop searching.SearchCoordinatorSvcImplTest
229                                                         */
230                                                        keepWaiting = false;
231                                                        break;
232                                                case FAILED:
233                                                case FINISHED:
234                                                case GONE:
235                                                default:
236                                                        keepWaiting = false;
237                                                        break;
238                                        }
239                                } else {
240                                        keepWaiting = false;
241                                }
242                        }
243
244                        if (keepWaiting) {
245                                ourLog.info(
246                                                "Waiting as we only have {} results - Search status: {}",
247                                                mySyncedPids.size(),
248                                                mySearch.getStatus());
249                                AsyncUtil.sleep(500L);
250                        }
251                } while (keepWaiting);
252
253                ourLog.debug("Proceeding, as we have {} results", mySyncedPids.size());
254
255                ArrayList<JpaPid> retVal = new ArrayList<>();
256                synchronized (mySyncedPids) {
257                        QueryParameterUtils.verifySearchHasntFailedOrThrowInternalErrorException(mySearch);
258
259                        int toIndex = theToIndex;
260                        if (mySyncedPids.size() < toIndex) {
261                                toIndex = mySyncedPids.size();
262                        }
263                        for (int i = theFromIndex; i < toIndex; i++) {
264                                retVal.add(mySyncedPids.get(i));
265                        }
266                }
267
268                ourLog.trace(
269                                "Done syncing results - Wanted {}-{} and returning {} of {}",
270                                theFromIndex,
271                                theToIndex,
272                                retVal.size(),
273                                mySyncedPids.size());
274
275                return retVal;
276        }
277
278        public void saveSearch() {
279                myTxService
280                                .withRequest(myRequest)
281                                .withRequestPartitionId(myRequestPartitionId)
282                                .withPropagation(Propagation.REQUIRES_NEW)
283                                .execute(() -> doSaveSearch());
284        }
285
286        @SuppressWarnings("rawtypes")
287        private void saveUnsynced(final IResultIterator theResultIter) {
288                myTxService
289                                .withRequest(myRequest)
290                                .withRequestPartitionId(myRequestPartitionId)
291                                .execute(() -> {
292                                        if (mySearch.getId() == null) {
293                                                doSaveSearch();
294                                        }
295
296                                        ArrayList<JpaPid> unsyncedPids = myUnsyncedPids;
297                                        int countBlocked = 0;
298
299                                        // Interceptor call: STORAGE_PREACCESS_RESOURCES
300                                        // This can be used to remove results from the search result details before
301                                        // the user has a chance to know that they were in the results
302                                        if (mySearchRuntimeDetails.getRequestDetails() != null && !unsyncedPids.isEmpty()) {
303                                                JpaPreResourceAccessDetails accessDetails =
304                                                                new JpaPreResourceAccessDetails(unsyncedPids, this::newSearchBuilder);
305                                                HookParams params = new HookParams()
306                                                                .add(IPreResourceAccessDetails.class, accessDetails)
307                                                                .add(RequestDetails.class, mySearchRuntimeDetails.getRequestDetails())
308                                                                .addIfMatchesType(
309                                                                                ServletRequestDetails.class, mySearchRuntimeDetails.getRequestDetails());
310                                                CompositeInterceptorBroadcaster.doCallHooks(
311                                                                myInterceptorBroadcaster, myRequest, Pointcut.STORAGE_PREACCESS_RESOURCES, params);
312
313                                                for (int i = unsyncedPids.size() - 1; i >= 0; i--) {
314                                                        if (accessDetails.isDontReturnResourceAtIndex(i)) {
315                                                                unsyncedPids.remove(i);
316                                                                myCountBlockedThisPass++;
317                                                                myCountSavedTotal++;
318                                                                countBlocked++;
319                                                        }
320                                                }
321                                        }
322
323                                        // Actually store the results in the query cache storage
324                                        myCountSavedTotal += unsyncedPids.size();
325                                        myCountSavedThisPass += unsyncedPids.size();
326                                        mySearchResultCacheSvc.storeResults(
327                                                        mySearch, mySyncedPids, unsyncedPids, myRequest, getRequestPartitionId());
328
329                                        synchronized (mySyncedPids) {
330                                                int numSyncedThisPass = unsyncedPids.size();
331                                                ourLog.trace(
332                                                                "Syncing {} search results - Have more: {}",
333                                                                numSyncedThisPass,
334                                                                theResultIter.hasNext());
335                                                mySyncedPids.addAll(unsyncedPids);
336                                                unsyncedPids.clear();
337
338                                                if (!theResultIter.hasNext()) {
339                                                        int skippedCount = theResultIter.getSkippedCount();
340                                                        ourLog.trace(
341                                                                        "MaxToFetch[{}] SkippedCount[{}] CountSavedThisPass[{}] CountSavedThisTotal[{}] AdditionalPrefetchRemaining[{}]",
342                                                                        myMaxResultsToFetch,
343                                                                        skippedCount,
344                                                                        myCountSavedThisPass,
345                                                                        myCountSavedTotal,
346                                                                        myAdditionalPrefetchThresholdsRemaining);
347
348                                                        if (isFinished(theResultIter)) {
349                                                                // finished
350                                                                ourLog.trace("Setting search status to FINISHED");
351                                                                mySearch.setStatus(SearchStatusEnum.FINISHED);
352                                                                mySearch.setTotalCount(myCountSavedTotal - countBlocked);
353                                                        } else if (myAdditionalPrefetchThresholdsRemaining) {
354                                                                // pass complete
355                                                                ourLog.trace("Setting search status to PASSCMPLET");
356                                                                mySearch.setStatus(SearchStatusEnum.PASSCMPLET);
357                                                                mySearch.setSearchParameterMap(myParams);
358                                                        } else {
359                                                                // also finished
360                                                                ourLog.trace("Setting search status to FINISHED");
361                                                                mySearch.setStatus(SearchStatusEnum.FINISHED);
362                                                                mySearch.setTotalCount(myCountSavedTotal - countBlocked);
363                                                        }
364                                                }
365                                        }
366
367                                        mySearch.setNumFound(myCountSavedTotal);
368                                        mySearch.setNumBlocked(mySearch.getNumBlocked() + countBlocked);
369
370                                        int numSynced;
371                                        synchronized (mySyncedPids) {
372                                                numSynced = mySyncedPids.size();
373                                        }
374
375                                        if (myStorageSettings.getCountSearchResultsUpTo() == null
376                                                        || myStorageSettings.getCountSearchResultsUpTo() <= 0
377                                                        || myStorageSettings.getCountSearchResultsUpTo() <= numSynced) {
378                                                myInitialCollectionLatch.countDown();
379                                        }
380
381                                        doSaveSearch();
382
383                                        ourLog.trace("saveUnsynced() - pre-commit");
384                                });
385                ourLog.trace("saveUnsynced() - post-commit");
386        }
387
388        @SuppressWarnings("rawtypes")
389        private boolean isFinished(final IResultIterator theResultIter) {
390                int skippedCount = theResultIter.getSkippedCount();
391                int nonSkippedCount = theResultIter.getNonSkippedCount();
392                int totalFetched = skippedCount + myCountSavedThisPass + myCountBlockedThisPass;
393
394                if (myMaxResultsToFetch != null && totalFetched < myMaxResultsToFetch) {
395                        // total fetched < max results to fetch -> we've exhausted the search
396                        return true;
397                } else {
398                        if (nonSkippedCount == 0) {
399                                // no skipped resources in this query
400                                if (myParams.getCount() != null) {
401                                        // count supplied
402                                        // if the count is > what we've fetched -> we've exhausted the query
403                                        return myParams.getCount() > totalFetched;
404                                } else {
405                                        // legacy - we have no skipped resources - we are done
406                                        return true;
407                                }
408                        }
409                        // skipped resources means we have more to fetch
410                        return false;
411                }
412        }
413
414        public boolean isNotAborted() {
415                return !myAbortRequested;
416        }
417
418        public void markComplete() {
419                myCompletionLatch.countDown();
420        }
421
422        public CountDownLatch getCompletionLatch() {
423                return myCompletionLatch;
424        }
425
426        /**
427         * Request that the task abort as soon as possible
428         */
429        public void requestImmediateAbort() {
430                myAbortRequested = true;
431        }
432
433        /**
434         * This is the method which actually performs the search.
435         * It is called automatically by the thread pool.
436         */
437        @Override
438        public Void call() {
439                StopWatch sw = new StopWatch();
440                Span span = myParentTransaction.startSpan("db", "query", "search");
441                span.setName("FHIR Database Search");
442                try {
443                        // Create an initial search in the DB and give it an ID
444                        saveSearch();
445
446                        myTxService
447                                        .withRequest(myRequest)
448                                        .withRequestPartitionId(myRequestPartitionId)
449                                        .execute(this::doSearch);
450
451                        mySearchRuntimeDetails.setSearchStatus(mySearch.getStatus());
452                        if (mySearch.getStatus() == SearchStatusEnum.FINISHED) {
453                                HookParams params = new HookParams()
454                                                .add(RequestDetails.class, myRequest)
455                                                .addIfMatchesType(ServletRequestDetails.class, myRequest)
456                                                .add(SearchRuntimeDetails.class, mySearchRuntimeDetails);
457                                CompositeInterceptorBroadcaster.doCallHooks(
458                                                myInterceptorBroadcaster, myRequest, Pointcut.JPA_PERFTRACE_SEARCH_COMPLETE, params);
459                        } else {
460                                HookParams params = new HookParams()
461                                                .add(RequestDetails.class, myRequest)
462                                                .addIfMatchesType(ServletRequestDetails.class, myRequest)
463                                                .add(SearchRuntimeDetails.class, mySearchRuntimeDetails);
464                                CompositeInterceptorBroadcaster.doCallHooks(
465                                                myInterceptorBroadcaster, myRequest, Pointcut.JPA_PERFTRACE_SEARCH_PASS_COMPLETE, params);
466                        }
467
468                        ourLog.trace(
469                                        "Have completed search for [{}{}] and found {} resources in {}ms - Status is {}",
470                                        mySearch.getResourceType(),
471                                        mySearch.getSearchQueryString(),
472                                        mySyncedPids.size(),
473                                        sw.getMillis(),
474                                        mySearch.getStatus());
475
476                } catch (Throwable t) {
477
478                        /*
479                         * Don't print a stack trace for client errors (i.e. requests that
480                         * aren't valid because the client screwed up).. that's just noise
481                         * in the logs and who needs that.
482                         */
483                        boolean logged = false;
484                        if (t instanceof BaseServerResponseException) {
485                                BaseServerResponseException exception = (BaseServerResponseException) t;
486                                if (exception.getStatusCode() >= 400 && exception.getStatusCode() < 500) {
487                                        logged = true;
488                                        ourLog.warn("Failed during search due to invalid request: {}", t.toString());
489                                }
490                        }
491
492                        if (!logged) {
493                                ourLog.error("Failed during search loading after {}ms", sw.getMillis(), t);
494                        }
495                        myUnsyncedPids.clear();
496                        Throwable rootCause = ExceptionUtils.getRootCause(t);
497                        rootCause = defaultIfNull(rootCause, t);
498
499                        String failureMessage = rootCause.getMessage();
500
501                        int failureCode = InternalErrorException.STATUS_CODE;
502                        if (t instanceof BaseServerResponseException) {
503                                failureCode = ((BaseServerResponseException) t).getStatusCode();
504                        }
505
506                        if (HapiSystemProperties.isUnitTestCaptureStackEnabled()) {
507                                failureMessage += "\nStack\n" + ExceptionUtils.getStackTrace(rootCause);
508                        }
509
510                        mySearch.setFailureMessage(failureMessage);
511                        mySearch.setFailureCode(failureCode);
512                        mySearch.setStatus(SearchStatusEnum.FAILED);
513
514                        mySearchRuntimeDetails.setSearchStatus(mySearch.getStatus());
515                        HookParams params = new HookParams()
516                                        .add(RequestDetails.class, myRequest)
517                                        .addIfMatchesType(ServletRequestDetails.class, myRequest)
518                                        .add(SearchRuntimeDetails.class, mySearchRuntimeDetails);
519                        CompositeInterceptorBroadcaster.doCallHooks(
520                                        myInterceptorBroadcaster, myRequest, Pointcut.JPA_PERFTRACE_SEARCH_FAILED, params);
521
522                        saveSearch();
523                        span.captureException(t);
524                } finally {
525                        myOnRemove.accept(mySearch.getUuid());
526
527                        myInitialCollectionLatch.countDown();
528                        markComplete();
529                        span.end();
530                }
531                return null;
532        }
533
534        private void doSaveSearch() {
535                Search newSearch = mySearchCacheSvc.save(mySearch, myRequestPartitionId);
536
537                // mySearchDao.save is not supposed to return null, but in unit tests
538                // it can if the mock search dao isn't set up to handle that
539                if (newSearch != null) {
540                        mySearch = newSearch;
541                }
542        }
543
544        /**
545         * This method actually creates the database query to perform the
546         * search, and starts it.
547         */
548        @SuppressWarnings({"rawtypes", "unchecked"})
549        private void doSearch() {
550                /*
551                 * If the user has explicitly requested a _count, perform a
552                 *
553                 * SELECT COUNT(*) ....
554                 *
555                 * before doing anything else.
556                 */
557                boolean myParamWantOnlyCount = isWantOnlyCount(myParams);
558                boolean myParamOrDefaultWantCount = nonNull(myParams.getSearchTotalMode())
559                                ? isWantCount(myParams)
560                                : SearchParameterMapCalculator.isWantCount(myStorageSettings.getDefaultTotalMode());
561
562                if (myParamWantOnlyCount || myParamOrDefaultWantCount) {
563                        doCountOnlyQuery(myParamWantOnlyCount);
564                        if (myParamWantOnlyCount) {
565                                return;
566                        }
567                }
568
569                ourLog.trace("Done count");
570                ISearchBuilder sb = newSearchBuilder();
571
572                /*
573                 * Figure out how many results we're actually going to fetch from the
574                 * database in this pass. This calculation takes into consideration the
575                 * "pre-fetch thresholds" specified in StorageSettings#getSearchPreFetchThresholds()
576                 * as well as the value of the _count parameter.
577                 */
578                int currentlyLoaded = defaultIfNull(mySearch.getNumFound(), 0);
579                int minWanted = 0;
580
581                // if no count is provided,
582                // we only use the values in SearchPreFetchThresholds
583                // but if there is a count...
584                if (myParams.getCount() != null) {
585                        minWanted = Math.min(myParams.getCount(), myPagingProvider.getMaximumPageSize());
586                        minWanted += currentlyLoaded;
587                }
588
589                // iterate through the search thresholds
590                for (Iterator<Integer> iter =
591                                                myStorageSettings.getSearchPreFetchThresholds().iterator();
592                                iter.hasNext(); ) {
593                        int next = iter.next();
594                        if (next != -1 && next <= currentlyLoaded) {
595                                continue;
596                        }
597
598                        if (next == -1) {
599                                sb.setMaxResultsToFetch(null);
600                        } else {
601                                // we want at least 1 more than our requested amount
602                                // so we know that there are other results
603                                // (in case we get the exact amount back)
604                                myMaxResultsToFetch = Math.max(next, minWanted);
605                                sb.setMaxResultsToFetch(myMaxResultsToFetch + 1);
606                        }
607
608                        if (iter.hasNext()) {
609                                myAdditionalPrefetchThresholdsRemaining = true;
610                        }
611
612                        // If we get here's we've found an appropriate threshold
613                        break;
614                }
615
616                /*
617                 * Provide any PID we loaded in previous search passes to the
618                 * SearchBuilder so that we don't get duplicates coming from running
619                 * the same query again.
620                 *
621                 * We could possibly accomplish this in a different way by using sorted
622                 * results in our SQL query and specifying an offset. I don't actually
623                 * know if that would be faster or not. At some point should test this
624                 * idea.
625                 */
626                if (myPreviouslyAddedResourcePids != null) {
627                        sb.setPreviouslyAddedResourcePids(myPreviouslyAddedResourcePids);
628                        mySyncedPids.addAll(myPreviouslyAddedResourcePids);
629                }
630
631                /*
632                 * createQuery
633                 * Construct the SQL query we'll be sending to the database
634                 *
635                 * NB: (See createCountQuery above)
636                 * We will pass the original myParams here (not a copy)
637                 * because we actually _want_ the mutation of the myParams to happen.
638                 * Specifically because SearchBuilder itself will _expect_
639                 * not to have these parameters when dumping back
640                 * to our DB.
641                 *
642                 * This is an odd implementation behaviour, but the change
643                 * for this will require a lot more handling at higher levels
644                 */
645                try (IResultIterator<JpaPid> resultIterator =
646                                sb.createQuery(myParams, mySearchRuntimeDetails, myRequest, myRequestPartitionId)) {
647                        // resultIterator is SearchBuilder.QueryIterator
648                        assert (resultIterator != null);
649
650                        /*
651                         * The following loop actually loads the PIDs of the resources
652                         * matching the search off of the disk and into memory. After
653                         * every X results, we commit to the HFJ_SEARCH table.
654                         */
655                        int syncSize = mySyncSize;
656                        while (resultIterator.hasNext()) {
657                                myUnsyncedPids.add(resultIterator.next());
658
659                                boolean shouldSync = myUnsyncedPids.size() >= syncSize;
660
661                                if (myStorageSettings.getCountSearchResultsUpTo() != null
662                                                && myStorageSettings.getCountSearchResultsUpTo() > 0
663                                                && myStorageSettings.getCountSearchResultsUpTo() < myUnsyncedPids.size()) {
664                                        shouldSync = false;
665                                }
666
667                                if (myUnsyncedPids.size() > 50000) {
668                                        shouldSync = true;
669                                }
670
671                                // If no abort was requested, bail out
672                                Validate.isTrue(isNotAborted(), "Abort has been requested");
673
674                                if (shouldSync) {
675                                        saveUnsynced(resultIterator);
676                                }
677
678                                if (myLoadingThrottleForUnitTests != null) {
679                                        AsyncUtil.sleep(myLoadingThrottleForUnitTests);
680                                }
681                        }
682
683                        // If no abort was requested, bail out
684                        Validate.isTrue(isNotAborted(), "Abort has been requested");
685
686                        saveUnsynced(resultIterator);
687
688                } catch (IOException e) {
689                        ourLog.error("IO failure during database access", e);
690                        throw new InternalErrorException(Msg.code(1166) + e);
691                }
692        }
693
694        /**
695         * Does the query but only for the count.
696         * @param theParamWantOnlyCount - if count query is wanted only
697         */
698        private void doCountOnlyQuery(boolean theParamWantOnlyCount) {
699                ourLog.trace("Performing count");
700                @SuppressWarnings("rawtypes")
701                ISearchBuilder sb = newSearchBuilder();
702
703                /*
704                 * createCountQuery
705                 * NB: (see createQuery below)
706                 * Because FulltextSearchSvcImpl will (internally)
707                 * mutate the myParams (searchmap),
708                 * (specifically removing the _content and _text filters)
709                 * we will have to clone those parameters here so that
710                 * the "correct" params are used in createQuery below
711                 */
712                Long count = sb.createCountQuery(myParams.clone(), mySearch.getUuid(), myRequest, myRequestPartitionId);
713
714                ourLog.trace("Got count {}", count);
715
716                myTxService
717                                .withRequest(myRequest)
718                                .withRequestPartitionId(myRequestPartitionId)
719                                .execute(() -> {
720                                        mySearch.setTotalCount(count.intValue());
721                                        if (theParamWantOnlyCount) {
722                                                mySearch.setStatus(SearchStatusEnum.FINISHED);
723                                        }
724                                        doSaveSearch();
725                                });
726        }
727}