001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L%family 019 */ 020package ca.uhn.fhir.jpa.search; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.i18n.Msg; 024import ca.uhn.fhir.interceptor.api.HookParams; 025import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; 026import ca.uhn.fhir.interceptor.api.Pointcut; 027import ca.uhn.fhir.interceptor.model.RequestPartitionId; 028import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; 029import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 030import ca.uhn.fhir.jpa.api.dao.IDao; 031import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 032import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc; 033import ca.uhn.fhir.jpa.config.SearchConfig; 034import ca.uhn.fhir.jpa.dao.BaseStorageDao; 035import ca.uhn.fhir.jpa.dao.ISearchBuilder; 036import ca.uhn.fhir.jpa.dao.SearchBuilderFactory; 037import ca.uhn.fhir.jpa.dao.search.ResourceNotFoundInIndexException; 038import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; 039import ca.uhn.fhir.jpa.entity.Search; 040import ca.uhn.fhir.jpa.model.dao.JpaPid; 041import ca.uhn.fhir.jpa.model.search.SearchStatusEnum; 042import ca.uhn.fhir.jpa.search.builder.StorageInterceptorHooksFacade; 043import ca.uhn.fhir.jpa.search.builder.tasks.SearchContinuationTask; 044import ca.uhn.fhir.jpa.search.builder.tasks.SearchTask; 045import ca.uhn.fhir.jpa.search.builder.tasks.SearchTaskParameters; 046import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc; 047import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc; 048import ca.uhn.fhir.jpa.search.cache.SearchCacheStatusEnum; 049import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 050import ca.uhn.fhir.jpa.util.QueryParameterUtils; 051import ca.uhn.fhir.model.api.Include; 052import ca.uhn.fhir.rest.api.CacheControlDirective; 053import ca.uhn.fhir.rest.api.Constants; 054import ca.uhn.fhir.rest.api.RestSearchParameterTypeEnum; 055import ca.uhn.fhir.rest.api.SearchTotalModeEnum; 056import ca.uhn.fhir.rest.api.server.IBundleProvider; 057import ca.uhn.fhir.rest.api.server.RequestDetails; 058import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 059import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; 060import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails; 061import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster; 062import ca.uhn.fhir.rest.server.util.ISearchParamRegistry; 063import ca.uhn.fhir.util.AsyncUtil; 064import ca.uhn.fhir.util.StopWatch; 065import ca.uhn.fhir.util.UrlUtil; 066import com.google.common.annotations.VisibleForTesting; 067import jakarta.annotation.Nonnull; 068import jakarta.annotation.Nullable; 069import org.apache.commons.lang3.time.DateUtils; 070import org.hl7.fhir.instance.model.api.IBaseResource; 071import org.springframework.beans.factory.BeanFactory; 072import org.springframework.data.domain.PageRequest; 073import org.springframework.data.domain.Pageable; 074import org.springframework.data.domain.Sort; 075import org.springframework.stereotype.Component; 076import org.springframework.transaction.support.TransactionSynchronizationManager; 077 078import java.time.Instant; 079import java.time.temporal.ChronoUnit; 080import java.util.List; 081import java.util.Optional; 082import java.util.Set; 083import java.util.UUID; 084import java.util.concurrent.Callable; 085import java.util.concurrent.ConcurrentHashMap; 086import java.util.concurrent.TimeUnit; 087import java.util.function.Consumer; 088import java.util.stream.Collectors; 089 090import static ca.uhn.fhir.jpa.util.QueryParameterUtils.DEFAULT_SYNC_SIZE; 091import static org.apache.commons.lang3.StringUtils.isBlank; 092import static org.apache.commons.lang3.StringUtils.isNotBlank; 093 094@Component("mySearchCoordinatorSvc") 095public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc<JpaPid> { 096 097 private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchCoordinatorSvcImpl.class); 098 099 private final FhirContext myContext; 100 private final JpaStorageSettings myStorageSettings; 101 private final IInterceptorBroadcaster myInterceptorBroadcaster; 102 private final HapiTransactionService myTxService; 103 private final ISearchCacheSvc mySearchCacheSvc; 104 private final ISearchResultCacheSvc mySearchResultCacheSvc; 105 private final DaoRegistry myDaoRegistry; 106 private final SearchBuilderFactory<JpaPid> mySearchBuilderFactory; 107 private final ISynchronousSearchSvc mySynchronousSearchSvc; 108 private final PersistedJpaBundleProviderFactory myPersistedJpaBundleProviderFactory; 109 private final ISearchParamRegistry mySearchParamRegistry; 110 private final SearchStrategyFactory mySearchStrategyFactory; 111 private final ExceptionService myExceptionSvc; 112 private final BeanFactory myBeanFactory; 113 private ConcurrentHashMap<String, SearchTask> myIdToSearchTask = new ConcurrentHashMap<>(); 114 115 private final Consumer<String> myOnRemoveSearchTask = myIdToSearchTask::remove; 116 117 private final StorageInterceptorHooksFacade myStorageInterceptorHooks; 118 private Integer myLoadingThrottleForUnitTests = null; 119 private long myMaxMillisToWaitForRemoteResults = DateUtils.MILLIS_PER_MINUTE; 120 private boolean myNeverUseLocalSearchForUnitTests; 121 private int mySyncSize = DEFAULT_SYNC_SIZE; 122 123 /** 124 * Constructor 125 */ 126 public SearchCoordinatorSvcImpl( 127 FhirContext theContext, 128 JpaStorageSettings theStorageSettings, 129 IInterceptorBroadcaster theInterceptorBroadcaster, 130 HapiTransactionService theTxService, 131 ISearchCacheSvc theSearchCacheSvc, 132 ISearchResultCacheSvc theSearchResultCacheSvc, 133 DaoRegistry theDaoRegistry, 134 SearchBuilderFactory<JpaPid> theSearchBuilderFactory, 135 ISynchronousSearchSvc theSynchronousSearchSvc, 136 PersistedJpaBundleProviderFactory thePersistedJpaBundleProviderFactory, 137 ISearchParamRegistry theSearchParamRegistry, 138 SearchStrategyFactory theSearchStrategyFactory, 139 ExceptionService theExceptionSvc, 140 BeanFactory theBeanFactory) { 141 super(); 142 myContext = theContext; 143 myStorageSettings = theStorageSettings; 144 myInterceptorBroadcaster = theInterceptorBroadcaster; 145 myTxService = theTxService; 146 mySearchCacheSvc = theSearchCacheSvc; 147 mySearchResultCacheSvc = theSearchResultCacheSvc; 148 myDaoRegistry = theDaoRegistry; 149 mySearchBuilderFactory = theSearchBuilderFactory; 150 mySynchronousSearchSvc = theSynchronousSearchSvc; 151 myPersistedJpaBundleProviderFactory = thePersistedJpaBundleProviderFactory; 152 mySearchParamRegistry = theSearchParamRegistry; 153 mySearchStrategyFactory = theSearchStrategyFactory; 154 myExceptionSvc = theExceptionSvc; 155 myBeanFactory = theBeanFactory; 156 157 myStorageInterceptorHooks = new StorageInterceptorHooksFacade(myInterceptorBroadcaster); 158 } 159 160 @VisibleForTesting 161 Set<String> getActiveSearchIds() { 162 return myIdToSearchTask.keySet(); 163 } 164 165 @VisibleForTesting 166 public void setIdToSearchTaskMapForUnitTests(ConcurrentHashMap<String, SearchTask> theIdToSearchTaskMap) { 167 myIdToSearchTask = theIdToSearchTaskMap; 168 } 169 170 @VisibleForTesting 171 public void setLoadingThrottleForUnitTests(Integer theLoadingThrottleForUnitTests) { 172 myLoadingThrottleForUnitTests = theLoadingThrottleForUnitTests; 173 } 174 175 @VisibleForTesting 176 public void setNeverUseLocalSearchForUnitTests(boolean theNeverUseLocalSearchForUnitTests) { 177 myNeverUseLocalSearchForUnitTests = theNeverUseLocalSearchForUnitTests; 178 } 179 180 @VisibleForTesting 181 public void setSyncSizeForUnitTests(int theSyncSize) { 182 mySyncSize = theSyncSize; 183 } 184 185 @Override 186 public void cancelAllActiveSearches() { 187 for (SearchTask next : myIdToSearchTask.values()) { 188 ourLog.info( 189 "Requesting immediate abort of search: {}", next.getSearch().getUuid()); 190 next.requestImmediateAbort(); 191 AsyncUtil.awaitLatchAndIgnoreInterrupt(next.getCompletionLatch(), 30, TimeUnit.SECONDS); 192 } 193 } 194 195 @SuppressWarnings("SameParameterValue") 196 @VisibleForTesting 197 void setMaxMillisToWaitForRemoteResultsForUnitTest(long theMaxMillisToWaitForRemoteResults) { 198 myMaxMillisToWaitForRemoteResults = theMaxMillisToWaitForRemoteResults; 199 } 200 201 /** 202 * This method is called by the HTTP client processing thread in order to 203 * fetch resources. 204 * <p> 205 * This method must not be called from inside a transaction. The rationale is that 206 * the {@link Search} entity is treated as a piece of shared state across client threads 207 * accessing the same thread, so we need to be able to update that table in a transaction 208 * and commit it right away in order for that to work. Examples of needing to do this 209 * include if two different clients request the same search and are both paging at the 210 * same time, but also includes clients that are hacking the paging links to 211 * fetch multiple pages of a search result in parallel. In both cases we need to only 212 * let one of them actually activate the search, or we will have conficts. The other thread 213 * just needs to wait until the first one actually fetches more results. 214 */ 215 @Override 216 public List<JpaPid> getResources( 217 final String theUuid, 218 int theFrom, 219 int theTo, 220 @Nullable RequestDetails theRequestDetails, 221 RequestPartitionId theRequestPartitionId) { 222 assert !TransactionSynchronizationManager.isActualTransactionActive(); 223 224 // If we're actively searching right now, don't try to do anything until at least one batch has been 225 // persisted in the DB 226 SearchTask searchTask = myIdToSearchTask.get(theUuid); 227 if (searchTask != null) { 228 searchTask.awaitInitialSync(); 229 } 230 231 ourLog.trace("About to start looking for resources {}-{}", theFrom, theTo); 232 233 Search search; 234 StopWatch sw = new StopWatch(); 235 while (true) { 236 237 if (myNeverUseLocalSearchForUnitTests == false) { 238 if (searchTask != null) { 239 ourLog.trace("Local search found"); 240 List<JpaPid> resourcePids = searchTask.getResourcePids(theFrom, theTo); 241 ourLog.trace( 242 "Local search returned {} pids, wanted {}-{} - Search: {}", 243 resourcePids.size(), 244 theFrom, 245 theTo, 246 searchTask.getSearch()); 247 248 /* 249 * Generally, if a search task is open, the fastest possible thing is to just return its results. This 250 * will work most of the time, but can fail if the task hit a search threshold and the client is requesting 251 * results beyond that threashold. In that case, we'll keep going below, since that will trigger another 252 * task. 253 */ 254 if ((searchTask.getSearch().getNumFound() 255 - searchTask.getSearch().getNumBlocked()) 256 >= theTo 257 || resourcePids.size() == (theTo - theFrom)) { 258 return resourcePids; 259 } 260 } 261 } 262 263 Callable<Search> searchCallback = () -> mySearchCacheSvc 264 .fetchByUuid(theUuid, theRequestPartitionId) 265 .orElseThrow(() -> myExceptionSvc.newUnknownSearchException(theUuid)); 266 search = myTxService 267 .withRequest(theRequestDetails) 268 .withRequestPartitionId(theRequestPartitionId) 269 .execute(searchCallback); 270 QueryParameterUtils.verifySearchHasntFailedOrThrowInternalErrorException(search); 271 272 if (search.getStatus() == SearchStatusEnum.FINISHED) { 273 ourLog.trace("Search entity marked as finished with {} results", search.getNumFound()); 274 break; 275 } 276 if ((search.getNumFound() - search.getNumBlocked()) >= theTo) { 277 ourLog.trace("Search entity has {} results so far", search.getNumFound()); 278 break; 279 } 280 281 if (sw.getMillis() > myMaxMillisToWaitForRemoteResults) { 282 ourLog.error( 283 "Search {} of type {} for {}{} timed out after {}ms", 284 search.getId(), 285 search.getSearchType(), 286 search.getResourceType(), 287 search.getSearchQueryString(), 288 sw.getMillis()); 289 throw new InternalErrorException(Msg.code(1163) + "Request timed out after " + sw.getMillis() + "ms"); 290 } 291 292 // If the search was saved in "pass complete mode" it's probably time to 293 // start a new pass 294 if (search.getStatus() == SearchStatusEnum.PASSCMPLET) { 295 ourLog.trace("Going to try to start next search"); 296 Optional<Search> newSearch = 297 mySearchCacheSvc.tryToMarkSearchAsInProgress(search, theRequestPartitionId); 298 if (newSearch.isPresent()) { 299 ourLog.trace("Launching new search"); 300 search = newSearch.get(); 301 String resourceType = search.getResourceType(); 302 SearchParameterMap params = search.getSearchParameterMap() 303 .orElseThrow(() -> new IllegalStateException("No map in PASSCOMPLET search")); 304 IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(resourceType); 305 306 SearchTaskParameters parameters = new SearchTaskParameters( 307 search, 308 resourceDao, 309 params, 310 resourceType, 311 theRequestDetails, 312 theRequestPartitionId, 313 myOnRemoveSearchTask, 314 mySyncSize); 315 parameters.setLoadingThrottleForUnitTests(myLoadingThrottleForUnitTests); 316 SearchContinuationTask task = 317 (SearchContinuationTask) myBeanFactory.getBean(SearchConfig.CONTINUE_TASK, parameters); 318 myIdToSearchTask.put(search.getUuid(), task); 319 task.call(); 320 } 321 } 322 323 if (!search.getStatus().isDone()) { 324 AsyncUtil.sleep(500); 325 } 326 } 327 328 ourLog.trace("Finished looping"); 329 330 List<JpaPid> pids = fetchResultPids(theUuid, theFrom, theTo, theRequestDetails, search, theRequestPartitionId); 331 332 ourLog.trace("Fetched {} results", pids.size()); 333 334 return pids; 335 } 336 337 @Nonnull 338 private List<JpaPid> fetchResultPids( 339 String theUuid, 340 int theFrom, 341 int theTo, 342 @Nullable RequestDetails theRequestDetails, 343 Search theSearch, 344 RequestPartitionId theRequestPartitionId) { 345 List<JpaPid> pids = mySearchResultCacheSvc.fetchResultPids( 346 theSearch, theFrom, theTo, theRequestDetails, theRequestPartitionId); 347 if (pids == null) { 348 throw myExceptionSvc.newUnknownSearchException(theUuid); 349 } 350 return pids; 351 } 352 353 @Override 354 public IBundleProvider registerSearch( 355 final IFhirResourceDao<?> theCallingDao, 356 final SearchParameterMap theParams, 357 String theResourceType, 358 CacheControlDirective theCacheControlDirective, 359 RequestDetails theRequestDetails, 360 RequestPartitionId theRequestPartitionId) { 361 final String searchUuid = UUID.randomUUID().toString(); 362 363 final String queryString = theParams.toNormalizedQueryString(myContext); 364 ourLog.debug("Registering new search {}", searchUuid); 365 366 Search search = new Search(); 367 QueryParameterUtils.populateSearchEntity( 368 theParams, theResourceType, searchUuid, queryString, search, theRequestPartitionId); 369 370 myStorageInterceptorHooks.callStoragePresearchRegistered( 371 theRequestDetails, theParams, search, theRequestPartitionId); 372 373 validateSearch(theParams); 374 375 Class<? extends IBaseResource> resourceTypeClass = 376 myContext.getResourceDefinition(theResourceType).getImplementingClass(); 377 final ISearchBuilder<JpaPid> sb = 378 mySearchBuilderFactory.newSearchBuilder(theCallingDao, theResourceType, resourceTypeClass); 379 sb.setFetchSize(mySyncSize); 380 381 final Integer loadSynchronousUpTo = getLoadSynchronousUpToOrNull(theCacheControlDirective); 382 boolean isOffsetQuery = theParams.isOffsetQuery(); 383 384 // todo someday - not today. 385 // SearchStrategyFactory.ISearchStrategy searchStrategy = mySearchStrategyFactory.pickStrategy(theResourceType, 386 // theParams, theRequestDetails); 387 // return searchStrategy.get(); 388 389 if (theParams.isLoadSynchronous() || loadSynchronousUpTo != null || isOffsetQuery) { 390 if (mySearchStrategyFactory.isSupportsHSearchDirect(theResourceType, theParams, theRequestDetails)) { 391 ourLog.info("Search {} is using direct load strategy", searchUuid); 392 SearchStrategyFactory.ISearchStrategy direct = mySearchStrategyFactory.makeDirectStrategy( 393 searchUuid, theResourceType, theParams, theRequestDetails); 394 395 try { 396 return direct.get(); 397 398 } catch (ResourceNotFoundInIndexException theE) { 399 // some resources were not found in index, so we will inform this and resort to JPA search 400 ourLog.warn( 401 "Some resources were not found in index. Make sure all resources were indexed. Resorting to database search."); 402 } 403 } 404 405 ourLog.debug("Search {} is loading in synchronous mode", searchUuid); 406 return mySynchronousSearchSvc.executeQuery( 407 theParams, theRequestDetails, searchUuid, sb, loadSynchronousUpTo, theRequestPartitionId); 408 } 409 410 /* 411 * See if there are any cached searches whose results we can return 412 * instead 413 */ 414 SearchCacheStatusEnum cacheStatus = SearchCacheStatusEnum.MISS; 415 if (theCacheControlDirective != null && theCacheControlDirective.isNoCache()) { 416 cacheStatus = SearchCacheStatusEnum.NOT_TRIED; 417 } 418 419 if (cacheStatus != SearchCacheStatusEnum.NOT_TRIED) { 420 if (theParams.getEverythingMode() == null) { 421 if (myStorageSettings.getReuseCachedSearchResultsForMillis() != null) { 422 PersistedJpaBundleProvider foundSearchProvider = findCachedQuery( 423 theParams, theResourceType, theRequestDetails, queryString, theRequestPartitionId); 424 if (foundSearchProvider != null) { 425 foundSearchProvider.setCacheStatus(SearchCacheStatusEnum.HIT); 426 return foundSearchProvider; 427 } 428 } 429 } 430 } 431 432 PersistedJpaSearchFirstPageBundleProvider retVal = submitSearch( 433 theCallingDao, theParams, theResourceType, theRequestDetails, sb, theRequestPartitionId, search); 434 retVal.setCacheStatus(cacheStatus); 435 return retVal; 436 } 437 438 private void validateSearch(SearchParameterMap theParams) { 439 validateIncludes(theParams.getIncludes(), Constants.PARAM_INCLUDE); 440 validateIncludes(theParams.getRevIncludes(), Constants.PARAM_REVINCLUDE); 441 } 442 443 private void validateIncludes(Set<Include> includes, String name) { 444 for (Include next : includes) { 445 String value = next.getValue(); 446 if (value.equals(Constants.INCLUDE_STAR) || isBlank(value)) { 447 continue; 448 } 449 450 String paramType = next.getParamType(); 451 String paramName = next.getParamName(); 452 String paramTargetType = next.getParamTargetType(); 453 454 if (isBlank(paramType) || isBlank(paramName)) { 455 String msg = myContext 456 .getLocalizer() 457 .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidInclude", name, value, ""); 458 throw new InvalidRequestException(Msg.code(2018) + msg); 459 } 460 461 if (!myDaoRegistry.isResourceTypeSupported(paramType)) { 462 String resourceTypeMsg = myContext 463 .getLocalizer() 464 .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidResourceType", paramType); 465 String msg = myContext 466 .getLocalizer() 467 .getMessage( 468 SearchCoordinatorSvcImpl.class, 469 "invalidInclude", 470 UrlUtil.sanitizeUrlPart(name), 471 UrlUtil.sanitizeUrlPart(value), 472 resourceTypeMsg); // last param is pre-sanitized 473 throw new InvalidRequestException(Msg.code(2017) + msg); 474 } 475 476 if (isNotBlank(paramTargetType) && !myDaoRegistry.isResourceTypeSupported(paramTargetType)) { 477 String resourceTypeMsg = myContext 478 .getLocalizer() 479 .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidResourceType", paramTargetType); 480 String msg = myContext 481 .getLocalizer() 482 .getMessage( 483 SearchCoordinatorSvcImpl.class, 484 "invalidInclude", 485 UrlUtil.sanitizeUrlPart(name), 486 UrlUtil.sanitizeUrlPart(value), 487 resourceTypeMsg); // last param is pre-sanitized 488 throw new InvalidRequestException(Msg.code(2016) + msg); 489 } 490 491 if (!Constants.INCLUDE_STAR.equals(paramName) 492 && mySearchParamRegistry.getActiveSearchParam(paramType, paramName) == null) { 493 List<String> validNames = mySearchParamRegistry.getActiveSearchParams(paramType).values().stream() 494 .filter(t -> t.getParamType() == RestSearchParameterTypeEnum.REFERENCE) 495 .map(t -> UrlUtil.sanitizeUrlPart(t.getName())) 496 .sorted() 497 .collect(Collectors.toList()); 498 String searchParamMessage = myContext 499 .getLocalizer() 500 .getMessage( 501 BaseStorageDao.class, 502 "invalidSearchParameter", 503 UrlUtil.sanitizeUrlPart(paramName), 504 UrlUtil.sanitizeUrlPart(paramType), 505 validNames); 506 String msg = myContext 507 .getLocalizer() 508 .getMessage( 509 SearchCoordinatorSvcImpl.class, 510 "invalidInclude", 511 UrlUtil.sanitizeUrlPart(name), 512 UrlUtil.sanitizeUrlPart(value), 513 searchParamMessage); // last param is pre-sanitized 514 throw new InvalidRequestException(Msg.code(2015) + msg); 515 } 516 } 517 } 518 519 @Override 520 public Optional<Integer> getSearchTotal( 521 String theUuid, @Nullable RequestDetails theRequestDetails, RequestPartitionId theRequestPartitionId) { 522 SearchTask task = myIdToSearchTask.get(theUuid); 523 if (task != null) { 524 return Optional.ofNullable(task.awaitInitialSync()); 525 } 526 527 /* 528 * In case there is no running search, if the total is listed as accurate we know one is coming 529 * so let's wait a bit for it to show up 530 */ 531 Optional<Search> search = myTxService 532 .withRequest(theRequestDetails) 533 .execute(() -> mySearchCacheSvc.fetchByUuid(theUuid, theRequestPartitionId)); 534 if (search.isPresent()) { 535 Optional<SearchParameterMap> searchParameterMap = search.get().getSearchParameterMap(); 536 if (searchParameterMap.isPresent() 537 && searchParameterMap.get().getSearchTotalMode() == SearchTotalModeEnum.ACCURATE) { 538 for (int i = 0; i < 10; i++) { 539 if (search.isPresent()) { 540 QueryParameterUtils.verifySearchHasntFailedOrThrowInternalErrorException(search.get()); 541 if (search.get().getTotalCount() != null) { 542 return Optional.of(search.get().getTotalCount()); 543 } 544 } 545 search = mySearchCacheSvc.fetchByUuid(theUuid, theRequestPartitionId); 546 } 547 } 548 } 549 550 return Optional.empty(); 551 } 552 553 @Nonnull 554 private PersistedJpaSearchFirstPageBundleProvider submitSearch( 555 IDao theCallingDao, 556 SearchParameterMap theParams, 557 String theResourceType, 558 RequestDetails theRequestDetails, 559 ISearchBuilder<JpaPid> theSb, 560 RequestPartitionId theRequestPartitionId, 561 Search theSearch) { 562 StopWatch w = new StopWatch(); 563 564 SearchTaskParameters stp = new SearchTaskParameters( 565 theSearch, 566 theCallingDao, 567 theParams, 568 theResourceType, 569 theRequestDetails, 570 theRequestPartitionId, 571 myOnRemoveSearchTask, 572 mySyncSize); 573 stp.setLoadingThrottleForUnitTests(myLoadingThrottleForUnitTests); 574 SearchTask task = (SearchTask) myBeanFactory.getBean(SearchConfig.SEARCH_TASK, stp); 575 myIdToSearchTask.put(theSearch.getUuid(), task); 576 task.call(); 577 578 PersistedJpaSearchFirstPageBundleProvider retVal = myPersistedJpaBundleProviderFactory.newInstanceFirstPage( 579 theRequestDetails, task, theSb, theRequestPartitionId); 580 581 ourLog.debug("Search initial phase completed in {}ms", w.getMillis()); 582 return retVal; 583 } 584 585 @Nullable 586 private PersistedJpaBundleProvider findCachedQuery( 587 SearchParameterMap theParams, 588 String theResourceType, 589 RequestDetails theRequestDetails, 590 String theQueryString, 591 RequestPartitionId theRequestPartitionId) { 592 // May be null 593 return myTxService 594 .withRequest(theRequestDetails) 595 .withRequestPartitionId(theRequestPartitionId) 596 .execute(() -> { 597 598 // Interceptor call: STORAGE_PRECHECK_FOR_CACHED_SEARCH 599 HookParams params = new HookParams() 600 .add(SearchParameterMap.class, theParams) 601 .add(RequestDetails.class, theRequestDetails) 602 .addIfMatchesType(ServletRequestDetails.class, theRequestDetails); 603 Object outcome = CompositeInterceptorBroadcaster.doCallHooksAndReturnObject( 604 myInterceptorBroadcaster, 605 theRequestDetails, 606 Pointcut.STORAGE_PRECHECK_FOR_CACHED_SEARCH, 607 params); 608 if (Boolean.FALSE.equals(outcome)) { 609 return null; 610 } 611 612 // Check for a search matching the given hash 613 Search searchToUse = findSearchToUseOrNull(theQueryString, theResourceType, theRequestPartitionId); 614 if (searchToUse == null) { 615 return null; 616 } 617 618 ourLog.debug("Reusing search {} from cache", searchToUse.getUuid()); 619 // Interceptor call: JPA_PERFTRACE_SEARCH_REUSING_CACHED 620 params = new HookParams() 621 .add(SearchParameterMap.class, theParams) 622 .add(RequestDetails.class, theRequestDetails) 623 .addIfMatchesType(ServletRequestDetails.class, theRequestDetails); 624 CompositeInterceptorBroadcaster.doCallHooks( 625 myInterceptorBroadcaster, 626 theRequestDetails, 627 Pointcut.JPA_PERFTRACE_SEARCH_REUSING_CACHED, 628 params); 629 630 return myPersistedJpaBundleProviderFactory.newInstance(theRequestDetails, searchToUse.getUuid()); 631 }); 632 } 633 634 @Nullable 635 private Search findSearchToUseOrNull( 636 String theQueryString, String theResourceType, RequestPartitionId theRequestPartitionId) { 637 // createdCutoff is in recent past 638 final Instant createdCutoff = 639 Instant.now().minus(myStorageSettings.getReuseCachedSearchResultsForMillis(), ChronoUnit.MILLIS); 640 641 Optional<Search> candidate = mySearchCacheSvc.findCandidatesForReuse( 642 theResourceType, theQueryString, createdCutoff, theRequestPartitionId); 643 return candidate.orElse(null); 644 } 645 646 @Nullable 647 private Integer getLoadSynchronousUpToOrNull(CacheControlDirective theCacheControlDirective) { 648 final Integer loadSynchronousUpTo; 649 if (theCacheControlDirective != null && theCacheControlDirective.isNoStore()) { 650 if (theCacheControlDirective.getMaxResults() != null) { 651 loadSynchronousUpTo = theCacheControlDirective.getMaxResults(); 652 if (loadSynchronousUpTo > myStorageSettings.getCacheControlNoStoreMaxResultsUpperLimit()) { 653 throw new InvalidRequestException(Msg.code(1165) + Constants.HEADER_CACHE_CONTROL + " header " 654 + Constants.CACHE_CONTROL_MAX_RESULTS + " value must not exceed " 655 + myStorageSettings.getCacheControlNoStoreMaxResultsUpperLimit()); 656 } 657 } else { 658 loadSynchronousUpTo = 100; 659 } 660 } else { 661 loadSynchronousUpTo = null; 662 } 663 return loadSynchronousUpTo; 664 } 665 666 /** 667 * Creates a {@link Pageable} using a start and end index 668 */ 669 @SuppressWarnings("WeakerAccess") 670 @Nullable 671 public static Pageable toPage(final int theFromIndex, int theToIndex) { 672 int pageSize = theToIndex - theFromIndex; 673 if (pageSize < 1) { 674 return null; 675 } 676 677 int pageIndex = theFromIndex / pageSize; 678 679 return new PageRequest(pageIndex, pageSize, Sort.unsorted()) { 680 private static final long serialVersionUID = 1L; 681 682 @Override 683 public long getOffset() { 684 return theFromIndex; 685 } 686 }; 687 } 688}