001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L%family 019 */ 020package ca.uhn.fhir.jpa.search; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.i18n.Msg; 024import ca.uhn.fhir.interceptor.api.HookParams; 025import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; 026import ca.uhn.fhir.interceptor.api.Pointcut; 027import ca.uhn.fhir.interceptor.model.RequestPartitionId; 028import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; 029import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 030import ca.uhn.fhir.jpa.api.dao.IDao; 031import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 032import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc; 033import ca.uhn.fhir.jpa.config.SearchConfig; 034import ca.uhn.fhir.jpa.dao.BaseStorageDao; 035import ca.uhn.fhir.jpa.dao.ISearchBuilder; 036import ca.uhn.fhir.jpa.dao.SearchBuilderFactory; 037import ca.uhn.fhir.jpa.dao.search.ResourceNotFoundInIndexException; 038import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; 039import ca.uhn.fhir.jpa.entity.Search; 040import ca.uhn.fhir.jpa.model.dao.JpaPid; 041import ca.uhn.fhir.jpa.model.search.SearchStatusEnum; 042import ca.uhn.fhir.jpa.search.builder.StorageInterceptorHooksFacade; 043import ca.uhn.fhir.jpa.search.builder.tasks.SearchContinuationTask; 044import ca.uhn.fhir.jpa.search.builder.tasks.SearchTask; 045import ca.uhn.fhir.jpa.search.builder.tasks.SearchTaskParameters; 046import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc; 047import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc; 048import ca.uhn.fhir.jpa.search.cache.SearchCacheStatusEnum; 049import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 050import ca.uhn.fhir.jpa.util.QueryParameterUtils; 051import ca.uhn.fhir.model.api.Include; 052import ca.uhn.fhir.rest.api.CacheControlDirective; 053import ca.uhn.fhir.rest.api.Constants; 054import ca.uhn.fhir.rest.api.RestSearchParameterTypeEnum; 055import ca.uhn.fhir.rest.api.SearchTotalModeEnum; 056import ca.uhn.fhir.rest.api.server.IBundleProvider; 057import ca.uhn.fhir.rest.api.server.RequestDetails; 058import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 059import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; 060import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails; 061import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster; 062import ca.uhn.fhir.rest.server.util.ISearchParamRegistry; 063import ca.uhn.fhir.util.AsyncUtil; 064import ca.uhn.fhir.util.StopWatch; 065import ca.uhn.fhir.util.UrlUtil; 066import com.google.common.annotations.VisibleForTesting; 067import jakarta.annotation.Nonnull; 068import jakarta.annotation.Nullable; 069import org.apache.commons.lang3.time.DateUtils; 070import org.hl7.fhir.instance.model.api.IBaseResource; 071import org.springframework.beans.factory.BeanFactory; 072import org.springframework.data.domain.PageRequest; 073import org.springframework.data.domain.Pageable; 074import org.springframework.data.domain.Sort; 075import org.springframework.stereotype.Component; 076import org.springframework.transaction.support.TransactionSynchronizationManager; 077 078import java.time.Instant; 079import java.time.temporal.ChronoUnit; 080import java.util.List; 081import java.util.Optional; 082import java.util.Set; 083import java.util.UUID; 084import java.util.concurrent.Callable; 085import java.util.concurrent.ConcurrentHashMap; 086import java.util.concurrent.TimeUnit; 087import java.util.function.Consumer; 088import java.util.stream.Collectors; 089 090import static ca.uhn.fhir.jpa.util.QueryParameterUtils.DEFAULT_SYNC_SIZE; 091import static org.apache.commons.lang3.StringUtils.isBlank; 092import static org.apache.commons.lang3.StringUtils.isNotBlank; 093 094@Component("mySearchCoordinatorSvc") 095public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc<JpaPid> { 096 097 private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchCoordinatorSvcImpl.class); 098 099 private final FhirContext myContext; 100 private final JpaStorageSettings myStorageSettings; 101 private final IInterceptorBroadcaster myInterceptorBroadcaster; 102 private final HapiTransactionService myTxService; 103 private final ISearchCacheSvc mySearchCacheSvc; 104 private final ISearchResultCacheSvc mySearchResultCacheSvc; 105 private final DaoRegistry myDaoRegistry; 106 private final SearchBuilderFactory<JpaPid> mySearchBuilderFactory; 107 private final ISynchronousSearchSvc mySynchronousSearchSvc; 108 private final PersistedJpaBundleProviderFactory myPersistedJpaBundleProviderFactory; 109 private final ISearchParamRegistry mySearchParamRegistry; 110 private final SearchStrategyFactory mySearchStrategyFactory; 111 private final ExceptionService myExceptionSvc; 112 private final BeanFactory myBeanFactory; 113 private ConcurrentHashMap<String, SearchTask> myIdToSearchTask = new ConcurrentHashMap<>(); 114 115 private final Consumer<String> myOnRemoveSearchTask = myIdToSearchTask::remove; 116 117 private final StorageInterceptorHooksFacade myStorageInterceptorHooks; 118 private Integer myLoadingThrottleForUnitTests = null; 119 private long myMaxMillisToWaitForRemoteResults = DateUtils.MILLIS_PER_MINUTE; 120 private boolean myNeverUseLocalSearchForUnitTests; 121 private int mySyncSize = DEFAULT_SYNC_SIZE; 122 123 /** 124 * Constructor 125 */ 126 public SearchCoordinatorSvcImpl( 127 FhirContext theContext, 128 JpaStorageSettings theStorageSettings, 129 IInterceptorBroadcaster theInterceptorBroadcaster, 130 HapiTransactionService theTxService, 131 ISearchCacheSvc theSearchCacheSvc, 132 ISearchResultCacheSvc theSearchResultCacheSvc, 133 DaoRegistry theDaoRegistry, 134 SearchBuilderFactory<JpaPid> theSearchBuilderFactory, 135 ISynchronousSearchSvc theSynchronousSearchSvc, 136 PersistedJpaBundleProviderFactory thePersistedJpaBundleProviderFactory, 137 ISearchParamRegistry theSearchParamRegistry, 138 SearchStrategyFactory theSearchStrategyFactory, 139 ExceptionService theExceptionSvc, 140 BeanFactory theBeanFactory) { 141 super(); 142 myContext = theContext; 143 myStorageSettings = theStorageSettings; 144 myInterceptorBroadcaster = theInterceptorBroadcaster; 145 myTxService = theTxService; 146 mySearchCacheSvc = theSearchCacheSvc; 147 mySearchResultCacheSvc = theSearchResultCacheSvc; 148 myDaoRegistry = theDaoRegistry; 149 mySearchBuilderFactory = theSearchBuilderFactory; 150 mySynchronousSearchSvc = theSynchronousSearchSvc; 151 myPersistedJpaBundleProviderFactory = thePersistedJpaBundleProviderFactory; 152 mySearchParamRegistry = theSearchParamRegistry; 153 mySearchStrategyFactory = theSearchStrategyFactory; 154 myExceptionSvc = theExceptionSvc; 155 myBeanFactory = theBeanFactory; 156 157 myStorageInterceptorHooks = new StorageInterceptorHooksFacade(myInterceptorBroadcaster); 158 } 159 160 @VisibleForTesting 161 Set<String> getActiveSearchIds() { 162 return myIdToSearchTask.keySet(); 163 } 164 165 @VisibleForTesting 166 public void setIdToSearchTaskMapForUnitTests(ConcurrentHashMap<String, SearchTask> theIdToSearchTaskMap) { 167 myIdToSearchTask = theIdToSearchTaskMap; 168 } 169 170 @VisibleForTesting 171 public void setLoadingThrottleForUnitTests(Integer theLoadingThrottleForUnitTests) { 172 myLoadingThrottleForUnitTests = theLoadingThrottleForUnitTests; 173 } 174 175 @VisibleForTesting 176 public void setNeverUseLocalSearchForUnitTests(boolean theNeverUseLocalSearchForUnitTests) { 177 myNeverUseLocalSearchForUnitTests = theNeverUseLocalSearchForUnitTests; 178 } 179 180 @VisibleForTesting 181 public void setSyncSizeForUnitTests(int theSyncSize) { 182 mySyncSize = theSyncSize; 183 } 184 185 @Override 186 public void cancelAllActiveSearches() { 187 for (SearchTask next : myIdToSearchTask.values()) { 188 ourLog.info( 189 "Requesting immediate abort of search: {}", next.getSearch().getUuid()); 190 next.requestImmediateAbort(); 191 AsyncUtil.awaitLatchAndIgnoreInterrupt(next.getCompletionLatch(), 30, TimeUnit.SECONDS); 192 } 193 } 194 195 @SuppressWarnings("SameParameterValue") 196 @VisibleForTesting 197 void setMaxMillisToWaitForRemoteResultsForUnitTest(long theMaxMillisToWaitForRemoteResults) { 198 myMaxMillisToWaitForRemoteResults = theMaxMillisToWaitForRemoteResults; 199 } 200 201 /** 202 * This method is called by the HTTP client processing thread in order to 203 * fetch resources. 204 * <p> 205 * This method must not be called from inside a transaction. The rationale is that 206 * the {@link Search} entity is treated as a piece of shared state across client threads 207 * accessing the same thread, so we need to be able to update that table in a transaction 208 * and commit it right away in order for that to work. Examples of needing to do this 209 * include if two different clients request the same search and are both paging at the 210 * same time, but also includes clients that are hacking the paging links to 211 * fetch multiple pages of a search result in parallel. In both cases we need to only 212 * let one of them actually activate the search, or we will have conficts. The other thread 213 * just needs to wait until the first one actually fetches more results. 214 */ 215 @Override 216 public List<JpaPid> getResources( 217 final String theUuid, 218 int theFrom, 219 int theTo, 220 @Nullable RequestDetails theRequestDetails, 221 RequestPartitionId theRequestPartitionId) { 222 assert !TransactionSynchronizationManager.isActualTransactionActive(); 223 224 // If we're actively searching right now, don't try to do anything until at least one batch has been 225 // persisted in the DB 226 SearchTask searchTask = myIdToSearchTask.get(theUuid); 227 if (searchTask != null) { 228 searchTask.awaitInitialSync(); 229 } 230 231 ourLog.trace("About to start looking for resources {}-{}", theFrom, theTo); 232 233 Search search; 234 StopWatch sw = new StopWatch(); 235 while (true) { 236 237 if (myNeverUseLocalSearchForUnitTests == false) { 238 if (searchTask != null) { 239 ourLog.trace("Local search found"); 240 List<JpaPid> resourcePids = searchTask.getResourcePids(theFrom, theTo); 241 ourLog.trace( 242 "Local search returned {} pids, wanted {}-{} - Search: {}", 243 resourcePids.size(), 244 theFrom, 245 theTo, 246 searchTask.getSearch()); 247 248 /* 249 * Generally, if a search task is open, the fastest possible thing is to just return its results. This 250 * will work most of the time, but can fail if the task hit a search threshold and the client is requesting 251 * results beyond that threashold. In that case, we'll keep going below, since that will trigger another 252 * task. 253 */ 254 if ((searchTask.getSearch().getNumFound() 255 - searchTask.getSearch().getNumBlocked()) 256 >= theTo 257 || resourcePids.size() == (theTo - theFrom)) { 258 return resourcePids; 259 } 260 } 261 } 262 263 Callable<Search> searchCallback = () -> mySearchCacheSvc 264 .fetchByUuid(theUuid, theRequestPartitionId) 265 .orElseThrow(() -> myExceptionSvc.newUnknownSearchException(theUuid)); 266 search = myTxService 267 .withRequest(theRequestDetails) 268 .withRequestPartitionId(theRequestPartitionId) 269 .execute(searchCallback); 270 QueryParameterUtils.verifySearchHasntFailedOrThrowInternalErrorException(search); 271 272 if (search.getStatus() == SearchStatusEnum.FINISHED) { 273 ourLog.trace("Search entity marked as finished with {} results", search.getNumFound()); 274 break; 275 } 276 if ((search.getNumFound() - search.getNumBlocked()) >= theTo) { 277 ourLog.trace("Search entity has {} results so far", search.getNumFound()); 278 break; 279 } 280 281 if (sw.getMillis() > myMaxMillisToWaitForRemoteResults) { 282 ourLog.error( 283 "Search {} of type {} for {}{} timed out after {}ms", 284 search.getId(), 285 search.getSearchType(), 286 search.getResourceType(), 287 search.getSearchQueryString(), 288 sw.getMillis()); 289 throw new InternalErrorException(Msg.code(1163) + "Request timed out after " + sw.getMillis() + "ms"); 290 } 291 292 // If the search was saved in "pass complete mode" it's probably time to 293 // start a new pass 294 if (search.getStatus() == SearchStatusEnum.PASSCMPLET) { 295 ourLog.trace("Going to try to start next search"); 296 Optional<Search> newSearch = 297 mySearchCacheSvc.tryToMarkSearchAsInProgress(search, theRequestPartitionId); 298 if (newSearch.isPresent()) { 299 ourLog.trace("Launching new search"); 300 search = newSearch.get(); 301 String resourceType = search.getResourceType(); 302 SearchParameterMap params = search.getSearchParameterMap() 303 .orElseThrow(() -> new IllegalStateException("No map in PASSCOMPLET search")); 304 IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(resourceType); 305 306 SearchTaskParameters parameters = new SearchTaskParameters( 307 search, 308 resourceDao, 309 params, 310 resourceType, 311 theRequestDetails, 312 theRequestPartitionId, 313 myOnRemoveSearchTask, 314 mySyncSize); 315 parameters.setLoadingThrottleForUnitTests(myLoadingThrottleForUnitTests); 316 SearchContinuationTask task = 317 (SearchContinuationTask) myBeanFactory.getBean(SearchConfig.CONTINUE_TASK, parameters); 318 myIdToSearchTask.put(search.getUuid(), task); 319 task.call(); 320 } 321 } 322 323 if (!search.getStatus().isDone()) { 324 AsyncUtil.sleep(500); 325 } 326 } 327 328 ourLog.trace("Finished looping"); 329 330 List<JpaPid> pids = fetchResultPids(theUuid, theFrom, theTo, theRequestDetails, search, theRequestPartitionId); 331 332 ourLog.trace("Fetched {} results", pids.size()); 333 334 return pids; 335 } 336 337 @Nonnull 338 private List<JpaPid> fetchResultPids( 339 String theUuid, 340 int theFrom, 341 int theTo, 342 @Nullable RequestDetails theRequestDetails, 343 Search theSearch, 344 RequestPartitionId theRequestPartitionId) { 345 List<JpaPid> pids = mySearchResultCacheSvc.fetchResultPids( 346 theSearch, theFrom, theTo, theRequestDetails, theRequestPartitionId); 347 if (pids == null) { 348 throw myExceptionSvc.newUnknownSearchException(theUuid); 349 } 350 return pids; 351 } 352 353 @Override 354 public IBundleProvider registerSearch( 355 final IFhirResourceDao<?> theCallingDao, 356 final SearchParameterMap theParams, 357 String theResourceType, 358 CacheControlDirective theCacheControlDirective, 359 RequestDetails theRequestDetails, 360 RequestPartitionId theRequestPartitionId) { 361 final String searchUuid = UUID.randomUUID().toString(); 362 363 final String queryString = theParams.toNormalizedQueryString(myContext); 364 ourLog.debug("Registering new search {}", searchUuid); 365 366 Search search = new Search(); 367 QueryParameterUtils.populateSearchEntity( 368 theParams, theResourceType, searchUuid, queryString, search, theRequestPartitionId); 369 370 myStorageInterceptorHooks.callStoragePresearchRegistered( 371 theRequestDetails, theParams, search, theRequestPartitionId); 372 373 validateSearch(theParams); 374 375 Class<? extends IBaseResource> resourceTypeClass = 376 myContext.getResourceDefinition(theResourceType).getImplementingClass(); 377 final ISearchBuilder<JpaPid> sb = 378 mySearchBuilderFactory.newSearchBuilder(theCallingDao, theResourceType, resourceTypeClass); 379 sb.setFetchSize(mySyncSize); 380 381 final Integer loadSynchronousUpTo = getLoadSynchronousUpToOrNull(theCacheControlDirective); 382 boolean isOffsetQuery = theParams.isOffsetQuery(); 383 384 // todo someday - not today. 385 // SearchStrategyFactory.ISearchStrategy searchStrategy = mySearchStrategyFactory.pickStrategy(theResourceType, 386 // theParams, theRequestDetails); 387 // return searchStrategy.get(); 388 389 if (theParams.isLoadSynchronous() || loadSynchronousUpTo != null || isOffsetQuery) { 390 if (mySearchStrategyFactory.isSupportsHSearchDirect(theResourceType, theParams, theRequestDetails)) { 391 ourLog.info("Search {} is using direct load strategy", searchUuid); 392 SearchStrategyFactory.ISearchStrategy direct = mySearchStrategyFactory.makeDirectStrategy( 393 searchUuid, theResourceType, theParams, theRequestDetails); 394 395 try { 396 return direct.get(); 397 398 } catch (ResourceNotFoundInIndexException theE) { 399 // some resources were not found in index, so we will inform this and resort to JPA search 400 ourLog.warn( 401 "Some resources were not found in index. Make sure all resources were indexed. Resorting to database search."); 402 } 403 } 404 405 ourLog.debug("Search {} is loading in synchronous mode", searchUuid); 406 return mySynchronousSearchSvc.executeQuery( 407 theParams, theRequestDetails, searchUuid, sb, loadSynchronousUpTo, theRequestPartitionId); 408 } 409 410 /* 411 * See if there are any cached searches whose results we can return 412 * instead 413 */ 414 SearchCacheStatusEnum cacheStatus = SearchCacheStatusEnum.MISS; 415 if (theCacheControlDirective != null && theCacheControlDirective.isNoCache()) { 416 cacheStatus = SearchCacheStatusEnum.NOT_TRIED; 417 } 418 419 if (cacheStatus != SearchCacheStatusEnum.NOT_TRIED) { 420 if (theParams.getEverythingMode() == null) { 421 if (myStorageSettings.getReuseCachedSearchResultsForMillis() != null) { 422 PersistedJpaBundleProvider foundSearchProvider = findCachedQuery( 423 theParams, theResourceType, theRequestDetails, queryString, theRequestPartitionId); 424 if (foundSearchProvider != null) { 425 foundSearchProvider.setCacheStatus(SearchCacheStatusEnum.HIT); 426 return foundSearchProvider; 427 } 428 } 429 } 430 } 431 432 PersistedJpaSearchFirstPageBundleProvider retVal = submitSearch( 433 theCallingDao, theParams, theResourceType, theRequestDetails, sb, theRequestPartitionId, search); 434 retVal.setCacheStatus(cacheStatus); 435 return retVal; 436 } 437 438 private void validateSearch(SearchParameterMap theParams) { 439 validateIncludes(theParams.getIncludes(), Constants.PARAM_INCLUDE); 440 validateIncludes(theParams.getRevIncludes(), Constants.PARAM_REVINCLUDE); 441 } 442 443 private void validateIncludes(Set<Include> includes, String name) { 444 for (Include next : includes) { 445 String value = next.getValue(); 446 if (value.equals(Constants.INCLUDE_STAR) || isBlank(value)) { 447 continue; 448 } 449 450 String paramType = next.getParamType(); 451 String paramName = next.getParamName(); 452 String paramTargetType = next.getParamTargetType(); 453 454 if (isBlank(paramType) || isBlank(paramName)) { 455 String msg = myContext 456 .getLocalizer() 457 .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidInclude", name, value, ""); 458 throw new InvalidRequestException(Msg.code(2018) + msg); 459 } 460 461 if (!myDaoRegistry.isResourceTypeSupported(paramType)) { 462 String resourceTypeMsg = myContext 463 .getLocalizer() 464 .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidResourceType", paramType); 465 String msg = myContext 466 .getLocalizer() 467 .getMessage( 468 SearchCoordinatorSvcImpl.class, 469 "invalidInclude", 470 UrlUtil.sanitizeUrlPart(name), 471 UrlUtil.sanitizeUrlPart(value), 472 resourceTypeMsg); // last param is pre-sanitized 473 throw new InvalidRequestException(Msg.code(2017) + msg); 474 } 475 476 if (isNotBlank(paramTargetType) && !myDaoRegistry.isResourceTypeSupported(paramTargetType)) { 477 String resourceTypeMsg = myContext 478 .getLocalizer() 479 .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidResourceType", paramTargetType); 480 String msg = myContext 481 .getLocalizer() 482 .getMessage( 483 SearchCoordinatorSvcImpl.class, 484 "invalidInclude", 485 UrlUtil.sanitizeUrlPart(name), 486 UrlUtil.sanitizeUrlPart(value), 487 resourceTypeMsg); // last param is pre-sanitized 488 throw new InvalidRequestException(Msg.code(2016) + msg); 489 } 490 491 if (!Constants.INCLUDE_STAR.equals(paramName) 492 && mySearchParamRegistry.getActiveSearchParam( 493 paramType, paramName, ISearchParamRegistry.SearchParamLookupContextEnum.SEARCH) 494 == null) { 495 List<String> validNames = mySearchParamRegistry 496 .getActiveSearchParams(paramType, ISearchParamRegistry.SearchParamLookupContextEnum.SEARCH) 497 .values() 498 .stream() 499 .filter(t -> t.getParamType() == RestSearchParameterTypeEnum.REFERENCE) 500 .map(t -> UrlUtil.sanitizeUrlPart(t.getName())) 501 .sorted() 502 .collect(Collectors.toList()); 503 String searchParamMessage = myContext 504 .getLocalizer() 505 .getMessage( 506 BaseStorageDao.class, 507 "invalidSearchParameter", 508 UrlUtil.sanitizeUrlPart(paramName), 509 UrlUtil.sanitizeUrlPart(paramType), 510 validNames); 511 String msg = myContext 512 .getLocalizer() 513 .getMessage( 514 SearchCoordinatorSvcImpl.class, 515 "invalidInclude", 516 UrlUtil.sanitizeUrlPart(name), 517 UrlUtil.sanitizeUrlPart(value), 518 searchParamMessage); // last param is pre-sanitized 519 throw new InvalidRequestException(Msg.code(2015) + msg); 520 } 521 } 522 } 523 524 @Override 525 public Optional<Integer> getSearchTotal( 526 String theUuid, @Nullable RequestDetails theRequestDetails, RequestPartitionId theRequestPartitionId) { 527 SearchTask task = myIdToSearchTask.get(theUuid); 528 if (task != null) { 529 return Optional.ofNullable(task.awaitInitialSync()); 530 } 531 532 /* 533 * In case there is no running search, if the total is listed as accurate we know one is coming 534 * so let's wait a bit for it to show up 535 */ 536 Optional<Search> search = myTxService 537 .withRequest(theRequestDetails) 538 .execute(() -> mySearchCacheSvc.fetchByUuid(theUuid, theRequestPartitionId)); 539 if (search.isPresent()) { 540 Optional<SearchParameterMap> searchParameterMap = search.get().getSearchParameterMap(); 541 if (searchParameterMap.isPresent() 542 && searchParameterMap.get().getSearchTotalMode() == SearchTotalModeEnum.ACCURATE) { 543 for (int i = 0; i < 10; i++) { 544 if (search.isPresent()) { 545 QueryParameterUtils.verifySearchHasntFailedOrThrowInternalErrorException(search.get()); 546 if (search.get().getTotalCount() != null) { 547 return Optional.of(search.get().getTotalCount()); 548 } 549 } 550 search = mySearchCacheSvc.fetchByUuid(theUuid, theRequestPartitionId); 551 } 552 } 553 } 554 555 return Optional.empty(); 556 } 557 558 @Nonnull 559 private PersistedJpaSearchFirstPageBundleProvider submitSearch( 560 IDao theCallingDao, 561 SearchParameterMap theParams, 562 String theResourceType, 563 RequestDetails theRequestDetails, 564 ISearchBuilder<JpaPid> theSb, 565 RequestPartitionId theRequestPartitionId, 566 Search theSearch) { 567 StopWatch w = new StopWatch(); 568 569 SearchTaskParameters stp = new SearchTaskParameters( 570 theSearch, 571 theCallingDao, 572 theParams, 573 theResourceType, 574 theRequestDetails, 575 theRequestPartitionId, 576 myOnRemoveSearchTask, 577 mySyncSize); 578 stp.setLoadingThrottleForUnitTests(myLoadingThrottleForUnitTests); 579 SearchTask task = (SearchTask) myBeanFactory.getBean(SearchConfig.SEARCH_TASK, stp); 580 myIdToSearchTask.put(theSearch.getUuid(), task); 581 task.call(); 582 583 PersistedJpaSearchFirstPageBundleProvider retVal = myPersistedJpaBundleProviderFactory.newInstanceFirstPage( 584 theRequestDetails, task, theSb, theRequestPartitionId); 585 586 ourLog.debug("Search initial phase completed in {}ms", w.getMillis()); 587 return retVal; 588 } 589 590 @Nullable 591 private PersistedJpaBundleProvider findCachedQuery( 592 SearchParameterMap theParams, 593 String theResourceType, 594 RequestDetails theRequestDetails, 595 String theQueryString, 596 RequestPartitionId theRequestPartitionId) { 597 // May be null 598 return myTxService 599 .withRequest(theRequestDetails) 600 .withRequestPartitionId(theRequestPartitionId) 601 .execute(() -> { 602 603 // Interceptor call: STORAGE_PRECHECK_FOR_CACHED_SEARCH 604 HookParams params = new HookParams() 605 .add(SearchParameterMap.class, theParams) 606 .add(RequestDetails.class, theRequestDetails) 607 .addIfMatchesType(ServletRequestDetails.class, theRequestDetails); 608 Object outcome = CompositeInterceptorBroadcaster.doCallHooksAndReturnObject( 609 myInterceptorBroadcaster, 610 theRequestDetails, 611 Pointcut.STORAGE_PRECHECK_FOR_CACHED_SEARCH, 612 params); 613 if (Boolean.FALSE.equals(outcome)) { 614 return null; 615 } 616 617 // Check for a search matching the given hash 618 Search searchToUse = findSearchToUseOrNull(theQueryString, theResourceType, theRequestPartitionId); 619 if (searchToUse == null) { 620 return null; 621 } 622 623 ourLog.debug("Reusing search {} from cache", searchToUse.getUuid()); 624 // Interceptor call: JPA_PERFTRACE_SEARCH_REUSING_CACHED 625 params = new HookParams() 626 .add(SearchParameterMap.class, theParams) 627 .add(RequestDetails.class, theRequestDetails) 628 .addIfMatchesType(ServletRequestDetails.class, theRequestDetails); 629 CompositeInterceptorBroadcaster.doCallHooks( 630 myInterceptorBroadcaster, 631 theRequestDetails, 632 Pointcut.JPA_PERFTRACE_SEARCH_REUSING_CACHED, 633 params); 634 635 return myPersistedJpaBundleProviderFactory.newInstance(theRequestDetails, searchToUse.getUuid()); 636 }); 637 } 638 639 @Nullable 640 private Search findSearchToUseOrNull( 641 String theQueryString, String theResourceType, RequestPartitionId theRequestPartitionId) { 642 // createdCutoff is in recent past 643 final Instant createdCutoff = 644 Instant.now().minus(myStorageSettings.getReuseCachedSearchResultsForMillis(), ChronoUnit.MILLIS); 645 646 Optional<Search> candidate = mySearchCacheSvc.findCandidatesForReuse( 647 theResourceType, theQueryString, createdCutoff, theRequestPartitionId); 648 return candidate.orElse(null); 649 } 650 651 @Nullable 652 private Integer getLoadSynchronousUpToOrNull(CacheControlDirective theCacheControlDirective) { 653 final Integer loadSynchronousUpTo; 654 if (theCacheControlDirective != null && theCacheControlDirective.isNoStore()) { 655 if (theCacheControlDirective.getMaxResults() != null) { 656 loadSynchronousUpTo = theCacheControlDirective.getMaxResults(); 657 if (loadSynchronousUpTo > myStorageSettings.getCacheControlNoStoreMaxResultsUpperLimit()) { 658 throw new InvalidRequestException(Msg.code(1165) + Constants.HEADER_CACHE_CONTROL + " header " 659 + Constants.CACHE_CONTROL_MAX_RESULTS + " value must not exceed " 660 + myStorageSettings.getCacheControlNoStoreMaxResultsUpperLimit()); 661 } 662 } else { 663 loadSynchronousUpTo = 100; 664 } 665 } else { 666 loadSynchronousUpTo = null; 667 } 668 return loadSynchronousUpTo; 669 } 670 671 /** 672 * Creates a {@link Pageable} using a start and end index 673 */ 674 @SuppressWarnings("WeakerAccess") 675 @Nullable 676 public static Pageable toPage(final int theFromIndex, int theToIndex) { 677 int pageSize = theToIndex - theFromIndex; 678 if (pageSize < 1) { 679 return null; 680 } 681 682 int pageIndex = theFromIndex / pageSize; 683 684 return new PageRequest(pageIndex, pageSize, Sort.unsorted()) { 685 private static final long serialVersionUID = 1L; 686 687 @Override 688 public long getOffset() { 689 return theFromIndex; 690 } 691 }; 692 } 693}