001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L%family 019 */ 020package ca.uhn.fhir.jpa.search; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.i18n.Msg; 024import ca.uhn.fhir.interceptor.api.HookParams; 025import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; 026import ca.uhn.fhir.interceptor.api.Pointcut; 027import ca.uhn.fhir.interceptor.model.RequestPartitionId; 028import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; 029import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 030import ca.uhn.fhir.jpa.api.dao.IDao; 031import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 032import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc; 033import ca.uhn.fhir.jpa.config.SearchConfig; 034import ca.uhn.fhir.jpa.dao.BaseStorageDao; 035import ca.uhn.fhir.jpa.dao.ISearchBuilder; 036import ca.uhn.fhir.jpa.dao.SearchBuilderFactory; 037import ca.uhn.fhir.jpa.dao.search.ResourceNotFoundInIndexException; 038import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; 039import ca.uhn.fhir.jpa.entity.Search; 040import ca.uhn.fhir.jpa.model.dao.JpaPid; 041import ca.uhn.fhir.jpa.model.search.SearchStatusEnum; 042import ca.uhn.fhir.jpa.search.builder.StorageInterceptorHooksFacade; 043import ca.uhn.fhir.jpa.search.builder.tasks.SearchContinuationTask; 044import ca.uhn.fhir.jpa.search.builder.tasks.SearchTask; 045import ca.uhn.fhir.jpa.search.builder.tasks.SearchTaskParameters; 046import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc; 047import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc; 048import ca.uhn.fhir.jpa.search.cache.SearchCacheStatusEnum; 049import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 050import ca.uhn.fhir.jpa.util.QueryParameterUtils; 051import ca.uhn.fhir.model.api.Include; 052import ca.uhn.fhir.rest.api.CacheControlDirective; 053import ca.uhn.fhir.rest.api.Constants; 054import ca.uhn.fhir.rest.api.RestSearchParameterTypeEnum; 055import ca.uhn.fhir.rest.api.SearchTotalModeEnum; 056import ca.uhn.fhir.rest.api.server.IBundleProvider; 057import ca.uhn.fhir.rest.api.server.RequestDetails; 058import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 059import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; 060import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails; 061import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster; 062import ca.uhn.fhir.rest.server.util.ISearchParamRegistry; 063import ca.uhn.fhir.util.AsyncUtil; 064import ca.uhn.fhir.util.StopWatch; 065import ca.uhn.fhir.util.UrlUtil; 066import com.google.common.annotations.VisibleForTesting; 067import jakarta.annotation.Nonnull; 068import jakarta.annotation.Nullable; 069import org.apache.commons.lang3.time.DateUtils; 070import org.hl7.fhir.instance.model.api.IBaseResource; 071import org.springframework.beans.factory.BeanFactory; 072import org.springframework.data.domain.PageRequest; 073import org.springframework.data.domain.Pageable; 074import org.springframework.data.domain.Sort; 075import org.springframework.stereotype.Component; 076import org.springframework.transaction.support.TransactionSynchronizationManager; 077 078import java.time.Instant; 079import java.time.temporal.ChronoUnit; 080import java.util.List; 081import java.util.Optional; 082import java.util.Set; 083import java.util.UUID; 084import java.util.concurrent.Callable; 085import java.util.concurrent.ConcurrentHashMap; 086import java.util.concurrent.TimeUnit; 087import java.util.function.Consumer; 088import java.util.stream.Collectors; 089 090import static ca.uhn.fhir.jpa.util.QueryParameterUtils.DEFAULT_SYNC_SIZE; 091import static org.apache.commons.lang3.StringUtils.isBlank; 092import static org.apache.commons.lang3.StringUtils.isNotBlank; 093 094@Component("mySearchCoordinatorSvc") 095public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc<JpaPid> { 096 097 private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchCoordinatorSvcImpl.class); 098 099 private final FhirContext myContext; 100 private final JpaStorageSettings myStorageSettings; 101 private final IInterceptorBroadcaster myInterceptorBroadcaster; 102 private final HapiTransactionService myTxService; 103 private final ISearchCacheSvc mySearchCacheSvc; 104 private final ISearchResultCacheSvc mySearchResultCacheSvc; 105 private final DaoRegistry myDaoRegistry; 106 private final SearchBuilderFactory<JpaPid> mySearchBuilderFactory; 107 private final ISynchronousSearchSvc mySynchronousSearchSvc; 108 private final PersistedJpaBundleProviderFactory myPersistedJpaBundleProviderFactory; 109 private final ISearchParamRegistry mySearchParamRegistry; 110 private final SearchStrategyFactory mySearchStrategyFactory; 111 private final ExceptionService myExceptionSvc; 112 private final BeanFactory myBeanFactory; 113 private ConcurrentHashMap<String, SearchTask> myIdToSearchTask = new ConcurrentHashMap<>(); 114 115 private final Consumer<String> myOnRemoveSearchTask = myIdToSearchTask::remove; 116 117 private final StorageInterceptorHooksFacade myStorageInterceptorHooks; 118 private Integer myLoadingThrottleForUnitTests = null; 119 private long myMaxMillisToWaitForRemoteResults = DateUtils.MILLIS_PER_MINUTE; 120 private boolean myNeverUseLocalSearchForUnitTests; 121 private int mySyncSize = DEFAULT_SYNC_SIZE; 122 123 /** 124 * Constructor 125 */ 126 public SearchCoordinatorSvcImpl( 127 FhirContext theContext, 128 JpaStorageSettings theStorageSettings, 129 IInterceptorBroadcaster theInterceptorBroadcaster, 130 HapiTransactionService theTxService, 131 ISearchCacheSvc theSearchCacheSvc, 132 ISearchResultCacheSvc theSearchResultCacheSvc, 133 DaoRegistry theDaoRegistry, 134 SearchBuilderFactory<JpaPid> theSearchBuilderFactory, 135 ISynchronousSearchSvc theSynchronousSearchSvc, 136 PersistedJpaBundleProviderFactory thePersistedJpaBundleProviderFactory, 137 ISearchParamRegistry theSearchParamRegistry, 138 SearchStrategyFactory theSearchStrategyFactory, 139 ExceptionService theExceptionSvc, 140 BeanFactory theBeanFactory) { 141 super(); 142 myContext = theContext; 143 myStorageSettings = theStorageSettings; 144 myInterceptorBroadcaster = theInterceptorBroadcaster; 145 myTxService = theTxService; 146 mySearchCacheSvc = theSearchCacheSvc; 147 mySearchResultCacheSvc = theSearchResultCacheSvc; 148 myDaoRegistry = theDaoRegistry; 149 mySearchBuilderFactory = theSearchBuilderFactory; 150 mySynchronousSearchSvc = theSynchronousSearchSvc; 151 myPersistedJpaBundleProviderFactory = thePersistedJpaBundleProviderFactory; 152 mySearchParamRegistry = theSearchParamRegistry; 153 mySearchStrategyFactory = theSearchStrategyFactory; 154 myExceptionSvc = theExceptionSvc; 155 myBeanFactory = theBeanFactory; 156 157 myStorageInterceptorHooks = new StorageInterceptorHooksFacade(myInterceptorBroadcaster); 158 } 159 160 @VisibleForTesting 161 Set<String> getActiveSearchIds() { 162 return myIdToSearchTask.keySet(); 163 } 164 165 @VisibleForTesting 166 public void setIdToSearchTaskMapForUnitTests(ConcurrentHashMap<String, SearchTask> theIdToSearchTaskMap) { 167 myIdToSearchTask = theIdToSearchTaskMap; 168 } 169 170 @VisibleForTesting 171 public void setLoadingThrottleForUnitTests(Integer theLoadingThrottleForUnitTests) { 172 myLoadingThrottleForUnitTests = theLoadingThrottleForUnitTests; 173 } 174 175 @VisibleForTesting 176 public void setNeverUseLocalSearchForUnitTests(boolean theNeverUseLocalSearchForUnitTests) { 177 myNeverUseLocalSearchForUnitTests = theNeverUseLocalSearchForUnitTests; 178 } 179 180 @VisibleForTesting 181 public void setSyncSizeForUnitTests(int theSyncSize) { 182 mySyncSize = theSyncSize; 183 } 184 185 @Override 186 public void cancelAllActiveSearches() { 187 for (SearchTask next : myIdToSearchTask.values()) { 188 ourLog.info( 189 "Requesting immediate abort of search: {}", next.getSearch().getUuid()); 190 next.requestImmediateAbort(); 191 AsyncUtil.awaitLatchAndIgnoreInterrupt(next.getCompletionLatch(), 30, TimeUnit.SECONDS); 192 } 193 } 194 195 @SuppressWarnings("SameParameterValue") 196 @VisibleForTesting 197 void setMaxMillisToWaitForRemoteResultsForUnitTest(long theMaxMillisToWaitForRemoteResults) { 198 myMaxMillisToWaitForRemoteResults = theMaxMillisToWaitForRemoteResults; 199 } 200 201 /** 202 * This method is called by the HTTP client processing thread in order to 203 * fetch resources. 204 * <p> 205 * This method must not be called from inside a transaction. The rationale is that 206 * the {@link Search} entity is treated as a piece of shared state across client threads 207 * accessing the same thread, so we need to be able to update that table in a transaction 208 * and commit it right away in order for that to work. Examples of needing to do this 209 * include if two different clients request the same search and are both paging at the 210 * same time, but also includes clients that are hacking the paging links to 211 * fetch multiple pages of a search result in parallel. In both cases we need to only 212 * let one of them actually activate the search, or we will have conficts. The other thread 213 * just needs to wait until the first one actually fetches more results. 214 */ 215 @Override 216 public List<JpaPid> getResources( 217 final String theUuid, 218 int theFrom, 219 int theTo, 220 @Nullable RequestDetails theRequestDetails, 221 RequestPartitionId theRequestPartitionId) { 222 assert !TransactionSynchronizationManager.isActualTransactionActive(); 223 224 // If we're actively searching right now, don't try to do anything until at least one batch has been 225 // persisted in the DB 226 SearchTask searchTask = myIdToSearchTask.get(theUuid); 227 if (searchTask != null) { 228 searchTask.awaitInitialSync(); 229 } 230 231 ourLog.trace("About to start looking for resources {}-{}", theFrom, theTo); 232 233 Search search; 234 StopWatch sw = new StopWatch(); 235 while (true) { 236 237 if (myNeverUseLocalSearchForUnitTests == false) { 238 if (searchTask != null) { 239 ourLog.trace("Local search found"); 240 List<JpaPid> resourcePids = searchTask.getResourcePids(theFrom, theTo); 241 ourLog.trace( 242 "Local search returned {} pids, wanted {}-{} - Search: {}", 243 resourcePids.size(), 244 theFrom, 245 theTo, 246 searchTask.getSearch()); 247 248 /* 249 * Generally, if a search task is open, the fastest possible thing is to just return its results. This 250 * will work most of the time, but can fail if the task hit a search threshold and the client is requesting 251 * results beyond that threashold. In that case, we'll keep going below, since that will trigger another 252 * task. 253 */ 254 if ((searchTask.getSearch().getNumFound() 255 - searchTask.getSearch().getNumBlocked()) 256 >= theTo 257 || resourcePids.size() == (theTo - theFrom)) { 258 return resourcePids; 259 } 260 } 261 } 262 263 Callable<Search> searchCallback = () -> mySearchCacheSvc 264 .fetchByUuid(theUuid, theRequestPartitionId) 265 .orElseThrow(() -> myExceptionSvc.newUnknownSearchException(theUuid)); 266 search = myTxService 267 .withRequest(theRequestDetails) 268 .withRequestPartitionId(theRequestPartitionId) 269 .execute(searchCallback); 270 QueryParameterUtils.verifySearchHasntFailedOrThrowInternalErrorException(search); 271 272 if (search.getStatus() == SearchStatusEnum.FINISHED) { 273 ourLog.trace("Search entity marked as finished with {} results", search.getNumFound()); 274 break; 275 } 276 if ((search.getNumFound() - search.getNumBlocked()) >= theTo) { 277 ourLog.trace("Search entity has {} results so far", search.getNumFound()); 278 break; 279 } 280 281 if (sw.getMillis() > myMaxMillisToWaitForRemoteResults) { 282 ourLog.error( 283 "Search {} of type {} for {}{} timed out after {}ms", 284 search.getId(), 285 search.getSearchType(), 286 search.getResourceType(), 287 search.getSearchQueryString(), 288 sw.getMillis()); 289 throw new InternalErrorException(Msg.code(1163) + "Request timed out after " + sw.getMillis() + "ms"); 290 } 291 292 // If the search was saved in "pass complete mode" it's probably time to 293 // start a new pass 294 if (search.getStatus() == SearchStatusEnum.PASSCMPLET) { 295 ourLog.trace("Going to try to start next search"); 296 Optional<Search> newSearch = 297 mySearchCacheSvc.tryToMarkSearchAsInProgress(search, theRequestPartitionId); 298 if (newSearch.isPresent()) { 299 ourLog.trace("Launching new search"); 300 search = newSearch.get(); 301 String resourceType = search.getResourceType(); 302 SearchParameterMap params = search.getSearchParameterMap() 303 .orElseThrow(() -> new IllegalStateException("No map in PASSCOMPLET search")); 304 IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(resourceType); 305 306 SearchTaskParameters parameters = new SearchTaskParameters( 307 search, 308 resourceDao, 309 params, 310 resourceType, 311 theRequestDetails, 312 theRequestPartitionId, 313 myOnRemoveSearchTask, 314 mySyncSize); 315 parameters.setLoadingThrottleForUnitTests(myLoadingThrottleForUnitTests); 316 SearchContinuationTask task = 317 (SearchContinuationTask) myBeanFactory.getBean(SearchConfig.CONTINUE_TASK, parameters); 318 myIdToSearchTask.put(search.getUuid(), task); 319 task.call(); 320 } 321 } 322 323 if (!search.getStatus().isDone()) { 324 AsyncUtil.sleep(500); 325 } 326 } 327 328 ourLog.trace("Finished looping"); 329 330 List<JpaPid> pids = fetchResultPids(theUuid, theFrom, theTo, theRequestDetails, search, theRequestPartitionId); 331 332 ourLog.trace("Fetched {} results", pids.size()); 333 334 return pids; 335 } 336 337 @Nonnull 338 private List<JpaPid> fetchResultPids( 339 String theUuid, 340 int theFrom, 341 int theTo, 342 @Nullable RequestDetails theRequestDetails, 343 Search theSearch, 344 RequestPartitionId theRequestPartitionId) { 345 List<JpaPid> pids = mySearchResultCacheSvc.fetchResultPids( 346 theSearch, theFrom, theTo, theRequestDetails, theRequestPartitionId); 347 if (pids == null) { 348 throw myExceptionSvc.newUnknownSearchException(theUuid); 349 } 350 return pids; 351 } 352 353 @Override 354 public IBundleProvider registerSearch( 355 final IFhirResourceDao<?> theCallingDao, 356 final SearchParameterMap theParams, 357 String theResourceType, 358 CacheControlDirective theCacheControlDirective, 359 RequestDetails theRequestDetails, 360 RequestPartitionId theRequestPartitionId) { 361 final String searchUuid = UUID.randomUUID().toString(); 362 363 final String queryString = theParams.toNormalizedQueryString(myContext); 364 ourLog.debug("Registering new search {}", searchUuid); 365 366 Search search = new Search(); 367 QueryParameterUtils.populateSearchEntity( 368 theParams, theResourceType, searchUuid, queryString, search, theRequestPartitionId); 369 370 myStorageInterceptorHooks.callStoragePresearchRegistered( 371 theRequestDetails, theParams, search, theRequestPartitionId); 372 373 validateSearch(theParams); 374 375 Class<? extends IBaseResource> resourceTypeClass = 376 myContext.getResourceDefinition(theResourceType).getImplementingClass(); 377 final ISearchBuilder<JpaPid> sb = mySearchBuilderFactory.newSearchBuilder(theResourceType, resourceTypeClass); 378 sb.setFetchSize(mySyncSize); 379 380 final Integer loadSynchronousUpTo = getLoadSynchronousUpToOrNull(theCacheControlDirective); 381 boolean isOffsetQuery = theParams.isOffsetQuery(); 382 383 // todo someday - not today. 384 // SearchStrategyFactory.ISearchStrategy searchStrategy = mySearchStrategyFactory.pickStrategy(theResourceType, 385 // theParams, theRequestDetails); 386 // return searchStrategy.get(); 387 388 if (theParams.isLoadSynchronous() || loadSynchronousUpTo != null || isOffsetQuery) { 389 if (mySearchStrategyFactory.isSupportsHSearchDirect(theResourceType, theParams, theRequestDetails)) { 390 ourLog.info("Search {} is using direct load strategy", searchUuid); 391 SearchStrategyFactory.ISearchStrategy direct = mySearchStrategyFactory.makeDirectStrategy( 392 searchUuid, theResourceType, theParams, theRequestDetails); 393 394 try { 395 return direct.get(); 396 397 } catch (ResourceNotFoundInIndexException theE) { 398 // some resources were not found in index, so we will inform this and resort to JPA search 399 ourLog.warn( 400 "Some resources were not found in index. Make sure all resources were indexed. Resorting to database search."); 401 } 402 } 403 404 ourLog.debug("Search {} is loading in synchronous mode", searchUuid); 405 return mySynchronousSearchSvc.executeQuery( 406 theParams, theRequestDetails, searchUuid, sb, loadSynchronousUpTo, theRequestPartitionId); 407 } 408 409 /* 410 * See if there are any cached searches whose results we can return 411 * instead 412 */ 413 SearchCacheStatusEnum cacheStatus = SearchCacheStatusEnum.MISS; 414 if (theCacheControlDirective != null && theCacheControlDirective.isNoCache()) { 415 cacheStatus = SearchCacheStatusEnum.NOT_TRIED; 416 } 417 418 if (cacheStatus != SearchCacheStatusEnum.NOT_TRIED) { 419 if (theParams.getEverythingMode() == null) { 420 if (myStorageSettings.getReuseCachedSearchResultsForMillis() != null) { 421 PersistedJpaBundleProvider foundSearchProvider = findCachedQuery( 422 theParams, theResourceType, theRequestDetails, queryString, theRequestPartitionId); 423 if (foundSearchProvider != null) { 424 foundSearchProvider.setCacheStatus(SearchCacheStatusEnum.HIT); 425 return foundSearchProvider; 426 } 427 } 428 } 429 } 430 431 PersistedJpaSearchFirstPageBundleProvider retVal = submitSearch( 432 theCallingDao, theParams, theResourceType, theRequestDetails, sb, theRequestPartitionId, search); 433 retVal.setCacheStatus(cacheStatus); 434 return retVal; 435 } 436 437 private void validateSearch(SearchParameterMap theParams) { 438 validateIncludes(theParams.getIncludes(), Constants.PARAM_INCLUDE); 439 validateIncludes(theParams.getRevIncludes(), Constants.PARAM_REVINCLUDE); 440 } 441 442 private void validateIncludes(Set<Include> includes, String name) { 443 for (Include next : includes) { 444 String value = next.getValue(); 445 if (value.equals(Constants.INCLUDE_STAR) || isBlank(value)) { 446 continue; 447 } 448 449 String paramType = next.getParamType(); 450 String paramName = next.getParamName(); 451 String paramTargetType = next.getParamTargetType(); 452 453 if (isBlank(paramType) || isBlank(paramName)) { 454 String msg = myContext 455 .getLocalizer() 456 .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidInclude", name, value, ""); 457 throw new InvalidRequestException(Msg.code(2018) + msg); 458 } 459 460 if (!myDaoRegistry.isResourceTypeSupported(paramType)) { 461 String resourceTypeMsg = myContext 462 .getLocalizer() 463 .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidResourceType", paramType); 464 String msg = myContext 465 .getLocalizer() 466 .getMessage( 467 SearchCoordinatorSvcImpl.class, 468 "invalidInclude", 469 UrlUtil.sanitizeUrlPart(name), 470 UrlUtil.sanitizeUrlPart(value), 471 resourceTypeMsg); // last param is pre-sanitized 472 throw new InvalidRequestException(Msg.code(2017) + msg); 473 } 474 475 if (isNotBlank(paramTargetType) && !myDaoRegistry.isResourceTypeSupported(paramTargetType)) { 476 String resourceTypeMsg = myContext 477 .getLocalizer() 478 .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidResourceType", paramTargetType); 479 String msg = myContext 480 .getLocalizer() 481 .getMessage( 482 SearchCoordinatorSvcImpl.class, 483 "invalidInclude", 484 UrlUtil.sanitizeUrlPart(name), 485 UrlUtil.sanitizeUrlPart(value), 486 resourceTypeMsg); // last param is pre-sanitized 487 throw new InvalidRequestException(Msg.code(2016) + msg); 488 } 489 490 if (!Constants.INCLUDE_STAR.equals(paramName) 491 && mySearchParamRegistry.getActiveSearchParam( 492 paramType, paramName, ISearchParamRegistry.SearchParamLookupContextEnum.SEARCH) 493 == null) { 494 List<String> validNames = mySearchParamRegistry 495 .getActiveSearchParams(paramType, ISearchParamRegistry.SearchParamLookupContextEnum.SEARCH) 496 .values() 497 .stream() 498 .filter(t -> t.getParamType() == RestSearchParameterTypeEnum.REFERENCE) 499 .map(t -> UrlUtil.sanitizeUrlPart(t.getName())) 500 .sorted() 501 .collect(Collectors.toList()); 502 String searchParamMessage = myContext 503 .getLocalizer() 504 .getMessage( 505 BaseStorageDao.class, 506 "invalidSearchParameter", 507 UrlUtil.sanitizeUrlPart(paramName), 508 UrlUtil.sanitizeUrlPart(paramType), 509 validNames); 510 String msg = myContext 511 .getLocalizer() 512 .getMessage( 513 SearchCoordinatorSvcImpl.class, 514 "invalidInclude", 515 UrlUtil.sanitizeUrlPart(name), 516 UrlUtil.sanitizeUrlPart(value), 517 searchParamMessage); // last param is pre-sanitized 518 throw new InvalidRequestException(Msg.code(2015) + msg); 519 } 520 } 521 } 522 523 @Override 524 public Optional<Integer> getSearchTotal( 525 String theUuid, @Nullable RequestDetails theRequestDetails, RequestPartitionId theRequestPartitionId) { 526 SearchTask task = myIdToSearchTask.get(theUuid); 527 if (task != null) { 528 return Optional.ofNullable(task.awaitInitialSync()); 529 } 530 531 /* 532 * In case there is no running search, if the total is listed as accurate we know one is coming 533 * so let's wait a bit for it to show up 534 */ 535 Optional<Search> search = myTxService 536 .withRequest(theRequestDetails) 537 .execute(() -> mySearchCacheSvc.fetchByUuid(theUuid, theRequestPartitionId)); 538 if (search.isPresent()) { 539 Optional<SearchParameterMap> searchParameterMap = search.get().getSearchParameterMap(); 540 if (searchParameterMap.isPresent() 541 && searchParameterMap.get().getSearchTotalMode() == SearchTotalModeEnum.ACCURATE) { 542 for (int i = 0; i < 10; i++) { 543 if (search.isPresent()) { 544 QueryParameterUtils.verifySearchHasntFailedOrThrowInternalErrorException(search.get()); 545 if (search.get().getTotalCount() != null) { 546 return Optional.of(search.get().getTotalCount()); 547 } 548 } 549 search = mySearchCacheSvc.fetchByUuid(theUuid, theRequestPartitionId); 550 } 551 } 552 } 553 554 return Optional.empty(); 555 } 556 557 @Nonnull 558 private PersistedJpaSearchFirstPageBundleProvider submitSearch( 559 IDao theCallingDao, 560 SearchParameterMap theParams, 561 String theResourceType, 562 RequestDetails theRequestDetails, 563 ISearchBuilder<JpaPid> theSb, 564 RequestPartitionId theRequestPartitionId, 565 Search theSearch) { 566 StopWatch w = new StopWatch(); 567 568 SearchTaskParameters stp = new SearchTaskParameters( 569 theSearch, 570 theCallingDao, 571 theParams, 572 theResourceType, 573 theRequestDetails, 574 theRequestPartitionId, 575 myOnRemoveSearchTask, 576 mySyncSize); 577 stp.setLoadingThrottleForUnitTests(myLoadingThrottleForUnitTests); 578 SearchTask task = (SearchTask) myBeanFactory.getBean(SearchConfig.SEARCH_TASK, stp); 579 myIdToSearchTask.put(theSearch.getUuid(), task); 580 task.call(); 581 582 PersistedJpaSearchFirstPageBundleProvider retVal = myPersistedJpaBundleProviderFactory.newInstanceFirstPage( 583 theRequestDetails, task, theSb, theRequestPartitionId); 584 585 ourLog.debug("Search initial phase completed in {}ms", w.getMillis()); 586 return retVal; 587 } 588 589 @Nullable 590 private PersistedJpaBundleProvider findCachedQuery( 591 SearchParameterMap theParams, 592 String theResourceType, 593 RequestDetails theRequestDetails, 594 String theQueryString, 595 RequestPartitionId theRequestPartitionId) { 596 // May be null 597 return myTxService 598 .withRequest(theRequestDetails) 599 .withRequestPartitionId(theRequestPartitionId) 600 .execute(() -> { 601 IInterceptorBroadcaster compositeBroadcaster = 602 CompositeInterceptorBroadcaster.newCompositeBroadcaster( 603 myInterceptorBroadcaster, theRequestDetails); 604 605 // Interceptor call: STORAGE_PRECHECK_FOR_CACHED_SEARCH 606 607 HookParams params = new HookParams() 608 .add(SearchParameterMap.class, theParams) 609 .add(RequestDetails.class, theRequestDetails) 610 .addIfMatchesType(ServletRequestDetails.class, theRequestDetails); 611 boolean canUseCache = 612 compositeBroadcaster.callHooks(Pointcut.STORAGE_PRECHECK_FOR_CACHED_SEARCH, params); 613 if (!canUseCache) { 614 return null; 615 } 616 617 // Check for a search matching the given hash 618 Search searchToUse = findSearchToUseOrNull(theQueryString, theResourceType, theRequestPartitionId); 619 if (searchToUse == null) { 620 return null; 621 } 622 623 ourLog.debug("Reusing search {} from cache", searchToUse.getUuid()); 624 // Interceptor call: JPA_PERFTRACE_SEARCH_REUSING_CACHED 625 params = new HookParams() 626 .add(SearchParameterMap.class, theParams) 627 .add(RequestDetails.class, theRequestDetails) 628 .addIfMatchesType(ServletRequestDetails.class, theRequestDetails); 629 compositeBroadcaster.callHooks(Pointcut.JPA_PERFTRACE_SEARCH_REUSING_CACHED, params); 630 631 return myPersistedJpaBundleProviderFactory.newInstance(theRequestDetails, searchToUse.getUuid()); 632 }); 633 } 634 635 @Nullable 636 private Search findSearchToUseOrNull( 637 String theQueryString, String theResourceType, RequestPartitionId theRequestPartitionId) { 638 // createdCutoff is in recent past 639 final Instant createdCutoff = 640 Instant.now().minus(myStorageSettings.getReuseCachedSearchResultsForMillis(), ChronoUnit.MILLIS); 641 642 Optional<Search> candidate = mySearchCacheSvc.findCandidatesForReuse( 643 theResourceType, theQueryString, createdCutoff, theRequestPartitionId); 644 return candidate.orElse(null); 645 } 646 647 @Nullable 648 private Integer getLoadSynchronousUpToOrNull(CacheControlDirective theCacheControlDirective) { 649 final Integer loadSynchronousUpTo; 650 if (theCacheControlDirective != null && theCacheControlDirective.isNoStore()) { 651 if (theCacheControlDirective.getMaxResults() != null) { 652 loadSynchronousUpTo = theCacheControlDirective.getMaxResults(); 653 if (loadSynchronousUpTo > myStorageSettings.getCacheControlNoStoreMaxResultsUpperLimit()) { 654 throw new InvalidRequestException(Msg.code(1165) + Constants.HEADER_CACHE_CONTROL + " header " 655 + Constants.CACHE_CONTROL_MAX_RESULTS + " value must not exceed " 656 + myStorageSettings.getCacheControlNoStoreMaxResultsUpperLimit()); 657 } 658 } else { 659 loadSynchronousUpTo = 100; 660 } 661 } else { 662 loadSynchronousUpTo = null; 663 } 664 return loadSynchronousUpTo; 665 } 666 667 /** 668 * Creates a {@link Pageable} using a start and end index 669 */ 670 @SuppressWarnings("WeakerAccess") 671 @Nullable 672 public static Pageable toPage(final int theFromIndex, int theToIndex) { 673 int pageSize = theToIndex - theFromIndex; 674 if (pageSize < 1) { 675 return null; 676 } 677 678 int pageIndex = theFromIndex / pageSize; 679 680 return new PageRequest(pageIndex, pageSize, Sort.unsorted()) { 681 private static final long serialVersionUID = 1L; 682 683 @Override 684 public long getOffset() { 685 return theFromIndex; 686 } 687 }; 688 } 689}