001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.search.builder.tasks; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.i18n.Msg; 024import ca.uhn.fhir.interceptor.api.HookParams; 025import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; 026import ca.uhn.fhir.interceptor.api.Pointcut; 027import ca.uhn.fhir.interceptor.model.RequestPartitionId; 028import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; 029import ca.uhn.fhir.jpa.api.dao.IDao; 030import ca.uhn.fhir.jpa.dao.IResultIterator; 031import ca.uhn.fhir.jpa.dao.ISearchBuilder; 032import ca.uhn.fhir.jpa.dao.SearchBuilderFactory; 033import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; 034import ca.uhn.fhir.jpa.entity.Search; 035import ca.uhn.fhir.jpa.interceptor.JpaPreResourceAccessDetails; 036import ca.uhn.fhir.jpa.model.dao.JpaPid; 037import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails; 038import ca.uhn.fhir.jpa.model.search.SearchStatusEnum; 039import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc; 040import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc; 041import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 042import ca.uhn.fhir.jpa.util.QueryParameterUtils; 043import ca.uhn.fhir.jpa.util.SearchParameterMapCalculator; 044import ca.uhn.fhir.rest.api.server.IPreResourceAccessDetails; 045import ca.uhn.fhir.rest.api.server.RequestDetails; 046import ca.uhn.fhir.rest.server.IPagingProvider; 047import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException; 048import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 049import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails; 050import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster; 051import ca.uhn.fhir.system.HapiSystemProperties; 052import ca.uhn.fhir.util.AsyncUtil; 053import ca.uhn.fhir.util.StopWatch; 054import co.elastic.apm.api.ElasticApm; 055import co.elastic.apm.api.Span; 056import co.elastic.apm.api.Transaction; 057import jakarta.annotation.Nonnull; 058import org.apache.commons.lang3.Validate; 059import org.apache.commons.lang3.exception.ExceptionUtils; 060import org.hl7.fhir.instance.model.api.IBaseResource; 061import org.springframework.transaction.annotation.Propagation; 062 063import java.io.IOException; 064import java.util.ArrayList; 065import java.util.Iterator; 066import java.util.List; 067import java.util.concurrent.Callable; 068import java.util.concurrent.CountDownLatch; 069import java.util.concurrent.TimeUnit; 070import java.util.function.Consumer; 071 072import static ca.uhn.fhir.jpa.util.SearchParameterMapCalculator.isWantCount; 073import static ca.uhn.fhir.jpa.util.SearchParameterMapCalculator.isWantOnlyCount; 074import static java.util.Objects.nonNull; 075import static org.apache.commons.lang3.ObjectUtils.defaultIfNull; 076 077/** 078 * A search task is a Callable task that runs in 079 * a thread pool to handle an individual search. One instance 080 * is created for any requested search and runs from the 081 * beginning to the end of the search. 082 * <p> 083 * Understand: 084 * This class executes in its own thread separate from the 085 * web server client thread that made the request. We do that 086 * so that we can return to the client as soon as possible, 087 * but keep the search going in the background (and have 088 * the next page of results ready to go when the client asks). 089 */ 090public class SearchTask implements Callable<Void> { 091 092 private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchTask.class); 093 // injected beans 094 protected final HapiTransactionService myTxService; 095 protected final FhirContext myContext; 096 protected final ISearchResultCacheSvc mySearchResultCacheSvc; 097 private final SearchParameterMap myParams; 098 private final IDao myCallingDao; 099 private final String myResourceType; 100 private final ArrayList<JpaPid> mySyncedPids = new ArrayList<>(); 101 private final CountDownLatch myInitialCollectionLatch = new CountDownLatch(1); 102 private final CountDownLatch myCompletionLatch; 103 private final ArrayList<JpaPid> myUnsyncedPids = new ArrayList<>(); 104 private final RequestDetails myRequest; 105 private final RequestPartitionId myRequestPartitionId; 106 private final SearchRuntimeDetails mySearchRuntimeDetails; 107 private final Transaction myParentTransaction; 108 private final Consumer<String> myOnRemove; 109 private final int mySyncSize; 110 private final Integer myLoadingThrottleForUnitTests; 111 private final IInterceptorBroadcaster myInterceptorBroadcaster; 112 private final SearchBuilderFactory<JpaPid> mySearchBuilderFactory; 113 private final JpaStorageSettings myStorageSettings; 114 private final ISearchCacheSvc mySearchCacheSvc; 115 private final IPagingProvider myPagingProvider; 116 private Search mySearch; 117 private boolean myAbortRequested; 118 private int myCountSavedTotal = 0; 119 private int myCountSavedThisPass = 0; 120 private int myCountBlockedThisPass = 0; 121 private boolean myAdditionalPrefetchThresholdsRemaining; 122 private List<JpaPid> myPreviouslyAddedResourcePids; 123 private Integer myMaxResultsToFetch; 124 125 /** 126 * Constructor 127 */ 128 @SuppressWarnings({"unchecked", "rawtypes"}) 129 public SearchTask( 130 SearchTaskParameters theCreationParams, 131 HapiTransactionService theManagedTxManager, 132 FhirContext theContext, 133 IInterceptorBroadcaster theInterceptorBroadcaster, 134 SearchBuilderFactory theSearchBuilderFactory, 135 ISearchResultCacheSvc theSearchResultCacheSvc, 136 JpaStorageSettings theStorageSettings, 137 ISearchCacheSvc theSearchCacheSvc, 138 IPagingProvider thePagingProvider) { 139 // beans 140 myTxService = theManagedTxManager; 141 myContext = theContext; 142 myInterceptorBroadcaster = theInterceptorBroadcaster; 143 mySearchBuilderFactory = theSearchBuilderFactory; 144 mySearchResultCacheSvc = theSearchResultCacheSvc; 145 myStorageSettings = theStorageSettings; 146 mySearchCacheSvc = theSearchCacheSvc; 147 myPagingProvider = thePagingProvider; 148 149 // values 150 myOnRemove = theCreationParams.OnRemove; 151 mySearch = theCreationParams.Search; 152 myCallingDao = theCreationParams.CallingDao; 153 myParams = theCreationParams.Params; 154 myResourceType = theCreationParams.ResourceType; 155 myRequest = theCreationParams.Request; 156 myCompletionLatch = new CountDownLatch(1); 157 mySyncSize = theCreationParams.SyncSize; 158 myLoadingThrottleForUnitTests = theCreationParams.getLoadingThrottleForUnitTests(); 159 160 mySearchRuntimeDetails = new SearchRuntimeDetails(myRequest, mySearch.getUuid()); 161 mySearchRuntimeDetails.setQueryString(myParams.toNormalizedQueryString(myCallingDao.getContext())); 162 myRequestPartitionId = theCreationParams.RequestPartitionId; 163 myParentTransaction = ElasticApm.currentTransaction(); 164 } 165 166 protected RequestPartitionId getRequestPartitionId() { 167 return myRequestPartitionId; 168 } 169 170 /** 171 * This method is called by the server HTTP thread, and 172 * will block until at least one page of results have been 173 * fetched from the DB, and will never block after that. 174 */ 175 public Integer awaitInitialSync() { 176 ourLog.trace("Awaiting initial sync"); 177 do { 178 ourLog.trace("Search {} aborted: {}", getSearch().getUuid(), !isNotAborted()); 179 if (AsyncUtil.awaitLatchAndThrowInternalErrorExceptionOnInterrupt( 180 getInitialCollectionLatch(), 250L, TimeUnit.MILLISECONDS)) { 181 break; 182 } 183 } while (getSearch().getStatus() == SearchStatusEnum.LOADING); 184 ourLog.trace("Initial sync completed"); 185 186 return getSearch().getTotalCount(); 187 } 188 189 public Search getSearch() { 190 return mySearch; 191 } 192 193 public CountDownLatch getInitialCollectionLatch() { 194 return myInitialCollectionLatch; 195 } 196 197 public void setPreviouslyAddedResourcePids(List<JpaPid> thePreviouslyAddedResourcePids) { 198 myPreviouslyAddedResourcePids = thePreviouslyAddedResourcePids; 199 myCountSavedTotal = myPreviouslyAddedResourcePids.size(); 200 } 201 202 @SuppressWarnings("rawtypes") 203 private ISearchBuilder newSearchBuilder() { 204 Class<? extends IBaseResource> resourceTypeClass = 205 myContext.getResourceDefinition(myResourceType).getImplementingClass(); 206 return mySearchBuilderFactory.newSearchBuilder(myCallingDao, myResourceType, resourceTypeClass); 207 } 208 209 @Nonnull 210 public List<JpaPid> getResourcePids(int theFromIndex, int theToIndex) { 211 ourLog.debug("Requesting search PIDs from {}-{}", theFromIndex, theToIndex); 212 213 boolean keepWaiting; 214 do { 215 synchronized (mySyncedPids) { 216 ourLog.trace("Search status is {}", mySearch.getStatus()); 217 boolean haveEnoughResults = mySyncedPids.size() >= theToIndex; 218 if (!haveEnoughResults) { 219 switch (mySearch.getStatus()) { 220 case LOADING: 221 keepWaiting = true; 222 break; 223 case PASSCMPLET: 224 /* 225 * If we get here, it means that the user requested resources that crossed the 226 * current pre-fetch boundary. For example, if the prefetch threshold is 50 and the 227 * user has requested resources 0-60, then they would get 0-50 back but the search 228 * coordinator would then stop searching.SearchCoordinatorSvcImplTest 229 */ 230 keepWaiting = false; 231 break; 232 case FAILED: 233 case FINISHED: 234 case GONE: 235 default: 236 keepWaiting = false; 237 break; 238 } 239 } else { 240 keepWaiting = false; 241 } 242 } 243 244 if (keepWaiting) { 245 ourLog.info( 246 "Waiting as we only have {} results - Search status: {}", 247 mySyncedPids.size(), 248 mySearch.getStatus()); 249 AsyncUtil.sleep(500L); 250 } 251 } while (keepWaiting); 252 253 ourLog.debug("Proceeding, as we have {} results", mySyncedPids.size()); 254 255 ArrayList<JpaPid> retVal = new ArrayList<>(); 256 synchronized (mySyncedPids) { 257 QueryParameterUtils.verifySearchHasntFailedOrThrowInternalErrorException(mySearch); 258 259 int toIndex = theToIndex; 260 if (mySyncedPids.size() < toIndex) { 261 toIndex = mySyncedPids.size(); 262 } 263 for (int i = theFromIndex; i < toIndex; i++) { 264 retVal.add(mySyncedPids.get(i)); 265 } 266 } 267 268 ourLog.trace( 269 "Done syncing results - Wanted {}-{} and returning {} of {}", 270 theFromIndex, 271 theToIndex, 272 retVal.size(), 273 mySyncedPids.size()); 274 275 return retVal; 276 } 277 278 public void saveSearch() { 279 myTxService 280 .withRequest(myRequest) 281 .withRequestPartitionId(myRequestPartitionId) 282 .withPropagation(Propagation.REQUIRES_NEW) 283 .execute(() -> doSaveSearch()); 284 } 285 286 @SuppressWarnings("rawtypes") 287 private void saveUnsynced(final IResultIterator theResultIter) { 288 myTxService 289 .withRequest(myRequest) 290 .withRequestPartitionId(myRequestPartitionId) 291 .execute(() -> { 292 if (mySearch.getId() == null) { 293 doSaveSearch(); 294 } 295 296 ArrayList<JpaPid> unsyncedPids = myUnsyncedPids; 297 int countBlocked = 0; 298 299 // Interceptor call: STORAGE_PREACCESS_RESOURCES 300 // This can be used to remove results from the search result details before 301 // the user has a chance to know that they were in the results 302 if (mySearchRuntimeDetails.getRequestDetails() != null && !unsyncedPids.isEmpty()) { 303 JpaPreResourceAccessDetails accessDetails = 304 new JpaPreResourceAccessDetails(unsyncedPids, this::newSearchBuilder); 305 HookParams params = new HookParams() 306 .add(IPreResourceAccessDetails.class, accessDetails) 307 .add(RequestDetails.class, mySearchRuntimeDetails.getRequestDetails()) 308 .addIfMatchesType( 309 ServletRequestDetails.class, mySearchRuntimeDetails.getRequestDetails()); 310 CompositeInterceptorBroadcaster.doCallHooks( 311 myInterceptorBroadcaster, myRequest, Pointcut.STORAGE_PREACCESS_RESOURCES, params); 312 313 for (int i = unsyncedPids.size() - 1; i >= 0; i--) { 314 if (accessDetails.isDontReturnResourceAtIndex(i)) { 315 unsyncedPids.remove(i); 316 myCountBlockedThisPass++; 317 myCountSavedTotal++; 318 countBlocked++; 319 } 320 } 321 } 322 323 // Actually store the results in the query cache storage 324 myCountSavedTotal += unsyncedPids.size(); 325 myCountSavedThisPass += unsyncedPids.size(); 326 mySearchResultCacheSvc.storeResults( 327 mySearch, mySyncedPids, unsyncedPids, myRequest, getRequestPartitionId()); 328 329 synchronized (mySyncedPids) { 330 int numSyncedThisPass = unsyncedPids.size(); 331 ourLog.trace( 332 "Syncing {} search results - Have more: {}", 333 numSyncedThisPass, 334 theResultIter.hasNext()); 335 mySyncedPids.addAll(unsyncedPids); 336 unsyncedPids.clear(); 337 338 if (!theResultIter.hasNext()) { 339 int skippedCount = theResultIter.getSkippedCount(); 340 ourLog.trace( 341 "MaxToFetch[{}] SkippedCount[{}] CountSavedThisPass[{}] CountSavedThisTotal[{}] AdditionalPrefetchRemaining[{}]", 342 myMaxResultsToFetch, 343 skippedCount, 344 myCountSavedThisPass, 345 myCountSavedTotal, 346 myAdditionalPrefetchThresholdsRemaining); 347 348 if (isFinished(theResultIter)) { 349 // finished 350 ourLog.trace("Setting search status to FINISHED"); 351 mySearch.setStatus(SearchStatusEnum.FINISHED); 352 mySearch.setTotalCount(myCountSavedTotal - countBlocked); 353 } else if (myAdditionalPrefetchThresholdsRemaining) { 354 // pass complete 355 ourLog.trace("Setting search status to PASSCMPLET"); 356 mySearch.setStatus(SearchStatusEnum.PASSCMPLET); 357 mySearch.setSearchParameterMap(myParams); 358 } else { 359 // also finished 360 ourLog.trace("Setting search status to FINISHED"); 361 mySearch.setStatus(SearchStatusEnum.FINISHED); 362 mySearch.setTotalCount(myCountSavedTotal - countBlocked); 363 } 364 } 365 } 366 367 mySearch.setNumFound(myCountSavedTotal); 368 mySearch.setNumBlocked(mySearch.getNumBlocked() + countBlocked); 369 370 int numSynced; 371 synchronized (mySyncedPids) { 372 numSynced = mySyncedPids.size(); 373 } 374 375 if (myStorageSettings.getCountSearchResultsUpTo() == null 376 || myStorageSettings.getCountSearchResultsUpTo() <= 0 377 || myStorageSettings.getCountSearchResultsUpTo() <= numSynced) { 378 myInitialCollectionLatch.countDown(); 379 } 380 381 doSaveSearch(); 382 383 ourLog.trace("saveUnsynced() - pre-commit"); 384 }); 385 ourLog.trace("saveUnsynced() - post-commit"); 386 } 387 388 @SuppressWarnings("rawtypes") 389 private boolean isFinished(final IResultIterator theResultIter) { 390 int skippedCount = theResultIter.getSkippedCount(); 391 int nonSkippedCount = theResultIter.getNonSkippedCount(); 392 int totalFetched = skippedCount + myCountSavedThisPass + myCountBlockedThisPass; 393 394 if (myMaxResultsToFetch != null && totalFetched < myMaxResultsToFetch) { 395 // total fetched < max results to fetch -> we've exhausted the search 396 return true; 397 } else { 398 if (nonSkippedCount == 0) { 399 // no skipped resources in this query 400 if (myParams.getCount() != null) { 401 // count supplied 402 // if the count is > what we've fetched -> we've exhausted the query 403 return myParams.getCount() > totalFetched; 404 } else { 405 // legacy - we have no skipped resources - we are done 406 return true; 407 } 408 } 409 // skipped resources means we have more to fetch 410 return false; 411 } 412 } 413 414 public boolean isNotAborted() { 415 return !myAbortRequested; 416 } 417 418 public void markComplete() { 419 myCompletionLatch.countDown(); 420 } 421 422 public CountDownLatch getCompletionLatch() { 423 return myCompletionLatch; 424 } 425 426 /** 427 * Request that the task abort as soon as possible 428 */ 429 public void requestImmediateAbort() { 430 myAbortRequested = true; 431 } 432 433 /** 434 * This is the method which actually performs the search. 435 * It is called automatically by the thread pool. 436 */ 437 @Override 438 public Void call() { 439 StopWatch sw = new StopWatch(); 440 Span span = myParentTransaction.startSpan("db", "query", "search"); 441 span.setName("FHIR Database Search"); 442 try { 443 // Create an initial search in the DB and give it an ID 444 saveSearch(); 445 446 myTxService 447 .withRequest(myRequest) 448 .withRequestPartitionId(myRequestPartitionId) 449 .execute(this::doSearch); 450 451 mySearchRuntimeDetails.setSearchStatus(mySearch.getStatus()); 452 if (mySearch.getStatus() == SearchStatusEnum.FINISHED) { 453 HookParams params = new HookParams() 454 .add(RequestDetails.class, myRequest) 455 .addIfMatchesType(ServletRequestDetails.class, myRequest) 456 .add(SearchRuntimeDetails.class, mySearchRuntimeDetails); 457 CompositeInterceptorBroadcaster.doCallHooks( 458 myInterceptorBroadcaster, myRequest, Pointcut.JPA_PERFTRACE_SEARCH_COMPLETE, params); 459 } else { 460 HookParams params = new HookParams() 461 .add(RequestDetails.class, myRequest) 462 .addIfMatchesType(ServletRequestDetails.class, myRequest) 463 .add(SearchRuntimeDetails.class, mySearchRuntimeDetails); 464 CompositeInterceptorBroadcaster.doCallHooks( 465 myInterceptorBroadcaster, myRequest, Pointcut.JPA_PERFTRACE_SEARCH_PASS_COMPLETE, params); 466 } 467 468 ourLog.trace( 469 "Have completed search for [{}{}] and found {} resources in {}ms - Status is {}", 470 mySearch.getResourceType(), 471 mySearch.getSearchQueryString(), 472 mySyncedPids.size(), 473 sw.getMillis(), 474 mySearch.getStatus()); 475 476 } catch (Throwable t) { 477 478 /* 479 * Don't print a stack trace for client errors (i.e. requests that 480 * aren't valid because the client screwed up).. that's just noise 481 * in the logs and who needs that. 482 */ 483 boolean logged = false; 484 if (t instanceof BaseServerResponseException) { 485 BaseServerResponseException exception = (BaseServerResponseException) t; 486 if (exception.getStatusCode() >= 400 && exception.getStatusCode() < 500) { 487 logged = true; 488 ourLog.warn("Failed during search due to invalid request: {}", t.toString()); 489 } 490 } 491 492 if (!logged) { 493 ourLog.error("Failed during search loading after {}ms", sw.getMillis(), t); 494 } 495 myUnsyncedPids.clear(); 496 Throwable rootCause = ExceptionUtils.getRootCause(t); 497 rootCause = defaultIfNull(rootCause, t); 498 499 String failureMessage = rootCause.getMessage(); 500 501 int failureCode = InternalErrorException.STATUS_CODE; 502 if (t instanceof BaseServerResponseException) { 503 failureCode = ((BaseServerResponseException) t).getStatusCode(); 504 } 505 506 if (HapiSystemProperties.isUnitTestCaptureStackEnabled()) { 507 failureMessage += "\nStack\n" + ExceptionUtils.getStackTrace(rootCause); 508 } 509 510 mySearch.setFailureMessage(failureMessage); 511 mySearch.setFailureCode(failureCode); 512 mySearch.setStatus(SearchStatusEnum.FAILED); 513 514 mySearchRuntimeDetails.setSearchStatus(mySearch.getStatus()); 515 HookParams params = new HookParams() 516 .add(RequestDetails.class, myRequest) 517 .addIfMatchesType(ServletRequestDetails.class, myRequest) 518 .add(SearchRuntimeDetails.class, mySearchRuntimeDetails); 519 CompositeInterceptorBroadcaster.doCallHooks( 520 myInterceptorBroadcaster, myRequest, Pointcut.JPA_PERFTRACE_SEARCH_FAILED, params); 521 522 saveSearch(); 523 span.captureException(t); 524 } finally { 525 myOnRemove.accept(mySearch.getUuid()); 526 527 myInitialCollectionLatch.countDown(); 528 markComplete(); 529 span.end(); 530 } 531 return null; 532 } 533 534 private void doSaveSearch() { 535 Search newSearch = mySearchCacheSvc.save(mySearch, myRequestPartitionId); 536 537 // mySearchDao.save is not supposed to return null, but in unit tests 538 // it can if the mock search dao isn't set up to handle that 539 if (newSearch != null) { 540 mySearch = newSearch; 541 } 542 } 543 544 /** 545 * This method actually creates the database query to perform the 546 * search, and starts it. 547 */ 548 @SuppressWarnings({"rawtypes", "unchecked"}) 549 private void doSearch() { 550 /* 551 * If the user has explicitly requested a _count, perform a 552 * 553 * SELECT COUNT(*) .... 554 * 555 * before doing anything else. 556 */ 557 boolean myParamWantOnlyCount = isWantOnlyCount(myParams); 558 boolean myParamOrDefaultWantCount = nonNull(myParams.getSearchTotalMode()) 559 ? isWantCount(myParams) 560 : SearchParameterMapCalculator.isWantCount(myStorageSettings.getDefaultTotalMode()); 561 562 if (myParamWantOnlyCount || myParamOrDefaultWantCount) { 563 doCountOnlyQuery(myParamWantOnlyCount); 564 if (myParamWantOnlyCount) { 565 return; 566 } 567 } 568 569 ourLog.trace("Done count"); 570 ISearchBuilder sb = newSearchBuilder(); 571 572 /* 573 * Figure out how many results we're actually going to fetch from the 574 * database in this pass. This calculation takes into consideration the 575 * "pre-fetch thresholds" specified in StorageSettings#getSearchPreFetchThresholds() 576 * as well as the value of the _count parameter. 577 */ 578 int currentlyLoaded = defaultIfNull(mySearch.getNumFound(), 0); 579 int minWanted = 0; 580 581 // if no count is provided, 582 // we only use the values in SearchPreFetchThresholds 583 // but if there is a count... 584 if (myParams.getCount() != null) { 585 minWanted = Math.min(myParams.getCount(), myPagingProvider.getMaximumPageSize()); 586 minWanted += currentlyLoaded; 587 } 588 589 // iterate through the search thresholds 590 for (Iterator<Integer> iter = 591 myStorageSettings.getSearchPreFetchThresholds().iterator(); 592 iter.hasNext(); ) { 593 int next = iter.next(); 594 if (next != -1 && next <= currentlyLoaded) { 595 continue; 596 } 597 598 if (next == -1) { 599 sb.setMaxResultsToFetch(null); 600 } else { 601 // we want at least 1 more than our requested amount 602 // so we know that there are other results 603 // (in case we get the exact amount back) 604 myMaxResultsToFetch = Math.max(next, minWanted); 605 sb.setMaxResultsToFetch(myMaxResultsToFetch + 1); 606 } 607 608 if (iter.hasNext()) { 609 myAdditionalPrefetchThresholdsRemaining = true; 610 } 611 612 // If we get here's we've found an appropriate threshold 613 break; 614 } 615 616 /* 617 * Provide any PID we loaded in previous search passes to the 618 * SearchBuilder so that we don't get duplicates coming from running 619 * the same query again. 620 * 621 * We could possibly accomplish this in a different way by using sorted 622 * results in our SQL query and specifying an offset. I don't actually 623 * know if that would be faster or not. At some point should test this 624 * idea. 625 */ 626 if (myPreviouslyAddedResourcePids != null) { 627 sb.setPreviouslyAddedResourcePids(myPreviouslyAddedResourcePids); 628 mySyncedPids.addAll(myPreviouslyAddedResourcePids); 629 } 630 631 /* 632 * createQuery 633 * Construct the SQL query we'll be sending to the database 634 * 635 * NB: (See createCountQuery above) 636 * We will pass the original myParams here (not a copy) 637 * because we actually _want_ the mutation of the myParams to happen. 638 * Specifically because SearchBuilder itself will _expect_ 639 * not to have these parameters when dumping back 640 * to our DB. 641 * 642 * This is an odd implementation behaviour, but the change 643 * for this will require a lot more handling at higher levels 644 */ 645 try (IResultIterator<JpaPid> resultIterator = 646 sb.createQuery(myParams, mySearchRuntimeDetails, myRequest, myRequestPartitionId)) { 647 // resultIterator is SearchBuilder.QueryIterator 648 assert (resultIterator != null); 649 650 /* 651 * The following loop actually loads the PIDs of the resources 652 * matching the search off of the disk and into memory. After 653 * every X results, we commit to the HFJ_SEARCH table. 654 */ 655 int syncSize = mySyncSize; 656 while (resultIterator.hasNext()) { 657 myUnsyncedPids.add(resultIterator.next()); 658 659 boolean shouldSync = myUnsyncedPids.size() >= syncSize; 660 661 if (myStorageSettings.getCountSearchResultsUpTo() != null 662 && myStorageSettings.getCountSearchResultsUpTo() > 0 663 && myStorageSettings.getCountSearchResultsUpTo() < myUnsyncedPids.size()) { 664 shouldSync = false; 665 } 666 667 if (myUnsyncedPids.size() > 50000) { 668 shouldSync = true; 669 } 670 671 // If no abort was requested, bail out 672 Validate.isTrue(isNotAborted(), "Abort has been requested"); 673 674 if (shouldSync) { 675 saveUnsynced(resultIterator); 676 } 677 678 if (myLoadingThrottleForUnitTests != null) { 679 AsyncUtil.sleep(myLoadingThrottleForUnitTests); 680 } 681 } 682 683 // If no abort was requested, bail out 684 Validate.isTrue(isNotAborted(), "Abort has been requested"); 685 686 saveUnsynced(resultIterator); 687 688 } catch (IOException e) { 689 ourLog.error("IO failure during database access", e); 690 throw new InternalErrorException(Msg.code(1166) + e); 691 } 692 } 693 694 /** 695 * Does the query but only for the count. 696 * @param theParamWantOnlyCount - if count query is wanted only 697 */ 698 private void doCountOnlyQuery(boolean theParamWantOnlyCount) { 699 ourLog.trace("Performing count"); 700 @SuppressWarnings("rawtypes") 701 ISearchBuilder sb = newSearchBuilder(); 702 703 /* 704 * createCountQuery 705 * NB: (see createQuery below) 706 * Because FulltextSearchSvcImpl will (internally) 707 * mutate the myParams (searchmap), 708 * (specifically removing the _content and _text filters) 709 * we will have to clone those parameters here so that 710 * the "correct" params are used in createQuery below 711 */ 712 Long count = sb.createCountQuery(myParams.clone(), mySearch.getUuid(), myRequest, myRequestPartitionId); 713 714 ourLog.trace("Got count {}", count); 715 716 myTxService 717 .withRequest(myRequest) 718 .withRequestPartitionId(myRequestPartitionId) 719 .execute(() -> { 720 mySearch.setTotalCount(count.intValue()); 721 if (theParamWantOnlyCount) { 722 mySearch.setStatus(SearchStatusEnum.FINISHED); 723 } 724 doSaveSearch(); 725 }); 726 } 727}