
001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L%family 019 */ 020package ca.uhn.fhir.jpa.search; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.i18n.Msg; 024import ca.uhn.fhir.interceptor.api.HookParams; 025import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; 026import ca.uhn.fhir.interceptor.api.Pointcut; 027import ca.uhn.fhir.interceptor.model.RequestPartitionId; 028import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; 029import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 030import ca.uhn.fhir.jpa.api.dao.IDao; 031import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 032import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc; 033import ca.uhn.fhir.jpa.config.SearchConfig; 034import ca.uhn.fhir.jpa.dao.BaseStorageDao; 035import ca.uhn.fhir.jpa.dao.ISearchBuilder; 036import ca.uhn.fhir.jpa.dao.SearchBuilderFactory; 037import ca.uhn.fhir.jpa.dao.search.ResourceNotFoundInIndexException; 038import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; 039import ca.uhn.fhir.jpa.entity.Search; 040import ca.uhn.fhir.jpa.model.dao.JpaPid; 041import ca.uhn.fhir.jpa.model.search.SearchStatusEnum; 042import ca.uhn.fhir.jpa.search.builder.StorageInterceptorHooksFacade; 043import ca.uhn.fhir.jpa.search.builder.tasks.SearchContinuationTask; 044import ca.uhn.fhir.jpa.search.builder.tasks.SearchTask; 045import ca.uhn.fhir.jpa.search.builder.tasks.SearchTaskParameters; 046import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc; 047import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc; 048import ca.uhn.fhir.jpa.search.cache.SearchCacheStatusEnum; 049import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 050import ca.uhn.fhir.jpa.util.QueryParameterUtils; 051import ca.uhn.fhir.model.api.IQueryParameterType; 052import ca.uhn.fhir.model.api.Include; 053import ca.uhn.fhir.rest.api.CacheControlDirective; 054import ca.uhn.fhir.rest.api.Constants; 055import ca.uhn.fhir.rest.api.RestSearchParameterTypeEnum; 056import ca.uhn.fhir.rest.api.SearchTotalModeEnum; 057import ca.uhn.fhir.rest.api.server.IBundleProvider; 058import ca.uhn.fhir.rest.api.server.RequestDetails; 059import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 060import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; 061import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails; 062import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster; 063import ca.uhn.fhir.rest.server.util.ISearchParamRegistry; 064import ca.uhn.fhir.util.AsyncUtil; 065import ca.uhn.fhir.util.StopWatch; 066import ca.uhn.fhir.util.UrlUtil; 067import com.google.common.annotations.VisibleForTesting; 068import jakarta.annotation.Nonnull; 069import jakarta.annotation.Nullable; 070import org.apache.commons.lang3.time.DateUtils; 071import org.hl7.fhir.instance.model.api.IBaseResource; 072import org.springframework.beans.factory.BeanFactory; 073import org.springframework.data.domain.PageRequest; 074import org.springframework.data.domain.Pageable; 075import org.springframework.data.domain.Sort; 076import org.springframework.stereotype.Component; 077import org.springframework.transaction.support.TransactionSynchronizationManager; 078 079import java.time.Instant; 080import java.time.temporal.ChronoUnit; 081import java.util.HashSet; 082import java.util.List; 083import java.util.Optional; 084import java.util.Set; 085import java.util.UUID; 086import java.util.concurrent.Callable; 087import java.util.concurrent.ConcurrentHashMap; 088import java.util.concurrent.TimeUnit; 089import java.util.function.Consumer; 090import java.util.stream.Collectors; 091 092import static ca.uhn.fhir.jpa.util.QueryParameterUtils.DEFAULT_SYNC_SIZE; 093import static org.apache.commons.lang3.StringUtils.isBlank; 094import static org.apache.commons.lang3.StringUtils.isNotBlank; 095 096@Component("mySearchCoordinatorSvc") 097public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc<JpaPid> { 098 099 private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchCoordinatorSvcImpl.class); 100 101 private final FhirContext myContext; 102 private final JpaStorageSettings myStorageSettings; 103 private final IInterceptorBroadcaster myInterceptorBroadcaster; 104 private final HapiTransactionService myTxService; 105 private final ISearchCacheSvc mySearchCacheSvc; 106 private final ISearchResultCacheSvc mySearchResultCacheSvc; 107 private final DaoRegistry myDaoRegistry; 108 private final SearchBuilderFactory<JpaPid> mySearchBuilderFactory; 109 private final ISynchronousSearchSvc mySynchronousSearchSvc; 110 private final PersistedJpaBundleProviderFactory myPersistedJpaBundleProviderFactory; 111 private final ISearchParamRegistry mySearchParamRegistry; 112 private final SearchStrategyFactory mySearchStrategyFactory; 113 private final ExceptionService myExceptionSvc; 114 private final BeanFactory myBeanFactory; 115 private ConcurrentHashMap<String, SearchTask> myIdToSearchTask = new ConcurrentHashMap<>(); 116 117 private final Consumer<String> myOnRemoveSearchTask = myIdToSearchTask::remove; 118 119 private final StorageInterceptorHooksFacade myStorageInterceptorHooks; 120 private Integer myLoadingThrottleForUnitTests = null; 121 private long myMaxMillisToWaitForRemoteResults = DateUtils.MILLIS_PER_MINUTE; 122 private boolean myNeverUseLocalSearchForUnitTests; 123 private int mySyncSize = DEFAULT_SYNC_SIZE; 124 125 /** 126 * Constructor 127 */ 128 public SearchCoordinatorSvcImpl( 129 FhirContext theContext, 130 JpaStorageSettings theStorageSettings, 131 IInterceptorBroadcaster theInterceptorBroadcaster, 132 HapiTransactionService theTxService, 133 ISearchCacheSvc theSearchCacheSvc, 134 ISearchResultCacheSvc theSearchResultCacheSvc, 135 DaoRegistry theDaoRegistry, 136 SearchBuilderFactory<JpaPid> theSearchBuilderFactory, 137 ISynchronousSearchSvc theSynchronousSearchSvc, 138 PersistedJpaBundleProviderFactory thePersistedJpaBundleProviderFactory, 139 ISearchParamRegistry theSearchParamRegistry, 140 SearchStrategyFactory theSearchStrategyFactory, 141 ExceptionService theExceptionSvc, 142 BeanFactory theBeanFactory) { 143 super(); 144 myContext = theContext; 145 myStorageSettings = theStorageSettings; 146 myInterceptorBroadcaster = theInterceptorBroadcaster; 147 myTxService = theTxService; 148 mySearchCacheSvc = theSearchCacheSvc; 149 mySearchResultCacheSvc = theSearchResultCacheSvc; 150 myDaoRegistry = theDaoRegistry; 151 mySearchBuilderFactory = theSearchBuilderFactory; 152 mySynchronousSearchSvc = theSynchronousSearchSvc; 153 myPersistedJpaBundleProviderFactory = thePersistedJpaBundleProviderFactory; 154 mySearchParamRegistry = theSearchParamRegistry; 155 mySearchStrategyFactory = theSearchStrategyFactory; 156 myExceptionSvc = theExceptionSvc; 157 myBeanFactory = theBeanFactory; 158 159 myStorageInterceptorHooks = new StorageInterceptorHooksFacade(myInterceptorBroadcaster); 160 } 161 162 @VisibleForTesting 163 Set<String> getActiveSearchIds() { 164 return myIdToSearchTask.keySet(); 165 } 166 167 @VisibleForTesting 168 public void setIdToSearchTaskMapForUnitTests(ConcurrentHashMap<String, SearchTask> theIdToSearchTaskMap) { 169 myIdToSearchTask = theIdToSearchTaskMap; 170 } 171 172 @VisibleForTesting 173 public void setLoadingThrottleForUnitTests(Integer theLoadingThrottleForUnitTests) { 174 myLoadingThrottleForUnitTests = theLoadingThrottleForUnitTests; 175 } 176 177 @VisibleForTesting 178 public void setNeverUseLocalSearchForUnitTests(boolean theNeverUseLocalSearchForUnitTests) { 179 myNeverUseLocalSearchForUnitTests = theNeverUseLocalSearchForUnitTests; 180 } 181 182 @VisibleForTesting 183 public void setSyncSizeForUnitTests(int theSyncSize) { 184 mySyncSize = theSyncSize; 185 } 186 187 @Override 188 public void cancelAllActiveSearches() { 189 for (SearchTask next : myIdToSearchTask.values()) { 190 ourLog.info( 191 "Requesting immediate abort of search: {}", next.getSearch().getUuid()); 192 next.requestImmediateAbort(); 193 AsyncUtil.awaitLatchAndIgnoreInterrupt(next.getCompletionLatch(), 30, TimeUnit.SECONDS); 194 } 195 } 196 197 @SuppressWarnings("SameParameterValue") 198 @VisibleForTesting 199 public void setMaxMillisToWaitForRemoteResultsForUnitTest(long theMaxMillisToWaitForRemoteResults) { 200 myMaxMillisToWaitForRemoteResults = theMaxMillisToWaitForRemoteResults; 201 } 202 203 /** 204 * This method is called by the HTTP client processing thread in order to 205 * fetch resources. 206 * <p> 207 * This method must not be called from inside a transaction. The rationale is that 208 * the {@link Search} entity is treated as a piece of shared state across client threads 209 * accessing the same thread, so we need to be able to update that table in a transaction 210 * and commit it right away in order for that to work. Examples of needing to do this 211 * include if two different clients request the same search and are both paging at the 212 * same time, but also includes clients that are hacking the paging links to 213 * fetch multiple pages of a search result in parallel. In both cases we need to only 214 * let one of them actually activate the search, or we will have conflicts. The other thread 215 * just needs to wait until the first one actually fetches more results. 216 */ 217 @Override 218 public List<JpaPid> getResources( 219 final String theUuid, 220 int theFrom, 221 int theTo, 222 @Nullable RequestDetails theRequestDetails, 223 RequestPartitionId theRequestPartitionId) { 224 assert !TransactionSynchronizationManager.isActualTransactionActive(); 225 226 // If we're actively searching right now, don't try to do anything until at least one batch has been 227 // persisted in the DB 228 SearchTask searchTask = myIdToSearchTask.get(theUuid); 229 if (searchTask != null) { 230 searchTask.awaitInitialSync(); 231 } 232 233 ourLog.trace("About to start looking for resources {}-{}", theFrom, theTo); 234 235 Search search; 236 StopWatch sw = new StopWatch(); 237 while (true) { 238 239 if (myNeverUseLocalSearchForUnitTests == false) { 240 if (searchTask != null) { 241 ourLog.trace("Local search found"); 242 List<JpaPid> resourcePids = searchTask.getResourcePids(theFrom, theTo); 243 ourLog.trace( 244 "Local search returned {} pids, wanted {}-{} - Search: {}", 245 resourcePids.size(), 246 theFrom, 247 theTo, 248 searchTask.getSearch()); 249 250 /* 251 * Generally, if a search task is open, the fastest possible thing is to just return its results. This 252 * will work most of the time, but can fail if the task hit a search threshold and the client is requesting 253 * results beyond that threashold. In that case, we'll keep going below, since that will trigger another 254 * task. 255 */ 256 if ((searchTask.getSearch().getNumFound() 257 - searchTask.getSearch().getNumBlocked()) 258 >= theTo 259 || resourcePids.size() == (theTo - theFrom)) { 260 return resourcePids; 261 } 262 } 263 } 264 265 Callable<Search> searchCallback = () -> mySearchCacheSvc 266 .fetchByUuid(theUuid, theRequestPartitionId) 267 .orElseThrow(() -> myExceptionSvc.newUnknownSearchException(theUuid)); 268 search = myTxService 269 .withRequest(theRequestDetails) 270 .withRequestPartitionId(theRequestPartitionId) 271 .execute(searchCallback); 272 QueryParameterUtils.verifySearchHasntFailedOrThrowInternalErrorException(search); 273 274 if (search.getStatus() == SearchStatusEnum.FINISHED) { 275 ourLog.trace("Search entity marked as finished with {} results", search.getNumFound()); 276 break; 277 } 278 if ((search.getNumFound() - search.getNumBlocked()) >= theTo) { 279 ourLog.trace("Search entity has {} results so far", search.getNumFound()); 280 break; 281 } 282 283 if (sw.getMillis() > myMaxMillisToWaitForRemoteResults) { 284 ourLog.error( 285 "Search {} of type {} for {}{} timed out after {}ms with status {}.", 286 search.getId(), 287 search.getSearchType(), 288 search.getResourceType(), 289 search.getSearchQueryString(), 290 sw.getMillis(), 291 search.getStatus()); 292 throw new InternalErrorException(Msg.code(1163) + "Request timed out after " + sw.getMillis() + "ms"); 293 } 294 295 // If the search was saved in "pass complete mode" it's probably time to 296 // start a new pass 297 if (search.getStatus() == SearchStatusEnum.PASSCMPLET) { 298 ourLog.trace("Going to try to start next search"); 299 Optional<Search> newSearch = 300 mySearchCacheSvc.tryToMarkSearchAsInProgress(search, theRequestPartitionId); 301 if (newSearch.isPresent()) { 302 ourLog.trace("Launching new search"); 303 search = newSearch.get(); 304 String resourceType = search.getResourceType(); 305 SearchParameterMap params = search.getSearchParameterMap() 306 .orElseThrow(() -> new IllegalStateException("No map in PASSCOMPLET search")); 307 IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(resourceType); 308 309 SearchTaskParameters parameters = new SearchTaskParameters( 310 search, 311 resourceDao, 312 params, 313 resourceType, 314 theRequestDetails, 315 theRequestPartitionId, 316 myOnRemoveSearchTask, 317 mySyncSize); 318 parameters.setLoadingThrottleForUnitTests(myLoadingThrottleForUnitTests); 319 SearchContinuationTask task = 320 (SearchContinuationTask) myBeanFactory.getBean(SearchConfig.CONTINUE_TASK, parameters); 321 myIdToSearchTask.put(search.getUuid(), task); 322 task.call(); 323 } 324 } 325 326 if (!search.getStatus().isDone()) { 327 AsyncUtil.sleep(500); 328 } 329 } 330 331 ourLog.trace("Finished looping"); 332 333 List<JpaPid> pids = fetchResultPids(theUuid, theFrom, theTo, theRequestDetails, search, theRequestPartitionId); 334 335 ourLog.trace("Fetched {} results", pids.size()); 336 337 return pids; 338 } 339 340 @Nonnull 341 private List<JpaPid> fetchResultPids( 342 String theUuid, 343 int theFrom, 344 int theTo, 345 @Nullable RequestDetails theRequestDetails, 346 Search theSearch, 347 RequestPartitionId theRequestPartitionId) { 348 List<JpaPid> pids = mySearchResultCacheSvc.fetchResultPids( 349 theSearch, theFrom, theTo, theRequestDetails, theRequestPartitionId); 350 if (pids == null) { 351 throw myExceptionSvc.newUnknownSearchException(theUuid); 352 } 353 return pids; 354 } 355 356 @Override 357 public IBundleProvider registerSearch( 358 final IFhirResourceDao<?> theCallingDao, 359 final SearchParameterMap theParams, 360 String theResourceType, 361 CacheControlDirective theCacheControlDirective, 362 RequestDetails theRequestDetails, 363 RequestPartitionId theRequestPartitionId) { 364 final String searchUuid = UUID.randomUUID().toString(); 365 366 final String queryString = theParams.toNormalizedQueryString(myContext); 367 ourLog.debug("Registering new search {}", searchUuid); 368 369 Search search = new Search(); 370 QueryParameterUtils.populateSearchEntity( 371 theParams, theResourceType, searchUuid, queryString, search, theRequestPartitionId); 372 373 myStorageInterceptorHooks.callStoragePresearchRegistered( 374 theRequestDetails, theParams, search, theRequestPartitionId); 375 376 validateSearch(theParams); 377 378 Class<? extends IBaseResource> resourceTypeClass = 379 myContext.getResourceDefinition(theResourceType).getImplementingClass(); 380 final ISearchBuilder<JpaPid> sb = mySearchBuilderFactory.newSearchBuilder(theResourceType, resourceTypeClass); 381 sb.setFetchSize(mySyncSize); 382 sb.setRequireTotal(theParams.getCount() != null); 383 384 final Integer loadSynchronousUpTo = getLoadSynchronousUpToOrNull(theCacheControlDirective); 385 boolean isOffsetQuery = theParams.isOffsetQuery(); 386 387 // todo someday - not today. 388 // SearchStrategyFactory.ISearchStrategy searchStrategy = mySearchStrategyFactory.pickStrategy(theResourceType, 389 // theParams, theRequestDetails); 390 // return searchStrategy.get(); 391 392 if (theParams.isLoadSynchronous() || loadSynchronousUpTo != null || isOffsetQuery) { 393 if (mySearchStrategyFactory.isSupportsHSearchDirect(theResourceType, theParams, theRequestDetails)) { 394 ourLog.info("Search {} is using direct load strategy", searchUuid); 395 SearchStrategyFactory.ISearchStrategy direct = mySearchStrategyFactory.makeDirectStrategy( 396 searchUuid, theResourceType, theParams, theRequestDetails); 397 398 try { 399 return direct.get(); 400 } catch (ResourceNotFoundInIndexException theE) { 401 // some resources were not found in index, so we will inform this and resort to JPA search 402 ourLog.warn( 403 "Some resources were not found in index. Make sure all resources were indexed. Resorting to database search."); 404 } 405 } 406 407 // we need a max to fetch for synchronous searches; 408 // otherwise we'll explode memory. 409 Integer maxToLoad = getSynchronousMaxResultsToFetch(theParams, loadSynchronousUpTo); 410 ourLog.debug("Setting a max fetch value of {} for synchronous search", maxToLoad); 411 sb.setMaxResultsToFetch(maxToLoad); 412 413 ourLog.debug("Search {} is loading in synchronous mode", searchUuid); 414 return mySynchronousSearchSvc.executeQuery( 415 theParams, theRequestDetails, searchUuid, sb, loadSynchronousUpTo, theRequestPartitionId); 416 } 417 418 /* 419 * See if there are any cached searches whose results we can return 420 * instead 421 */ 422 SearchCacheStatusEnum cacheStatus = SearchCacheStatusEnum.MISS; 423 if (theCacheControlDirective != null && theCacheControlDirective.isNoCache()) { 424 cacheStatus = SearchCacheStatusEnum.NOT_TRIED; 425 } 426 427 if (cacheStatus != SearchCacheStatusEnum.NOT_TRIED) { 428 if (theParams.getEverythingMode() == null) { 429 if (myStorageSettings.getReuseCachedSearchResultsForMillis() != null) { 430 PersistedJpaBundleProvider foundSearchProvider = findCachedQuery( 431 theParams, theResourceType, theRequestDetails, queryString, theRequestPartitionId); 432 if (foundSearchProvider != null) { 433 foundSearchProvider.setCacheStatus(SearchCacheStatusEnum.HIT); 434 return foundSearchProvider; 435 } 436 } 437 } 438 } 439 440 PersistedJpaSearchFirstPageBundleProvider retVal = submitSearch( 441 theCallingDao, theParams, theResourceType, theRequestDetails, sb, theRequestPartitionId, search); 442 retVal.setCacheStatus(cacheStatus); 443 return retVal; 444 } 445 446 /** 447 * The max results to return if this is a synchronous search. 448 * 449 * We'll look in this order: 450 * * load synchronous up to (on params) 451 * * param count (+ offset) 452 * * StorageSettings fetch size default max 453 * * 454 */ 455 private Integer getSynchronousMaxResultsToFetch(SearchParameterMap theParams, Integer theLoadSynchronousUpTo) { 456 if (theLoadSynchronousUpTo != null) { 457 return theLoadSynchronousUpTo; 458 } 459 460 if (theParams.getCount() != null) { 461 int valToReturn = theParams.getCount() + 1; 462 if (theParams.getOffset() != null) { 463 valToReturn += theParams.getOffset(); 464 } 465 return valToReturn; 466 } 467 468 if (myStorageSettings.getFetchSizeDefaultMaximum() != null) { 469 return myStorageSettings.getFetchSizeDefaultMaximum(); 470 } 471 472 return myStorageSettings.getInternalSynchronousSearchSize(); 473 } 474 475 private void validateSearch(SearchParameterMap theParams) { 476 assert checkNoDuplicateParameters(theParams) 477 : "Duplicate parameters found in query: " + theParams.toNormalizedQueryString(myContext); 478 479 validateIncludes(theParams.getIncludes(), Constants.PARAM_INCLUDE); 480 validateIncludes(theParams.getRevIncludes(), Constants.PARAM_REVINCLUDE); 481 } 482 483 /** 484 * This method detects whether we have any duplicate lists of parameters and returns 485 * {@literal true} if none are found. For example, the following query would result 486 * in this method returning {@literal false}: 487 * <code>Patient?name=bart,homer&name=bart,homer</code> 488 * <p> 489 * This is not an optimized test, and it's not technically even prohibited to have 490 * duplicates like these in queries so this method should only be called as a 491 * part of an {@literal assert} statement to catch errors in tests. 492 */ 493 private boolean checkNoDuplicateParameters(SearchParameterMap theParams) { 494 HashSet<List<IQueryParameterType>> lists = new HashSet<>(); 495 for (List<List<IQueryParameterType>> andList : theParams.values()) { 496 497 lists.clear(); 498 for (int i = 0; i < andList.size(); i++) { 499 List<IQueryParameterType> orListI = andList.get(i); 500 if (!orListI.isEmpty() && !lists.add(orListI)) { 501 return false; 502 } 503 } 504 } 505 return true; 506 } 507 508 private void validateIncludes(Set<Include> includes, String name) { 509 for (Include next : includes) { 510 String value = next.getValue(); 511 if (value.equals(Constants.INCLUDE_STAR) || isBlank(value)) { 512 continue; 513 } 514 515 String paramType = next.getParamType(); 516 String paramName = next.getParamName(); 517 String paramTargetType = next.getParamTargetType(); 518 519 if (isBlank(paramType) || isBlank(paramName)) { 520 String msg = myContext 521 .getLocalizer() 522 .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidInclude", name, value, ""); 523 throw new InvalidRequestException(Msg.code(2018) + msg); 524 } 525 526 if (!myDaoRegistry.isResourceTypeSupported(paramType)) { 527 String resourceTypeMsg = myContext 528 .getLocalizer() 529 .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidResourceType", paramType); 530 String msg = myContext 531 .getLocalizer() 532 .getMessage( 533 SearchCoordinatorSvcImpl.class, 534 "invalidInclude", 535 UrlUtil.sanitizeUrlPart(name), 536 UrlUtil.sanitizeUrlPart(value), 537 resourceTypeMsg); // last param is pre-sanitized 538 throw new InvalidRequestException(Msg.code(2017) + msg); 539 } 540 541 if (isNotBlank(paramTargetType) && !myDaoRegistry.isResourceTypeSupported(paramTargetType)) { 542 String resourceTypeMsg = myContext 543 .getLocalizer() 544 .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidResourceType", paramTargetType); 545 String msg = myContext 546 .getLocalizer() 547 .getMessage( 548 SearchCoordinatorSvcImpl.class, 549 "invalidInclude", 550 UrlUtil.sanitizeUrlPart(name), 551 UrlUtil.sanitizeUrlPart(value), 552 resourceTypeMsg); // last param is pre-sanitized 553 throw new InvalidRequestException(Msg.code(2016) + msg); 554 } 555 556 if (!Constants.INCLUDE_STAR.equals(paramName) 557 && mySearchParamRegistry.getActiveSearchParam( 558 paramType, paramName, ISearchParamRegistry.SearchParamLookupContextEnum.SEARCH) 559 == null) { 560 List<String> validNames = mySearchParamRegistry 561 .getActiveSearchParams(paramType, ISearchParamRegistry.SearchParamLookupContextEnum.SEARCH) 562 .values() 563 .stream() 564 .filter(t -> t.getParamType() == RestSearchParameterTypeEnum.REFERENCE) 565 .map(t -> UrlUtil.sanitizeUrlPart(t.getName())) 566 .sorted() 567 .collect(Collectors.toList()); 568 String searchParamMessage = myContext 569 .getLocalizer() 570 .getMessage( 571 BaseStorageDao.class, 572 "invalidSearchParameter", 573 UrlUtil.sanitizeUrlPart(paramName), 574 UrlUtil.sanitizeUrlPart(paramType), 575 validNames); 576 String msg = myContext 577 .getLocalizer() 578 .getMessage( 579 SearchCoordinatorSvcImpl.class, 580 "invalidInclude", 581 UrlUtil.sanitizeUrlPart(name), 582 UrlUtil.sanitizeUrlPart(value), 583 searchParamMessage); // last param is pre-sanitized 584 throw new InvalidRequestException(Msg.code(2015) + msg); 585 } 586 } 587 } 588 589 @Override 590 public Optional<Integer> getSearchTotal( 591 String theUuid, @Nullable RequestDetails theRequestDetails, RequestPartitionId theRequestPartitionId) { 592 SearchTask task = myIdToSearchTask.get(theUuid); 593 if (task != null) { 594 return Optional.ofNullable(task.awaitInitialSync()); 595 } 596 597 /* 598 * In case there is no running search, if the total is listed as accurate we know one is coming 599 * so let's wait a bit for it to show up 600 */ 601 Optional<Search> search = myTxService 602 .withRequest(theRequestDetails) 603 .execute(() -> mySearchCacheSvc.fetchByUuid(theUuid, theRequestPartitionId)); 604 if (search.isPresent()) { 605 Optional<SearchParameterMap> searchParameterMap = search.get().getSearchParameterMap(); 606 if (searchParameterMap.isPresent() 607 && searchParameterMap.get().getSearchTotalMode() == SearchTotalModeEnum.ACCURATE) { 608 for (int i = 0; i < 10; i++) { 609 if (search.isPresent()) { 610 QueryParameterUtils.verifySearchHasntFailedOrThrowInternalErrorException(search.get()); 611 if (search.get().getTotalCount() != null) { 612 return Optional.of(search.get().getTotalCount()); 613 } 614 } 615 search = mySearchCacheSvc.fetchByUuid(theUuid, theRequestPartitionId); 616 } 617 } 618 } 619 620 return Optional.empty(); 621 } 622 623 @Nonnull 624 private PersistedJpaSearchFirstPageBundleProvider submitSearch( 625 IDao theCallingDao, 626 SearchParameterMap theParams, 627 String theResourceType, 628 RequestDetails theRequestDetails, 629 ISearchBuilder<JpaPid> theSb, 630 RequestPartitionId theRequestPartitionId, 631 Search theSearch) { 632 StopWatch w = new StopWatch(); 633 634 SearchTaskParameters stp = new SearchTaskParameters( 635 theSearch, 636 theCallingDao, 637 theParams, 638 theResourceType, 639 theRequestDetails, 640 theRequestPartitionId, 641 myOnRemoveSearchTask, 642 mySyncSize); 643 stp.setLoadingThrottleForUnitTests(myLoadingThrottleForUnitTests); 644 SearchTask task = (SearchTask) myBeanFactory.getBean(SearchConfig.SEARCH_TASK, stp); 645 myIdToSearchTask.put(theSearch.getUuid(), task); 646 task.call(); 647 648 PersistedJpaSearchFirstPageBundleProvider retVal = myPersistedJpaBundleProviderFactory.newInstanceFirstPage( 649 theRequestDetails, task, theSb, theRequestPartitionId); 650 651 ourLog.debug("Search initial phase completed in {}ms", w.getMillis()); 652 return retVal; 653 } 654 655 @Nullable 656 private PersistedJpaBundleProvider findCachedQuery( 657 SearchParameterMap theParams, 658 String theResourceType, 659 RequestDetails theRequestDetails, 660 String theQueryString, 661 RequestPartitionId theRequestPartitionId) { 662 // May be null 663 return myTxService 664 .withRequest(theRequestDetails) 665 .withRequestPartitionId(theRequestPartitionId) 666 .execute(() -> { 667 IInterceptorBroadcaster compositeBroadcaster = 668 CompositeInterceptorBroadcaster.newCompositeBroadcaster( 669 myInterceptorBroadcaster, theRequestDetails); 670 671 // Interceptor call: STORAGE_PRECHECK_FOR_CACHED_SEARCH 672 673 HookParams params = new HookParams() 674 .add(SearchParameterMap.class, theParams) 675 .add(RequestDetails.class, theRequestDetails) 676 .addIfMatchesType(ServletRequestDetails.class, theRequestDetails); 677 boolean canUseCache = 678 compositeBroadcaster.callHooks(Pointcut.STORAGE_PRECHECK_FOR_CACHED_SEARCH, params); 679 if (!canUseCache) { 680 return null; 681 } 682 683 // Check for a search matching the given hash 684 Search searchToUse = findSearchToUseOrNull(theQueryString, theResourceType, theRequestPartitionId); 685 if (searchToUse == null) { 686 return null; 687 } 688 689 ourLog.debug("Reusing search {} from cache", searchToUse.getUuid()); 690 // Interceptor call: JPA_PERFTRACE_SEARCH_REUSING_CACHED 691 params = new HookParams() 692 .add(SearchParameterMap.class, theParams) 693 .add(RequestDetails.class, theRequestDetails) 694 .addIfMatchesType(ServletRequestDetails.class, theRequestDetails); 695 compositeBroadcaster.callHooks(Pointcut.JPA_PERFTRACE_SEARCH_REUSING_CACHED, params); 696 697 return myPersistedJpaBundleProviderFactory.newInstance(theRequestDetails, searchToUse.getUuid()); 698 }); 699 } 700 701 @Nullable 702 private Search findSearchToUseOrNull( 703 String theQueryString, String theResourceType, RequestPartitionId theRequestPartitionId) { 704 // createdCutoff is in recent past 705 final Instant createdCutoff = 706 Instant.now().minus(myStorageSettings.getReuseCachedSearchResultsForMillis(), ChronoUnit.MILLIS); 707 708 Optional<Search> candidate = mySearchCacheSvc.findCandidatesForReuse( 709 theResourceType, theQueryString, createdCutoff, theRequestPartitionId); 710 return candidate.orElse(null); 711 } 712 713 @Nullable 714 private Integer getLoadSynchronousUpToOrNull(CacheControlDirective theCacheControlDirective) { 715 final Integer loadSynchronousUpTo; 716 if (theCacheControlDirective != null && theCacheControlDirective.isNoStore()) { 717 if (theCacheControlDirective.getMaxResults() != null) { 718 loadSynchronousUpTo = theCacheControlDirective.getMaxResults(); 719 if (loadSynchronousUpTo > myStorageSettings.getCacheControlNoStoreMaxResultsUpperLimit()) { 720 throw new InvalidRequestException(Msg.code(1165) + Constants.HEADER_CACHE_CONTROL + " header " 721 + Constants.CACHE_CONTROL_MAX_RESULTS + " value must not exceed " 722 + myStorageSettings.getCacheControlNoStoreMaxResultsUpperLimit()); 723 } 724 } else { 725 loadSynchronousUpTo = 100; 726 } 727 } else { 728 loadSynchronousUpTo = null; 729 } 730 return loadSynchronousUpTo; 731 } 732 733 /** 734 * Creates a {@link Pageable} using a start and end index 735 */ 736 @SuppressWarnings("WeakerAccess") 737 @Nullable 738 public static Pageable toPage(final int theFromIndex, int theToIndex) { 739 int pageSize = theToIndex - theFromIndex; 740 if (pageSize < 1) { 741 return null; 742 } 743 744 int pageIndex = theFromIndex / pageSize; 745 746 return new PageRequest(pageIndex, pageSize, Sort.unsorted()) { 747 private static final long serialVersionUID = 1L; 748 749 @Override 750 public long getOffset() { 751 return theFromIndex; 752 } 753 }; 754 } 755}