001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L%family 019 */ 020package ca.uhn.fhir.jpa.search; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.i18n.Msg; 024import ca.uhn.fhir.interceptor.api.HookParams; 025import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; 026import ca.uhn.fhir.interceptor.api.Pointcut; 027import ca.uhn.fhir.interceptor.model.RequestPartitionId; 028import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; 029import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 030import ca.uhn.fhir.jpa.api.dao.IDao; 031import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 032import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc; 033import ca.uhn.fhir.jpa.config.SearchConfig; 034import ca.uhn.fhir.jpa.dao.BaseStorageDao; 035import ca.uhn.fhir.jpa.dao.ISearchBuilder; 036import ca.uhn.fhir.jpa.dao.SearchBuilderFactory; 037import ca.uhn.fhir.jpa.dao.search.ResourceNotFoundInIndexException; 038import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; 039import ca.uhn.fhir.jpa.entity.Search; 040import ca.uhn.fhir.jpa.model.dao.JpaPid; 041import ca.uhn.fhir.jpa.model.search.SearchStatusEnum; 042import ca.uhn.fhir.jpa.search.builder.StorageInterceptorHooksFacade; 043import ca.uhn.fhir.jpa.search.builder.tasks.SearchContinuationTask; 044import ca.uhn.fhir.jpa.search.builder.tasks.SearchTask; 045import ca.uhn.fhir.jpa.search.builder.tasks.SearchTaskParameters; 046import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc; 047import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc; 048import ca.uhn.fhir.jpa.search.cache.SearchCacheStatusEnum; 049import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 050import ca.uhn.fhir.jpa.util.QueryParameterUtils; 051import ca.uhn.fhir.model.api.Include; 052import ca.uhn.fhir.rest.api.CacheControlDirective; 053import ca.uhn.fhir.rest.api.Constants; 054import ca.uhn.fhir.rest.api.RestSearchParameterTypeEnum; 055import ca.uhn.fhir.rest.api.SearchTotalModeEnum; 056import ca.uhn.fhir.rest.api.server.IBundleProvider; 057import ca.uhn.fhir.rest.api.server.RequestDetails; 058import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 059import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; 060import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails; 061import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster; 062import ca.uhn.fhir.rest.server.util.ISearchParamRegistry; 063import ca.uhn.fhir.util.AsyncUtil; 064import ca.uhn.fhir.util.StopWatch; 065import ca.uhn.fhir.util.UrlUtil; 066import com.google.common.annotations.VisibleForTesting; 067import jakarta.annotation.Nonnull; 068import jakarta.annotation.Nullable; 069import org.apache.commons.lang3.time.DateUtils; 070import org.hl7.fhir.instance.model.api.IBaseResource; 071import org.springframework.beans.factory.BeanFactory; 072import org.springframework.data.domain.PageRequest; 073import org.springframework.data.domain.Pageable; 074import org.springframework.data.domain.Sort; 075import org.springframework.stereotype.Component; 076import org.springframework.transaction.support.TransactionSynchronizationManager; 077 078import java.time.Instant; 079import java.time.temporal.ChronoUnit; 080import java.util.List; 081import java.util.Optional; 082import java.util.Set; 083import java.util.UUID; 084import java.util.concurrent.Callable; 085import java.util.concurrent.ConcurrentHashMap; 086import java.util.concurrent.TimeUnit; 087import java.util.function.Consumer; 088import java.util.stream.Collectors; 089 090import static ca.uhn.fhir.jpa.util.QueryParameterUtils.DEFAULT_SYNC_SIZE; 091import static org.apache.commons.lang3.StringUtils.isBlank; 092import static org.apache.commons.lang3.StringUtils.isNotBlank; 093 094@Component("mySearchCoordinatorSvc") 095public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc<JpaPid> { 096 097 private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchCoordinatorSvcImpl.class); 098 099 private final FhirContext myContext; 100 private final JpaStorageSettings myStorageSettings; 101 private final IInterceptorBroadcaster myInterceptorBroadcaster; 102 private final HapiTransactionService myTxService; 103 private final ISearchCacheSvc mySearchCacheSvc; 104 private final ISearchResultCacheSvc mySearchResultCacheSvc; 105 private final DaoRegistry myDaoRegistry; 106 private final SearchBuilderFactory<JpaPid> mySearchBuilderFactory; 107 private final ISynchronousSearchSvc mySynchronousSearchSvc; 108 private final PersistedJpaBundleProviderFactory myPersistedJpaBundleProviderFactory; 109 private final ISearchParamRegistry mySearchParamRegistry; 110 private final SearchStrategyFactory mySearchStrategyFactory; 111 private final ExceptionService myExceptionSvc; 112 private final BeanFactory myBeanFactory; 113 private ConcurrentHashMap<String, SearchTask> myIdToSearchTask = new ConcurrentHashMap<>(); 114 115 private final Consumer<String> myOnRemoveSearchTask = myIdToSearchTask::remove; 116 117 private final StorageInterceptorHooksFacade myStorageInterceptorHooks; 118 private Integer myLoadingThrottleForUnitTests = null; 119 private long myMaxMillisToWaitForRemoteResults = DateUtils.MILLIS_PER_MINUTE; 120 private boolean myNeverUseLocalSearchForUnitTests; 121 private int mySyncSize = DEFAULT_SYNC_SIZE; 122 123 /** 124 * Constructor 125 */ 126 public SearchCoordinatorSvcImpl( 127 FhirContext theContext, 128 JpaStorageSettings theStorageSettings, 129 IInterceptorBroadcaster theInterceptorBroadcaster, 130 HapiTransactionService theTxService, 131 ISearchCacheSvc theSearchCacheSvc, 132 ISearchResultCacheSvc theSearchResultCacheSvc, 133 DaoRegistry theDaoRegistry, 134 SearchBuilderFactory<JpaPid> theSearchBuilderFactory, 135 ISynchronousSearchSvc theSynchronousSearchSvc, 136 PersistedJpaBundleProviderFactory thePersistedJpaBundleProviderFactory, 137 ISearchParamRegistry theSearchParamRegistry, 138 SearchStrategyFactory theSearchStrategyFactory, 139 ExceptionService theExceptionSvc, 140 BeanFactory theBeanFactory) { 141 super(); 142 myContext = theContext; 143 myStorageSettings = theStorageSettings; 144 myInterceptorBroadcaster = theInterceptorBroadcaster; 145 myTxService = theTxService; 146 mySearchCacheSvc = theSearchCacheSvc; 147 mySearchResultCacheSvc = theSearchResultCacheSvc; 148 myDaoRegistry = theDaoRegistry; 149 mySearchBuilderFactory = theSearchBuilderFactory; 150 mySynchronousSearchSvc = theSynchronousSearchSvc; 151 myPersistedJpaBundleProviderFactory = thePersistedJpaBundleProviderFactory; 152 mySearchParamRegistry = theSearchParamRegistry; 153 mySearchStrategyFactory = theSearchStrategyFactory; 154 myExceptionSvc = theExceptionSvc; 155 myBeanFactory = theBeanFactory; 156 157 myStorageInterceptorHooks = new StorageInterceptorHooksFacade(myInterceptorBroadcaster); 158 } 159 160 @VisibleForTesting 161 Set<String> getActiveSearchIds() { 162 return myIdToSearchTask.keySet(); 163 } 164 165 @VisibleForTesting 166 public void setIdToSearchTaskMapForUnitTests(ConcurrentHashMap<String, SearchTask> theIdToSearchTaskMap) { 167 myIdToSearchTask = theIdToSearchTaskMap; 168 } 169 170 @VisibleForTesting 171 public void setLoadingThrottleForUnitTests(Integer theLoadingThrottleForUnitTests) { 172 myLoadingThrottleForUnitTests = theLoadingThrottleForUnitTests; 173 } 174 175 @VisibleForTesting 176 public void setNeverUseLocalSearchForUnitTests(boolean theNeverUseLocalSearchForUnitTests) { 177 myNeverUseLocalSearchForUnitTests = theNeverUseLocalSearchForUnitTests; 178 } 179 180 @VisibleForTesting 181 public void setSyncSizeForUnitTests(int theSyncSize) { 182 mySyncSize = theSyncSize; 183 } 184 185 @Override 186 public void cancelAllActiveSearches() { 187 for (SearchTask next : myIdToSearchTask.values()) { 188 ourLog.info( 189 "Requesting immediate abort of search: {}", next.getSearch().getUuid()); 190 next.requestImmediateAbort(); 191 AsyncUtil.awaitLatchAndIgnoreInterrupt(next.getCompletionLatch(), 30, TimeUnit.SECONDS); 192 } 193 } 194 195 @SuppressWarnings("SameParameterValue") 196 @VisibleForTesting 197 void setMaxMillisToWaitForRemoteResultsForUnitTest(long theMaxMillisToWaitForRemoteResults) { 198 myMaxMillisToWaitForRemoteResults = theMaxMillisToWaitForRemoteResults; 199 } 200 201 /** 202 * This method is called by the HTTP client processing thread in order to 203 * fetch resources. 204 * <p> 205 * This method must not be called from inside a transaction. The rationale is that 206 * the {@link Search} entity is treated as a piece of shared state across client threads 207 * accessing the same thread, so we need to be able to update that table in a transaction 208 * and commit it right away in order for that to work. Examples of needing to do this 209 * include if two different clients request the same search and are both paging at the 210 * same time, but also includes clients that are hacking the paging links to 211 * fetch multiple pages of a search result in parallel. In both cases we need to only 212 * let one of them actually activate the search, or we will have conficts. The other thread 213 * just needs to wait until the first one actually fetches more results. 214 */ 215 @Override 216 public List<JpaPid> getResources( 217 final String theUuid, 218 int theFrom, 219 int theTo, 220 @Nullable RequestDetails theRequestDetails, 221 RequestPartitionId theRequestPartitionId) { 222 assert !TransactionSynchronizationManager.isActualTransactionActive(); 223 224 // If we're actively searching right now, don't try to do anything until at least one batch has been 225 // persisted in the DB 226 SearchTask searchTask = myIdToSearchTask.get(theUuid); 227 if (searchTask != null) { 228 searchTask.awaitInitialSync(); 229 } 230 231 ourLog.trace("About to start looking for resources {}-{}", theFrom, theTo); 232 233 Search search; 234 StopWatch sw = new StopWatch(); 235 while (true) { 236 237 if (myNeverUseLocalSearchForUnitTests == false) { 238 if (searchTask != null) { 239 ourLog.trace("Local search found"); 240 List<JpaPid> resourcePids = searchTask.getResourcePids(theFrom, theTo); 241 ourLog.trace( 242 "Local search returned {} pids, wanted {}-{} - Search: {}", 243 resourcePids.size(), 244 theFrom, 245 theTo, 246 searchTask.getSearch()); 247 248 /* 249 * Generally, if a search task is open, the fastest possible thing is to just return its results. This 250 * will work most of the time, but can fail if the task hit a search threshold and the client is requesting 251 * results beyond that threashold. In that case, we'll keep going below, since that will trigger another 252 * task. 253 */ 254 if ((searchTask.getSearch().getNumFound() 255 - searchTask.getSearch().getNumBlocked()) 256 >= theTo 257 || resourcePids.size() == (theTo - theFrom)) { 258 return resourcePids; 259 } 260 } 261 } 262 263 Callable<Search> searchCallback = () -> mySearchCacheSvc 264 .fetchByUuid(theUuid, theRequestPartitionId) 265 .orElseThrow(() -> myExceptionSvc.newUnknownSearchException(theUuid)); 266 search = myTxService 267 .withRequest(theRequestDetails) 268 .withRequestPartitionId(theRequestPartitionId) 269 .execute(searchCallback); 270 QueryParameterUtils.verifySearchHasntFailedOrThrowInternalErrorException(search); 271 272 if (search.getStatus() == SearchStatusEnum.FINISHED) { 273 ourLog.trace("Search entity marked as finished with {} results", search.getNumFound()); 274 break; 275 } 276 if ((search.getNumFound() - search.getNumBlocked()) >= theTo) { 277 ourLog.trace("Search entity has {} results so far", search.getNumFound()); 278 break; 279 } 280 281 if (sw.getMillis() > myMaxMillisToWaitForRemoteResults) { 282 ourLog.error( 283 "Search {} of type {} for {}{} timed out after {}ms", 284 search.getId(), 285 search.getSearchType(), 286 search.getResourceType(), 287 search.getSearchQueryString(), 288 sw.getMillis()); 289 throw new InternalErrorException(Msg.code(1163) + "Request timed out after " + sw.getMillis() + "ms"); 290 } 291 292 // If the search was saved in "pass complete mode" it's probably time to 293 // start a new pass 294 if (search.getStatus() == SearchStatusEnum.PASSCMPLET) { 295 ourLog.trace("Going to try to start next search"); 296 Optional<Search> newSearch = 297 mySearchCacheSvc.tryToMarkSearchAsInProgress(search, theRequestPartitionId); 298 if (newSearch.isPresent()) { 299 ourLog.trace("Launching new search"); 300 search = newSearch.get(); 301 String resourceType = search.getResourceType(); 302 SearchParameterMap params = search.getSearchParameterMap() 303 .orElseThrow(() -> new IllegalStateException("No map in PASSCOMPLET search")); 304 IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(resourceType); 305 306 SearchTaskParameters parameters = new SearchTaskParameters( 307 search, 308 resourceDao, 309 params, 310 resourceType, 311 theRequestDetails, 312 theRequestPartitionId, 313 myOnRemoveSearchTask, 314 mySyncSize); 315 parameters.setLoadingThrottleForUnitTests(myLoadingThrottleForUnitTests); 316 SearchContinuationTask task = 317 (SearchContinuationTask) myBeanFactory.getBean(SearchConfig.CONTINUE_TASK, parameters); 318 myIdToSearchTask.put(search.getUuid(), task); 319 task.call(); 320 } 321 } 322 323 if (!search.getStatus().isDone()) { 324 AsyncUtil.sleep(500); 325 } 326 } 327 328 ourLog.trace("Finished looping"); 329 330 List<JpaPid> pids = fetchResultPids(theUuid, theFrom, theTo, theRequestDetails, search, theRequestPartitionId); 331 332 ourLog.trace("Fetched {} results", pids.size()); 333 334 return pids; 335 } 336 337 @Nonnull 338 private List<JpaPid> fetchResultPids( 339 String theUuid, 340 int theFrom, 341 int theTo, 342 @Nullable RequestDetails theRequestDetails, 343 Search theSearch, 344 RequestPartitionId theRequestPartitionId) { 345 List<JpaPid> pids = mySearchResultCacheSvc.fetchResultPids( 346 theSearch, theFrom, theTo, theRequestDetails, theRequestPartitionId); 347 if (pids == null) { 348 throw myExceptionSvc.newUnknownSearchException(theUuid); 349 } 350 return pids; 351 } 352 353 @Override 354 public IBundleProvider registerSearch( 355 final IFhirResourceDao<?> theCallingDao, 356 final SearchParameterMap theParams, 357 String theResourceType, 358 CacheControlDirective theCacheControlDirective, 359 RequestDetails theRequestDetails, 360 RequestPartitionId theRequestPartitionId) { 361 final String searchUuid = UUID.randomUUID().toString(); 362 363 final String queryString = theParams.toNormalizedQueryString(myContext); 364 ourLog.debug("Registering new search {}", searchUuid); 365 366 Search search = new Search(); 367 QueryParameterUtils.populateSearchEntity( 368 theParams, theResourceType, searchUuid, queryString, search, theRequestPartitionId); 369 370 myStorageInterceptorHooks.callStoragePresearchRegistered( 371 theRequestDetails, theParams, search, theRequestPartitionId); 372 373 validateSearch(theParams); 374 375 Class<? extends IBaseResource> resourceTypeClass = 376 myContext.getResourceDefinition(theResourceType).getImplementingClass(); 377 final ISearchBuilder<JpaPid> sb = mySearchBuilderFactory.newSearchBuilder(theResourceType, resourceTypeClass); 378 sb.setFetchSize(mySyncSize); 379 sb.setRequireTotal(theParams.getCount() != null); 380 381 final Integer loadSynchronousUpTo = getLoadSynchronousUpToOrNull(theCacheControlDirective); 382 boolean isOffsetQuery = theParams.isOffsetQuery(); 383 384 // todo someday - not today. 385 // SearchStrategyFactory.ISearchStrategy searchStrategy = mySearchStrategyFactory.pickStrategy(theResourceType, 386 // theParams, theRequestDetails); 387 // return searchStrategy.get(); 388 389 if (theParams.isLoadSynchronous() || loadSynchronousUpTo != null || isOffsetQuery) { 390 if (mySearchStrategyFactory.isSupportsHSearchDirect(theResourceType, theParams, theRequestDetails)) { 391 ourLog.info("Search {} is using direct load strategy", searchUuid); 392 SearchStrategyFactory.ISearchStrategy direct = mySearchStrategyFactory.makeDirectStrategy( 393 searchUuid, theResourceType, theParams, theRequestDetails); 394 395 try { 396 return direct.get(); 397 } catch (ResourceNotFoundInIndexException theE) { 398 // some resources were not found in index, so we will inform this and resort to JPA search 399 ourLog.warn( 400 "Some resources were not found in index. Make sure all resources were indexed. Resorting to database search."); 401 } 402 } 403 404 // we need a max to fetch for synchronous searches; 405 // otherwise we'll explode memory. 406 Integer maxToLoad = getSynchronousMaxResultsToFetch(theParams, loadSynchronousUpTo); 407 ourLog.debug("Setting a max fetch value of {} for synchronous search", maxToLoad); 408 sb.setMaxResultsToFetch(maxToLoad); 409 410 ourLog.debug("Search {} is loading in synchronous mode", searchUuid); 411 return mySynchronousSearchSvc.executeQuery( 412 theParams, theRequestDetails, searchUuid, sb, loadSynchronousUpTo, theRequestPartitionId); 413 } 414 415 /* 416 * See if there are any cached searches whose results we can return 417 * instead 418 */ 419 SearchCacheStatusEnum cacheStatus = SearchCacheStatusEnum.MISS; 420 if (theCacheControlDirective != null && theCacheControlDirective.isNoCache()) { 421 cacheStatus = SearchCacheStatusEnum.NOT_TRIED; 422 } 423 424 if (cacheStatus != SearchCacheStatusEnum.NOT_TRIED) { 425 if (theParams.getEverythingMode() == null) { 426 if (myStorageSettings.getReuseCachedSearchResultsForMillis() != null) { 427 PersistedJpaBundleProvider foundSearchProvider = findCachedQuery( 428 theParams, theResourceType, theRequestDetails, queryString, theRequestPartitionId); 429 if (foundSearchProvider != null) { 430 foundSearchProvider.setCacheStatus(SearchCacheStatusEnum.HIT); 431 return foundSearchProvider; 432 } 433 } 434 } 435 } 436 437 PersistedJpaSearchFirstPageBundleProvider retVal = submitSearch( 438 theCallingDao, theParams, theResourceType, theRequestDetails, sb, theRequestPartitionId, search); 439 retVal.setCacheStatus(cacheStatus); 440 return retVal; 441 } 442 443 /** 444 * The max results to return if this is a synchronous search. 445 * 446 * We'll look in this order: 447 * * load synchronous up to (on params) 448 * * param count (+ offset) 449 * * StorageSettings fetch size default max 450 * * 451 */ 452 private Integer getSynchronousMaxResultsToFetch(SearchParameterMap theParams, Integer theLoadSynchronousUpTo) { 453 if (theLoadSynchronousUpTo != null) { 454 return theLoadSynchronousUpTo; 455 } 456 457 if (theParams.getCount() != null) { 458 int valToReturn = theParams.getCount() + 1; 459 if (theParams.getOffset() != null) { 460 valToReturn += theParams.getOffset(); 461 } 462 return valToReturn; 463 } 464 465 if (myStorageSettings.getFetchSizeDefaultMaximum() != null) { 466 return myStorageSettings.getFetchSizeDefaultMaximum(); 467 } 468 469 return myStorageSettings.getInternalSynchronousSearchSize(); 470 } 471 472 private void validateSearch(SearchParameterMap theParams) { 473 validateIncludes(theParams.getIncludes(), Constants.PARAM_INCLUDE); 474 validateIncludes(theParams.getRevIncludes(), Constants.PARAM_REVINCLUDE); 475 } 476 477 private void validateIncludes(Set<Include> includes, String name) { 478 for (Include next : includes) { 479 String value = next.getValue(); 480 if (value.equals(Constants.INCLUDE_STAR) || isBlank(value)) { 481 continue; 482 } 483 484 String paramType = next.getParamType(); 485 String paramName = next.getParamName(); 486 String paramTargetType = next.getParamTargetType(); 487 488 if (isBlank(paramType) || isBlank(paramName)) { 489 String msg = myContext 490 .getLocalizer() 491 .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidInclude", name, value, ""); 492 throw new InvalidRequestException(Msg.code(2018) + msg); 493 } 494 495 if (!myDaoRegistry.isResourceTypeSupported(paramType)) { 496 String resourceTypeMsg = myContext 497 .getLocalizer() 498 .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidResourceType", paramType); 499 String msg = myContext 500 .getLocalizer() 501 .getMessage( 502 SearchCoordinatorSvcImpl.class, 503 "invalidInclude", 504 UrlUtil.sanitizeUrlPart(name), 505 UrlUtil.sanitizeUrlPart(value), 506 resourceTypeMsg); // last param is pre-sanitized 507 throw new InvalidRequestException(Msg.code(2017) + msg); 508 } 509 510 if (isNotBlank(paramTargetType) && !myDaoRegistry.isResourceTypeSupported(paramTargetType)) { 511 String resourceTypeMsg = myContext 512 .getLocalizer() 513 .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidResourceType", paramTargetType); 514 String msg = myContext 515 .getLocalizer() 516 .getMessage( 517 SearchCoordinatorSvcImpl.class, 518 "invalidInclude", 519 UrlUtil.sanitizeUrlPart(name), 520 UrlUtil.sanitizeUrlPart(value), 521 resourceTypeMsg); // last param is pre-sanitized 522 throw new InvalidRequestException(Msg.code(2016) + msg); 523 } 524 525 if (!Constants.INCLUDE_STAR.equals(paramName) 526 && mySearchParamRegistry.getActiveSearchParam( 527 paramType, paramName, ISearchParamRegistry.SearchParamLookupContextEnum.SEARCH) 528 == null) { 529 List<String> validNames = mySearchParamRegistry 530 .getActiveSearchParams(paramType, ISearchParamRegistry.SearchParamLookupContextEnum.SEARCH) 531 .values() 532 .stream() 533 .filter(t -> t.getParamType() == RestSearchParameterTypeEnum.REFERENCE) 534 .map(t -> UrlUtil.sanitizeUrlPart(t.getName())) 535 .sorted() 536 .collect(Collectors.toList()); 537 String searchParamMessage = myContext 538 .getLocalizer() 539 .getMessage( 540 BaseStorageDao.class, 541 "invalidSearchParameter", 542 UrlUtil.sanitizeUrlPart(paramName), 543 UrlUtil.sanitizeUrlPart(paramType), 544 validNames); 545 String msg = myContext 546 .getLocalizer() 547 .getMessage( 548 SearchCoordinatorSvcImpl.class, 549 "invalidInclude", 550 UrlUtil.sanitizeUrlPart(name), 551 UrlUtil.sanitizeUrlPart(value), 552 searchParamMessage); // last param is pre-sanitized 553 throw new InvalidRequestException(Msg.code(2015) + msg); 554 } 555 } 556 } 557 558 @Override 559 public Optional<Integer> getSearchTotal( 560 String theUuid, @Nullable RequestDetails theRequestDetails, RequestPartitionId theRequestPartitionId) { 561 SearchTask task = myIdToSearchTask.get(theUuid); 562 if (task != null) { 563 return Optional.ofNullable(task.awaitInitialSync()); 564 } 565 566 /* 567 * In case there is no running search, if the total is listed as accurate we know one is coming 568 * so let's wait a bit for it to show up 569 */ 570 Optional<Search> search = myTxService 571 .withRequest(theRequestDetails) 572 .execute(() -> mySearchCacheSvc.fetchByUuid(theUuid, theRequestPartitionId)); 573 if (search.isPresent()) { 574 Optional<SearchParameterMap> searchParameterMap = search.get().getSearchParameterMap(); 575 if (searchParameterMap.isPresent() 576 && searchParameterMap.get().getSearchTotalMode() == SearchTotalModeEnum.ACCURATE) { 577 for (int i = 0; i < 10; i++) { 578 if (search.isPresent()) { 579 QueryParameterUtils.verifySearchHasntFailedOrThrowInternalErrorException(search.get()); 580 if (search.get().getTotalCount() != null) { 581 return Optional.of(search.get().getTotalCount()); 582 } 583 } 584 search = mySearchCacheSvc.fetchByUuid(theUuid, theRequestPartitionId); 585 } 586 } 587 } 588 589 return Optional.empty(); 590 } 591 592 @Nonnull 593 private PersistedJpaSearchFirstPageBundleProvider submitSearch( 594 IDao theCallingDao, 595 SearchParameterMap theParams, 596 String theResourceType, 597 RequestDetails theRequestDetails, 598 ISearchBuilder<JpaPid> theSb, 599 RequestPartitionId theRequestPartitionId, 600 Search theSearch) { 601 StopWatch w = new StopWatch(); 602 603 SearchTaskParameters stp = new SearchTaskParameters( 604 theSearch, 605 theCallingDao, 606 theParams, 607 theResourceType, 608 theRequestDetails, 609 theRequestPartitionId, 610 myOnRemoveSearchTask, 611 mySyncSize); 612 stp.setLoadingThrottleForUnitTests(myLoadingThrottleForUnitTests); 613 SearchTask task = (SearchTask) myBeanFactory.getBean(SearchConfig.SEARCH_TASK, stp); 614 myIdToSearchTask.put(theSearch.getUuid(), task); 615 task.call(); 616 617 PersistedJpaSearchFirstPageBundleProvider retVal = myPersistedJpaBundleProviderFactory.newInstanceFirstPage( 618 theRequestDetails, task, theSb, theRequestPartitionId); 619 620 ourLog.debug("Search initial phase completed in {}ms", w.getMillis()); 621 return retVal; 622 } 623 624 @Nullable 625 private PersistedJpaBundleProvider findCachedQuery( 626 SearchParameterMap theParams, 627 String theResourceType, 628 RequestDetails theRequestDetails, 629 String theQueryString, 630 RequestPartitionId theRequestPartitionId) { 631 // May be null 632 return myTxService 633 .withRequest(theRequestDetails) 634 .withRequestPartitionId(theRequestPartitionId) 635 .execute(() -> { 636 IInterceptorBroadcaster compositeBroadcaster = 637 CompositeInterceptorBroadcaster.newCompositeBroadcaster( 638 myInterceptorBroadcaster, theRequestDetails); 639 640 // Interceptor call: STORAGE_PRECHECK_FOR_CACHED_SEARCH 641 642 HookParams params = new HookParams() 643 .add(SearchParameterMap.class, theParams) 644 .add(RequestDetails.class, theRequestDetails) 645 .addIfMatchesType(ServletRequestDetails.class, theRequestDetails); 646 boolean canUseCache = 647 compositeBroadcaster.callHooks(Pointcut.STORAGE_PRECHECK_FOR_CACHED_SEARCH, params); 648 if (!canUseCache) { 649 return null; 650 } 651 652 // Check for a search matching the given hash 653 Search searchToUse = findSearchToUseOrNull(theQueryString, theResourceType, theRequestPartitionId); 654 if (searchToUse == null) { 655 return null; 656 } 657 658 ourLog.debug("Reusing search {} from cache", searchToUse.getUuid()); 659 // Interceptor call: JPA_PERFTRACE_SEARCH_REUSING_CACHED 660 params = new HookParams() 661 .add(SearchParameterMap.class, theParams) 662 .add(RequestDetails.class, theRequestDetails) 663 .addIfMatchesType(ServletRequestDetails.class, theRequestDetails); 664 compositeBroadcaster.callHooks(Pointcut.JPA_PERFTRACE_SEARCH_REUSING_CACHED, params); 665 666 return myPersistedJpaBundleProviderFactory.newInstance(theRequestDetails, searchToUse.getUuid()); 667 }); 668 } 669 670 @Nullable 671 private Search findSearchToUseOrNull( 672 String theQueryString, String theResourceType, RequestPartitionId theRequestPartitionId) { 673 // createdCutoff is in recent past 674 final Instant createdCutoff = 675 Instant.now().minus(myStorageSettings.getReuseCachedSearchResultsForMillis(), ChronoUnit.MILLIS); 676 677 Optional<Search> candidate = mySearchCacheSvc.findCandidatesForReuse( 678 theResourceType, theQueryString, createdCutoff, theRequestPartitionId); 679 return candidate.orElse(null); 680 } 681 682 @Nullable 683 private Integer getLoadSynchronousUpToOrNull(CacheControlDirective theCacheControlDirective) { 684 final Integer loadSynchronousUpTo; 685 if (theCacheControlDirective != null && theCacheControlDirective.isNoStore()) { 686 if (theCacheControlDirective.getMaxResults() != null) { 687 loadSynchronousUpTo = theCacheControlDirective.getMaxResults(); 688 if (loadSynchronousUpTo > myStorageSettings.getCacheControlNoStoreMaxResultsUpperLimit()) { 689 throw new InvalidRequestException(Msg.code(1165) + Constants.HEADER_CACHE_CONTROL + " header " 690 + Constants.CACHE_CONTROL_MAX_RESULTS + " value must not exceed " 691 + myStorageSettings.getCacheControlNoStoreMaxResultsUpperLimit()); 692 } 693 } else { 694 loadSynchronousUpTo = 100; 695 } 696 } else { 697 loadSynchronousUpTo = null; 698 } 699 return loadSynchronousUpTo; 700 } 701 702 /** 703 * Creates a {@link Pageable} using a start and end index 704 */ 705 @SuppressWarnings("WeakerAccess") 706 @Nullable 707 public static Pageable toPage(final int theFromIndex, int theToIndex) { 708 int pageSize = theToIndex - theFromIndex; 709 if (pageSize < 1) { 710 return null; 711 } 712 713 int pageIndex = theFromIndex / pageSize; 714 715 return new PageRequest(pageIndex, pageSize, Sort.unsorted()) { 716 private static final long serialVersionUID = 1L; 717 718 @Override 719 public long getOffset() { 720 return theFromIndex; 721 } 722 }; 723 } 724}