
001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L%family 019 */ 020package ca.uhn.fhir.jpa.search; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.i18n.Msg; 024import ca.uhn.fhir.interceptor.api.HookParams; 025import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; 026import ca.uhn.fhir.interceptor.api.Pointcut; 027import ca.uhn.fhir.interceptor.model.RequestPartitionId; 028import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; 029import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 030import ca.uhn.fhir.jpa.api.dao.IDao; 031import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 032import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc; 033import ca.uhn.fhir.jpa.config.SearchConfig; 034import ca.uhn.fhir.jpa.dao.BaseStorageDao; 035import ca.uhn.fhir.jpa.dao.ISearchBuilder; 036import ca.uhn.fhir.jpa.dao.SearchBuilderFactory; 037import ca.uhn.fhir.jpa.dao.search.ResourceNotFoundInIndexException; 038import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; 039import ca.uhn.fhir.jpa.entity.Search; 040import ca.uhn.fhir.jpa.model.dao.JpaPid; 041import ca.uhn.fhir.jpa.model.search.SearchStatusEnum; 042import ca.uhn.fhir.jpa.search.builder.StorageInterceptorHooksFacade; 043import ca.uhn.fhir.jpa.search.builder.tasks.SearchContinuationTask; 044import ca.uhn.fhir.jpa.search.builder.tasks.SearchTask; 045import ca.uhn.fhir.jpa.search.builder.tasks.SearchTaskParameters; 046import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc; 047import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc; 048import ca.uhn.fhir.jpa.search.cache.SearchCacheStatusEnum; 049import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 050import ca.uhn.fhir.jpa.util.QueryParameterUtils; 051import ca.uhn.fhir.model.api.IQueryParameterType; 052import ca.uhn.fhir.model.api.Include; 053import ca.uhn.fhir.rest.api.CacheControlDirective; 054import ca.uhn.fhir.rest.api.Constants; 055import ca.uhn.fhir.rest.api.RestSearchParameterTypeEnum; 056import ca.uhn.fhir.rest.api.SearchTotalModeEnum; 057import ca.uhn.fhir.rest.api.server.IBundleProvider; 058import ca.uhn.fhir.rest.api.server.RequestDetails; 059import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 060import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; 061import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails; 062import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster; 063import ca.uhn.fhir.rest.server.util.ISearchParamRegistry; 064import ca.uhn.fhir.util.AsyncUtil; 065import ca.uhn.fhir.util.StopWatch; 066import ca.uhn.fhir.util.UrlUtil; 067import com.google.common.annotations.VisibleForTesting; 068import jakarta.annotation.Nonnull; 069import jakarta.annotation.Nullable; 070import org.apache.commons.lang3.time.DateUtils; 071import org.hl7.fhir.instance.model.api.IBaseResource; 072import org.springframework.beans.factory.BeanFactory; 073import org.springframework.data.domain.PageRequest; 074import org.springframework.data.domain.Pageable; 075import org.springframework.data.domain.Sort; 076import org.springframework.stereotype.Component; 077import org.springframework.transaction.support.TransactionSynchronizationManager; 078 079import java.time.Instant; 080import java.time.temporal.ChronoUnit; 081import java.util.Date; 082import java.util.HashSet; 083import java.util.List; 084import java.util.Optional; 085import java.util.Set; 086import java.util.UUID; 087import java.util.concurrent.Callable; 088import java.util.concurrent.ConcurrentHashMap; 089import java.util.concurrent.TimeUnit; 090import java.util.function.Consumer; 091import java.util.stream.Collectors; 092 093import static ca.uhn.fhir.jpa.util.QueryParameterUtils.DEFAULT_SYNC_SIZE; 094import static org.apache.commons.lang3.StringUtils.isBlank; 095import static org.apache.commons.lang3.StringUtils.isNotBlank; 096 097@Component("mySearchCoordinatorSvc") 098public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc<JpaPid> { 099 100 private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchCoordinatorSvcImpl.class); 101 public static final int SEARCH_EXPIRY_OFFSET_MINUTES = 10; 102 103 private final FhirContext myContext; 104 private final JpaStorageSettings myStorageSettings; 105 private final IInterceptorBroadcaster myInterceptorBroadcaster; 106 private final HapiTransactionService myTxService; 107 private final ISearchCacheSvc mySearchCacheSvc; 108 private final ISearchResultCacheSvc mySearchResultCacheSvc; 109 private final DaoRegistry myDaoRegistry; 110 private final SearchBuilderFactory<JpaPid> mySearchBuilderFactory; 111 private final ISynchronousSearchSvc mySynchronousSearchSvc; 112 private final PersistedJpaBundleProviderFactory myPersistedJpaBundleProviderFactory; 113 private final ISearchParamRegistry mySearchParamRegistry; 114 private final SearchStrategyFactory mySearchStrategyFactory; 115 private final ExceptionService myExceptionSvc; 116 private final BeanFactory myBeanFactory; 117 private ConcurrentHashMap<String, SearchTask> myIdToSearchTask = new ConcurrentHashMap<>(); 118 119 private final Consumer<String> myOnRemoveSearchTask = myIdToSearchTask::remove; 120 121 private final StorageInterceptorHooksFacade myStorageInterceptorHooks; 122 private Integer myLoadingThrottleForUnitTests = null; 123 private long myMaxMillisToWaitForRemoteResults = DateUtils.MILLIS_PER_MINUTE; 124 private boolean myNeverUseLocalSearchForUnitTests; 125 private int mySyncSize = DEFAULT_SYNC_SIZE; 126 127 /** 128 * Constructor 129 */ 130 public SearchCoordinatorSvcImpl( 131 FhirContext theContext, 132 JpaStorageSettings theStorageSettings, 133 IInterceptorBroadcaster theInterceptorBroadcaster, 134 HapiTransactionService theTxService, 135 ISearchCacheSvc theSearchCacheSvc, 136 ISearchResultCacheSvc theSearchResultCacheSvc, 137 DaoRegistry theDaoRegistry, 138 SearchBuilderFactory<JpaPid> theSearchBuilderFactory, 139 ISynchronousSearchSvc theSynchronousSearchSvc, 140 PersistedJpaBundleProviderFactory thePersistedJpaBundleProviderFactory, 141 ISearchParamRegistry theSearchParamRegistry, 142 SearchStrategyFactory theSearchStrategyFactory, 143 ExceptionService theExceptionSvc, 144 BeanFactory theBeanFactory) { 145 super(); 146 myContext = theContext; 147 myStorageSettings = theStorageSettings; 148 myInterceptorBroadcaster = theInterceptorBroadcaster; 149 myTxService = theTxService; 150 mySearchCacheSvc = theSearchCacheSvc; 151 mySearchResultCacheSvc = theSearchResultCacheSvc; 152 myDaoRegistry = theDaoRegistry; 153 mySearchBuilderFactory = theSearchBuilderFactory; 154 mySynchronousSearchSvc = theSynchronousSearchSvc; 155 myPersistedJpaBundleProviderFactory = thePersistedJpaBundleProviderFactory; 156 mySearchParamRegistry = theSearchParamRegistry; 157 mySearchStrategyFactory = theSearchStrategyFactory; 158 myExceptionSvc = theExceptionSvc; 159 myBeanFactory = theBeanFactory; 160 161 myStorageInterceptorHooks = new StorageInterceptorHooksFacade(myInterceptorBroadcaster); 162 } 163 164 @VisibleForTesting 165 Set<String> getActiveSearchIds() { 166 return myIdToSearchTask.keySet(); 167 } 168 169 @VisibleForTesting 170 public void setIdToSearchTaskMapForUnitTests(ConcurrentHashMap<String, SearchTask> theIdToSearchTaskMap) { 171 myIdToSearchTask = theIdToSearchTaskMap; 172 } 173 174 @VisibleForTesting 175 public void setLoadingThrottleForUnitTests(Integer theLoadingThrottleForUnitTests) { 176 myLoadingThrottleForUnitTests = theLoadingThrottleForUnitTests; 177 } 178 179 @VisibleForTesting 180 public void setNeverUseLocalSearchForUnitTests(boolean theNeverUseLocalSearchForUnitTests) { 181 myNeverUseLocalSearchForUnitTests = theNeverUseLocalSearchForUnitTests; 182 } 183 184 @VisibleForTesting 185 public void setSyncSizeForUnitTests(int theSyncSize) { 186 mySyncSize = theSyncSize; 187 } 188 189 @Override 190 public void cancelAllActiveSearches() { 191 for (SearchTask next : myIdToSearchTask.values()) { 192 ourLog.info( 193 "Requesting immediate abort of search: {}", next.getSearch().getUuid()); 194 next.requestImmediateAbort(); 195 AsyncUtil.awaitLatchAndIgnoreInterrupt(next.getCompletionLatch(), 30, TimeUnit.SECONDS); 196 } 197 } 198 199 @SuppressWarnings("SameParameterValue") 200 @VisibleForTesting 201 public void setMaxMillisToWaitForRemoteResultsForUnitTest(long theMaxMillisToWaitForRemoteResults) { 202 myMaxMillisToWaitForRemoteResults = theMaxMillisToWaitForRemoteResults; 203 } 204 205 /** 206 * This method is called by the HTTP client processing thread in order to 207 * fetch resources. 208 * <p> 209 * This method must not be called from inside a transaction. The rationale is that 210 * the {@link Search} entity is treated as a piece of shared state across client threads 211 * accessing the same thread, so we need to be able to update that table in a transaction 212 * and commit it right away in order for that to work. Examples of needing to do this 213 * include if two different clients request the same search and are both paging at the 214 * same time, but also includes clients that are hacking the paging links to 215 * fetch multiple pages of a search result in parallel. In both cases we need to only 216 * let one of them actually activate the search, or we will have conflicts. The other thread 217 * just needs to wait until the first one actually fetches more results. 218 */ 219 @Override 220 public List<JpaPid> getResources( 221 final String theUuid, 222 int theFrom, 223 int theTo, 224 @Nullable RequestDetails theRequestDetails, 225 RequestPartitionId theRequestPartitionId) { 226 assert !TransactionSynchronizationManager.isActualTransactionActive(); 227 228 // If we're actively searching right now, don't try to do anything until at least one batch has been 229 // persisted in the DB 230 SearchTask searchTask = myIdToSearchTask.get(theUuid); 231 if (searchTask != null) { 232 searchTask.awaitInitialSync(); 233 } 234 235 ourLog.trace("About to start looking for resources {}-{}", theFrom, theTo); 236 237 Search search; 238 StopWatch sw = new StopWatch(); 239 while (true) { 240 241 if (myNeverUseLocalSearchForUnitTests == false) { 242 if (searchTask != null) { 243 ourLog.trace("Local search found"); 244 List<JpaPid> resourcePids = searchTask.getResourcePids(theFrom, theTo); 245 ourLog.trace( 246 "Local search returned {} pids, wanted {}-{} - Search: {}", 247 resourcePids.size(), 248 theFrom, 249 theTo, 250 searchTask.getSearch()); 251 252 /* 253 * Generally, if a search task is open, the fastest possible thing is to just return its results. This 254 * will work most of the time, but can fail if the task hit a search threshold and the client is requesting 255 * results beyond that threashold. In that case, we'll keep going below, since that will trigger another 256 * task. 257 */ 258 if ((searchTask.getSearch().getNumFound() 259 - searchTask.getSearch().getNumBlocked()) 260 >= theTo 261 || resourcePids.size() == (theTo - theFrom)) { 262 return resourcePids; 263 } 264 } 265 } 266 267 Callable<Search> searchCallback = () -> mySearchCacheSvc 268 .fetchByUuid(theUuid, theRequestPartitionId) 269 .orElseThrow(() -> myExceptionSvc.newUnknownSearchException(theUuid)); 270 search = myTxService 271 .withRequest(theRequestDetails) 272 .withRequestPartitionId(theRequestPartitionId) 273 .execute(searchCallback); 274 QueryParameterUtils.verifySearchHasntFailedOrThrowInternalErrorException(search); 275 276 if (search.getStatus() == SearchStatusEnum.FINISHED) { 277 ourLog.trace("Search entity marked as finished with {} results", search.getNumFound()); 278 break; 279 } 280 if ((search.getNumFound() - search.getNumBlocked()) >= theTo) { 281 ourLog.trace("Search entity has {} results so far", search.getNumFound()); 282 break; 283 } 284 285 if (sw.getMillis() > myMaxMillisToWaitForRemoteResults) { 286 ourLog.error( 287 "Search {} of type {} for {}{} timed out after {}ms with status {}.", 288 search.getId(), 289 search.getSearchType(), 290 search.getResourceType(), 291 search.getSearchQueryString(), 292 sw.getMillis(), 293 search.getStatus()); 294 throw new InternalErrorException(Msg.code(1163) + "Request timed out after " + sw.getMillis() + "ms"); 295 } 296 297 // If the search was saved in "pass complete mode" it's probably time to 298 // start a new pass 299 if (search.getStatus() == SearchStatusEnum.PASSCMPLET) { 300 ourLog.trace("Going to try to start next search"); 301 Optional<Search> newSearch = 302 mySearchCacheSvc.tryToMarkSearchAsInProgress(search, theRequestPartitionId); 303 if (newSearch.isPresent()) { 304 ourLog.trace("Launching new search"); 305 search = newSearch.get(); 306 String resourceType = search.getResourceType(); 307 SearchParameterMap params = search.getSearchParameterMap() 308 .orElseThrow(() -> new IllegalStateException("No map in PASSCOMPLET search")); 309 IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(resourceType); 310 311 SearchTaskParameters parameters = new SearchTaskParameters( 312 search, 313 resourceDao, 314 params, 315 resourceType, 316 theRequestDetails, 317 theRequestPartitionId, 318 myOnRemoveSearchTask, 319 mySyncSize); 320 parameters.setLoadingThrottleForUnitTests(myLoadingThrottleForUnitTests); 321 SearchContinuationTask task = 322 (SearchContinuationTask) myBeanFactory.getBean(SearchConfig.CONTINUE_TASK, parameters); 323 myIdToSearchTask.put(search.getUuid(), task); 324 task.call(); 325 } 326 } 327 328 if (!search.getStatus().isDone()) { 329 AsyncUtil.sleep(500); 330 } 331 } 332 333 ourLog.trace("Finished looping"); 334 335 List<JpaPid> pids = fetchResultPids(theUuid, theFrom, theTo, theRequestDetails, search, theRequestPartitionId); 336 337 updateSearchExpiryOrNull(search, theRequestPartitionId); 338 339 ourLog.trace("Fetched {} results", pids.size()); 340 341 return pids; 342 } 343 344 @Nonnull 345 private List<JpaPid> fetchResultPids( 346 String theUuid, 347 int theFrom, 348 int theTo, 349 @Nullable RequestDetails theRequestDetails, 350 Search theSearch, 351 RequestPartitionId theRequestPartitionId) { 352 List<JpaPid> pids = mySearchResultCacheSvc.fetchResultPids( 353 theSearch, theFrom, theTo, theRequestDetails, theRequestPartitionId); 354 if (pids == null) { 355 throw myExceptionSvc.newUnknownSearchException(theUuid); 356 } 357 return pids; 358 } 359 360 private void updateSearchExpiryOrNull(Search theSearch, RequestPartitionId theRequestPartitionId) { 361 // The created time may be null in some unit tests 362 if (theSearch.getCreated() != null) { 363 // start tracking last-access-time for this search when it is more than halfway to expire by created time 364 // we do this to avoid generating excessive write traffic on busy cached searches. 365 long expireAfterMillis = myStorageSettings.getExpireSearchResultsAfterMillis(); 366 long createdCutoff = theSearch.getCreated().getTime() + expireAfterMillis; 367 if (createdCutoff - System.currentTimeMillis() < expireAfterMillis / 2) { 368 theSearch.setExpiryOrNull(DateUtils.addMinutes(new Date(), SEARCH_EXPIRY_OFFSET_MINUTES)); 369 // TODO: A nice future enhancement might be to make this flush be asynchronous 370 mySearchCacheSvc.save(theSearch, theRequestPartitionId); 371 } 372 } 373 } 374 375 @Override 376 public IBundleProvider registerSearch( 377 final IFhirResourceDao<?> theCallingDao, 378 final SearchParameterMap theParams, 379 String theResourceType, 380 CacheControlDirective theCacheControlDirective, 381 RequestDetails theRequestDetails, 382 RequestPartitionId theRequestPartitionId) { 383 final String searchUuid = UUID.randomUUID().toString(); 384 385 final String queryString = theParams.toNormalizedQueryString(myContext); 386 ourLog.debug("Registering new search {}", searchUuid); 387 388 Search search = new Search(); 389 QueryParameterUtils.populateSearchEntity( 390 theParams, theResourceType, searchUuid, queryString, search, theRequestPartitionId); 391 392 myStorageInterceptorHooks.callStoragePresearchRegistered( 393 theRequestDetails, theParams, search, theRequestPartitionId); 394 395 validateSearch(theParams); 396 397 Class<? extends IBaseResource> resourceTypeClass = 398 myContext.getResourceDefinition(theResourceType).getImplementingClass(); 399 final ISearchBuilder<JpaPid> sb = mySearchBuilderFactory.newSearchBuilder(theResourceType, resourceTypeClass); 400 sb.setFetchSize(mySyncSize); 401 sb.setRequireTotal(theParams.getCount() != null); 402 403 final Integer loadSynchronousUpTo = getLoadSynchronousUpToOrNull(theCacheControlDirective); 404 boolean isOffsetQuery = theParams.isOffsetQuery(); 405 406 // todo someday - not today. 407 // SearchStrategyFactory.ISearchStrategy searchStrategy = mySearchStrategyFactory.pickStrategy(theResourceType, 408 // theParams, theRequestDetails); 409 // return searchStrategy.get(); 410 411 if (theParams.isLoadSynchronous() || loadSynchronousUpTo != null || isOffsetQuery) { 412 if (mySearchStrategyFactory.isSupportsHSearchDirect(theResourceType, theParams, theRequestDetails)) { 413 ourLog.info("Search {} is using direct load strategy", searchUuid); 414 SearchStrategyFactory.ISearchStrategy direct = mySearchStrategyFactory.makeDirectStrategy( 415 searchUuid, theResourceType, theParams, theRequestDetails); 416 417 try { 418 return direct.get(); 419 } catch (ResourceNotFoundInIndexException theE) { 420 // some resources were not found in index, so we will inform this and resort to JPA search 421 ourLog.warn( 422 "Some resources were not found in index. Make sure all resources were indexed. Resorting to database search."); 423 } 424 } 425 426 // we need a max to fetch for synchronous searches; 427 // otherwise we'll explode memory. 428 Integer maxToLoad = getSynchronousMaxResultsToFetch(theParams, loadSynchronousUpTo); 429 ourLog.debug("Setting a max fetch value of {} for synchronous search", maxToLoad); 430 sb.setMaxResultsToFetch(maxToLoad); 431 432 ourLog.debug("Search {} is loading in synchronous mode", searchUuid); 433 return mySynchronousSearchSvc.executeQuery( 434 theParams, theRequestDetails, searchUuid, sb, loadSynchronousUpTo, theRequestPartitionId); 435 } 436 437 /* 438 * See if there are any cached searches whose results we can return 439 * instead 440 */ 441 SearchCacheStatusEnum cacheStatus = SearchCacheStatusEnum.MISS; 442 if (theCacheControlDirective != null && theCacheControlDirective.isNoCache()) { 443 cacheStatus = SearchCacheStatusEnum.NOT_TRIED; 444 } 445 446 if (cacheStatus != SearchCacheStatusEnum.NOT_TRIED) { 447 if (theParams.getEverythingMode() == null) { 448 if (myStorageSettings.getReuseCachedSearchResultsForMillis() != null) { 449 PersistedJpaBundleProvider foundSearchProvider = findCachedQuery( 450 theParams, theResourceType, theRequestDetails, queryString, theRequestPartitionId); 451 if (foundSearchProvider != null) { 452 foundSearchProvider.setCacheStatus(SearchCacheStatusEnum.HIT); 453 return foundSearchProvider; 454 } 455 } 456 } 457 } 458 459 PersistedJpaSearchFirstPageBundleProvider retVal = submitSearch( 460 theCallingDao, theParams, theResourceType, theRequestDetails, sb, theRequestPartitionId, search); 461 retVal.setCacheStatus(cacheStatus); 462 return retVal; 463 } 464 465 /** 466 * The max results to return if this is a synchronous search. 467 * <p> 468 * We'll look in this order: 469 * * load synchronous up to (on params) 470 * * param count (+ offset) 471 * * StorageSettings fetch size default max 472 * * 473 */ 474 private Integer getSynchronousMaxResultsToFetch(SearchParameterMap theParams, Integer theLoadSynchronousUpTo) { 475 if (theLoadSynchronousUpTo != null) { 476 return theLoadSynchronousUpTo; 477 } 478 479 if (theParams.getCount() != null) { 480 int valToReturn = theParams.getCount() + 1; 481 if (theParams.getOffset() != null) { 482 valToReturn += theParams.getOffset(); 483 } 484 return valToReturn; 485 } 486 487 if (myStorageSettings.getFetchSizeDefaultMaximum() != null) { 488 return myStorageSettings.getFetchSizeDefaultMaximum(); 489 } 490 491 return myStorageSettings.getInternalSynchronousSearchSize(); 492 } 493 494 private void validateSearch(SearchParameterMap theParams) { 495 assert checkNoDuplicateParameters(theParams) 496 : "Duplicate parameters found in query: " + theParams.toNormalizedQueryString(myContext); 497 498 validateIncludes(theParams.getIncludes(), Constants.PARAM_INCLUDE); 499 validateIncludes(theParams.getRevIncludes(), Constants.PARAM_REVINCLUDE); 500 } 501 502 /** 503 * This method detects whether we have any duplicate lists of parameters and returns 504 * {@literal true} if none are found. For example, the following query would result 505 * in this method returning {@literal false}: 506 * <code>Patient?name=bart,homer&name=bart,homer</code> 507 * <p> 508 * This is not an optimized test, and it's not technically even prohibited to have 509 * duplicates like these in queries so this method should only be called as a 510 * part of an {@literal assert} statement to catch errors in tests. 511 */ 512 private boolean checkNoDuplicateParameters(SearchParameterMap theParams) { 513 HashSet<List<IQueryParameterType>> lists = new HashSet<>(); 514 for (List<List<IQueryParameterType>> andList : theParams.values()) { 515 516 lists.clear(); 517 for (int i = 0; i < andList.size(); i++) { 518 List<IQueryParameterType> orListI = andList.get(i); 519 if (!orListI.isEmpty() && !lists.add(orListI)) { 520 return false; 521 } 522 } 523 } 524 return true; 525 } 526 527 private void validateIncludes(Set<Include> includes, String name) { 528 for (Include next : includes) { 529 String value = next.getValue(); 530 if (value.equals(Constants.INCLUDE_STAR) || isBlank(value)) { 531 continue; 532 } 533 534 String paramType = next.getParamType(); 535 String paramName = next.getParamName(); 536 String paramTargetType = next.getParamTargetType(); 537 538 if (isBlank(paramType) || isBlank(paramName)) { 539 String msg = myContext 540 .getLocalizer() 541 .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidInclude", name, value, ""); 542 throw new InvalidRequestException(Msg.code(2018) + msg); 543 } 544 545 if (!myDaoRegistry.isResourceTypeSupported(paramType)) { 546 String resourceTypeMsg = myContext 547 .getLocalizer() 548 .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidResourceType", paramType); 549 String msg = myContext 550 .getLocalizer() 551 .getMessage( 552 SearchCoordinatorSvcImpl.class, 553 "invalidInclude", 554 UrlUtil.sanitizeUrlPart(name), 555 UrlUtil.sanitizeUrlPart(value), 556 resourceTypeMsg); // last param is pre-sanitized 557 throw new InvalidRequestException(Msg.code(2017) + msg); 558 } 559 560 if (isNotBlank(paramTargetType) && !myDaoRegistry.isResourceTypeSupported(paramTargetType)) { 561 String resourceTypeMsg = myContext 562 .getLocalizer() 563 .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidResourceType", paramTargetType); 564 String msg = myContext 565 .getLocalizer() 566 .getMessage( 567 SearchCoordinatorSvcImpl.class, 568 "invalidInclude", 569 UrlUtil.sanitizeUrlPart(name), 570 UrlUtil.sanitizeUrlPart(value), 571 resourceTypeMsg); // last param is pre-sanitized 572 throw new InvalidRequestException(Msg.code(2016) + msg); 573 } 574 575 if (!Constants.INCLUDE_STAR.equals(paramName) 576 && mySearchParamRegistry.getActiveSearchParam( 577 paramType, paramName, ISearchParamRegistry.SearchParamLookupContextEnum.SEARCH) 578 == null) { 579 List<String> validNames = mySearchParamRegistry 580 .getActiveSearchParams(paramType, ISearchParamRegistry.SearchParamLookupContextEnum.SEARCH) 581 .values() 582 .stream() 583 .filter(t -> t.getParamType() == RestSearchParameterTypeEnum.REFERENCE) 584 .map(t -> UrlUtil.sanitizeUrlPart(t.getName())) 585 .sorted() 586 .collect(Collectors.toList()); 587 String searchParamMessage = myContext 588 .getLocalizer() 589 .getMessage( 590 BaseStorageDao.class, 591 "invalidSearchParameter", 592 UrlUtil.sanitizeUrlPart(paramName), 593 UrlUtil.sanitizeUrlPart(paramType), 594 validNames); 595 String msg = myContext 596 .getLocalizer() 597 .getMessage( 598 SearchCoordinatorSvcImpl.class, 599 "invalidInclude", 600 UrlUtil.sanitizeUrlPart(name), 601 UrlUtil.sanitizeUrlPart(value), 602 searchParamMessage); // last param is pre-sanitized 603 throw new InvalidRequestException(Msg.code(2015) + msg); 604 } 605 } 606 } 607 608 @Override 609 public Optional<Integer> getSearchTotal( 610 String theUuid, @Nullable RequestDetails theRequestDetails, RequestPartitionId theRequestPartitionId) { 611 SearchTask task = myIdToSearchTask.get(theUuid); 612 if (task != null) { 613 return Optional.ofNullable(task.awaitInitialSync()); 614 } 615 616 /* 617 * In case there is no running search, if the total is listed as accurate we know one is coming 618 * so let's wait a bit for it to show up 619 */ 620 Optional<Search> search = myTxService 621 .withRequest(theRequestDetails) 622 .execute(() -> mySearchCacheSvc.fetchByUuid(theUuid, theRequestPartitionId)); 623 if (search.isPresent()) { 624 Optional<SearchParameterMap> searchParameterMap = search.get().getSearchParameterMap(); 625 if (searchParameterMap.isPresent() 626 && searchParameterMap.get().getSearchTotalMode() == SearchTotalModeEnum.ACCURATE) { 627 for (int i = 0; i < 10; i++) { 628 if (search.isPresent()) { 629 QueryParameterUtils.verifySearchHasntFailedOrThrowInternalErrorException(search.get()); 630 if (search.get().getTotalCount() != null) { 631 return Optional.of(search.get().getTotalCount()); 632 } 633 } 634 search = mySearchCacheSvc.fetchByUuid(theUuid, theRequestPartitionId); 635 } 636 } 637 } 638 639 return Optional.empty(); 640 } 641 642 @Nonnull 643 private PersistedJpaSearchFirstPageBundleProvider submitSearch( 644 IDao theCallingDao, 645 SearchParameterMap theParams, 646 String theResourceType, 647 RequestDetails theRequestDetails, 648 ISearchBuilder<JpaPid> theSb, 649 RequestPartitionId theRequestPartitionId, 650 Search theSearch) { 651 StopWatch w = new StopWatch(); 652 653 SearchTaskParameters stp = new SearchTaskParameters( 654 theSearch, 655 theCallingDao, 656 theParams, 657 theResourceType, 658 theRequestDetails, 659 theRequestPartitionId, 660 myOnRemoveSearchTask, 661 mySyncSize); 662 stp.setLoadingThrottleForUnitTests(myLoadingThrottleForUnitTests); 663 SearchTask task = (SearchTask) myBeanFactory.getBean(SearchConfig.SEARCH_TASK, stp); 664 myIdToSearchTask.put(theSearch.getUuid(), task); 665 task.call(); 666 667 PersistedJpaSearchFirstPageBundleProvider retVal = myPersistedJpaBundleProviderFactory.newInstanceFirstPage( 668 theRequestDetails, task, theSb, theRequestPartitionId); 669 670 ourLog.debug("Search initial phase completed in {}ms", w.getMillis()); 671 return retVal; 672 } 673 674 @Nullable 675 private PersistedJpaBundleProvider findCachedQuery( 676 SearchParameterMap theParams, 677 String theResourceType, 678 RequestDetails theRequestDetails, 679 String theQueryString, 680 RequestPartitionId theRequestPartitionId) { 681 // May be null 682 return myTxService 683 .withRequest(theRequestDetails) 684 .withRequestPartitionId(theRequestPartitionId) 685 .execute(() -> { 686 IInterceptorBroadcaster compositeBroadcaster = 687 CompositeInterceptorBroadcaster.newCompositeBroadcaster( 688 myInterceptorBroadcaster, theRequestDetails); 689 690 // Interceptor call: STORAGE_PRECHECK_FOR_CACHED_SEARCH 691 692 HookParams params = new HookParams() 693 .add(SearchParameterMap.class, theParams) 694 .add(RequestDetails.class, theRequestDetails) 695 .addIfMatchesType(ServletRequestDetails.class, theRequestDetails); 696 boolean canUseCache = 697 compositeBroadcaster.callHooks(Pointcut.STORAGE_PRECHECK_FOR_CACHED_SEARCH, params); 698 if (!canUseCache) { 699 return null; 700 } 701 702 // Check for a search matching the given hash 703 Search searchToUse = findSearchToUseOrNull(theQueryString, theResourceType, theRequestPartitionId); 704 if (searchToUse == null) { 705 return null; 706 } 707 708 ourLog.debug("Reusing search {} from cache", searchToUse.getUuid()); 709 // Interceptor call: JPA_PERFTRACE_SEARCH_REUSING_CACHED 710 params = new HookParams() 711 .add(SearchParameterMap.class, theParams) 712 .add(RequestDetails.class, theRequestDetails) 713 .addIfMatchesType(ServletRequestDetails.class, theRequestDetails); 714 compositeBroadcaster.callHooks(Pointcut.JPA_PERFTRACE_SEARCH_REUSING_CACHED, params); 715 716 return myPersistedJpaBundleProviderFactory.newInstance(theRequestDetails, searchToUse.getUuid()); 717 }); 718 } 719 720 @Nullable 721 private Search findSearchToUseOrNull( 722 String theQueryString, String theResourceType, RequestPartitionId theRequestPartitionId) { 723 // createdCutoff is in recent past 724 final Instant createdCutoff = 725 Instant.now().minus(myStorageSettings.getReuseCachedSearchResultsForMillis(), ChronoUnit.MILLIS); 726 727 Optional<Search> candidate = mySearchCacheSvc.findCandidatesForReuse( 728 theResourceType, theQueryString, createdCutoff, theRequestPartitionId); 729 return candidate.orElse(null); 730 } 731 732 @Nullable 733 private Integer getLoadSynchronousUpToOrNull(CacheControlDirective theCacheControlDirective) { 734 final Integer loadSynchronousUpTo; 735 if (theCacheControlDirective != null && theCacheControlDirective.isNoStore()) { 736 if (theCacheControlDirective.getMaxResults() != null) { 737 loadSynchronousUpTo = theCacheControlDirective.getMaxResults(); 738 if (loadSynchronousUpTo > myStorageSettings.getCacheControlNoStoreMaxResultsUpperLimit()) { 739 throw new InvalidRequestException(Msg.code(1165) + Constants.HEADER_CACHE_CONTROL + " header " 740 + Constants.CACHE_CONTROL_MAX_RESULTS + " value must not exceed " 741 + myStorageSettings.getCacheControlNoStoreMaxResultsUpperLimit()); 742 } 743 } else { 744 loadSynchronousUpTo = 100; 745 } 746 } else { 747 loadSynchronousUpTo = null; 748 } 749 return loadSynchronousUpTo; 750 } 751 752 /** 753 * Creates a {@link Pageable} using a start and end index 754 */ 755 @SuppressWarnings("WeakerAccess") 756 @Nullable 757 public static Pageable toPage(final int theFromIndex, int theToIndex) { 758 int pageSize = theToIndex - theFromIndex; 759 if (pageSize < 1) { 760 return null; 761 } 762 763 int pageIndex = theFromIndex / pageSize; 764 765 return new PageRequest(pageIndex, pageSize, Sort.unsorted()) { 766 private static final long serialVersionUID = 1L; 767 768 @Override 769 public long getOffset() { 770 return theFromIndex; 771 } 772 }; 773 } 774}