
001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L%family 019 */ 020package ca.uhn.fhir.jpa.search; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.i18n.Msg; 024import ca.uhn.fhir.interceptor.api.HookParams; 025import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; 026import ca.uhn.fhir.interceptor.api.Pointcut; 027import ca.uhn.fhir.interceptor.model.RequestPartitionId; 028import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; 029import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 030import ca.uhn.fhir.jpa.api.dao.IDao; 031import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 032import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc; 033import ca.uhn.fhir.jpa.config.SearchConfig; 034import ca.uhn.fhir.jpa.dao.BaseStorageDao; 035import ca.uhn.fhir.jpa.dao.ISearchBuilder; 036import ca.uhn.fhir.jpa.dao.SearchBuilderFactory; 037import ca.uhn.fhir.jpa.dao.search.ResourceNotFoundInIndexException; 038import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; 039import ca.uhn.fhir.jpa.entity.Search; 040import ca.uhn.fhir.jpa.model.dao.JpaPid; 041import ca.uhn.fhir.jpa.model.search.SearchStatusEnum; 042import ca.uhn.fhir.jpa.search.builder.StorageInterceptorHooksFacade; 043import ca.uhn.fhir.jpa.search.builder.tasks.SearchContinuationTask; 044import ca.uhn.fhir.jpa.search.builder.tasks.SearchTask; 045import ca.uhn.fhir.jpa.search.builder.tasks.SearchTaskParameters; 046import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc; 047import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc; 048import ca.uhn.fhir.jpa.search.cache.SearchCacheStatusEnum; 049import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 050import ca.uhn.fhir.jpa.util.QueryParameterUtils; 051import ca.uhn.fhir.model.api.Include; 052import ca.uhn.fhir.rest.api.CacheControlDirective; 053import ca.uhn.fhir.rest.api.Constants; 054import ca.uhn.fhir.rest.api.RestSearchParameterTypeEnum; 055import ca.uhn.fhir.rest.api.SearchTotalModeEnum; 056import ca.uhn.fhir.rest.api.server.IBundleProvider; 057import ca.uhn.fhir.rest.api.server.RequestDetails; 058import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 059import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; 060import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails; 061import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster; 062import ca.uhn.fhir.rest.server.util.ISearchParamRegistry; 063import ca.uhn.fhir.util.AsyncUtil; 064import ca.uhn.fhir.util.StopWatch; 065import ca.uhn.fhir.util.UrlUtil; 066import com.google.common.annotations.VisibleForTesting; 067import jakarta.annotation.Nonnull; 068import jakarta.annotation.Nullable; 069import org.apache.commons.lang3.time.DateUtils; 070import org.hl7.fhir.instance.model.api.IBaseResource; 071import org.springframework.beans.factory.BeanFactory; 072import org.springframework.data.domain.PageRequest; 073import org.springframework.data.domain.Pageable; 074import org.springframework.data.domain.Sort; 075import org.springframework.stereotype.Component; 076import org.springframework.transaction.support.TransactionSynchronizationManager; 077 078import java.time.Instant; 079import java.time.temporal.ChronoUnit; 080import java.util.List; 081import java.util.Optional; 082import java.util.Set; 083import java.util.UUID; 084import java.util.concurrent.Callable; 085import java.util.concurrent.ConcurrentHashMap; 086import java.util.concurrent.TimeUnit; 087import java.util.function.Consumer; 088import java.util.stream.Collectors; 089 090import static ca.uhn.fhir.jpa.util.QueryParameterUtils.DEFAULT_SYNC_SIZE; 091import static org.apache.commons.lang3.StringUtils.isBlank; 092import static org.apache.commons.lang3.StringUtils.isNotBlank; 093 094@Component("mySearchCoordinatorSvc") 095public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc<JpaPid> { 096 097 private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchCoordinatorSvcImpl.class); 098 099 private final FhirContext myContext; 100 private final JpaStorageSettings myStorageSettings; 101 private final IInterceptorBroadcaster myInterceptorBroadcaster; 102 private final HapiTransactionService myTxService; 103 private final ISearchCacheSvc mySearchCacheSvc; 104 private final ISearchResultCacheSvc mySearchResultCacheSvc; 105 private final DaoRegistry myDaoRegistry; 106 private final SearchBuilderFactory<JpaPid> mySearchBuilderFactory; 107 private final ISynchronousSearchSvc mySynchronousSearchSvc; 108 private final PersistedJpaBundleProviderFactory myPersistedJpaBundleProviderFactory; 109 private final ISearchParamRegistry mySearchParamRegistry; 110 private final SearchStrategyFactory mySearchStrategyFactory; 111 private final ExceptionService myExceptionSvc; 112 private final BeanFactory myBeanFactory; 113 private ConcurrentHashMap<String, SearchTask> myIdToSearchTask = new ConcurrentHashMap<>(); 114 115 private final Consumer<String> myOnRemoveSearchTask = myIdToSearchTask::remove; 116 117 private final StorageInterceptorHooksFacade myStorageInterceptorHooks; 118 private Integer myLoadingThrottleForUnitTests = null; 119 private long myMaxMillisToWaitForRemoteResults = DateUtils.MILLIS_PER_MINUTE; 120 private boolean myNeverUseLocalSearchForUnitTests; 121 private int mySyncSize = DEFAULT_SYNC_SIZE; 122 123 /** 124 * Constructor 125 */ 126 public SearchCoordinatorSvcImpl( 127 FhirContext theContext, 128 JpaStorageSettings theStorageSettings, 129 IInterceptorBroadcaster theInterceptorBroadcaster, 130 HapiTransactionService theTxService, 131 ISearchCacheSvc theSearchCacheSvc, 132 ISearchResultCacheSvc theSearchResultCacheSvc, 133 DaoRegistry theDaoRegistry, 134 SearchBuilderFactory<JpaPid> theSearchBuilderFactory, 135 ISynchronousSearchSvc theSynchronousSearchSvc, 136 PersistedJpaBundleProviderFactory thePersistedJpaBundleProviderFactory, 137 ISearchParamRegistry theSearchParamRegistry, 138 SearchStrategyFactory theSearchStrategyFactory, 139 ExceptionService theExceptionSvc, 140 BeanFactory theBeanFactory) { 141 super(); 142 myContext = theContext; 143 myStorageSettings = theStorageSettings; 144 myInterceptorBroadcaster = theInterceptorBroadcaster; 145 myTxService = theTxService; 146 mySearchCacheSvc = theSearchCacheSvc; 147 mySearchResultCacheSvc = theSearchResultCacheSvc; 148 myDaoRegistry = theDaoRegistry; 149 mySearchBuilderFactory = theSearchBuilderFactory; 150 mySynchronousSearchSvc = theSynchronousSearchSvc; 151 myPersistedJpaBundleProviderFactory = thePersistedJpaBundleProviderFactory; 152 mySearchParamRegistry = theSearchParamRegistry; 153 mySearchStrategyFactory = theSearchStrategyFactory; 154 myExceptionSvc = theExceptionSvc; 155 myBeanFactory = theBeanFactory; 156 157 myStorageInterceptorHooks = new StorageInterceptorHooksFacade(myInterceptorBroadcaster); 158 } 159 160 @VisibleForTesting 161 Set<String> getActiveSearchIds() { 162 return myIdToSearchTask.keySet(); 163 } 164 165 @VisibleForTesting 166 public void setIdToSearchTaskMapForUnitTests(ConcurrentHashMap<String, SearchTask> theIdToSearchTaskMap) { 167 myIdToSearchTask = theIdToSearchTaskMap; 168 } 169 170 @VisibleForTesting 171 public void setLoadingThrottleForUnitTests(Integer theLoadingThrottleForUnitTests) { 172 myLoadingThrottleForUnitTests = theLoadingThrottleForUnitTests; 173 } 174 175 @VisibleForTesting 176 public void setNeverUseLocalSearchForUnitTests(boolean theNeverUseLocalSearchForUnitTests) { 177 myNeverUseLocalSearchForUnitTests = theNeverUseLocalSearchForUnitTests; 178 } 179 180 @VisibleForTesting 181 public void setSyncSizeForUnitTests(int theSyncSize) { 182 mySyncSize = theSyncSize; 183 } 184 185 @Override 186 public void cancelAllActiveSearches() { 187 for (SearchTask next : myIdToSearchTask.values()) { 188 ourLog.info( 189 "Requesting immediate abort of search: {}", next.getSearch().getUuid()); 190 next.requestImmediateAbort(); 191 AsyncUtil.awaitLatchAndIgnoreInterrupt(next.getCompletionLatch(), 30, TimeUnit.SECONDS); 192 } 193 } 194 195 @SuppressWarnings("SameParameterValue") 196 @VisibleForTesting 197 public void setMaxMillisToWaitForRemoteResultsForUnitTest(long theMaxMillisToWaitForRemoteResults) { 198 myMaxMillisToWaitForRemoteResults = theMaxMillisToWaitForRemoteResults; 199 } 200 201 /** 202 * This method is called by the HTTP client processing thread in order to 203 * fetch resources. 204 * <p> 205 * This method must not be called from inside a transaction. The rationale is that 206 * the {@link Search} entity is treated as a piece of shared state across client threads 207 * accessing the same thread, so we need to be able to update that table in a transaction 208 * and commit it right away in order for that to work. Examples of needing to do this 209 * include if two different clients request the same search and are both paging at the 210 * same time, but also includes clients that are hacking the paging links to 211 * fetch multiple pages of a search result in parallel. In both cases we need to only 212 * let one of them actually activate the search, or we will have conflicts. The other thread 213 * just needs to wait until the first one actually fetches more results. 214 */ 215 @Override 216 public List<JpaPid> getResources( 217 final String theUuid, 218 int theFrom, 219 int theTo, 220 @Nullable RequestDetails theRequestDetails, 221 RequestPartitionId theRequestPartitionId) { 222 assert !TransactionSynchronizationManager.isActualTransactionActive(); 223 224 // If we're actively searching right now, don't try to do anything until at least one batch has been 225 // persisted in the DB 226 SearchTask searchTask = myIdToSearchTask.get(theUuid); 227 if (searchTask != null) { 228 searchTask.awaitInitialSync(); 229 } 230 231 ourLog.trace("About to start looking for resources {}-{}", theFrom, theTo); 232 233 Search search; 234 StopWatch sw = new StopWatch(); 235 while (true) { 236 237 if (myNeverUseLocalSearchForUnitTests == false) { 238 if (searchTask != null) { 239 ourLog.trace("Local search found"); 240 List<JpaPid> resourcePids = searchTask.getResourcePids(theFrom, theTo); 241 ourLog.trace( 242 "Local search returned {} pids, wanted {}-{} - Search: {}", 243 resourcePids.size(), 244 theFrom, 245 theTo, 246 searchTask.getSearch()); 247 248 /* 249 * Generally, if a search task is open, the fastest possible thing is to just return its results. This 250 * will work most of the time, but can fail if the task hit a search threshold and the client is requesting 251 * results beyond that threashold. In that case, we'll keep going below, since that will trigger another 252 * task. 253 */ 254 if ((searchTask.getSearch().getNumFound() 255 - searchTask.getSearch().getNumBlocked()) 256 >= theTo 257 || resourcePids.size() == (theTo - theFrom)) { 258 return resourcePids; 259 } 260 } 261 } 262 263 Callable<Search> searchCallback = () -> mySearchCacheSvc 264 .fetchByUuid(theUuid, theRequestPartitionId) 265 .orElseThrow(() -> myExceptionSvc.newUnknownSearchException(theUuid)); 266 search = myTxService 267 .withRequest(theRequestDetails) 268 .withRequestPartitionId(theRequestPartitionId) 269 .execute(searchCallback); 270 QueryParameterUtils.verifySearchHasntFailedOrThrowInternalErrorException(search); 271 272 if (search.getStatus() == SearchStatusEnum.FINISHED) { 273 ourLog.trace("Search entity marked as finished with {} results", search.getNumFound()); 274 break; 275 } 276 if ((search.getNumFound() - search.getNumBlocked()) >= theTo) { 277 ourLog.trace("Search entity has {} results so far", search.getNumFound()); 278 break; 279 } 280 281 if (sw.getMillis() > myMaxMillisToWaitForRemoteResults) { 282 ourLog.error( 283 "Search {} of type {} for {}{} timed out after {}ms with status {}.", 284 search.getId(), 285 search.getSearchType(), 286 search.getResourceType(), 287 search.getSearchQueryString(), 288 sw.getMillis(), 289 search.getStatus()); 290 throw new InternalErrorException(Msg.code(1163) + "Request timed out after " + sw.getMillis() + "ms"); 291 } 292 293 // If the search was saved in "pass complete mode" it's probably time to 294 // start a new pass 295 if (search.getStatus() == SearchStatusEnum.PASSCMPLET) { 296 ourLog.trace("Going to try to start next search"); 297 Optional<Search> newSearch = 298 mySearchCacheSvc.tryToMarkSearchAsInProgress(search, theRequestPartitionId); 299 if (newSearch.isPresent()) { 300 ourLog.trace("Launching new search"); 301 search = newSearch.get(); 302 String resourceType = search.getResourceType(); 303 SearchParameterMap params = search.getSearchParameterMap() 304 .orElseThrow(() -> new IllegalStateException("No map in PASSCOMPLET search")); 305 IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(resourceType); 306 307 SearchTaskParameters parameters = new SearchTaskParameters( 308 search, 309 resourceDao, 310 params, 311 resourceType, 312 theRequestDetails, 313 theRequestPartitionId, 314 myOnRemoveSearchTask, 315 mySyncSize); 316 parameters.setLoadingThrottleForUnitTests(myLoadingThrottleForUnitTests); 317 SearchContinuationTask task = 318 (SearchContinuationTask) myBeanFactory.getBean(SearchConfig.CONTINUE_TASK, parameters); 319 myIdToSearchTask.put(search.getUuid(), task); 320 task.call(); 321 } 322 } 323 324 if (!search.getStatus().isDone()) { 325 AsyncUtil.sleep(500); 326 } 327 } 328 329 ourLog.trace("Finished looping"); 330 331 List<JpaPid> pids = fetchResultPids(theUuid, theFrom, theTo, theRequestDetails, search, theRequestPartitionId); 332 333 ourLog.trace("Fetched {} results", pids.size()); 334 335 return pids; 336 } 337 338 @Nonnull 339 private List<JpaPid> fetchResultPids( 340 String theUuid, 341 int theFrom, 342 int theTo, 343 @Nullable RequestDetails theRequestDetails, 344 Search theSearch, 345 RequestPartitionId theRequestPartitionId) { 346 List<JpaPid> pids = mySearchResultCacheSvc.fetchResultPids( 347 theSearch, theFrom, theTo, theRequestDetails, theRequestPartitionId); 348 if (pids == null) { 349 throw myExceptionSvc.newUnknownSearchException(theUuid); 350 } 351 return pids; 352 } 353 354 @Override 355 public IBundleProvider registerSearch( 356 final IFhirResourceDao<?> theCallingDao, 357 final SearchParameterMap theParams, 358 String theResourceType, 359 CacheControlDirective theCacheControlDirective, 360 RequestDetails theRequestDetails, 361 RequestPartitionId theRequestPartitionId) { 362 final String searchUuid = UUID.randomUUID().toString(); 363 364 final String queryString = theParams.toNormalizedQueryString(myContext); 365 ourLog.debug("Registering new search {}", searchUuid); 366 367 Search search = new Search(); 368 QueryParameterUtils.populateSearchEntity( 369 theParams, theResourceType, searchUuid, queryString, search, theRequestPartitionId); 370 371 myStorageInterceptorHooks.callStoragePresearchRegistered( 372 theRequestDetails, theParams, search, theRequestPartitionId); 373 374 validateSearch(theParams); 375 376 Class<? extends IBaseResource> resourceTypeClass = 377 myContext.getResourceDefinition(theResourceType).getImplementingClass(); 378 final ISearchBuilder<JpaPid> sb = mySearchBuilderFactory.newSearchBuilder(theResourceType, resourceTypeClass); 379 sb.setFetchSize(mySyncSize); 380 sb.setRequireTotal(theParams.getCount() != null); 381 382 final Integer loadSynchronousUpTo = getLoadSynchronousUpToOrNull(theCacheControlDirective); 383 boolean isOffsetQuery = theParams.isOffsetQuery(); 384 385 // todo someday - not today. 386 // SearchStrategyFactory.ISearchStrategy searchStrategy = mySearchStrategyFactory.pickStrategy(theResourceType, 387 // theParams, theRequestDetails); 388 // return searchStrategy.get(); 389 390 if (theParams.isLoadSynchronous() || loadSynchronousUpTo != null || isOffsetQuery) { 391 if (mySearchStrategyFactory.isSupportsHSearchDirect(theResourceType, theParams, theRequestDetails)) { 392 ourLog.info("Search {} is using direct load strategy", searchUuid); 393 SearchStrategyFactory.ISearchStrategy direct = mySearchStrategyFactory.makeDirectStrategy( 394 searchUuid, theResourceType, theParams, theRequestDetails); 395 396 try { 397 return direct.get(); 398 } catch (ResourceNotFoundInIndexException theE) { 399 // some resources were not found in index, so we will inform this and resort to JPA search 400 ourLog.warn( 401 "Some resources were not found in index. Make sure all resources were indexed. Resorting to database search."); 402 } 403 } 404 405 // we need a max to fetch for synchronous searches; 406 // otherwise we'll explode memory. 407 Integer maxToLoad = getSynchronousMaxResultsToFetch(theParams, loadSynchronousUpTo); 408 ourLog.debug("Setting a max fetch value of {} for synchronous search", maxToLoad); 409 sb.setMaxResultsToFetch(maxToLoad); 410 411 ourLog.debug("Search {} is loading in synchronous mode", searchUuid); 412 return mySynchronousSearchSvc.executeQuery( 413 theParams, theRequestDetails, searchUuid, sb, loadSynchronousUpTo, theRequestPartitionId); 414 } 415 416 /* 417 * See if there are any cached searches whose results we can return 418 * instead 419 */ 420 SearchCacheStatusEnum cacheStatus = SearchCacheStatusEnum.MISS; 421 if (theCacheControlDirective != null && theCacheControlDirective.isNoCache()) { 422 cacheStatus = SearchCacheStatusEnum.NOT_TRIED; 423 } 424 425 if (cacheStatus != SearchCacheStatusEnum.NOT_TRIED) { 426 if (theParams.getEverythingMode() == null) { 427 if (myStorageSettings.getReuseCachedSearchResultsForMillis() != null) { 428 PersistedJpaBundleProvider foundSearchProvider = findCachedQuery( 429 theParams, theResourceType, theRequestDetails, queryString, theRequestPartitionId); 430 if (foundSearchProvider != null) { 431 foundSearchProvider.setCacheStatus(SearchCacheStatusEnum.HIT); 432 return foundSearchProvider; 433 } 434 } 435 } 436 } 437 438 PersistedJpaSearchFirstPageBundleProvider retVal = submitSearch( 439 theCallingDao, theParams, theResourceType, theRequestDetails, sb, theRequestPartitionId, search); 440 retVal.setCacheStatus(cacheStatus); 441 return retVal; 442 } 443 444 /** 445 * The max results to return if this is a synchronous search. 446 * 447 * We'll look in this order: 448 * * load synchronous up to (on params) 449 * * param count (+ offset) 450 * * StorageSettings fetch size default max 451 * * 452 */ 453 private Integer getSynchronousMaxResultsToFetch(SearchParameterMap theParams, Integer theLoadSynchronousUpTo) { 454 if (theLoadSynchronousUpTo != null) { 455 return theLoadSynchronousUpTo; 456 } 457 458 if (theParams.getCount() != null) { 459 int valToReturn = theParams.getCount() + 1; 460 if (theParams.getOffset() != null) { 461 valToReturn += theParams.getOffset(); 462 } 463 return valToReturn; 464 } 465 466 if (myStorageSettings.getFetchSizeDefaultMaximum() != null) { 467 return myStorageSettings.getFetchSizeDefaultMaximum(); 468 } 469 470 return myStorageSettings.getInternalSynchronousSearchSize(); 471 } 472 473 private void validateSearch(SearchParameterMap theParams) { 474 validateIncludes(theParams.getIncludes(), Constants.PARAM_INCLUDE); 475 validateIncludes(theParams.getRevIncludes(), Constants.PARAM_REVINCLUDE); 476 } 477 478 private void validateIncludes(Set<Include> includes, String name) { 479 for (Include next : includes) { 480 String value = next.getValue(); 481 if (value.equals(Constants.INCLUDE_STAR) || isBlank(value)) { 482 continue; 483 } 484 485 String paramType = next.getParamType(); 486 String paramName = next.getParamName(); 487 String paramTargetType = next.getParamTargetType(); 488 489 if (isBlank(paramType) || isBlank(paramName)) { 490 String msg = myContext 491 .getLocalizer() 492 .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidInclude", name, value, ""); 493 throw new InvalidRequestException(Msg.code(2018) + msg); 494 } 495 496 if (!myDaoRegistry.isResourceTypeSupported(paramType)) { 497 String resourceTypeMsg = myContext 498 .getLocalizer() 499 .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidResourceType", paramType); 500 String msg = myContext 501 .getLocalizer() 502 .getMessage( 503 SearchCoordinatorSvcImpl.class, 504 "invalidInclude", 505 UrlUtil.sanitizeUrlPart(name), 506 UrlUtil.sanitizeUrlPart(value), 507 resourceTypeMsg); // last param is pre-sanitized 508 throw new InvalidRequestException(Msg.code(2017) + msg); 509 } 510 511 if (isNotBlank(paramTargetType) && !myDaoRegistry.isResourceTypeSupported(paramTargetType)) { 512 String resourceTypeMsg = myContext 513 .getLocalizer() 514 .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidResourceType", paramTargetType); 515 String msg = myContext 516 .getLocalizer() 517 .getMessage( 518 SearchCoordinatorSvcImpl.class, 519 "invalidInclude", 520 UrlUtil.sanitizeUrlPart(name), 521 UrlUtil.sanitizeUrlPart(value), 522 resourceTypeMsg); // last param is pre-sanitized 523 throw new InvalidRequestException(Msg.code(2016) + msg); 524 } 525 526 if (!Constants.INCLUDE_STAR.equals(paramName) 527 && mySearchParamRegistry.getActiveSearchParam( 528 paramType, paramName, ISearchParamRegistry.SearchParamLookupContextEnum.SEARCH) 529 == null) { 530 List<String> validNames = mySearchParamRegistry 531 .getActiveSearchParams(paramType, ISearchParamRegistry.SearchParamLookupContextEnum.SEARCH) 532 .values() 533 .stream() 534 .filter(t -> t.getParamType() == RestSearchParameterTypeEnum.REFERENCE) 535 .map(t -> UrlUtil.sanitizeUrlPart(t.getName())) 536 .sorted() 537 .collect(Collectors.toList()); 538 String searchParamMessage = myContext 539 .getLocalizer() 540 .getMessage( 541 BaseStorageDao.class, 542 "invalidSearchParameter", 543 UrlUtil.sanitizeUrlPart(paramName), 544 UrlUtil.sanitizeUrlPart(paramType), 545 validNames); 546 String msg = myContext 547 .getLocalizer() 548 .getMessage( 549 SearchCoordinatorSvcImpl.class, 550 "invalidInclude", 551 UrlUtil.sanitizeUrlPart(name), 552 UrlUtil.sanitizeUrlPart(value), 553 searchParamMessage); // last param is pre-sanitized 554 throw new InvalidRequestException(Msg.code(2015) + msg); 555 } 556 } 557 } 558 559 @Override 560 public Optional<Integer> getSearchTotal( 561 String theUuid, @Nullable RequestDetails theRequestDetails, RequestPartitionId theRequestPartitionId) { 562 SearchTask task = myIdToSearchTask.get(theUuid); 563 if (task != null) { 564 return Optional.ofNullable(task.awaitInitialSync()); 565 } 566 567 /* 568 * In case there is no running search, if the total is listed as accurate we know one is coming 569 * so let's wait a bit for it to show up 570 */ 571 Optional<Search> search = myTxService 572 .withRequest(theRequestDetails) 573 .execute(() -> mySearchCacheSvc.fetchByUuid(theUuid, theRequestPartitionId)); 574 if (search.isPresent()) { 575 Optional<SearchParameterMap> searchParameterMap = search.get().getSearchParameterMap(); 576 if (searchParameterMap.isPresent() 577 && searchParameterMap.get().getSearchTotalMode() == SearchTotalModeEnum.ACCURATE) { 578 for (int i = 0; i < 10; i++) { 579 if (search.isPresent()) { 580 QueryParameterUtils.verifySearchHasntFailedOrThrowInternalErrorException(search.get()); 581 if (search.get().getTotalCount() != null) { 582 return Optional.of(search.get().getTotalCount()); 583 } 584 } 585 search = mySearchCacheSvc.fetchByUuid(theUuid, theRequestPartitionId); 586 } 587 } 588 } 589 590 return Optional.empty(); 591 } 592 593 @Nonnull 594 private PersistedJpaSearchFirstPageBundleProvider submitSearch( 595 IDao theCallingDao, 596 SearchParameterMap theParams, 597 String theResourceType, 598 RequestDetails theRequestDetails, 599 ISearchBuilder<JpaPid> theSb, 600 RequestPartitionId theRequestPartitionId, 601 Search theSearch) { 602 StopWatch w = new StopWatch(); 603 604 SearchTaskParameters stp = new SearchTaskParameters( 605 theSearch, 606 theCallingDao, 607 theParams, 608 theResourceType, 609 theRequestDetails, 610 theRequestPartitionId, 611 myOnRemoveSearchTask, 612 mySyncSize); 613 stp.setLoadingThrottleForUnitTests(myLoadingThrottleForUnitTests); 614 SearchTask task = (SearchTask) myBeanFactory.getBean(SearchConfig.SEARCH_TASK, stp); 615 myIdToSearchTask.put(theSearch.getUuid(), task); 616 task.call(); 617 618 PersistedJpaSearchFirstPageBundleProvider retVal = myPersistedJpaBundleProviderFactory.newInstanceFirstPage( 619 theRequestDetails, task, theSb, theRequestPartitionId); 620 621 ourLog.debug("Search initial phase completed in {}ms", w.getMillis()); 622 return retVal; 623 } 624 625 @Nullable 626 private PersistedJpaBundleProvider findCachedQuery( 627 SearchParameterMap theParams, 628 String theResourceType, 629 RequestDetails theRequestDetails, 630 String theQueryString, 631 RequestPartitionId theRequestPartitionId) { 632 // May be null 633 return myTxService 634 .withRequest(theRequestDetails) 635 .withRequestPartitionId(theRequestPartitionId) 636 .execute(() -> { 637 IInterceptorBroadcaster compositeBroadcaster = 638 CompositeInterceptorBroadcaster.newCompositeBroadcaster( 639 myInterceptorBroadcaster, theRequestDetails); 640 641 // Interceptor call: STORAGE_PRECHECK_FOR_CACHED_SEARCH 642 643 HookParams params = new HookParams() 644 .add(SearchParameterMap.class, theParams) 645 .add(RequestDetails.class, theRequestDetails) 646 .addIfMatchesType(ServletRequestDetails.class, theRequestDetails); 647 boolean canUseCache = 648 compositeBroadcaster.callHooks(Pointcut.STORAGE_PRECHECK_FOR_CACHED_SEARCH, params); 649 if (!canUseCache) { 650 return null; 651 } 652 653 // Check for a search matching the given hash 654 Search searchToUse = findSearchToUseOrNull(theQueryString, theResourceType, theRequestPartitionId); 655 if (searchToUse == null) { 656 return null; 657 } 658 659 ourLog.debug("Reusing search {} from cache", searchToUse.getUuid()); 660 // Interceptor call: JPA_PERFTRACE_SEARCH_REUSING_CACHED 661 params = new HookParams() 662 .add(SearchParameterMap.class, theParams) 663 .add(RequestDetails.class, theRequestDetails) 664 .addIfMatchesType(ServletRequestDetails.class, theRequestDetails); 665 compositeBroadcaster.callHooks(Pointcut.JPA_PERFTRACE_SEARCH_REUSING_CACHED, params); 666 667 return myPersistedJpaBundleProviderFactory.newInstance(theRequestDetails, searchToUse.getUuid()); 668 }); 669 } 670 671 @Nullable 672 private Search findSearchToUseOrNull( 673 String theQueryString, String theResourceType, RequestPartitionId theRequestPartitionId) { 674 // createdCutoff is in recent past 675 final Instant createdCutoff = 676 Instant.now().minus(myStorageSettings.getReuseCachedSearchResultsForMillis(), ChronoUnit.MILLIS); 677 678 Optional<Search> candidate = mySearchCacheSvc.findCandidatesForReuse( 679 theResourceType, theQueryString, createdCutoff, theRequestPartitionId); 680 return candidate.orElse(null); 681 } 682 683 @Nullable 684 private Integer getLoadSynchronousUpToOrNull(CacheControlDirective theCacheControlDirective) { 685 final Integer loadSynchronousUpTo; 686 if (theCacheControlDirective != null && theCacheControlDirective.isNoStore()) { 687 if (theCacheControlDirective.getMaxResults() != null) { 688 loadSynchronousUpTo = theCacheControlDirective.getMaxResults(); 689 if (loadSynchronousUpTo > myStorageSettings.getCacheControlNoStoreMaxResultsUpperLimit()) { 690 throw new InvalidRequestException(Msg.code(1165) + Constants.HEADER_CACHE_CONTROL + " header " 691 + Constants.CACHE_CONTROL_MAX_RESULTS + " value must not exceed " 692 + myStorageSettings.getCacheControlNoStoreMaxResultsUpperLimit()); 693 } 694 } else { 695 loadSynchronousUpTo = 100; 696 } 697 } else { 698 loadSynchronousUpTo = null; 699 } 700 return loadSynchronousUpTo; 701 } 702 703 /** 704 * Creates a {@link Pageable} using a start and end index 705 */ 706 @SuppressWarnings("WeakerAccess") 707 @Nullable 708 public static Pageable toPage(final int theFromIndex, int theToIndex) { 709 int pageSize = theToIndex - theFromIndex; 710 if (pageSize < 1) { 711 return null; 712 } 713 714 int pageIndex = theFromIndex / pageSize; 715 716 return new PageRequest(pageIndex, pageSize, Sort.unsorted()) { 717 private static final long serialVersionUID = 1L; 718 719 @Override 720 public long getOffset() { 721 return theFromIndex; 722 } 723 }; 724 } 725}