001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.search.builder.tasks; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.i18n.Msg; 024import ca.uhn.fhir.interceptor.api.HookParams; 025import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; 026import ca.uhn.fhir.interceptor.api.Pointcut; 027import ca.uhn.fhir.interceptor.model.RequestPartitionId; 028import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; 029import ca.uhn.fhir.jpa.dao.IResultIterator; 030import ca.uhn.fhir.jpa.dao.ISearchBuilder; 031import ca.uhn.fhir.jpa.dao.SearchBuilderFactory; 032import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; 033import ca.uhn.fhir.jpa.entity.Search; 034import ca.uhn.fhir.jpa.interceptor.JpaPreResourceAccessDetails; 035import ca.uhn.fhir.jpa.model.dao.JpaPid; 036import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails; 037import ca.uhn.fhir.jpa.model.search.SearchStatusEnum; 038import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc; 039import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc; 040import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 041import ca.uhn.fhir.jpa.util.QueryParameterUtils; 042import ca.uhn.fhir.jpa.util.SearchParameterMapCalculator; 043import ca.uhn.fhir.rest.api.server.IPreResourceAccessDetails; 044import ca.uhn.fhir.rest.api.server.RequestDetails; 045import ca.uhn.fhir.rest.server.IPagingProvider; 046import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException; 047import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 048import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails; 049import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster; 050import ca.uhn.fhir.system.HapiSystemProperties; 051import ca.uhn.fhir.util.AsyncUtil; 052import ca.uhn.fhir.util.StopWatch; 053import co.elastic.apm.api.ElasticApm; 054import co.elastic.apm.api.Span; 055import co.elastic.apm.api.Transaction; 056import jakarta.annotation.Nonnull; 057import org.apache.commons.lang3.Validate; 058import org.apache.commons.lang3.exception.ExceptionUtils; 059import org.hl7.fhir.instance.model.api.IBaseResource; 060import org.springframework.transaction.annotation.Propagation; 061 062import java.io.IOException; 063import java.util.ArrayList; 064import java.util.Iterator; 065import java.util.List; 066import java.util.concurrent.Callable; 067import java.util.concurrent.CountDownLatch; 068import java.util.concurrent.TimeUnit; 069import java.util.function.Consumer; 070 071import static ca.uhn.fhir.jpa.util.SearchParameterMapCalculator.isWantCount; 072import static ca.uhn.fhir.jpa.util.SearchParameterMapCalculator.isWantOnlyCount; 073import static java.util.Objects.nonNull; 074import static org.apache.commons.lang3.ObjectUtils.defaultIfNull; 075 076/** 077 * A search task is a Callable task that runs in 078 * a thread pool to handle an individual search. One instance 079 * is created for any requested search and runs from the 080 * beginning to the end of the search. 081 * <p> 082 * Understand: 083 * This class executes in its own thread separate from the 084 * web server client thread that made the request. We do that 085 * so that we can return to the client as soon as possible, 086 * but keep the search going in the background (and have 087 * the next page of results ready to go when the client asks). 088 */ 089public class SearchTask implements Callable<Void> { 090 091 private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchTask.class); 092 // injected beans 093 protected final HapiTransactionService myTxService; 094 protected final FhirContext myContext; 095 protected final ISearchResultCacheSvc mySearchResultCacheSvc; 096 private final SearchParameterMap myParams; 097 private final String myResourceType; 098 private final ArrayList<JpaPid> mySyncedPids = new ArrayList<>(); 099 private final CountDownLatch myInitialCollectionLatch = new CountDownLatch(1); 100 private final CountDownLatch myCompletionLatch; 101 private final ArrayList<JpaPid> myUnsyncedPids = new ArrayList<>(); 102 private final RequestDetails myRequest; 103 private final RequestPartitionId myRequestPartitionId; 104 private final SearchRuntimeDetails mySearchRuntimeDetails; 105 private final Transaction myParentTransaction; 106 private final Consumer<String> myOnRemove; 107 private final int mySyncSize; 108 private final Integer myLoadingThrottleForUnitTests; 109 private final IInterceptorBroadcaster myInterceptorBroadcaster; 110 private final SearchBuilderFactory<JpaPid> mySearchBuilderFactory; 111 private final JpaStorageSettings myStorageSettings; 112 private final ISearchCacheSvc mySearchCacheSvc; 113 private final IPagingProvider myPagingProvider; 114 private final IInterceptorBroadcaster myCompositeBroadcaster; 115 private Search mySearch; 116 private boolean myAbortRequested; 117 private int myCountSavedTotal = 0; 118 private int myCountSavedThisPass = 0; 119 private int myCountBlockedThisPass = 0; 120 private boolean myAdditionalPrefetchThresholdsRemaining; 121 private List<JpaPid> myPreviouslyAddedResourcePids; 122 private Integer myMaxResultsToFetch; 123 124 /** 125 * Constructor 126 */ 127 @SuppressWarnings({"unchecked", "rawtypes"}) 128 public SearchTask( 129 SearchTaskParameters theCreationParams, 130 HapiTransactionService theManagedTxManager, 131 FhirContext theContext, 132 IInterceptorBroadcaster theInterceptorBroadcaster, 133 SearchBuilderFactory theSearchBuilderFactory, 134 ISearchResultCacheSvc theSearchResultCacheSvc, 135 JpaStorageSettings theStorageSettings, 136 ISearchCacheSvc theSearchCacheSvc, 137 IPagingProvider thePagingProvider) { 138 // beans 139 myTxService = theManagedTxManager; 140 myContext = theContext; 141 myInterceptorBroadcaster = theInterceptorBroadcaster; 142 mySearchBuilderFactory = theSearchBuilderFactory; 143 mySearchResultCacheSvc = theSearchResultCacheSvc; 144 myStorageSettings = theStorageSettings; 145 mySearchCacheSvc = theSearchCacheSvc; 146 myPagingProvider = thePagingProvider; 147 148 // values 149 myOnRemove = theCreationParams.OnRemove; 150 mySearch = theCreationParams.Search; 151 myParams = theCreationParams.Params; 152 myResourceType = theCreationParams.ResourceType; 153 myRequest = theCreationParams.Request; 154 myCompletionLatch = new CountDownLatch(1); 155 mySyncSize = theCreationParams.SyncSize; 156 myLoadingThrottleForUnitTests = theCreationParams.getLoadingThrottleForUnitTests(); 157 158 mySearchRuntimeDetails = new SearchRuntimeDetails(myRequest, mySearch.getUuid()); 159 mySearchRuntimeDetails.setQueryString(myParams.toNormalizedQueryString(myContext)); 160 myRequestPartitionId = theCreationParams.RequestPartitionId; 161 myParentTransaction = ElasticApm.currentTransaction(); 162 myCompositeBroadcaster = 163 CompositeInterceptorBroadcaster.newCompositeBroadcaster(myInterceptorBroadcaster, myRequest); 164 } 165 166 protected RequestPartitionId getRequestPartitionId() { 167 return myRequestPartitionId; 168 } 169 170 /** 171 * This method is called by the server HTTP thread, and 172 * will block until at least one page of results have been 173 * fetched from the DB, and will never block after that. 174 */ 175 public Integer awaitInitialSync() { 176 ourLog.trace("Awaiting initial sync"); 177 do { 178 ourLog.trace("Search {} aborted: {}", getSearch().getUuid(), !isNotAborted()); 179 if (AsyncUtil.awaitLatchAndThrowInternalErrorExceptionOnInterrupt( 180 getInitialCollectionLatch(), 250L, TimeUnit.MILLISECONDS)) { 181 break; 182 } 183 } while (getSearch().getStatus() == SearchStatusEnum.LOADING); 184 ourLog.trace("Initial sync completed"); 185 186 return getSearch().getTotalCount(); 187 } 188 189 public Search getSearch() { 190 return mySearch; 191 } 192 193 public CountDownLatch getInitialCollectionLatch() { 194 return myInitialCollectionLatch; 195 } 196 197 public void setPreviouslyAddedResourcePids(List<JpaPid> thePreviouslyAddedResourcePids) { 198 myPreviouslyAddedResourcePids = thePreviouslyAddedResourcePids; 199 myCountSavedTotal = myPreviouslyAddedResourcePids.size(); 200 } 201 202 @SuppressWarnings("rawtypes") 203 private ISearchBuilder newSearchBuilder() { 204 Class<? extends IBaseResource> resourceTypeClass = 205 myContext.getResourceDefinition(myResourceType).getImplementingClass(); 206 return mySearchBuilderFactory.newSearchBuilder(myResourceType, resourceTypeClass); 207 } 208 209 @Nonnull 210 public List<JpaPid> getResourcePids(int theFromIndex, int theToIndex) { 211 ourLog.debug("Requesting search PIDs from {}-{}", theFromIndex, theToIndex); 212 213 boolean keepWaiting; 214 do { 215 synchronized (mySyncedPids) { 216 ourLog.trace("Search status is {}", mySearch.getStatus()); 217 boolean haveEnoughResults = mySyncedPids.size() >= theToIndex; 218 if (!haveEnoughResults) { 219 switch (mySearch.getStatus()) { 220 case LOADING: 221 keepWaiting = true; 222 break; 223 case PASSCMPLET: 224 /* 225 * If we get here, it means that the user requested resources that crossed the 226 * current pre-fetch boundary. For example, if the prefetch threshold is 50 and the 227 * user has requested resources 0-60, then they would get 0-50 back but the search 228 * coordinator would then stop searching.SearchCoordinatorSvcImplTest 229 */ 230 keepWaiting = false; 231 break; 232 case FAILED: 233 case FINISHED: 234 case GONE: 235 default: 236 keepWaiting = false; 237 break; 238 } 239 } else { 240 keepWaiting = false; 241 } 242 } 243 244 if (keepWaiting) { 245 ourLog.info( 246 "Waiting as we only have {} results - Search status: {}", 247 mySyncedPids.size(), 248 mySearch.getStatus()); 249 AsyncUtil.sleep(500L); 250 } 251 } while (keepWaiting); 252 253 ourLog.debug("Proceeding, as we have {} results", mySyncedPids.size()); 254 255 ArrayList<JpaPid> retVal = new ArrayList<>(); 256 synchronized (mySyncedPids) { 257 QueryParameterUtils.verifySearchHasntFailedOrThrowInternalErrorException(mySearch); 258 259 int toIndex = theToIndex; 260 if (mySyncedPids.size() < toIndex) { 261 toIndex = mySyncedPids.size(); 262 } 263 for (int i = theFromIndex; i < toIndex; i++) { 264 retVal.add(mySyncedPids.get(i)); 265 } 266 } 267 268 ourLog.trace( 269 "Done syncing results - Wanted {}-{} and returning {} of {}", 270 theFromIndex, 271 theToIndex, 272 retVal.size(), 273 mySyncedPids.size()); 274 275 return retVal; 276 } 277 278 public void saveSearch() { 279 myTxService 280 .withRequest(myRequest) 281 .withRequestPartitionId(myRequestPartitionId) 282 .withPropagation(Propagation.REQUIRES_NEW) 283 .execute(this::doSaveSearch); 284 } 285 286 @SuppressWarnings("rawtypes") 287 private void saveUnsynced(final IResultIterator theResultIter) { 288 myTxService 289 .withRequest(myRequest) 290 .withRequestPartitionId(myRequestPartitionId) 291 .execute(() -> { 292 if (mySearch.getId() == null) { 293 doSaveSearch(); 294 } 295 296 ArrayList<JpaPid> unsyncedPids = myUnsyncedPids; 297 int countBlocked = 0; 298 299 // Interceptor call: STORAGE_PREACCESS_RESOURCES 300 // This can be used to remove results from the search result details before 301 // the user has a chance to know that they were in the results 302 if (mySearchRuntimeDetails.getRequestDetails() != null && !unsyncedPids.isEmpty()) { 303 JpaPreResourceAccessDetails accessDetails = 304 new JpaPreResourceAccessDetails(unsyncedPids, this::newSearchBuilder); 305 HookParams params = new HookParams() 306 .add(IPreResourceAccessDetails.class, accessDetails) 307 .add(RequestDetails.class, mySearchRuntimeDetails.getRequestDetails()) 308 .addIfMatchesType( 309 ServletRequestDetails.class, mySearchRuntimeDetails.getRequestDetails()); 310 myCompositeBroadcaster.callHooks(Pointcut.STORAGE_PREACCESS_RESOURCES, params); 311 312 for (int i = unsyncedPids.size() - 1; i >= 0; i--) { 313 if (accessDetails.isDontReturnResourceAtIndex(i)) { 314 unsyncedPids.remove(i); 315 myCountBlockedThisPass++; 316 myCountSavedTotal++; 317 countBlocked++; 318 } 319 } 320 } 321 322 // Actually store the results in the query cache storage 323 myCountSavedTotal += unsyncedPids.size(); 324 myCountSavedThisPass += unsyncedPids.size(); 325 mySearchResultCacheSvc.storeResults( 326 mySearch, mySyncedPids, unsyncedPids, myRequest, getRequestPartitionId()); 327 328 synchronized (mySyncedPids) { 329 int numSyncedThisPass = unsyncedPids.size(); 330 ourLog.trace( 331 "Syncing {} search results - Have more: {}", 332 numSyncedThisPass, 333 theResultIter.hasNext()); 334 mySyncedPids.addAll(unsyncedPids); 335 unsyncedPids.clear(); 336 337 if (!theResultIter.hasNext()) { 338 int skippedCount = theResultIter.getSkippedCount(); 339 ourLog.trace( 340 "MaxToFetch[{}] SkippedCount[{}] CountSavedThisPass[{}] CountSavedThisTotal[{}] AdditionalPrefetchRemaining[{}]", 341 myMaxResultsToFetch, 342 skippedCount, 343 myCountSavedThisPass, 344 myCountSavedTotal, 345 myAdditionalPrefetchThresholdsRemaining); 346 347 if (isFinished(theResultIter)) { 348 // finished 349 ourLog.trace("Setting search status to FINISHED"); 350 mySearch.setStatus(SearchStatusEnum.FINISHED); 351 mySearch.setTotalCount(myCountSavedTotal - countBlocked); 352 } else if (myAdditionalPrefetchThresholdsRemaining) { 353 // pass complete 354 ourLog.trace("Setting search status to PASSCMPLET"); 355 mySearch.setStatus(SearchStatusEnum.PASSCMPLET); 356 mySearch.setSearchParameterMap(myParams); 357 } else { 358 // also finished 359 ourLog.trace("Setting search status to FINISHED"); 360 mySearch.setStatus(SearchStatusEnum.FINISHED); 361 mySearch.setTotalCount(myCountSavedTotal - countBlocked); 362 } 363 } 364 } 365 366 mySearch.setNumFound(myCountSavedTotal); 367 mySearch.setNumBlocked(mySearch.getNumBlocked() + countBlocked); 368 369 int numSynced; 370 synchronized (mySyncedPids) { 371 numSynced = mySyncedPids.size(); 372 } 373 374 if (myStorageSettings.getCountSearchResultsUpTo() == null 375 || myStorageSettings.getCountSearchResultsUpTo() <= 0 376 || myStorageSettings.getCountSearchResultsUpTo() <= numSynced) { 377 myInitialCollectionLatch.countDown(); 378 } 379 380 doSaveSearch(); 381 382 ourLog.trace("saveUnsynced() - pre-commit"); 383 }); 384 ourLog.trace("saveUnsynced() - post-commit"); 385 } 386 387 @SuppressWarnings("rawtypes") 388 private boolean isFinished(final IResultIterator theResultIter) { 389 int skippedCount = theResultIter.getSkippedCount(); 390 int nonSkippedCount = theResultIter.getNonSkippedCount(); 391 int totalFetched = skippedCount + myCountSavedThisPass + myCountBlockedThisPass; 392 393 if (myMaxResultsToFetch != null && totalFetched < myMaxResultsToFetch) { 394 // total fetched < max results to fetch -> we've exhausted the search 395 return true; 396 } else { 397 if (nonSkippedCount == 0) { 398 // no skipped resources in this query 399 if (myParams.getCount() != null) { 400 // count supplied 401 // if the count is > what we've fetched -> we've exhausted the query 402 return myParams.getCount() > totalFetched; 403 } else { 404 // legacy - we have no skipped resources - we are done 405 return true; 406 } 407 } 408 // skipped resources means we have more to fetch 409 return false; 410 } 411 } 412 413 public boolean isNotAborted() { 414 return !myAbortRequested; 415 } 416 417 public void markComplete() { 418 myCompletionLatch.countDown(); 419 } 420 421 public CountDownLatch getCompletionLatch() { 422 return myCompletionLatch; 423 } 424 425 /** 426 * Request that the task abort as soon as possible 427 */ 428 public void requestImmediateAbort() { 429 myAbortRequested = true; 430 } 431 432 /** 433 * This is the method which actually performs the search. 434 * It is called automatically by the thread pool. 435 */ 436 @Override 437 public Void call() { 438 StopWatch sw = new StopWatch(); 439 Span span = myParentTransaction.startSpan("db", "query", "search"); 440 span.setName("FHIR Database Search"); 441 try { 442 // Create an initial search in the DB and give it an ID 443 saveSearch(); 444 445 myTxService 446 .withRequest(myRequest) 447 .withRequestPartitionId(myRequestPartitionId) 448 .execute(this::doSearch); 449 450 mySearchRuntimeDetails.setSearchStatus(mySearch.getStatus()); 451 if (mySearch.getStatus() == SearchStatusEnum.FINISHED) { 452 HookParams params = new HookParams() 453 .add(RequestDetails.class, myRequest) 454 .addIfMatchesType(ServletRequestDetails.class, myRequest) 455 .add(SearchRuntimeDetails.class, mySearchRuntimeDetails); 456 myCompositeBroadcaster.callHooks(Pointcut.JPA_PERFTRACE_SEARCH_COMPLETE, params); 457 } else { 458 HookParams params = new HookParams() 459 .add(RequestDetails.class, myRequest) 460 .addIfMatchesType(ServletRequestDetails.class, myRequest) 461 .add(SearchRuntimeDetails.class, mySearchRuntimeDetails); 462 myCompositeBroadcaster.callHooks(Pointcut.JPA_PERFTRACE_SEARCH_PASS_COMPLETE, params); 463 } 464 465 ourLog.trace( 466 "Have completed search for [{}{}] and found {} resources in {}ms - Status is {}", 467 mySearch.getResourceType(), 468 mySearch.getSearchQueryString(), 469 mySyncedPids.size(), 470 sw.getMillis(), 471 mySearch.getStatus()); 472 473 } catch (Throwable t) { 474 475 /* 476 * Don't print a stack trace for client errors (i.e. requests that 477 * aren't valid because the client screwed up).. that's just noise 478 * in the logs and who needs that. 479 */ 480 boolean logged = false; 481 if (t instanceof BaseServerResponseException) { 482 BaseServerResponseException exception = (BaseServerResponseException) t; 483 if (exception.getStatusCode() >= 400 && exception.getStatusCode() < 500) { 484 logged = true; 485 ourLog.warn("Failed during search due to invalid request: {}", t.toString()); 486 } 487 } 488 489 if (!logged) { 490 ourLog.error("Failed during search loading after {}ms", sw.getMillis(), t); 491 } 492 myUnsyncedPids.clear(); 493 Throwable rootCause = ExceptionUtils.getRootCause(t); 494 rootCause = defaultIfNull(rootCause, t); 495 496 String failureMessage = rootCause.getMessage(); 497 498 int failureCode = InternalErrorException.STATUS_CODE; 499 if (t instanceof BaseServerResponseException) { 500 failureCode = ((BaseServerResponseException) t).getStatusCode(); 501 } 502 503 if (HapiSystemProperties.isUnitTestCaptureStackEnabled()) { 504 failureMessage += "\nStack\n" + ExceptionUtils.getStackTrace(rootCause); 505 } 506 507 mySearch.setFailureMessage(failureMessage); 508 mySearch.setFailureCode(failureCode); 509 mySearch.setStatus(SearchStatusEnum.FAILED); 510 511 mySearchRuntimeDetails.setSearchStatus(mySearch.getStatus()); 512 HookParams params = new HookParams() 513 .add(RequestDetails.class, myRequest) 514 .addIfMatchesType(ServletRequestDetails.class, myRequest) 515 .add(SearchRuntimeDetails.class, mySearchRuntimeDetails); 516 myCompositeBroadcaster.callHooks(Pointcut.JPA_PERFTRACE_SEARCH_FAILED, params); 517 518 saveSearch(); 519 span.captureException(t); 520 } finally { 521 myOnRemove.accept(mySearch.getUuid()); 522 523 myInitialCollectionLatch.countDown(); 524 markComplete(); 525 span.end(); 526 } 527 return null; 528 } 529 530 private void doSaveSearch() { 531 Search newSearch = mySearchCacheSvc.save(mySearch, myRequestPartitionId); 532 533 // mySearchDao.save is not supposed to return null, but in unit tests 534 // it can if the mock search dao isn't set up to handle that 535 if (newSearch != null) { 536 mySearch = newSearch; 537 } 538 } 539 540 /** 541 * This method actually creates the database query to perform the 542 * search, and starts it. 543 */ 544 @SuppressWarnings({"rawtypes", "unchecked"}) 545 private void doSearch() { 546 /* 547 * If the user has explicitly requested a _count, perform a 548 * 549 * SELECT COUNT(*) .... 550 * 551 * before doing anything else. 552 */ 553 boolean myParamWantOnlyCount = isWantOnlyCount(myParams); 554 boolean myParamOrDefaultWantCount = nonNull(myParams.getSearchTotalMode()) 555 ? isWantCount(myParams) 556 : SearchParameterMapCalculator.isWantCount(myStorageSettings.getDefaultTotalMode()); 557 558 if (myParamWantOnlyCount || myParamOrDefaultWantCount) { 559 doCountOnlyQuery(myParamWantOnlyCount); 560 if (myParamWantOnlyCount) { 561 return; 562 } 563 } 564 565 ourLog.trace("Done count"); 566 ISearchBuilder sb = newSearchBuilder(); 567 568 /* 569 * Figure out how many results we're actually going to fetch from the 570 * database in this pass. This calculation takes into consideration the 571 * "pre-fetch thresholds" specified in StorageSettings#getSearchPreFetchThresholds() 572 * as well as the value of the _count parameter. 573 */ 574 int currentlyLoaded = defaultIfNull(mySearch.getNumFound(), 0); 575 int minWanted = 0; 576 577 // if no count is provided, 578 // we only use the values in SearchPreFetchThresholds 579 // but if there is a count... 580 if (myParams.getCount() != null) { 581 minWanted = Math.min(myParams.getCount(), myPagingProvider.getMaximumPageSize()); 582 minWanted += currentlyLoaded; 583 } 584 585 // iterate through the search thresholds 586 for (Iterator<Integer> iter = 587 myStorageSettings.getSearchPreFetchThresholds().iterator(); 588 iter.hasNext(); ) { 589 int next = iter.next(); 590 if (next != -1 && next <= currentlyLoaded) { 591 continue; 592 } 593 594 if (next == -1) { 595 sb.setMaxResultsToFetch(null); 596 } else { 597 // we want at least 1 more than our requested amount 598 // so we know that there are other results 599 // (in case we get the exact amount back) 600 myMaxResultsToFetch = Math.max(next, minWanted); 601 sb.setMaxResultsToFetch(myMaxResultsToFetch + 1); 602 } 603 604 if (iter.hasNext()) { 605 myAdditionalPrefetchThresholdsRemaining = true; 606 } 607 608 // If we get here's we've found an appropriate threshold 609 break; 610 } 611 612 /* 613 * Provide any PID we loaded in previous search passes to the 614 * SearchBuilder so that we don't get duplicates coming from running 615 * the same query again. 616 * 617 * We could possibly accomplish this in a different way by using sorted 618 * results in our SQL query and specifying an offset. I don't actually 619 * know if that would be faster or not. At some point should test this 620 * idea. 621 */ 622 if (myPreviouslyAddedResourcePids != null) { 623 sb.setPreviouslyAddedResourcePids(myPreviouslyAddedResourcePids); 624 mySyncedPids.addAll(myPreviouslyAddedResourcePids); 625 } 626 627 /* 628 * createQuery 629 * Construct the SQL query we'll be sending to the database 630 * 631 * NB: (See createCountQuery above) 632 * We will pass the original myParams here (not a copy) 633 * because we actually _want_ the mutation of the myParams to happen. 634 * Specifically because SearchBuilder itself will _expect_ 635 * not to have these parameters when dumping back 636 * to our DB. 637 * 638 * This is an odd implementation behaviour, but the change 639 * for this will require a lot more handling at higher levels 640 */ 641 try (IResultIterator<JpaPid> resultIterator = 642 sb.createQuery(myParams, mySearchRuntimeDetails, myRequest, myRequestPartitionId)) { 643 // resultIterator is SearchBuilder.QueryIterator 644 assert (resultIterator != null); 645 646 /* 647 * The following loop actually loads the PIDs of the resources 648 * matching the search off of the disk and into memory. After 649 * every X results, we commit to the HFJ_SEARCH table. 650 */ 651 int syncSize = mySyncSize; 652 while (resultIterator.hasNext()) { 653 myUnsyncedPids.add(resultIterator.next()); 654 655 boolean shouldSync = myUnsyncedPids.size() >= syncSize; 656 657 if (myStorageSettings.getCountSearchResultsUpTo() != null 658 && myStorageSettings.getCountSearchResultsUpTo() > 0 659 && myStorageSettings.getCountSearchResultsUpTo() < myUnsyncedPids.size()) { 660 shouldSync = false; 661 } 662 663 if (myUnsyncedPids.size() > 50000) { 664 shouldSync = true; 665 } 666 667 // If no abort was requested, bail out 668 Validate.isTrue(isNotAborted(), "Abort has been requested"); 669 670 if (shouldSync) { 671 saveUnsynced(resultIterator); 672 } 673 674 if (myLoadingThrottleForUnitTests != null) { 675 AsyncUtil.sleep(myLoadingThrottleForUnitTests); 676 } 677 } 678 679 // If no abort was requested, bail out 680 Validate.isTrue(isNotAborted(), "Abort has been requested"); 681 682 saveUnsynced(resultIterator); 683 684 } catch (IOException e) { 685 ourLog.error("IO failure during database access", e); 686 throw new InternalErrorException(Msg.code(1166) + e); 687 } 688 } 689 690 /** 691 * Does the query but only for the count. 692 * @param theParamWantOnlyCount - if count query is wanted only 693 */ 694 private void doCountOnlyQuery(boolean theParamWantOnlyCount) { 695 ourLog.trace("Performing count"); 696 @SuppressWarnings("rawtypes") 697 ISearchBuilder sb = newSearchBuilder(); 698 699 /* 700 * createCountQuery 701 * NB: (see createQuery below) 702 * Because FulltextSearchSvcImpl will (internally) 703 * mutate the myParams (searchmap), 704 * (specifically removing the _content and _text filters) 705 * we will have to clone those parameters here so that 706 * the "correct" params are used in createQuery below 707 */ 708 Long count = sb.createCountQuery(myParams.clone(), mySearch.getUuid(), myRequest, myRequestPartitionId); 709 710 ourLog.trace("Got count {}", count); 711 712 myTxService 713 .withRequest(myRequest) 714 .withRequestPartitionId(myRequestPartitionId) 715 .execute(() -> { 716 mySearch.setTotalCount(count.intValue()); 717 if (theParamWantOnlyCount) { 718 mySearch.setStatus(SearchStatusEnum.FINISHED); 719 } 720 doSaveSearch(); 721 }); 722 } 723}