001/*-
002 * #%L
003 * HAPI FHIR JPA Server
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.search.builder.tasks;
021
022import ca.uhn.fhir.context.FhirContext;
023import ca.uhn.fhir.i18n.Msg;
024import ca.uhn.fhir.interceptor.api.HookParams;
025import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
026import ca.uhn.fhir.interceptor.api.Pointcut;
027import ca.uhn.fhir.interceptor.model.RequestPartitionId;
028import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
029import ca.uhn.fhir.jpa.dao.IResultIterator;
030import ca.uhn.fhir.jpa.dao.ISearchBuilder;
031import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
032import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
033import ca.uhn.fhir.jpa.entity.Search;
034import ca.uhn.fhir.jpa.interceptor.JpaPreResourceAccessDetails;
035import ca.uhn.fhir.jpa.model.dao.JpaPid;
036import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
037import ca.uhn.fhir.jpa.model.search.SearchStatusEnum;
038import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc;
039import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc;
040import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
041import ca.uhn.fhir.jpa.util.QueryParameterUtils;
042import ca.uhn.fhir.jpa.util.SearchParameterMapCalculator;
043import ca.uhn.fhir.rest.api.server.IPreResourceAccessDetails;
044import ca.uhn.fhir.rest.api.server.RequestDetails;
045import ca.uhn.fhir.rest.server.IPagingProvider;
046import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException;
047import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
048import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
049import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster;
050import ca.uhn.fhir.system.HapiSystemProperties;
051import ca.uhn.fhir.util.AsyncUtil;
052import ca.uhn.fhir.util.StopWatch;
053import co.elastic.apm.api.ElasticApm;
054import co.elastic.apm.api.Span;
055import co.elastic.apm.api.Transaction;
056import jakarta.annotation.Nonnull;
057import org.apache.commons.lang3.Validate;
058import org.apache.commons.lang3.exception.ExceptionUtils;
059import org.hl7.fhir.instance.model.api.IBaseResource;
060import org.springframework.transaction.annotation.Propagation;
061
062import java.io.IOException;
063import java.util.ArrayList;
064import java.util.Iterator;
065import java.util.List;
066import java.util.concurrent.Callable;
067import java.util.concurrent.CountDownLatch;
068import java.util.concurrent.TimeUnit;
069import java.util.function.Consumer;
070
071import static ca.uhn.fhir.jpa.util.SearchParameterMapCalculator.isWantCount;
072import static ca.uhn.fhir.jpa.util.SearchParameterMapCalculator.isWantOnlyCount;
073import static java.util.Objects.nonNull;
074import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
075
076/**
077 * A search task is a Callable task that runs in
078 * a thread pool to handle an individual search. One instance
079 * is created for any requested search and runs from the
080 * beginning to the end of the search.
081 * <p>
082 * Understand:
083 * This class executes in its own thread separate from the
084 * web server client thread that made the request. We do that
085 * so that we can return to the client as soon as possible,
086 * but keep the search going in the background (and have
087 * the next page of results ready to go when the client asks).
088 */
089public class SearchTask implements Callable<Void> {
090
091        private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchTask.class);
092        // injected beans
093        protected final HapiTransactionService myTxService;
094        protected final FhirContext myContext;
095        protected final ISearchResultCacheSvc mySearchResultCacheSvc;
096        private final SearchParameterMap myParams;
097        private final String myResourceType;
098        private final ArrayList<JpaPid> mySyncedPids = new ArrayList<>();
099        private final CountDownLatch myInitialCollectionLatch = new CountDownLatch(1);
100        private final CountDownLatch myCompletionLatch;
101        private final ArrayList<JpaPid> myUnsyncedPids = new ArrayList<>();
102        private final RequestDetails myRequest;
103        private final RequestPartitionId myRequestPartitionId;
104        private final SearchRuntimeDetails mySearchRuntimeDetails;
105        private final Transaction myParentTransaction;
106        private final Consumer<String> myOnRemove;
107        private final int mySyncSize;
108        private final Integer myLoadingThrottleForUnitTests;
109        private final IInterceptorBroadcaster myInterceptorBroadcaster;
110        private final SearchBuilderFactory<JpaPid> mySearchBuilderFactory;
111        private final JpaStorageSettings myStorageSettings;
112        private final ISearchCacheSvc mySearchCacheSvc;
113        private final IPagingProvider myPagingProvider;
114        private final IInterceptorBroadcaster myCompositeBroadcaster;
115        private Search mySearch;
116        private boolean myAbortRequested;
117        private int myCountSavedTotal = 0;
118        private int myCountSavedThisPass = 0;
119        private int myCountBlockedThisPass = 0;
120        private boolean myAdditionalPrefetchThresholdsRemaining;
121        private List<JpaPid> myPreviouslyAddedResourcePids;
122        private Integer myMaxResultsToFetch;
123
124        /**
125         * Constructor
126         */
127        @SuppressWarnings({"unchecked", "rawtypes"})
128        public SearchTask(
129                        SearchTaskParameters theCreationParams,
130                        HapiTransactionService theManagedTxManager,
131                        FhirContext theContext,
132                        IInterceptorBroadcaster theInterceptorBroadcaster,
133                        SearchBuilderFactory theSearchBuilderFactory,
134                        ISearchResultCacheSvc theSearchResultCacheSvc,
135                        JpaStorageSettings theStorageSettings,
136                        ISearchCacheSvc theSearchCacheSvc,
137                        IPagingProvider thePagingProvider) {
138                // beans
139                myTxService = theManagedTxManager;
140                myContext = theContext;
141                myInterceptorBroadcaster = theInterceptorBroadcaster;
142                mySearchBuilderFactory = theSearchBuilderFactory;
143                mySearchResultCacheSvc = theSearchResultCacheSvc;
144                myStorageSettings = theStorageSettings;
145                mySearchCacheSvc = theSearchCacheSvc;
146                myPagingProvider = thePagingProvider;
147
148                // values
149                myOnRemove = theCreationParams.OnRemove;
150                mySearch = theCreationParams.Search;
151                myParams = theCreationParams.Params;
152                myResourceType = theCreationParams.ResourceType;
153                myRequest = theCreationParams.Request;
154                myCompletionLatch = new CountDownLatch(1);
155                mySyncSize = theCreationParams.SyncSize;
156                myLoadingThrottleForUnitTests = theCreationParams.getLoadingThrottleForUnitTests();
157
158                mySearchRuntimeDetails = new SearchRuntimeDetails(myRequest, mySearch.getUuid());
159                mySearchRuntimeDetails.setQueryString(myParams.toNormalizedQueryString(myContext));
160                myRequestPartitionId = theCreationParams.RequestPartitionId;
161                myParentTransaction = ElasticApm.currentTransaction();
162                myCompositeBroadcaster =
163                                CompositeInterceptorBroadcaster.newCompositeBroadcaster(myInterceptorBroadcaster, myRequest);
164        }
165
166        protected RequestPartitionId getRequestPartitionId() {
167                return myRequestPartitionId;
168        }
169
170        /**
171         * This method is called by the server HTTP thread, and
172         * will block until at least one page of results have been
173         * fetched from the DB, and will never block after that.
174         */
175        public Integer awaitInitialSync() {
176                ourLog.trace("Awaiting initial sync");
177                do {
178                        ourLog.trace("Search {} aborted: {}", getSearch().getUuid(), !isNotAborted());
179                        if (AsyncUtil.awaitLatchAndThrowInternalErrorExceptionOnInterrupt(
180                                        getInitialCollectionLatch(), 250L, TimeUnit.MILLISECONDS)) {
181                                break;
182                        }
183                } while (getSearch().getStatus() == SearchStatusEnum.LOADING);
184                ourLog.trace("Initial sync completed");
185
186                return getSearch().getTotalCount();
187        }
188
189        public Search getSearch() {
190                return mySearch;
191        }
192
193        public CountDownLatch getInitialCollectionLatch() {
194                return myInitialCollectionLatch;
195        }
196
197        public void setPreviouslyAddedResourcePids(List<JpaPid> thePreviouslyAddedResourcePids) {
198                myPreviouslyAddedResourcePids = thePreviouslyAddedResourcePids;
199                myCountSavedTotal = myPreviouslyAddedResourcePids.size();
200        }
201
202        @SuppressWarnings("rawtypes")
203        private ISearchBuilder newSearchBuilder() {
204                Class<? extends IBaseResource> resourceTypeClass =
205                                myContext.getResourceDefinition(myResourceType).getImplementingClass();
206                return mySearchBuilderFactory.newSearchBuilder(myResourceType, resourceTypeClass);
207        }
208
209        @Nonnull
210        public List<JpaPid> getResourcePids(int theFromIndex, int theToIndex) {
211                ourLog.debug("Requesting search PIDs from {}-{}", theFromIndex, theToIndex);
212
213                boolean keepWaiting;
214                do {
215                        synchronized (mySyncedPids) {
216                                ourLog.trace("Search status is {}", mySearch.getStatus());
217                                boolean haveEnoughResults = mySyncedPids.size() >= theToIndex;
218                                if (!haveEnoughResults) {
219                                        switch (mySearch.getStatus()) {
220                                                case LOADING:
221                                                        keepWaiting = true;
222                                                        break;
223                                                case PASSCMPLET:
224                                                        /*
225                                                         * If we get here, it means that the user requested resources that crossed the
226                                                         * current pre-fetch boundary. For example, if the prefetch threshold is 50 and the
227                                                         * user has requested resources 0-60, then they would get 0-50 back but the search
228                                                         * coordinator would then stop searching.SearchCoordinatorSvcImplTest
229                                                         */
230                                                        keepWaiting = false;
231                                                        break;
232                                                case FAILED:
233                                                case FINISHED:
234                                                case GONE:
235                                                default:
236                                                        keepWaiting = false;
237                                                        break;
238                                        }
239                                } else {
240                                        keepWaiting = false;
241                                }
242                        }
243
244                        if (keepWaiting) {
245                                ourLog.info(
246                                                "Waiting as we only have {} results - Search status: {}",
247                                                mySyncedPids.size(),
248                                                mySearch.getStatus());
249                                AsyncUtil.sleep(500L);
250                        }
251                } while (keepWaiting);
252
253                ourLog.debug("Proceeding, as we have {} results", mySyncedPids.size());
254
255                ArrayList<JpaPid> retVal = new ArrayList<>();
256                synchronized (mySyncedPids) {
257                        QueryParameterUtils.verifySearchHasntFailedOrThrowInternalErrorException(mySearch);
258
259                        int toIndex = theToIndex;
260                        if (mySyncedPids.size() < toIndex) {
261                                toIndex = mySyncedPids.size();
262                        }
263                        for (int i = theFromIndex; i < toIndex; i++) {
264                                retVal.add(mySyncedPids.get(i));
265                        }
266                }
267
268                ourLog.trace(
269                                "Done syncing results - Wanted {}-{} and returning {} of {}",
270                                theFromIndex,
271                                theToIndex,
272                                retVal.size(),
273                                mySyncedPids.size());
274
275                return retVal;
276        }
277
278        public void saveSearch() {
279                myTxService
280                                .withRequest(myRequest)
281                                .withRequestPartitionId(myRequestPartitionId)
282                                .withPropagation(Propagation.REQUIRES_NEW)
283                                .execute(this::doSaveSearch);
284        }
285
286        @SuppressWarnings("rawtypes")
287        private void saveUnsynced(final IResultIterator theResultIter) {
288                myTxService
289                                .withRequest(myRequest)
290                                .withRequestPartitionId(myRequestPartitionId)
291                                .execute(() -> {
292                                        if (mySearch.getId() == null) {
293                                                doSaveSearch();
294                                        }
295
296                                        ArrayList<JpaPid> unsyncedPids = myUnsyncedPids;
297                                        int countBlocked = 0;
298
299                                        // Interceptor call: STORAGE_PREACCESS_RESOURCES
300                                        // This can be used to remove results from the search result details before
301                                        // the user has a chance to know that they were in the results
302                                        if (mySearchRuntimeDetails.getRequestDetails() != null && !unsyncedPids.isEmpty()) {
303                                                JpaPreResourceAccessDetails accessDetails =
304                                                                new JpaPreResourceAccessDetails(unsyncedPids, this::newSearchBuilder);
305                                                HookParams params = new HookParams()
306                                                                .add(IPreResourceAccessDetails.class, accessDetails)
307                                                                .add(RequestDetails.class, mySearchRuntimeDetails.getRequestDetails())
308                                                                .addIfMatchesType(
309                                                                                ServletRequestDetails.class, mySearchRuntimeDetails.getRequestDetails());
310                                                myCompositeBroadcaster.callHooks(Pointcut.STORAGE_PREACCESS_RESOURCES, params);
311
312                                                for (int i = unsyncedPids.size() - 1; i >= 0; i--) {
313                                                        if (accessDetails.isDontReturnResourceAtIndex(i)) {
314                                                                unsyncedPids.remove(i);
315                                                                myCountBlockedThisPass++;
316                                                                myCountSavedTotal++;
317                                                                countBlocked++;
318                                                        }
319                                                }
320                                        }
321
322                                        // Actually store the results in the query cache storage
323                                        myCountSavedTotal += unsyncedPids.size();
324                                        myCountSavedThisPass += unsyncedPids.size();
325                                        mySearchResultCacheSvc.storeResults(
326                                                        mySearch, mySyncedPids, unsyncedPids, myRequest, getRequestPartitionId());
327
328                                        synchronized (mySyncedPids) {
329                                                int numSyncedThisPass = unsyncedPids.size();
330                                                ourLog.trace(
331                                                                "Syncing {} search results - Have more: {}",
332                                                                numSyncedThisPass,
333                                                                theResultIter.hasNext());
334                                                mySyncedPids.addAll(unsyncedPids);
335                                                unsyncedPids.clear();
336
337                                                if (!theResultIter.hasNext()) {
338                                                        int skippedCount = theResultIter.getSkippedCount();
339                                                        ourLog.trace(
340                                                                        "MaxToFetch[{}] SkippedCount[{}] CountSavedThisPass[{}] CountSavedThisTotal[{}] AdditionalPrefetchRemaining[{}]",
341                                                                        myMaxResultsToFetch,
342                                                                        skippedCount,
343                                                                        myCountSavedThisPass,
344                                                                        myCountSavedTotal,
345                                                                        myAdditionalPrefetchThresholdsRemaining);
346
347                                                        if (isFinished(theResultIter)) {
348                                                                // finished
349                                                                ourLog.trace("Setting search status to FINISHED");
350                                                                mySearch.setStatus(SearchStatusEnum.FINISHED);
351                                                                mySearch.setTotalCount(myCountSavedTotal - countBlocked);
352                                                        } else if (myAdditionalPrefetchThresholdsRemaining) {
353                                                                // pass complete
354                                                                ourLog.trace("Setting search status to PASSCMPLET");
355                                                                mySearch.setStatus(SearchStatusEnum.PASSCMPLET);
356                                                                mySearch.setSearchParameterMap(myParams);
357                                                        } else {
358                                                                // also finished
359                                                                ourLog.trace("Setting search status to FINISHED");
360                                                                mySearch.setStatus(SearchStatusEnum.FINISHED);
361                                                                mySearch.setTotalCount(myCountSavedTotal - countBlocked);
362                                                        }
363                                                }
364                                        }
365
366                                        mySearch.setNumFound(myCountSavedTotal);
367                                        mySearch.setNumBlocked(mySearch.getNumBlocked() + countBlocked);
368
369                                        int numSynced;
370                                        synchronized (mySyncedPids) {
371                                                numSynced = mySyncedPids.size();
372                                        }
373
374                                        if (myStorageSettings.getCountSearchResultsUpTo() == null
375                                                        || myStorageSettings.getCountSearchResultsUpTo() <= 0
376                                                        || myStorageSettings.getCountSearchResultsUpTo() <= numSynced) {
377                                                myInitialCollectionLatch.countDown();
378                                        }
379
380                                        doSaveSearch();
381
382                                        ourLog.trace("saveUnsynced() - pre-commit");
383                                });
384                ourLog.trace("saveUnsynced() - post-commit");
385        }
386
387        @SuppressWarnings("rawtypes")
388        private boolean isFinished(final IResultIterator theResultIter) {
389                int skippedCount = theResultIter.getSkippedCount();
390                int nonSkippedCount = theResultIter.getNonSkippedCount();
391                int totalFetched = skippedCount + myCountSavedThisPass + myCountBlockedThisPass;
392
393                if (myMaxResultsToFetch != null && totalFetched < myMaxResultsToFetch) {
394                        // total fetched < max results to fetch -> we've exhausted the search
395                        return true;
396                } else {
397                        if (nonSkippedCount == 0) {
398                                // no skipped resources in this query
399                                if (myParams.getCount() != null) {
400                                        // count supplied
401                                        // if the count is > what we've fetched -> we've exhausted the query
402                                        return myParams.getCount() > totalFetched;
403                                } else {
404                                        // legacy - we have no skipped resources - we are done
405                                        return true;
406                                }
407                        }
408                        // skipped resources means we have more to fetch
409                        return false;
410                }
411        }
412
413        public boolean isNotAborted() {
414                return !myAbortRequested;
415        }
416
417        public void markComplete() {
418                myCompletionLatch.countDown();
419        }
420
421        public CountDownLatch getCompletionLatch() {
422                return myCompletionLatch;
423        }
424
425        /**
426         * Request that the task abort as soon as possible
427         */
428        public void requestImmediateAbort() {
429                myAbortRequested = true;
430        }
431
432        /**
433         * This is the method which actually performs the search.
434         * It is called automatically by the thread pool.
435         */
436        @Override
437        public Void call() {
438                StopWatch sw = new StopWatch();
439                Span span = myParentTransaction.startSpan("db", "query", "search");
440                span.setName("FHIR Database Search");
441                try {
442                        // Create an initial search in the DB and give it an ID
443                        saveSearch();
444
445                        myTxService
446                                        .withRequest(myRequest)
447                                        .withRequestPartitionId(myRequestPartitionId)
448                                        .execute(this::doSearch);
449
450                        mySearchRuntimeDetails.setSearchStatus(mySearch.getStatus());
451                        if (mySearch.getStatus() == SearchStatusEnum.FINISHED) {
452                                HookParams params = new HookParams()
453                                                .add(RequestDetails.class, myRequest)
454                                                .addIfMatchesType(ServletRequestDetails.class, myRequest)
455                                                .add(SearchRuntimeDetails.class, mySearchRuntimeDetails);
456                                myCompositeBroadcaster.callHooks(Pointcut.JPA_PERFTRACE_SEARCH_COMPLETE, params);
457                        } else {
458                                HookParams params = new HookParams()
459                                                .add(RequestDetails.class, myRequest)
460                                                .addIfMatchesType(ServletRequestDetails.class, myRequest)
461                                                .add(SearchRuntimeDetails.class, mySearchRuntimeDetails);
462                                myCompositeBroadcaster.callHooks(Pointcut.JPA_PERFTRACE_SEARCH_PASS_COMPLETE, params);
463                        }
464
465                        ourLog.trace(
466                                        "Have completed search for [{}{}] and found {} resources in {}ms - Status is {}",
467                                        mySearch.getResourceType(),
468                                        mySearch.getSearchQueryString(),
469                                        mySyncedPids.size(),
470                                        sw.getMillis(),
471                                        mySearch.getStatus());
472
473                } catch (Throwable t) {
474
475                        /*
476                         * Don't print a stack trace for client errors (i.e. requests that
477                         * aren't valid because the client screwed up).. that's just noise
478                         * in the logs and who needs that.
479                         */
480                        boolean logged = false;
481                        if (t instanceof BaseServerResponseException) {
482                                BaseServerResponseException exception = (BaseServerResponseException) t;
483                                if (exception.getStatusCode() >= 400 && exception.getStatusCode() < 500) {
484                                        logged = true;
485                                        ourLog.warn("Failed during search due to invalid request: {}", t.toString());
486                                }
487                        }
488
489                        if (!logged) {
490                                ourLog.error("Failed during search loading after {}ms", sw.getMillis(), t);
491                        }
492                        myUnsyncedPids.clear();
493                        Throwable rootCause = ExceptionUtils.getRootCause(t);
494                        rootCause = defaultIfNull(rootCause, t);
495
496                        String failureMessage = rootCause.getMessage();
497
498                        int failureCode = InternalErrorException.STATUS_CODE;
499                        if (t instanceof BaseServerResponseException) {
500                                failureCode = ((BaseServerResponseException) t).getStatusCode();
501                        }
502
503                        if (HapiSystemProperties.isUnitTestCaptureStackEnabled()) {
504                                failureMessage += "\nStack\n" + ExceptionUtils.getStackTrace(rootCause);
505                        }
506
507                        mySearch.setFailureMessage(failureMessage);
508                        mySearch.setFailureCode(failureCode);
509                        mySearch.setStatus(SearchStatusEnum.FAILED);
510
511                        mySearchRuntimeDetails.setSearchStatus(mySearch.getStatus());
512                        HookParams params = new HookParams()
513                                        .add(RequestDetails.class, myRequest)
514                                        .addIfMatchesType(ServletRequestDetails.class, myRequest)
515                                        .add(SearchRuntimeDetails.class, mySearchRuntimeDetails);
516                        myCompositeBroadcaster.callHooks(Pointcut.JPA_PERFTRACE_SEARCH_FAILED, params);
517
518                        saveSearch();
519                        span.captureException(t);
520                } finally {
521                        myOnRemove.accept(mySearch.getUuid());
522
523                        myInitialCollectionLatch.countDown();
524                        markComplete();
525                        span.end();
526                }
527                return null;
528        }
529
530        private void doSaveSearch() {
531                Search newSearch = mySearchCacheSvc.save(mySearch, myRequestPartitionId);
532
533                // mySearchDao.save is not supposed to return null, but in unit tests
534                // it can if the mock search dao isn't set up to handle that
535                if (newSearch != null) {
536                        mySearch = newSearch;
537                }
538        }
539
540        /**
541         * This method actually creates the database query to perform the
542         * search, and starts it.
543         */
544        @SuppressWarnings({"rawtypes", "unchecked"})
545        private void doSearch() {
546                /*
547                 * If the user has explicitly requested a _count, perform a
548                 *
549                 * SELECT COUNT(*) ....
550                 *
551                 * before doing anything else.
552                 */
553                boolean myParamWantOnlyCount = isWantOnlyCount(myParams);
554                boolean myParamOrDefaultWantCount = nonNull(myParams.getSearchTotalMode())
555                                ? isWantCount(myParams)
556                                : SearchParameterMapCalculator.isWantCount(myStorageSettings.getDefaultTotalMode());
557
558                if (myParamWantOnlyCount || myParamOrDefaultWantCount) {
559                        doCountOnlyQuery(myParamWantOnlyCount);
560                        if (myParamWantOnlyCount) {
561                                return;
562                        }
563                }
564
565                ourLog.trace("Done count");
566                ISearchBuilder sb = newSearchBuilder();
567
568                /*
569                 * Figure out how many results we're actually going to fetch from the
570                 * database in this pass. This calculation takes into consideration the
571                 * "pre-fetch thresholds" specified in StorageSettings#getSearchPreFetchThresholds()
572                 * as well as the value of the _count parameter.
573                 */
574                int currentlyLoaded = defaultIfNull(mySearch.getNumFound(), 0);
575                int minWanted = 0;
576
577                // if no count is provided,
578                // we only use the values in SearchPreFetchThresholds
579                // but if there is a count...
580                if (myParams.getCount() != null) {
581                        minWanted = Math.min(myParams.getCount(), myPagingProvider.getMaximumPageSize());
582                        minWanted += currentlyLoaded;
583                }
584
585                // iterate through the search thresholds
586                for (Iterator<Integer> iter =
587                                                myStorageSettings.getSearchPreFetchThresholds().iterator();
588                                iter.hasNext(); ) {
589                        int next = iter.next();
590                        if (next != -1 && next <= currentlyLoaded) {
591                                continue;
592                        }
593
594                        if (next == -1) {
595                                sb.setMaxResultsToFetch(null);
596                        } else {
597                                // we want at least 1 more than our requested amount
598                                // so we know that there are other results
599                                // (in case we get the exact amount back)
600                                myMaxResultsToFetch = Math.max(next, minWanted);
601                                sb.setMaxResultsToFetch(myMaxResultsToFetch + 1);
602                        }
603
604                        if (iter.hasNext()) {
605                                myAdditionalPrefetchThresholdsRemaining = true;
606                        }
607
608                        // If we get here's we've found an appropriate threshold
609                        break;
610                }
611
612                /*
613                 * Provide any PID we loaded in previous search passes to the
614                 * SearchBuilder so that we don't get duplicates coming from running
615                 * the same query again.
616                 *
617                 * We could possibly accomplish this in a different way by using sorted
618                 * results in our SQL query and specifying an offset. I don't actually
619                 * know if that would be faster or not. At some point should test this
620                 * idea.
621                 */
622                if (myPreviouslyAddedResourcePids != null) {
623                        sb.setPreviouslyAddedResourcePids(myPreviouslyAddedResourcePids);
624                        mySyncedPids.addAll(myPreviouslyAddedResourcePids);
625                }
626
627                /*
628                 * createQuery
629                 * Construct the SQL query we'll be sending to the database
630                 *
631                 * NB: (See createCountQuery above)
632                 * We will pass the original myParams here (not a copy)
633                 * because we actually _want_ the mutation of the myParams to happen.
634                 * Specifically because SearchBuilder itself will _expect_
635                 * not to have these parameters when dumping back
636                 * to our DB.
637                 *
638                 * This is an odd implementation behaviour, but the change
639                 * for this will require a lot more handling at higher levels
640                 */
641                try (IResultIterator<JpaPid> resultIterator =
642                                sb.createQuery(myParams, mySearchRuntimeDetails, myRequest, myRequestPartitionId)) {
643                        // resultIterator is SearchBuilder.QueryIterator
644                        assert (resultIterator != null);
645
646                        /*
647                         * The following loop actually loads the PIDs of the resources
648                         * matching the search off of the disk and into memory. After
649                         * every X results, we commit to the HFJ_SEARCH table.
650                         */
651                        int syncSize = mySyncSize;
652                        while (resultIterator.hasNext()) {
653                                myUnsyncedPids.add(resultIterator.next());
654
655                                boolean shouldSync = myUnsyncedPids.size() >= syncSize;
656
657                                if (myStorageSettings.getCountSearchResultsUpTo() != null
658                                                && myStorageSettings.getCountSearchResultsUpTo() > 0
659                                                && myStorageSettings.getCountSearchResultsUpTo() < myUnsyncedPids.size()) {
660                                        shouldSync = false;
661                                }
662
663                                if (myUnsyncedPids.size() > 50000) {
664                                        shouldSync = true;
665                                }
666
667                                // If no abort was requested, bail out
668                                Validate.isTrue(isNotAborted(), "Abort has been requested");
669
670                                if (shouldSync) {
671                                        saveUnsynced(resultIterator);
672                                }
673
674                                if (myLoadingThrottleForUnitTests != null) {
675                                        AsyncUtil.sleep(myLoadingThrottleForUnitTests);
676                                }
677                        }
678
679                        // If no abort was requested, bail out
680                        Validate.isTrue(isNotAborted(), "Abort has been requested");
681
682                        saveUnsynced(resultIterator);
683
684                } catch (IOException e) {
685                        ourLog.error("IO failure during database access", e);
686                        throw new InternalErrorException(Msg.code(1166) + e);
687                }
688        }
689
690        /**
691         * Does the query but only for the count.
692         * @param theParamWantOnlyCount - if count query is wanted only
693         */
694        private void doCountOnlyQuery(boolean theParamWantOnlyCount) {
695                ourLog.trace("Performing count");
696                @SuppressWarnings("rawtypes")
697                ISearchBuilder sb = newSearchBuilder();
698
699                /*
700                 * createCountQuery
701                 * NB: (see createQuery below)
702                 * Because FulltextSearchSvcImpl will (internally)
703                 * mutate the myParams (searchmap),
704                 * (specifically removing the _content and _text filters)
705                 * we will have to clone those parameters here so that
706                 * the "correct" params are used in createQuery below
707                 */
708                Long count = sb.createCountQuery(myParams.clone(), mySearch.getUuid(), myRequest, myRequestPartitionId);
709
710                ourLog.trace("Got count {}", count);
711
712                myTxService
713                                .withRequest(myRequest)
714                                .withRequestPartitionId(myRequestPartitionId)
715                                .execute(() -> {
716                                        mySearch.setTotalCount(count.intValue());
717                                        if (theParamWantOnlyCount) {
718                                                mySearch.setStatus(SearchStatusEnum.FINISHED);
719                                        }
720                                        doSaveSearch();
721                                });
722        }
723}