
001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.search.builder.tasks; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.i18n.Msg; 024import ca.uhn.fhir.interceptor.api.HookParams; 025import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; 026import ca.uhn.fhir.interceptor.api.Pointcut; 027import ca.uhn.fhir.interceptor.model.RequestPartitionId; 028import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; 029import ca.uhn.fhir.jpa.dao.IResultIterator; 030import ca.uhn.fhir.jpa.dao.ISearchBuilder; 031import ca.uhn.fhir.jpa.dao.SearchBuilderFactory; 032import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; 033import ca.uhn.fhir.jpa.entity.Search; 034import ca.uhn.fhir.jpa.interceptor.JpaPreResourceAccessDetails; 035import ca.uhn.fhir.jpa.model.dao.JpaPid; 036import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails; 037import ca.uhn.fhir.jpa.model.search.SearchStatusEnum; 038import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc; 039import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc; 040import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 041import ca.uhn.fhir.jpa.util.QueryParameterUtils; 042import ca.uhn.fhir.jpa.util.SearchParameterMapCalculator; 043import ca.uhn.fhir.rest.api.server.IPreResourceAccessDetails; 044import ca.uhn.fhir.rest.api.server.RequestDetails; 045import ca.uhn.fhir.rest.server.IPagingProvider; 046import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException; 047import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 048import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails; 049import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster; 050import ca.uhn.fhir.system.HapiSystemProperties; 051import ca.uhn.fhir.util.AsyncUtil; 052import ca.uhn.fhir.util.StopWatch; 053import co.elastic.apm.api.ElasticApm; 054import co.elastic.apm.api.Span; 055import co.elastic.apm.api.Transaction; 056import jakarta.annotation.Nonnull; 057import org.apache.commons.lang3.Validate; 058import org.apache.commons.lang3.exception.ExceptionUtils; 059import org.hl7.fhir.instance.model.api.IBaseResource; 060import org.springframework.transaction.annotation.Propagation; 061 062import java.io.IOException; 063import java.util.ArrayList; 064import java.util.Iterator; 065import java.util.List; 066import java.util.concurrent.Callable; 067import java.util.concurrent.CountDownLatch; 068import java.util.concurrent.TimeUnit; 069import java.util.function.Consumer; 070 071import static ca.uhn.fhir.jpa.util.SearchParameterMapCalculator.isWantCount; 072import static ca.uhn.fhir.jpa.util.SearchParameterMapCalculator.isWantOnlyCount; 073import static java.util.Objects.nonNull; 074import static org.apache.commons.lang3.ObjectUtils.defaultIfNull; 075 076/** 077 * A search task is a Callable task that runs in 078 * a thread pool to handle an individual search. One instance 079 * is created for any requested search and runs from the 080 * beginning to the end of the search. 081 * <p> 082 * Understand: 083 * This class executes in its own thread separate from the 084 * web server client thread that made the request. We do that 085 * so that we can return to the client as soon as possible, 086 * but keep the search going in the background (and have 087 * the next page of results ready to go when the client asks). 088 */ 089public class SearchTask implements Callable<Void> { 090 091 private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchTask.class); 092 // injected beans 093 protected final HapiTransactionService myTxService; 094 protected final FhirContext myContext; 095 protected final ISearchResultCacheSvc mySearchResultCacheSvc; 096 private final SearchParameterMap myParams; 097 private final String myResourceType; 098 private final ArrayList<JpaPid> mySyncedPids = new ArrayList<>(); 099 private final CountDownLatch myInitialCollectionLatch = new CountDownLatch(1); 100 private final CountDownLatch myCompletionLatch; 101 private final ArrayList<JpaPid> myUnsyncedPids = new ArrayList<>(); 102 private final RequestDetails myRequest; 103 private final RequestPartitionId myRequestPartitionId; 104 private final SearchRuntimeDetails mySearchRuntimeDetails; 105 private final Transaction myParentTransaction; 106 private final Consumer<String> myOnRemove; 107 private final int mySyncSize; 108 private final Integer myLoadingThrottleForUnitTests; 109 private final IInterceptorBroadcaster myInterceptorBroadcaster; 110 private final SearchBuilderFactory<JpaPid> mySearchBuilderFactory; 111 private final JpaStorageSettings myStorageSettings; 112 private final ISearchCacheSvc mySearchCacheSvc; 113 private final IPagingProvider myPagingProvider; 114 private final IInterceptorBroadcaster myCompositeBroadcaster; 115 private Search mySearch; 116 private boolean myAbortRequested; 117 private int myCountSavedTotal = 0; 118 private int myCountSavedThisPass = 0; 119 private int myCountBlockedThisPass = 0; 120 private boolean myAdditionalPrefetchThresholdsRemaining; 121 private List<JpaPid> myPreviouslyAddedResourcePids; 122 // The max number of results that we request from the search 123 // Note that this is set using the configured pre-fetch thresholds, maximum page size, and/or the client provided 124 // _count parameter 125 private Integer myMaxResultsToFetch; 126 127 /** 128 * Constructor 129 */ 130 @SuppressWarnings({"unchecked", "rawtypes"}) 131 public SearchTask( 132 SearchTaskParameters theCreationParams, 133 HapiTransactionService theManagedTxManager, 134 FhirContext theContext, 135 IInterceptorBroadcaster theInterceptorBroadcaster, 136 SearchBuilderFactory theSearchBuilderFactory, 137 ISearchResultCacheSvc theSearchResultCacheSvc, 138 JpaStorageSettings theStorageSettings, 139 ISearchCacheSvc theSearchCacheSvc, 140 IPagingProvider thePagingProvider) { 141 // beans 142 myTxService = theManagedTxManager; 143 myContext = theContext; 144 myInterceptorBroadcaster = theInterceptorBroadcaster; 145 mySearchBuilderFactory = theSearchBuilderFactory; 146 mySearchResultCacheSvc = theSearchResultCacheSvc; 147 myStorageSettings = theStorageSettings; 148 mySearchCacheSvc = theSearchCacheSvc; 149 myPagingProvider = thePagingProvider; 150 151 // values 152 myOnRemove = theCreationParams.OnRemove; 153 mySearch = theCreationParams.Search; 154 myParams = theCreationParams.Params; 155 myResourceType = theCreationParams.ResourceType; 156 myRequest = theCreationParams.Request; 157 myCompletionLatch = new CountDownLatch(1); 158 mySyncSize = theCreationParams.SyncSize; 159 myLoadingThrottleForUnitTests = theCreationParams.getLoadingThrottleForUnitTests(); 160 161 mySearchRuntimeDetails = new SearchRuntimeDetails(myRequest, mySearch.getUuid()); 162 mySearchRuntimeDetails.setQueryString(myParams.toNormalizedQueryString(myContext)); 163 myRequestPartitionId = theCreationParams.RequestPartitionId; 164 myParentTransaction = ElasticApm.currentTransaction(); 165 myCompositeBroadcaster = 166 CompositeInterceptorBroadcaster.newCompositeBroadcaster(myInterceptorBroadcaster, myRequest); 167 } 168 169 protected RequestPartitionId getRequestPartitionId() { 170 return myRequestPartitionId; 171 } 172 173 /** 174 * This method is called by the server HTTP thread, and 175 * will block until at least one page of results have been 176 * fetched from the DB, and will never block after that. 177 */ 178 public Integer awaitInitialSync() { 179 ourLog.trace("Awaiting initial sync"); 180 do { 181 ourLog.trace("Search {} aborted: {}", getSearch().getUuid(), !isNotAborted()); 182 if (AsyncUtil.awaitLatchAndThrowInternalErrorExceptionOnInterrupt( 183 getInitialCollectionLatch(), 250L, TimeUnit.MILLISECONDS)) { 184 break; 185 } 186 } while (getSearch().getStatus() == SearchStatusEnum.LOADING); 187 ourLog.trace("Initial sync completed"); 188 189 return getSearch().getTotalCount(); 190 } 191 192 public Search getSearch() { 193 return mySearch; 194 } 195 196 public CountDownLatch getInitialCollectionLatch() { 197 return myInitialCollectionLatch; 198 } 199 200 public void setPreviouslyAddedResourcePids(List<JpaPid> thePreviouslyAddedResourcePids) { 201 myPreviouslyAddedResourcePids = thePreviouslyAddedResourcePids; 202 myCountSavedTotal = myPreviouslyAddedResourcePids.size(); 203 } 204 205 @SuppressWarnings("rawtypes") 206 private ISearchBuilder newSearchBuilder() { 207 Class<? extends IBaseResource> resourceTypeClass = 208 myContext.getResourceDefinition(myResourceType).getImplementingClass(); 209 return mySearchBuilderFactory.newSearchBuilder(myResourceType, resourceTypeClass); 210 } 211 212 @Nonnull 213 public List<JpaPid> getResourcePids(int theFromIndex, int theToIndex) { 214 ourLog.debug("Requesting search PIDs from {}-{}", theFromIndex, theToIndex); 215 216 boolean keepWaiting; 217 do { 218 synchronized (mySyncedPids) { 219 ourLog.trace("Search status is {}", mySearch.getStatus()); 220 boolean haveEnoughResults = mySyncedPids.size() >= theToIndex; 221 if (!haveEnoughResults) { 222 switch (mySearch.getStatus()) { 223 case LOADING: 224 keepWaiting = true; 225 break; 226 case PASSCMPLET: 227 /* 228 * If we get here, it means that the user requested resources that crossed the 229 * current pre-fetch boundary. For example, if the prefetch threshold is 50 and the 230 * user has requested resources 0-60, then they would get 0-50 back but the search 231 * coordinator would then stop searching.SearchCoordinatorSvcImplTest 232 */ 233 keepWaiting = false; 234 break; 235 case FAILED: 236 case FINISHED: 237 case GONE: 238 default: 239 keepWaiting = false; 240 break; 241 } 242 } else { 243 keepWaiting = false; 244 } 245 } 246 247 if (keepWaiting) { 248 ourLog.info( 249 "Waiting as we only have {} results - Search status: {}", 250 mySyncedPids.size(), 251 mySearch.getStatus()); 252 AsyncUtil.sleep(500L); 253 } 254 } while (keepWaiting); 255 256 ourLog.debug("Proceeding, as we have {} results", mySyncedPids.size()); 257 258 ArrayList<JpaPid> retVal = new ArrayList<>(); 259 synchronized (mySyncedPids) { 260 QueryParameterUtils.verifySearchHasntFailedOrThrowInternalErrorException(mySearch); 261 262 int toIndex = theToIndex; 263 if (mySyncedPids.size() < toIndex) { 264 toIndex = mySyncedPids.size(); 265 } 266 for (int i = theFromIndex; i < toIndex; i++) { 267 retVal.add(mySyncedPids.get(i)); 268 } 269 } 270 271 ourLog.trace( 272 "Done syncing results - Wanted {}-{} and returning {} of {}", 273 theFromIndex, 274 theToIndex, 275 retVal.size(), 276 mySyncedPids.size()); 277 278 return retVal; 279 } 280 281 public void saveSearch() { 282 myTxService 283 .withRequest(myRequest) 284 .withRequestPartitionId(myRequestPartitionId) 285 .withPropagation(Propagation.REQUIRES_NEW) 286 .execute(this::doSaveSearch); 287 } 288 289 @SuppressWarnings("rawtypes") 290 private void saveUnsynced(final IResultIterator theResultIter) { 291 myTxService 292 .withRequest(myRequest) 293 .withRequestPartitionId(myRequestPartitionId) 294 .execute(() -> { 295 if (mySearch.getId() == null) { 296 doSaveSearch(); 297 } 298 299 ArrayList<JpaPid> unsyncedPids = myUnsyncedPids; 300 int countBlocked = 0; 301 302 // Interceptor call: STORAGE_PREACCESS_RESOURCES 303 // This can be used to remove results from the search result details before 304 // the user has a chance to know that they were in the results 305 if (mySearchRuntimeDetails.getRequestDetails() != null && !unsyncedPids.isEmpty()) { 306 JpaPreResourceAccessDetails accessDetails = 307 new JpaPreResourceAccessDetails(unsyncedPids, this::newSearchBuilder); 308 HookParams params = new HookParams() 309 .add(IPreResourceAccessDetails.class, accessDetails) 310 .add(RequestDetails.class, mySearchRuntimeDetails.getRequestDetails()) 311 .addIfMatchesType( 312 ServletRequestDetails.class, mySearchRuntimeDetails.getRequestDetails()); 313 myCompositeBroadcaster.callHooks(Pointcut.STORAGE_PREACCESS_RESOURCES, params); 314 315 for (int i = unsyncedPids.size() - 1; i >= 0; i--) { 316 if (accessDetails.isDontReturnResourceAtIndex(i)) { 317 unsyncedPids.remove(i); 318 myCountBlockedThisPass++; 319 myCountSavedTotal++; 320 countBlocked++; 321 } 322 } 323 } 324 325 // Actually store the results in the query cache storage 326 myCountSavedTotal += unsyncedPids.size(); 327 myCountSavedThisPass += unsyncedPids.size(); 328 mySearchResultCacheSvc.storeResults( 329 mySearch, mySyncedPids, unsyncedPids, myRequest, getRequestPartitionId()); 330 331 synchronized (mySyncedPids) { 332 int numSyncedThisPass = unsyncedPids.size(); 333 ourLog.trace( 334 "Syncing {} search results - Have more: {}", 335 numSyncedThisPass, 336 theResultIter.hasNext()); 337 mySyncedPids.addAll(unsyncedPids); 338 unsyncedPids.clear(); 339 340 if (!theResultIter.hasNext()) { 341 int skippedCount = theResultIter.getSkippedCount(); 342 ourLog.trace( 343 "MaxToFetch[{}] SkippedCount[{}] CountSavedThisPass[{}] CountSavedThisTotal[{}] AdditionalPrefetchRemaining[{}]", 344 myMaxResultsToFetch, 345 skippedCount, 346 myCountSavedThisPass, 347 myCountSavedTotal, 348 myAdditionalPrefetchThresholdsRemaining); 349 350 if (isFinished(theResultIter)) { 351 // finished 352 ourLog.trace("Setting search status to FINISHED"); 353 mySearch.setStatus(SearchStatusEnum.FINISHED); 354 mySearch.setTotalCount(myCountSavedTotal - countBlocked); 355 } else if (myAdditionalPrefetchThresholdsRemaining) { 356 // pass complete 357 ourLog.trace("Setting search status to PASSCMPLET"); 358 mySearch.setStatus(SearchStatusEnum.PASSCMPLET); 359 mySearch.setSearchParameterMap(myParams); 360 } else { 361 // also finished 362 ourLog.trace("Setting search status to FINISHED"); 363 mySearch.setStatus(SearchStatusEnum.FINISHED); 364 mySearch.setTotalCount(myCountSavedTotal - countBlocked); 365 } 366 } 367 } 368 369 mySearch.setNumFound(myCountSavedTotal); 370 mySearch.setNumBlocked(mySearch.getNumBlocked() + countBlocked); 371 372 int numSynced; 373 synchronized (mySyncedPids) { 374 numSynced = mySyncedPids.size(); 375 } 376 377 if (myStorageSettings.getCountSearchResultsUpTo() == null 378 || myStorageSettings.getCountSearchResultsUpTo() <= 0 379 || myStorageSettings.getCountSearchResultsUpTo() <= numSynced) { 380 myInitialCollectionLatch.countDown(); 381 } 382 383 doSaveSearch(); 384 385 ourLog.trace("saveUnsynced() - pre-commit"); 386 }); 387 ourLog.trace("saveUnsynced() - post-commit"); 388 } 389 390 @SuppressWarnings("rawtypes") 391 private boolean isFinished(final IResultIterator theResultIter) { 392 int skippedCount = theResultIter.getSkippedCount(); 393 int nonSkippedCount = theResultIter.getNonSkippedCount(); 394 int totalFetched = skippedCount + myCountSavedThisPass + myCountBlockedThisPass; 395 396 if (myMaxResultsToFetch != null && totalFetched < myMaxResultsToFetch) { 397 // total fetched < max results to fetch -> we've exhausted the search 398 return true; 399 } else { 400 if (nonSkippedCount == 0) { 401 // no skipped resources in this query 402 if (myParams.getCount() != null) { 403 // count supplied 404 // if the count is > what we've fetched -> we've exhausted the query 405 return myParams.getCount() > totalFetched; 406 } else { 407 // legacy - we have no skipped resources - we are done 408 return true; 409 } 410 } 411 // skipped resources means we have more to fetch 412 return false; 413 } 414 } 415 416 public boolean isNotAborted() { 417 return !myAbortRequested; 418 } 419 420 public void markComplete() { 421 myCompletionLatch.countDown(); 422 } 423 424 public CountDownLatch getCompletionLatch() { 425 return myCompletionLatch; 426 } 427 428 /** 429 * Request that the task abort as soon as possible 430 */ 431 public void requestImmediateAbort() { 432 myAbortRequested = true; 433 } 434 435 /** 436 * This is the method which actually performs the search. 437 * It is called automatically by the thread pool. 438 */ 439 @Override 440 public Void call() { 441 StopWatch sw = new StopWatch(); 442 Span span = myParentTransaction.startSpan("db", "query", "search"); 443 span.setName("FHIR Database Search"); 444 try { 445 // Create an initial search in the DB and give it an ID 446 saveSearch(); 447 448 myTxService 449 .withRequest(myRequest) 450 .withRequestPartitionId(myRequestPartitionId) 451 .execute(this::doSearch); 452 453 mySearchRuntimeDetails.setSearchStatus(mySearch.getStatus()); 454 if (mySearch.getStatus() == SearchStatusEnum.FINISHED) { 455 HookParams params = new HookParams() 456 .add(RequestDetails.class, myRequest) 457 .addIfMatchesType(ServletRequestDetails.class, myRequest) 458 .add(SearchRuntimeDetails.class, mySearchRuntimeDetails); 459 myCompositeBroadcaster.callHooks(Pointcut.JPA_PERFTRACE_SEARCH_COMPLETE, params); 460 } else { 461 HookParams params = new HookParams() 462 .add(RequestDetails.class, myRequest) 463 .addIfMatchesType(ServletRequestDetails.class, myRequest) 464 .add(SearchRuntimeDetails.class, mySearchRuntimeDetails); 465 myCompositeBroadcaster.callHooks(Pointcut.JPA_PERFTRACE_SEARCH_PASS_COMPLETE, params); 466 } 467 468 ourLog.trace( 469 "Have completed search for [{}{}] and found {} resources in {}ms - Status is {}", 470 mySearch.getResourceType(), 471 mySearch.getSearchQueryString(), 472 mySyncedPids.size(), 473 sw.getMillis(), 474 mySearch.getStatus()); 475 476 } catch (Throwable t) { 477 478 /* 479 * Don't print a stack trace for client errors (i.e. requests that 480 * aren't valid because the client screwed up).. that's just noise 481 * in the logs and who needs that. 482 */ 483 boolean logged = false; 484 if (t instanceof BaseServerResponseException) { 485 BaseServerResponseException exception = (BaseServerResponseException) t; 486 if (exception.getStatusCode() >= 400 && exception.getStatusCode() < 500) { 487 logged = true; 488 ourLog.warn("Failed during search due to invalid request: {}", t.toString()); 489 } 490 } 491 492 if (!logged) { 493 ourLog.error("Failed during search loading after {}ms", sw.getMillis(), t); 494 } 495 myUnsyncedPids.clear(); 496 Throwable rootCause = ExceptionUtils.getRootCause(t); 497 rootCause = defaultIfNull(rootCause, t); 498 499 String failureMessage = rootCause.getMessage(); 500 501 int failureCode = InternalErrorException.STATUS_CODE; 502 if (t instanceof BaseServerResponseException) { 503 failureCode = ((BaseServerResponseException) t).getStatusCode(); 504 } 505 506 if (HapiSystemProperties.isUnitTestCaptureStackEnabled()) { 507 failureMessage += "\nStack\n" + ExceptionUtils.getStackTrace(rootCause); 508 } 509 510 mySearch.setFailureMessage(failureMessage); 511 mySearch.setFailureCode(failureCode); 512 mySearch.setStatus(SearchStatusEnum.FAILED); 513 514 mySearchRuntimeDetails.setSearchStatus(mySearch.getStatus()); 515 HookParams params = new HookParams() 516 .add(RequestDetails.class, myRequest) 517 .addIfMatchesType(ServletRequestDetails.class, myRequest) 518 .add(SearchRuntimeDetails.class, mySearchRuntimeDetails); 519 myCompositeBroadcaster.callHooks(Pointcut.JPA_PERFTRACE_SEARCH_FAILED, params); 520 521 saveSearch(); 522 span.captureException(t); 523 } finally { 524 myOnRemove.accept(mySearch.getUuid()); 525 526 myInitialCollectionLatch.countDown(); 527 markComplete(); 528 span.end(); 529 } 530 return null; 531 } 532 533 private void doSaveSearch() { 534 Search newSearch = mySearchCacheSvc.save(mySearch, myRequestPartitionId); 535 536 // mySearchDao.save is not supposed to return null, but in unit tests 537 // it can if the mock search dao isn't set up to handle that 538 if (newSearch != null) { 539 mySearch = newSearch; 540 } 541 } 542 543 /** 544 * This method actually creates the database query to perform the 545 * search, and starts it. 546 */ 547 @SuppressWarnings({"rawtypes", "unchecked"}) 548 private void doSearch() { 549 /* 550 * If the user has explicitly requested a _count, perform a 551 * 552 * SELECT COUNT(*) .... 553 * 554 * before doing anything else. 555 */ 556 boolean myParamWantOnlyCount = isWantOnlyCount(myParams); 557 boolean myParamOrDefaultWantCount = nonNull(myParams.getSearchTotalMode()) 558 ? isWantCount(myParams) 559 : SearchParameterMapCalculator.isWantCount(myStorageSettings.getDefaultTotalMode()); 560 561 if (myParamWantOnlyCount || myParamOrDefaultWantCount) { 562 doCountOnlyQuery(myParamWantOnlyCount); 563 if (myParamWantOnlyCount) { 564 return; 565 } 566 } 567 568 ourLog.trace("Done count"); 569 ISearchBuilder sb = newSearchBuilder(); 570 571 /* 572 * Figure out how many results we're actually going to fetch from the 573 * database in this pass. This calculation takes into consideration the 574 * "pre-fetch thresholds" specified in StorageSettings#getSearchPreFetchThresholds() 575 * as well as the value of the _count parameter. 576 */ 577 int currentlyLoaded = defaultIfNull(mySearch.getNumFound(), 0); 578 int minWanted = 0; 579 580 // if no count is provided, 581 // we only use the values in SearchPreFetchThresholds 582 // but if there is a count... 583 if (myParams.getCount() != null) { 584 minWanted = Math.min(myParams.getCount(), myPagingProvider.getMaximumPageSize()); 585 minWanted += currentlyLoaded; 586 } 587 588 // iterate through the search thresholds 589 for (Iterator<Integer> iter = 590 myStorageSettings.getSearchPreFetchThresholds().iterator(); 591 iter.hasNext(); ) { 592 int next = iter.next(); 593 if (next != -1 && next <= currentlyLoaded) { 594 continue; 595 } 596 597 if (next == -1) { 598 sb.setMaxResultsToFetch(null); 599 /* 600 * If we're past the last prefetch threshold then 601 * we're potentially fetching unlimited amounts of data. 602 * We'll move responsibility for deduplication to the database in this case 603 * so that we don't run the risk of blowing out the memory 604 * in the app server 605 */ 606 sb.setDeduplicateInDatabase(true); 607 } else { 608 // we want at least 1 more than our requested amount 609 // so we know that there are other results 610 // (in case we get the exact amount back) 611 myMaxResultsToFetch = Math.max(next, minWanted) + 1; 612 sb.setMaxResultsToFetch(myMaxResultsToFetch); 613 } 614 615 if (iter.hasNext()) { 616 myAdditionalPrefetchThresholdsRemaining = true; 617 } 618 619 // If we get here's we've found an appropriate threshold 620 break; 621 } 622 623 /* 624 * Provide any PID we loaded in previous search passes to the 625 * SearchBuilder so that we don't get duplicates coming from running 626 * the same query again. 627 * 628 * We could possibly accomplish this in a different way by using sorted 629 * results in our SQL query and specifying an offset. I don't actually 630 * know if that would be faster or not. At some point should test this 631 * idea. 632 */ 633 if (myPreviouslyAddedResourcePids != null) { 634 sb.setPreviouslyAddedResourcePids(myPreviouslyAddedResourcePids); 635 mySyncedPids.addAll(myPreviouslyAddedResourcePids); 636 } 637 638 /* 639 * createQuery 640 * Construct the SQL query we'll be sending to the database 641 * 642 * NB: (See createCountQuery above) 643 * We will pass the original myParams here (not a copy) 644 * because we actually _want_ the mutation of the myParams to happen. 645 * Specifically because SearchBuilder itself will _expect_ 646 * not to have these parameters when dumping back 647 * to our DB. 648 * 649 * This is an odd implementation behaviour, but the change 650 * for this will require a lot more handling at higher levels 651 */ 652 try (IResultIterator<JpaPid> resultIterator = 653 sb.createQuery(myParams, mySearchRuntimeDetails, myRequest, myRequestPartitionId)) { 654 // resultIterator is SearchBuilder.QueryIterator 655 assert (resultIterator != null); 656 657 /* 658 * The following loop actually loads the PIDs of the resources 659 * matching the search off of the disk and into memory. After 660 * every X results, we commit to the HFJ_SEARCH table. 661 */ 662 int syncSize = mySyncSize; 663 while (resultIterator.hasNext()) { 664 myUnsyncedPids.add(resultIterator.next()); 665 666 boolean shouldSync = myUnsyncedPids.size() >= syncSize; 667 668 if (myStorageSettings.getCountSearchResultsUpTo() != null 669 && myStorageSettings.getCountSearchResultsUpTo() > 0 670 && myStorageSettings.getCountSearchResultsUpTo() < myUnsyncedPids.size()) { 671 shouldSync = false; 672 } 673 674 if (myUnsyncedPids.size() > 50000) { 675 shouldSync = true; 676 } 677 678 // If no abort was requested, bail out 679 Validate.isTrue(isNotAborted(), "Abort has been requested"); 680 681 if (shouldSync) { 682 saveUnsynced(resultIterator); 683 } 684 685 if (myLoadingThrottleForUnitTests != null) { 686 AsyncUtil.sleep(myLoadingThrottleForUnitTests); 687 } 688 } 689 690 // If no abort was requested, bail out 691 Validate.isTrue(isNotAborted(), "Abort has been requested"); 692 693 saveUnsynced(resultIterator); 694 695 } catch (IOException e) { 696 ourLog.error("IO failure during database access", e); 697 throw new InternalErrorException(Msg.code(1166) + e); 698 } 699 } 700 701 /** 702 * Does the query but only for the count. 703 * @param theParamWantOnlyCount - if count query is wanted only 704 */ 705 private void doCountOnlyQuery(boolean theParamWantOnlyCount) { 706 ourLog.trace("Performing count"); 707 @SuppressWarnings("rawtypes") 708 ISearchBuilder sb = newSearchBuilder(); 709 710 /* 711 * createCountQuery 712 * NB: (see createQuery below) 713 * Because FulltextSearchSvcImpl will (internally) 714 * mutate the myParams (searchmap), 715 * (specifically removing the _content and _text filters) 716 * we will have to clone those parameters here so that 717 * the "correct" params are used in createQuery below 718 */ 719 Long count = sb.createCountQuery(myParams.clone(), mySearch.getUuid(), myRequest, myRequestPartitionId); 720 721 ourLog.trace("Got count {}", count); 722 723 myTxService 724 .withRequest(myRequest) 725 .withRequestPartitionId(myRequestPartitionId) 726 .execute(() -> { 727 mySearch.setTotalCount(count.intValue()); 728 if (theParamWantOnlyCount) { 729 mySearch.setStatus(SearchStatusEnum.FINISHED); 730 } 731 doSaveSearch(); 732 }); 733 } 734}