
001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.search.builder.tasks; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.i18n.Msg; 024import ca.uhn.fhir.interceptor.api.HookParams; 025import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; 026import ca.uhn.fhir.interceptor.api.Pointcut; 027import ca.uhn.fhir.interceptor.model.RequestPartitionId; 028import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; 029import ca.uhn.fhir.jpa.dao.IResultIterator; 030import ca.uhn.fhir.jpa.dao.ISearchBuilder; 031import ca.uhn.fhir.jpa.dao.SearchBuilderFactory; 032import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; 033import ca.uhn.fhir.jpa.entity.Search; 034import ca.uhn.fhir.jpa.interceptor.JpaPreResourceAccessDetails; 035import ca.uhn.fhir.jpa.model.dao.JpaPid; 036import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails; 037import ca.uhn.fhir.jpa.model.search.SearchStatusEnum; 038import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc; 039import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc; 040import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 041import ca.uhn.fhir.jpa.util.QueryParameterUtils; 042import ca.uhn.fhir.jpa.util.SearchParameterMapCalculator; 043import ca.uhn.fhir.parser.DataFormatException; 044import ca.uhn.fhir.rest.api.server.IPreResourceAccessDetails; 045import ca.uhn.fhir.rest.api.server.RequestDetails; 046import ca.uhn.fhir.rest.server.IPagingProvider; 047import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException; 048import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 049import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; 050import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails; 051import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster; 052import ca.uhn.fhir.system.HapiSystemProperties; 053import ca.uhn.fhir.util.AsyncUtil; 054import ca.uhn.fhir.util.StopWatch; 055import co.elastic.apm.api.ElasticApm; 056import co.elastic.apm.api.Span; 057import co.elastic.apm.api.Transaction; 058import jakarta.annotation.Nonnull; 059import org.apache.commons.lang3.Validate; 060import org.apache.commons.lang3.exception.ExceptionUtils; 061import org.hl7.fhir.instance.model.api.IBaseResource; 062import org.springframework.transaction.annotation.Propagation; 063 064import java.io.IOException; 065import java.util.ArrayList; 066import java.util.Iterator; 067import java.util.List; 068import java.util.concurrent.Callable; 069import java.util.concurrent.CountDownLatch; 070import java.util.concurrent.TimeUnit; 071import java.util.function.Consumer; 072 073import static ca.uhn.fhir.jpa.util.SearchParameterMapCalculator.isWantCount; 074import static ca.uhn.fhir.jpa.util.SearchParameterMapCalculator.isWantOnlyCount; 075import static java.util.Objects.nonNull; 076import static org.apache.commons.lang3.ObjectUtils.defaultIfNull; 077 078/** 079 * A search task is a Callable task that runs in 080 * a thread pool to handle an individual search. One instance 081 * is created for any requested search and runs from the 082 * beginning to the end of the search. 083 * <p> 084 * Understand: 085 * This class executes in its own thread separate from the 086 * web server client thread that made the request. We do that 087 * so that we can return to the client as soon as possible, 088 * but keep the search going in the background (and have 089 * the next page of results ready to go when the client asks). 090 */ 091public class SearchTask implements Callable<Void> { 092 093 private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchTask.class); 094 // injected beans 095 protected final HapiTransactionService myTxService; 096 protected final FhirContext myContext; 097 protected final ISearchResultCacheSvc mySearchResultCacheSvc; 098 private final SearchParameterMap myParams; 099 private final String myResourceType; 100 private final ArrayList<JpaPid> mySyncedPids = new ArrayList<>(); 101 private final CountDownLatch myInitialCollectionLatch = new CountDownLatch(1); 102 private final CountDownLatch myCompletionLatch; 103 private final ArrayList<JpaPid> myUnsyncedPids = new ArrayList<>(); 104 private final RequestDetails myRequest; 105 private final RequestPartitionId myRequestPartitionId; 106 private final SearchRuntimeDetails mySearchRuntimeDetails; 107 private final Transaction myParentTransaction; 108 private final Consumer<String> myOnRemove; 109 private final int mySyncSize; 110 private final Integer myLoadingThrottleForUnitTests; 111 private final IInterceptorBroadcaster myInterceptorBroadcaster; 112 private final SearchBuilderFactory<JpaPid> mySearchBuilderFactory; 113 private final JpaStorageSettings myStorageSettings; 114 private final ISearchCacheSvc mySearchCacheSvc; 115 private final IPagingProvider myPagingProvider; 116 private final IInterceptorBroadcaster myCompositeBroadcaster; 117 private Search mySearch; 118 private boolean myAbortRequested; 119 private int myCountSavedTotal = 0; 120 private int myCountSavedThisPass = 0; 121 private int myCountBlockedThisPass = 0; 122 private boolean myAdditionalPrefetchThresholdsRemaining; 123 private List<JpaPid> myPreviouslyAddedResourcePids; 124 // The max number of results that we request from the search 125 // Note that this is set using the configured pre-fetch thresholds, maximum page size, and/or the client provided 126 // _count parameter 127 private Integer myMaxResultsToFetch; 128 129 /** 130 * Constructor 131 */ 132 @SuppressWarnings({"unchecked", "rawtypes"}) 133 public SearchTask( 134 SearchTaskParameters theCreationParams, 135 HapiTransactionService theManagedTxManager, 136 FhirContext theContext, 137 IInterceptorBroadcaster theInterceptorBroadcaster, 138 SearchBuilderFactory theSearchBuilderFactory, 139 ISearchResultCacheSvc theSearchResultCacheSvc, 140 JpaStorageSettings theStorageSettings, 141 ISearchCacheSvc theSearchCacheSvc, 142 IPagingProvider thePagingProvider) { 143 // beans 144 myTxService = theManagedTxManager; 145 myContext = theContext; 146 myInterceptorBroadcaster = theInterceptorBroadcaster; 147 mySearchBuilderFactory = theSearchBuilderFactory; 148 mySearchResultCacheSvc = theSearchResultCacheSvc; 149 myStorageSettings = theStorageSettings; 150 mySearchCacheSvc = theSearchCacheSvc; 151 myPagingProvider = thePagingProvider; 152 153 // values 154 myOnRemove = theCreationParams.OnRemove; 155 mySearch = theCreationParams.Search; 156 myParams = theCreationParams.Params; 157 myResourceType = theCreationParams.ResourceType; 158 myRequest = theCreationParams.Request; 159 myCompletionLatch = new CountDownLatch(1); 160 mySyncSize = theCreationParams.SyncSize; 161 myLoadingThrottleForUnitTests = theCreationParams.getLoadingThrottleForUnitTests(); 162 163 mySearchRuntimeDetails = new SearchRuntimeDetails(myRequest, mySearch.getUuid()); 164 mySearchRuntimeDetails.setQueryString(myParams.toNormalizedQueryString(myContext)); 165 myRequestPartitionId = theCreationParams.RequestPartitionId; 166 myParentTransaction = ElasticApm.currentTransaction(); 167 myCompositeBroadcaster = 168 CompositeInterceptorBroadcaster.newCompositeBroadcaster(myInterceptorBroadcaster, myRequest); 169 } 170 171 protected RequestPartitionId getRequestPartitionId() { 172 return myRequestPartitionId; 173 } 174 175 /** 176 * This method is called by the server HTTP thread, and 177 * will block until at least one page of results have been 178 * fetched from the DB, and will never block after that. 179 */ 180 public Integer awaitInitialSync() { 181 ourLog.trace("Awaiting initial sync"); 182 do { 183 ourLog.trace("Search {} aborted: {}", getSearch().getUuid(), !isNotAborted()); 184 if (AsyncUtil.awaitLatchAndThrowInternalErrorExceptionOnInterrupt( 185 getInitialCollectionLatch(), 250L, TimeUnit.MILLISECONDS)) { 186 break; 187 } 188 } while (getSearch().getStatus() == SearchStatusEnum.LOADING); 189 ourLog.trace("Initial sync completed"); 190 191 return getSearch().getTotalCount(); 192 } 193 194 public Search getSearch() { 195 return mySearch; 196 } 197 198 public CountDownLatch getInitialCollectionLatch() { 199 return myInitialCollectionLatch; 200 } 201 202 public void setPreviouslyAddedResourcePids(List<JpaPid> thePreviouslyAddedResourcePids) { 203 myPreviouslyAddedResourcePids = thePreviouslyAddedResourcePids; 204 myCountSavedTotal = myPreviouslyAddedResourcePids.size(); 205 } 206 207 @SuppressWarnings("rawtypes") 208 private ISearchBuilder newSearchBuilder() { 209 Class<? extends IBaseResource> resourceTypeClass = 210 myContext.getResourceDefinition(myResourceType).getImplementingClass(); 211 return mySearchBuilderFactory.newSearchBuilder(myResourceType, resourceTypeClass); 212 } 213 214 @Nonnull 215 public List<JpaPid> getResourcePids(int theFromIndex, int theToIndex) { 216 ourLog.debug("Requesting search PIDs from {}-{}", theFromIndex, theToIndex); 217 218 boolean keepWaiting; 219 do { 220 synchronized (mySyncedPids) { 221 ourLog.trace("Search status is {}", mySearch.getStatus()); 222 boolean haveEnoughResults = mySyncedPids.size() >= theToIndex; 223 if (!haveEnoughResults) { 224 switch (mySearch.getStatus()) { 225 case LOADING: 226 keepWaiting = true; 227 break; 228 case PASSCMPLET: 229 /* 230 * If we get here, it means that the user requested resources that crossed the 231 * current pre-fetch boundary. For example, if the prefetch threshold is 50 and the 232 * user has requested resources 0-60, then they would get 0-50 back but the search 233 * coordinator would then stop searching.SearchCoordinatorSvcImplTest 234 */ 235 keepWaiting = false; 236 break; 237 case FAILED: 238 case FINISHED: 239 case GONE: 240 default: 241 keepWaiting = false; 242 break; 243 } 244 } else { 245 keepWaiting = false; 246 } 247 } 248 249 if (keepWaiting) { 250 ourLog.info( 251 "Waiting as we only have {} results - Search status: {}", 252 mySyncedPids.size(), 253 mySearch.getStatus()); 254 AsyncUtil.sleep(500L); 255 } 256 } while (keepWaiting); 257 258 ourLog.debug("Proceeding, as we have {} results", mySyncedPids.size()); 259 260 ArrayList<JpaPid> retVal = new ArrayList<>(); 261 synchronized (mySyncedPids) { 262 QueryParameterUtils.verifySearchHasntFailedOrThrowInternalErrorException(mySearch); 263 264 int toIndex = theToIndex; 265 if (mySyncedPids.size() < toIndex) { 266 toIndex = mySyncedPids.size(); 267 } 268 for (int i = theFromIndex; i < toIndex; i++) { 269 retVal.add(mySyncedPids.get(i)); 270 } 271 } 272 273 ourLog.trace( 274 "Done syncing results - Wanted {}-{} and returning {} of {}", 275 theFromIndex, 276 theToIndex, 277 retVal.size(), 278 mySyncedPids.size()); 279 280 return retVal; 281 } 282 283 public void saveSearch() { 284 myTxService 285 .withRequest(myRequest) 286 .withRequestPartitionId(myRequestPartitionId) 287 .withPropagation(Propagation.REQUIRES_NEW) 288 .execute(this::doSaveSearch); 289 } 290 291 @SuppressWarnings("rawtypes") 292 private void saveUnsynced(final IResultIterator theResultIter) { 293 myTxService 294 .withRequest(myRequest) 295 .withRequestPartitionId(myRequestPartitionId) 296 .execute(() -> { 297 if (mySearch.getId() == null) { 298 doSaveSearch(); 299 } 300 301 ArrayList<JpaPid> unsyncedPids = myUnsyncedPids; 302 int countBlocked = 0; 303 304 // Interceptor call: STORAGE_PREACCESS_RESOURCES 305 // This can be used to remove results from the search result details before 306 // the user has a chance to know that they were in the results 307 if (mySearchRuntimeDetails.getRequestDetails() != null && !unsyncedPids.isEmpty()) { 308 JpaPreResourceAccessDetails accessDetails = 309 new JpaPreResourceAccessDetails(unsyncedPids, this::newSearchBuilder); 310 HookParams params = new HookParams() 311 .add(IPreResourceAccessDetails.class, accessDetails) 312 .add(RequestDetails.class, mySearchRuntimeDetails.getRequestDetails()) 313 .addIfMatchesType( 314 ServletRequestDetails.class, mySearchRuntimeDetails.getRequestDetails()); 315 myCompositeBroadcaster.callHooks(Pointcut.STORAGE_PREACCESS_RESOURCES, params); 316 317 for (int i = unsyncedPids.size() - 1; i >= 0; i--) { 318 if (accessDetails.isDontReturnResourceAtIndex(i)) { 319 unsyncedPids.remove(i); 320 myCountBlockedThisPass++; 321 myCountSavedTotal++; 322 countBlocked++; 323 } 324 } 325 } 326 327 // Actually store the results in the query cache storage 328 myCountSavedTotal += unsyncedPids.size(); 329 myCountSavedThisPass += unsyncedPids.size(); 330 mySearchResultCacheSvc.storeResults( 331 mySearch, mySyncedPids, unsyncedPids, myRequest, getRequestPartitionId()); 332 333 synchronized (mySyncedPids) { 334 int numSyncedThisPass = unsyncedPids.size(); 335 ourLog.trace( 336 "Syncing {} search results - Have more: {}", 337 numSyncedThisPass, 338 theResultIter.hasNext()); 339 mySyncedPids.addAll(unsyncedPids); 340 unsyncedPids.clear(); 341 342 if (!theResultIter.hasNext()) { 343 int skippedCount = theResultIter.getSkippedCount(); 344 ourLog.trace( 345 "MaxToFetch[{}] SkippedCount[{}] CountSavedThisPass[{}] CountSavedThisTotal[{}] AdditionalPrefetchRemaining[{}]", 346 myMaxResultsToFetch, 347 skippedCount, 348 myCountSavedThisPass, 349 myCountSavedTotal, 350 myAdditionalPrefetchThresholdsRemaining); 351 352 if (isFinished(theResultIter)) { 353 // finished 354 ourLog.trace("Setting search status to FINISHED"); 355 mySearch.setStatus(SearchStatusEnum.FINISHED); 356 mySearch.setTotalCount(myCountSavedTotal - countBlocked); 357 } else if (myAdditionalPrefetchThresholdsRemaining) { 358 // pass complete 359 ourLog.trace("Setting search status to PASSCMPLET"); 360 mySearch.setStatus(SearchStatusEnum.PASSCMPLET); 361 mySearch.setSearchParameterMap(myParams); 362 } else { 363 // also finished 364 ourLog.trace("Setting search status to FINISHED"); 365 mySearch.setStatus(SearchStatusEnum.FINISHED); 366 mySearch.setTotalCount(myCountSavedTotal - countBlocked); 367 } 368 } 369 } 370 371 mySearch.setNumFound(myCountSavedTotal); 372 mySearch.setNumBlocked(mySearch.getNumBlocked() + countBlocked); 373 374 int numSynced; 375 synchronized (mySyncedPids) { 376 numSynced = mySyncedPids.size(); 377 } 378 379 if (myStorageSettings.getCountSearchResultsUpTo() == null 380 || myStorageSettings.getCountSearchResultsUpTo() <= 0 381 || myStorageSettings.getCountSearchResultsUpTo() <= numSynced) { 382 myInitialCollectionLatch.countDown(); 383 } 384 385 doSaveSearch(); 386 387 ourLog.trace("saveUnsynced() - pre-commit"); 388 }); 389 ourLog.trace("saveUnsynced() - post-commit"); 390 } 391 392 @SuppressWarnings("rawtypes") 393 private boolean isFinished(final IResultIterator theResultIter) { 394 int skippedCount = theResultIter.getSkippedCount(); 395 int nonSkippedCount = theResultIter.getNonSkippedCount(); 396 int totalFetched = skippedCount + myCountSavedThisPass + myCountBlockedThisPass; 397 398 if (myMaxResultsToFetch != null && totalFetched < myMaxResultsToFetch) { 399 // total fetched < max results to fetch -> we've exhausted the search 400 return true; 401 } else { 402 if (nonSkippedCount == 0) { 403 // no skipped resources in this query 404 if (myParams.getCount() != null) { 405 // count supplied 406 // if the count is > what we've fetched -> we've exhausted the query 407 return myParams.getCount() > totalFetched; 408 } else { 409 // legacy - we have no skipped resources - we are done 410 return true; 411 } 412 } 413 // skipped resources means we have more to fetch 414 return false; 415 } 416 } 417 418 public boolean isNotAborted() { 419 return !myAbortRequested; 420 } 421 422 public void markComplete() { 423 myCompletionLatch.countDown(); 424 } 425 426 public CountDownLatch getCompletionLatch() { 427 return myCompletionLatch; 428 } 429 430 /** 431 * Request that the task abort as soon as possible 432 */ 433 public void requestImmediateAbort() { 434 myAbortRequested = true; 435 } 436 437 /** 438 * This is the method which actually performs the search. 439 * It is called automatically by the thread pool. 440 */ 441 @Override 442 public Void call() { 443 StopWatch sw = new StopWatch(); 444 Span span = myParentTransaction.startSpan("db", "query", "search"); 445 span.setName("FHIR Database Search"); 446 try { 447 // Create an initial search in the DB and give it an ID 448 saveSearch(); 449 450 myTxService 451 .withRequest(myRequest) 452 .withRequestPartitionId(myRequestPartitionId) 453 .execute(this::doSearch); 454 455 mySearchRuntimeDetails.setSearchStatus(mySearch.getStatus()); 456 if (mySearch.getStatus() == SearchStatusEnum.FINISHED) { 457 HookParams params = new HookParams() 458 .add(RequestDetails.class, myRequest) 459 .addIfMatchesType(ServletRequestDetails.class, myRequest) 460 .add(SearchRuntimeDetails.class, mySearchRuntimeDetails); 461 myCompositeBroadcaster.callHooks(Pointcut.JPA_PERFTRACE_SEARCH_COMPLETE, params); 462 } else { 463 HookParams params = new HookParams() 464 .add(RequestDetails.class, myRequest) 465 .addIfMatchesType(ServletRequestDetails.class, myRequest) 466 .add(SearchRuntimeDetails.class, mySearchRuntimeDetails); 467 myCompositeBroadcaster.callHooks(Pointcut.JPA_PERFTRACE_SEARCH_PASS_COMPLETE, params); 468 } 469 470 ourLog.trace( 471 "Have completed search for [{}{}] and found {} resources in {}ms - Status is {}", 472 mySearch.getResourceType(), 473 mySearch.getSearchQueryString(), 474 mySyncedPids.size(), 475 sw.getMillis(), 476 mySearch.getStatus()); 477 478 } catch (Throwable t) { 479 480 /* 481 * Don't print a stack trace for client errors (i.e. requests that 482 * aren't valid because the client screwed up).. that's just noise 483 * in the logs and who needs that. 484 */ 485 boolean logged = false; 486 if (t instanceof BaseServerResponseException) { 487 BaseServerResponseException exception = (BaseServerResponseException) t; 488 if (exception.getStatusCode() >= 400 && exception.getStatusCode() < 500) { 489 logged = true; 490 ourLog.warn("Failed during search due to invalid request: {}", t.toString()); 491 } 492 } 493 494 if (!logged) { 495 ourLog.error("Failed during search loading after {}ms", sw.getMillis(), t); 496 } 497 myUnsyncedPids.clear(); 498 499 markSearchAsFailedWithExceptionDetails(t); 500 501 mySearchRuntimeDetails.setSearchStatus(mySearch.getStatus()); 502 HookParams params = new HookParams() 503 .add(RequestDetails.class, myRequest) 504 .addIfMatchesType(ServletRequestDetails.class, myRequest) 505 .add(SearchRuntimeDetails.class, mySearchRuntimeDetails); 506 myCompositeBroadcaster.callHooks(Pointcut.JPA_PERFTRACE_SEARCH_FAILED, params); 507 508 saveSearch(); 509 span.captureException(t); 510 } finally { 511 myOnRemove.accept(mySearch.getUuid()); 512 513 myInitialCollectionLatch.countDown(); 514 markComplete(); 515 span.end(); 516 } 517 return null; 518 } 519 520 /** 521 * Updates the search entity with failure information based on the exception. 522 * Determines the appropriate HTTP status code based on exception type: 523 * - DataFormatException -> 400 Bad Request (client error) 524 * - BaseServerResponseException -> Use exception's status code 525 * - Other exceptions -> 500 Internal Server Error 526 * 527 * Optionally appends stack trace if unit test capture is enabled. 528 * 529 * @param theThrowable The exception that caused the search to fail 530 */ 531 protected void markSearchAsFailedWithExceptionDetails(Throwable theThrowable) { 532 Throwable rootCause = ExceptionUtils.getRootCause(theThrowable); 533 rootCause = defaultIfNull(rootCause, theThrowable); 534 535 String failureMessage = rootCause.getMessage(); 536 537 int failureCode; 538 if (rootCause instanceof DataFormatException || theThrowable instanceof DataFormatException) { 539 // DataFormatException indicates invalid client input 540 // and should return HTTP 400 Bad Request, not 500 Internal Server Error. 541 failureCode = InvalidRequestException.STATUS_CODE; 542 } else if (theThrowable instanceof BaseServerResponseException baseServerResponseException) { 543 failureCode = baseServerResponseException.getStatusCode(); 544 } else { 545 failureCode = InternalErrorException.STATUS_CODE; 546 } 547 548 if (HapiSystemProperties.isUnitTestCaptureStackEnabled()) { 549 failureMessage += "\nStack\n" + ExceptionUtils.getStackTrace(rootCause); 550 } 551 552 mySearch.setFailureMessage(failureMessage); 553 mySearch.setFailureCode(failureCode); 554 mySearch.setStatus(SearchStatusEnum.FAILED); 555 } 556 557 private void doSaveSearch() { 558 Search newSearch = mySearchCacheSvc.save(mySearch, myRequestPartitionId); 559 560 // mySearchDao.save is not supposed to return null, but in unit tests 561 // it can if the mock search dao isn't set up to handle that 562 if (newSearch != null) { 563 mySearch = newSearch; 564 } 565 } 566 567 /** 568 * This method actually creates the database query to perform the 569 * search, and starts it. 570 */ 571 @SuppressWarnings({"rawtypes", "unchecked"}) 572 private void doSearch() { 573 /* 574 * If the user has explicitly requested a _count, perform a 575 * 576 * SELECT COUNT(*) .... 577 * 578 * before doing anything else. 579 */ 580 boolean myParamWantOnlyCount = isWantOnlyCount(myParams); 581 boolean myParamOrDefaultWantCount = nonNull(myParams.getSearchTotalMode()) 582 ? isWantCount(myParams) 583 : SearchParameterMapCalculator.isWantCount(myStorageSettings.getDefaultTotalMode()); 584 585 if (myParamWantOnlyCount || myParamOrDefaultWantCount) { 586 doCountOnlyQuery(myParamWantOnlyCount); 587 if (myParamWantOnlyCount) { 588 return; 589 } 590 } 591 592 ourLog.trace("Done count"); 593 ISearchBuilder sb = newSearchBuilder(); 594 595 /* 596 * Figure out how many results we're actually going to fetch from the 597 * database in this pass. This calculation takes into consideration the 598 * "pre-fetch thresholds" specified in StorageSettings#getSearchPreFetchThresholds() 599 * as well as the value of the _count parameter. 600 */ 601 int currentlyLoaded = defaultIfNull(mySearch.getNumFound(), 0); 602 int minWanted = 0; 603 604 // if no count is provided, 605 // we only use the values in SearchPreFetchThresholds 606 // but if there is a count... 607 if (myParams.getCount() != null) { 608 minWanted = Math.min(myParams.getCount(), myPagingProvider.getMaximumPageSize()); 609 minWanted += currentlyLoaded; 610 } 611 612 // iterate through the search thresholds 613 for (Iterator<Integer> iter = 614 myStorageSettings.getSearchPreFetchThresholds().iterator(); 615 iter.hasNext(); ) { 616 int next = iter.next(); 617 if (next != -1 && next <= currentlyLoaded) { 618 continue; 619 } 620 621 if (next == -1) { 622 sb.setMaxResultsToFetch(null); 623 /* 624 * If we're past the last prefetch threshold then 625 * we're potentially fetching unlimited amounts of data. 626 * We'll move responsibility for deduplication to the database in this case 627 * so that we don't run the risk of blowing out the memory 628 * in the app server 629 */ 630 sb.setDeduplicateInDatabase(true); 631 } else { 632 // we want at least 1 more than our requested amount 633 // so we know that there are other results 634 // (in case we get the exact amount back) 635 myMaxResultsToFetch = Math.max(next, minWanted) + 1; 636 sb.setMaxResultsToFetch(myMaxResultsToFetch); 637 } 638 639 if (iter.hasNext()) { 640 myAdditionalPrefetchThresholdsRemaining = true; 641 } 642 643 // If we get here's we've found an appropriate threshold 644 break; 645 } 646 647 /* 648 * Provide any PID we loaded in previous search passes to the 649 * SearchBuilder so that we don't get duplicates coming from running 650 * the same query again. 651 * 652 * We could possibly accomplish this in a different way by using sorted 653 * results in our SQL query and specifying an offset. I don't actually 654 * know if that would be faster or not. At some point should test this 655 * idea. 656 */ 657 if (myPreviouslyAddedResourcePids != null) { 658 sb.setPreviouslyAddedResourcePids(myPreviouslyAddedResourcePids); 659 mySyncedPids.addAll(myPreviouslyAddedResourcePids); 660 } 661 662 /* 663 * createQuery 664 * Construct the SQL query we'll be sending to the database 665 * 666 * NB: (See createCountQuery above) 667 * We will pass the original myParams here (not a copy) 668 * because we actually _want_ the mutation of the myParams to happen. 669 * Specifically because SearchBuilder itself will _expect_ 670 * not to have these parameters when dumping back 671 * to our DB. 672 * 673 * This is an odd implementation behaviour, but the change 674 * for this will require a lot more handling at higher levels 675 */ 676 try (IResultIterator<JpaPid> resultIterator = 677 sb.createQuery(myParams, mySearchRuntimeDetails, myRequest, myRequestPartitionId)) { 678 // resultIterator is SearchBuilder.QueryIterator 679 assert (resultIterator != null); 680 681 /* 682 * The following loop actually loads the PIDs of the resources 683 * matching the search off of the disk and into memory. After 684 * every X results, we commit to the HFJ_SEARCH table. 685 */ 686 int syncSize = mySyncSize; 687 while (resultIterator.hasNext()) { 688 myUnsyncedPids.add(resultIterator.next()); 689 690 boolean shouldSync = myUnsyncedPids.size() >= syncSize; 691 692 if (myStorageSettings.getCountSearchResultsUpTo() != null 693 && myStorageSettings.getCountSearchResultsUpTo() > 0 694 && myStorageSettings.getCountSearchResultsUpTo() < myUnsyncedPids.size()) { 695 shouldSync = false; 696 } 697 698 if (myUnsyncedPids.size() > 50000) { 699 shouldSync = true; 700 } 701 702 // If no abort was requested, bail out 703 Validate.isTrue(isNotAborted(), "Abort has been requested"); 704 705 if (shouldSync) { 706 saveUnsynced(resultIterator); 707 } 708 709 if (myLoadingThrottleForUnitTests != null) { 710 AsyncUtil.sleep(myLoadingThrottleForUnitTests); 711 } 712 } 713 714 // If no abort was requested, bail out 715 Validate.isTrue(isNotAborted(), "Abort has been requested"); 716 717 saveUnsynced(resultIterator); 718 719 } catch (IOException e) { 720 ourLog.error("IO failure during database access", e); 721 throw new InternalErrorException(Msg.code(1166) + e); 722 } 723 } 724 725 /** 726 * Does the query but only for the count. 727 * @param theParamWantOnlyCount - if count query is wanted only 728 */ 729 private void doCountOnlyQuery(boolean theParamWantOnlyCount) { 730 ourLog.trace("Performing count"); 731 @SuppressWarnings("rawtypes") 732 ISearchBuilder sb = newSearchBuilder(); 733 734 /* 735 * createCountQuery 736 * NB: (see createQuery below) 737 * Because FulltextSearchSvcImpl will (internally) 738 * mutate the myParams (searchmap), 739 * (specifically removing the _content and _text filters) 740 * we will have to clone those parameters here so that 741 * the "correct" params are used in createQuery below 742 */ 743 Long count = sb.createCountQuery(myParams.clone(), mySearch.getUuid(), myRequest, myRequestPartitionId); 744 745 ourLog.trace("Got count {}", count); 746 747 myTxService 748 .withRequest(myRequest) 749 .withRequestPartitionId(myRequestPartitionId) 750 .execute(() -> { 751 mySearch.setTotalCount(count.intValue()); 752 if (theParamWantOnlyCount) { 753 mySearch.setStatus(SearchStatusEnum.FINISHED); 754 } 755 doSaveSearch(); 756 }); 757 } 758}