
001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L%family 019 */ 020package ca.uhn.fhir.jpa.search; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.i18n.Msg; 024import ca.uhn.fhir.interceptor.api.HookParams; 025import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; 026import ca.uhn.fhir.interceptor.api.Pointcut; 027import ca.uhn.fhir.interceptor.model.RequestPartitionId; 028import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; 029import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 030import ca.uhn.fhir.jpa.api.dao.IDao; 031import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 032import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc; 033import ca.uhn.fhir.jpa.config.SearchConfig; 034import ca.uhn.fhir.jpa.dao.BaseStorageDao; 035import ca.uhn.fhir.jpa.dao.ISearchBuilder; 036import ca.uhn.fhir.jpa.dao.NonPersistedSearch; 037import ca.uhn.fhir.jpa.dao.SearchBuilderFactory; 038import ca.uhn.fhir.jpa.dao.search.ResourceNotFoundInIndexException; 039import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; 040import ca.uhn.fhir.jpa.entity.Search; 041import ca.uhn.fhir.jpa.model.dao.JpaPid; 042import ca.uhn.fhir.jpa.model.search.SearchStatusEnum; 043import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc; 044import ca.uhn.fhir.jpa.search.builder.StorageInterceptorHooksFacade; 045import ca.uhn.fhir.jpa.search.builder.tasks.SearchContinuationTask; 046import ca.uhn.fhir.jpa.search.builder.tasks.SearchTask; 047import ca.uhn.fhir.jpa.search.builder.tasks.SearchTaskParameters; 048import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc; 049import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc; 050import ca.uhn.fhir.jpa.search.cache.SearchCacheStatusEnum; 051import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 052import ca.uhn.fhir.jpa.util.QueryParameterUtils; 053import ca.uhn.fhir.model.api.IQueryParameterType; 054import ca.uhn.fhir.model.api.Include; 055import ca.uhn.fhir.rest.api.CacheControlDirective; 056import ca.uhn.fhir.rest.api.Constants; 057import ca.uhn.fhir.rest.api.RestSearchParameterTypeEnum; 058import ca.uhn.fhir.rest.api.SearchTotalModeEnum; 059import ca.uhn.fhir.rest.api.server.IBundleProvider; 060import ca.uhn.fhir.rest.api.server.RequestDetails; 061import ca.uhn.fhir.rest.api.server.SystemRequestDetails; 062import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 063import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; 064import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails; 065import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster; 066import ca.uhn.fhir.rest.server.util.ISearchParamRegistry; 067import ca.uhn.fhir.util.AsyncUtil; 068import ca.uhn.fhir.util.StopWatch; 069import ca.uhn.fhir.util.UrlUtil; 070import com.google.common.annotations.VisibleForTesting; 071import jakarta.annotation.Nonnull; 072import jakarta.annotation.Nullable; 073import org.apache.commons.lang3.time.DateUtils; 074import org.hl7.fhir.instance.model.api.IBaseResource; 075import org.springframework.beans.factory.BeanFactory; 076import org.springframework.data.domain.PageRequest; 077import org.springframework.data.domain.Pageable; 078import org.springframework.data.domain.Sort; 079import org.springframework.stereotype.Component; 080import org.springframework.transaction.support.TransactionSynchronizationManager; 081 082import java.time.Instant; 083import java.time.temporal.ChronoUnit; 084import java.util.Date; 085import java.util.HashSet; 086import java.util.List; 087import java.util.Optional; 088import java.util.Set; 089import java.util.UUID; 090import java.util.concurrent.Callable; 091import java.util.concurrent.ConcurrentHashMap; 092import java.util.concurrent.TimeUnit; 093import java.util.function.Consumer; 094import java.util.stream.Collectors; 095 096import static ca.uhn.fhir.jpa.util.QueryParameterUtils.DEFAULT_SYNC_SIZE; 097import static org.apache.commons.lang3.StringUtils.isBlank; 098import static org.apache.commons.lang3.StringUtils.isNotBlank; 099 100@Component("mySearchCoordinatorSvc") 101public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc<JpaPid> { 102 103 private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchCoordinatorSvcImpl.class); 104 public static final int SEARCH_EXPIRY_OFFSET_MINUTES = 10; 105 106 private final FhirContext myContext; 107 private final JpaStorageSettings myStorageSettings; 108 private final IInterceptorBroadcaster myInterceptorBroadcaster; 109 private final HapiTransactionService myTxService; 110 private final ISearchCacheSvc mySearchCacheSvc; 111 private final ISearchResultCacheSvc mySearchResultCacheSvc; 112 private final DaoRegistry myDaoRegistry; 113 private final SearchBuilderFactory<JpaPid> mySearchBuilderFactory; 114 private final ISynchronousSearchSvc mySynchronousSearchSvc; 115 private final PersistedJpaBundleProviderFactory myPersistedJpaBundleProviderFactory; 116 private final ISearchParamRegistry mySearchParamRegistry; 117 private final SearchStrategyFactory mySearchStrategyFactory; 118 private final ExceptionService myExceptionSvc; 119 private final BeanFactory myBeanFactory; 120 private final IRequestPartitionHelperSvc myRequestPartitionHelperSvc; 121 private ConcurrentHashMap<String, SearchTask> myIdToSearchTask = new ConcurrentHashMap<>(); 122 123 private final Consumer<String> myOnRemoveSearchTask = myIdToSearchTask::remove; 124 125 private final StorageInterceptorHooksFacade myStorageInterceptorHooks; 126 private Integer myLoadingThrottleForUnitTests = null; 127 private long myMaxMillisToWaitForRemoteResults = DateUtils.MILLIS_PER_MINUTE; 128 private boolean myNeverUseLocalSearchForUnitTests; 129 private int mySyncSize = DEFAULT_SYNC_SIZE; 130 131 /** 132 * Constructor 133 */ 134 public SearchCoordinatorSvcImpl( 135 FhirContext theContext, 136 JpaStorageSettings theStorageSettings, 137 IInterceptorBroadcaster theInterceptorBroadcaster, 138 HapiTransactionService theTxService, 139 ISearchCacheSvc theSearchCacheSvc, 140 ISearchResultCacheSvc theSearchResultCacheSvc, 141 DaoRegistry theDaoRegistry, 142 SearchBuilderFactory<JpaPid> theSearchBuilderFactory, 143 ISynchronousSearchSvc theSynchronousSearchSvc, 144 PersistedJpaBundleProviderFactory thePersistedJpaBundleProviderFactory, 145 ISearchParamRegistry theSearchParamRegistry, 146 SearchStrategyFactory theSearchStrategyFactory, 147 ExceptionService theExceptionSvc, 148 BeanFactory theBeanFactory, 149 IRequestPartitionHelperSvc theRequestPartitionHelperSvc) { 150 super(); 151 myContext = theContext; 152 myStorageSettings = theStorageSettings; 153 myInterceptorBroadcaster = theInterceptorBroadcaster; 154 myTxService = theTxService; 155 mySearchCacheSvc = theSearchCacheSvc; 156 mySearchResultCacheSvc = theSearchResultCacheSvc; 157 myDaoRegistry = theDaoRegistry; 158 mySearchBuilderFactory = theSearchBuilderFactory; 159 mySynchronousSearchSvc = theSynchronousSearchSvc; 160 myPersistedJpaBundleProviderFactory = thePersistedJpaBundleProviderFactory; 161 mySearchParamRegistry = theSearchParamRegistry; 162 mySearchStrategyFactory = theSearchStrategyFactory; 163 myExceptionSvc = theExceptionSvc; 164 myBeanFactory = theBeanFactory; 165 myRequestPartitionHelperSvc = theRequestPartitionHelperSvc; 166 167 myStorageInterceptorHooks = new StorageInterceptorHooksFacade(myInterceptorBroadcaster); 168 } 169 170 @VisibleForTesting 171 Set<String> getActiveSearchIds() { 172 return myIdToSearchTask.keySet(); 173 } 174 175 @VisibleForTesting 176 public void setIdToSearchTaskMapForUnitTests(ConcurrentHashMap<String, SearchTask> theIdToSearchTaskMap) { 177 myIdToSearchTask = theIdToSearchTaskMap; 178 } 179 180 @VisibleForTesting 181 public void setLoadingThrottleForUnitTests(Integer theLoadingThrottleForUnitTests) { 182 myLoadingThrottleForUnitTests = theLoadingThrottleForUnitTests; 183 } 184 185 @VisibleForTesting 186 public void setNeverUseLocalSearchForUnitTests(boolean theNeverUseLocalSearchForUnitTests) { 187 myNeverUseLocalSearchForUnitTests = theNeverUseLocalSearchForUnitTests; 188 } 189 190 @VisibleForTesting 191 public void setSyncSizeForUnitTests(int theSyncSize) { 192 mySyncSize = theSyncSize; 193 } 194 195 @Override 196 public void cancelAllActiveSearches() { 197 for (SearchTask next : myIdToSearchTask.values()) { 198 ourLog.info( 199 "Requesting immediate abort of search: {}", next.getSearch().getUuid()); 200 next.requestImmediateAbort(); 201 AsyncUtil.awaitLatchAndIgnoreInterrupt(next.getCompletionLatch(), 30, TimeUnit.SECONDS); 202 } 203 } 204 205 @SuppressWarnings("SameParameterValue") 206 @VisibleForTesting 207 public void setMaxMillisToWaitForRemoteResultsForUnitTest(long theMaxMillisToWaitForRemoteResults) { 208 myMaxMillisToWaitForRemoteResults = theMaxMillisToWaitForRemoteResults; 209 } 210 211 /** 212 * This method is called by the HTTP client processing thread in order to 213 * fetch resources. 214 * <p> 215 * This method must not be called from inside a transaction. The rationale is that 216 * the {@link Search} entity is treated as a piece of shared state across client threads 217 * accessing the same thread, so we need to be able to update that table in a transaction 218 * and commit it right away in order for that to work. Examples of needing to do this 219 * include if two different clients request the same search and are both paging at the 220 * same time, but also includes clients that are hacking the paging links to 221 * fetch multiple pages of a search result in parallel. In both cases we need to only 222 * let one of them actually activate the search, or we will have conflicts. The other thread 223 * just needs to wait until the first one actually fetches more results. 224 */ 225 @Override 226 public List<JpaPid> getResources( 227 final String theUuid, 228 int theFrom, 229 int theTo, 230 @Nullable RequestDetails theRequestDetails, 231 RequestPartitionId theRequestPartitionId) { 232 assert !TransactionSynchronizationManager.isActualTransactionActive(); 233 234 // If we're actively searching right now, don't try to do anything until at least one batch has been 235 // persisted in the DB 236 SearchTask searchTask = myIdToSearchTask.get(theUuid); 237 if (searchTask != null) { 238 searchTask.awaitInitialSync(); 239 } 240 241 ourLog.trace("About to start looking for resources {}-{}", theFrom, theTo); 242 243 Search search; 244 StopWatch sw = new StopWatch(); 245 while (true) { 246 247 if (myNeverUseLocalSearchForUnitTests == false) { 248 if (searchTask != null) { 249 ourLog.trace("Local search found"); 250 List<JpaPid> resourcePids = searchTask.getResourcePids(theFrom, theTo); 251 ourLog.trace( 252 "Local search returned {} pids, wanted {}-{} - Search: {}", 253 resourcePids.size(), 254 theFrom, 255 theTo, 256 searchTask.getSearch()); 257 258 /* 259 * Generally, if a search task is open, the fastest possible thing is to just return its results. This 260 * will work most of the time, but can fail if the task hit a search threshold and the client is requesting 261 * results beyond that threashold. In that case, we'll keep going below, since that will trigger another 262 * task. 263 */ 264 if ((searchTask.getSearch().getNumFound() 265 - searchTask.getSearch().getNumBlocked()) 266 >= theTo 267 || resourcePids.size() == (theTo - theFrom)) { 268 return resourcePids; 269 } 270 } 271 } 272 273 Callable<Search> searchCallback = () -> mySearchCacheSvc 274 .fetchByUuid(theUuid, theRequestPartitionId) 275 .orElseThrow(() -> myExceptionSvc.newUnknownSearchException(theUuid)); 276 search = myTxService 277 .withRequest(theRequestDetails) 278 .withRequestPartitionId(theRequestPartitionId) 279 .execute(searchCallback); 280 QueryParameterUtils.verifySearchHasntFailedOrThrowInternalErrorException(search); 281 282 if (search.getStatus() == SearchStatusEnum.FINISHED) { 283 ourLog.trace("Search entity marked as finished with {} results", search.getNumFound()); 284 break; 285 } 286 if ((search.getNumFound() - search.getNumBlocked()) >= theTo) { 287 ourLog.trace("Search entity has {} results so far", search.getNumFound()); 288 break; 289 } 290 291 if (sw.getMillis() > myMaxMillisToWaitForRemoteResults) { 292 ourLog.error( 293 "Search {} of type {} for {}{} timed out after {}ms with status {}.", 294 search.getId(), 295 search.getSearchType(), 296 search.getResourceType(), 297 search.getSearchQueryString(), 298 sw.getMillis(), 299 search.getStatus()); 300 throw new InternalErrorException(Msg.code(1163) + "Request timed out after " + sw.getMillis() + "ms"); 301 } 302 303 // If the search was saved in "pass complete mode" it's probably time to 304 // start a new pass 305 if (search.getStatus() == SearchStatusEnum.PASSCMPLET) { 306 ourLog.trace("Going to try to start next search"); 307 Optional<Search> newSearch = 308 mySearchCacheSvc.tryToMarkSearchAsInProgress(search, theRequestPartitionId); 309 if (newSearch.isPresent()) { 310 ourLog.trace("Launching new search"); 311 search = newSearch.get(); 312 String resourceType = search.getResourceType(); 313 SearchParameterMap params = search.getSearchParameterMap() 314 .orElseThrow(() -> new IllegalStateException("No map in PASSCOMPLET search")); 315 IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(resourceType); 316 317 SearchTaskParameters parameters = new SearchTaskParameters( 318 search, 319 resourceDao, 320 params, 321 resourceType, 322 theRequestDetails, 323 theRequestPartitionId, 324 myOnRemoveSearchTask, 325 mySyncSize); 326 parameters.setLoadingThrottleForUnitTests(myLoadingThrottleForUnitTests); 327 SearchContinuationTask task = 328 (SearchContinuationTask) myBeanFactory.getBean(SearchConfig.CONTINUE_TASK, parameters); 329 myIdToSearchTask.put(search.getUuid(), task); 330 task.call(); 331 } 332 } 333 334 if (!search.getStatus().isDone()) { 335 AsyncUtil.sleep(500); 336 } 337 } 338 339 ourLog.trace("Finished looping"); 340 341 List<JpaPid> pids = fetchResultPids(theUuid, theFrom, theTo, theRequestDetails, search, theRequestPartitionId); 342 343 updateSearchExpiryOrNull(search, theRequestPartitionId); 344 345 ourLog.trace("Fetched {} results", pids.size()); 346 347 return pids; 348 } 349 350 @Nonnull 351 private List<JpaPid> fetchResultPids( 352 String theUuid, 353 int theFrom, 354 int theTo, 355 @Nullable RequestDetails theRequestDetails, 356 Search theSearch, 357 RequestPartitionId theRequestPartitionId) { 358 List<JpaPid> pids = mySearchResultCacheSvc.fetchResultPids( 359 theSearch, theFrom, theTo, theRequestDetails, theRequestPartitionId); 360 if (pids == null) { 361 throw myExceptionSvc.newUnknownSearchException(theUuid); 362 } 363 return pids; 364 } 365 366 private void updateSearchExpiryOrNull(Search theSearch, RequestPartitionId theRequestPartitionId) { 367 // The created time may be null in some unit tests 368 if (theSearch.getCreated() != null) { 369 // start tracking last-access-time for this search when it is more than halfway to expire by created time 370 // we do this to avoid generating excessive write traffic on busy cached searches. 371 long expireAfterMillis = myStorageSettings.getExpireSearchResultsAfterMillis(); 372 long createdCutoff = theSearch.getCreated().getTime() + expireAfterMillis; 373 if (createdCutoff - System.currentTimeMillis() < expireAfterMillis / 2) { 374 theSearch.setExpiryOrNull(DateUtils.addMinutes(new Date(), SEARCH_EXPIRY_OFFSET_MINUTES)); 375 // TODO: A nice future enhancement might be to make this flush be asynchronous 376 mySearchCacheSvc.save(theSearch, theRequestPartitionId); 377 } 378 } 379 } 380 381 @Override 382 public IBundleProvider registerSearch( 383 final IFhirResourceDao<?> theCallingDao, 384 final SearchParameterMap theParams, 385 String theResourceType, 386 CacheControlDirective theCacheControlDirective, 387 @Nullable RequestDetails theRequestDetails) { 388 final String searchUuid = UUID.randomUUID().toString(); 389 390 final String queryString = theParams.toNormalizedQueryString(); 391 ourLog.debug("Registering new search {}", searchUuid); 392 393 RequestPartitionId requestPartitionId = null; 394 if (theRequestDetails instanceof SystemRequestDetails srd) { 395 requestPartitionId = srd.getRequestPartitionId(); 396 } 397 398 // If an explicit request partition wasn't provided, calculate the request 399 // partition after invoking STORAGE_PRESEARCH_REGISTERED just in case any interceptors 400 // made changes which could affect the calculated partition 401 if (requestPartitionId == null) { 402 403 // Invoke any STORAGE_PRESEARCH_PARTITION_SELECTED interceptor hooks 404 NonPersistedSearch search = new NonPersistedSearch(theResourceType); 405 search.setUuid(searchUuid); 406 myStorageInterceptorHooks.callStoragePresearchPartitionSelected(theRequestDetails, theParams, search); 407 408 requestPartitionId = myRequestPartitionHelperSvc.determineReadPartitionForRequestForSearchType( 409 theRequestDetails, theResourceType, theParams); 410 } 411 412 Search search = new Search(); 413 QueryParameterUtils.populateSearchEntity( 414 theParams, theResourceType, searchUuid, queryString, search, requestPartitionId); 415 416 // Invoke any STORAGE_PRESEARCH_REGISTERED interceptor hooks 417 myStorageInterceptorHooks.callStoragePresearchRegistered( 418 theRequestDetails, theParams, search, requestPartitionId); 419 420 validateSearch(theParams); 421 422 Class<? extends IBaseResource> resourceTypeClass = 423 myContext.getResourceDefinition(theResourceType).getImplementingClass(); 424 final ISearchBuilder<JpaPid> sb = mySearchBuilderFactory.newSearchBuilder(theResourceType, resourceTypeClass); 425 sb.setFetchSize(mySyncSize); 426 sb.setRequireTotal(theParams.getCount() != null); 427 428 final Integer loadSynchronousUpTo = getLoadSynchronousUpToOrNull(theCacheControlDirective); 429 boolean isOffsetQuery = theParams.isOffsetQuery(); 430 431 // todo someday - not today. 432 // SearchStrategyFactory.ISearchStrategy searchStrategy = mySearchStrategyFactory.pickStrategy(theResourceType, 433 // theParams, theRequestDetails); 434 // return searchStrategy.get(); 435 436 if (theParams.isLoadSynchronous() || loadSynchronousUpTo != null || isOffsetQuery) { 437 if (mySearchStrategyFactory.isSupportsHSearchDirect(theResourceType, theParams, theRequestDetails)) { 438 ourLog.info("Search {} is using direct load strategy", searchUuid); 439 SearchStrategyFactory.ISearchStrategy direct = mySearchStrategyFactory.makeDirectStrategy( 440 searchUuid, theResourceType, theParams, theRequestDetails); 441 442 try { 443 return direct.get(); 444 } catch (ResourceNotFoundInIndexException theE) { 445 // some resources were not found in index, so we will inform this and resort to JPA search 446 ourLog.warn( 447 "Some resources were not found in index. Make sure all resources were indexed. Resorting to database search."); 448 } 449 } 450 451 // we need a max to fetch for synchronous searches; 452 // otherwise we'll explode memory. 453 Integer maxToLoad = getSynchronousMaxResultsToFetch(theParams, loadSynchronousUpTo); 454 ourLog.debug("Setting a max fetch value of {} for synchronous search", maxToLoad); 455 sb.setMaxResultsToFetch(maxToLoad); 456 457 ourLog.debug("Search {} is loading in synchronous mode", searchUuid); 458 return mySynchronousSearchSvc.executeQuery( 459 theParams, theRequestDetails, searchUuid, sb, loadSynchronousUpTo, requestPartitionId); 460 } 461 462 /* 463 * See if there are any cached searches whose results we can return 464 * instead 465 */ 466 SearchCacheStatusEnum cacheStatus = SearchCacheStatusEnum.MISS; 467 if (theCacheControlDirective != null && theCacheControlDirective.isNoCache()) { 468 cacheStatus = SearchCacheStatusEnum.NOT_TRIED; 469 } 470 471 if (cacheStatus != SearchCacheStatusEnum.NOT_TRIED) { 472 if (theParams.getEverythingMode() == null) { 473 if (myStorageSettings.getReuseCachedSearchResultsForMillis() != null) { 474 PersistedJpaBundleProvider foundSearchProvider = findCachedQuery( 475 theParams, theResourceType, theRequestDetails, queryString, requestPartitionId); 476 if (foundSearchProvider != null) { 477 foundSearchProvider.setCacheStatus(SearchCacheStatusEnum.HIT); 478 return foundSearchProvider; 479 } 480 } 481 } 482 } 483 484 PersistedJpaSearchFirstPageBundleProvider retVal = submitSearch( 485 theCallingDao, theParams, theResourceType, theRequestDetails, sb, requestPartitionId, search); 486 retVal.setCacheStatus(cacheStatus); 487 return retVal; 488 } 489 490 /** 491 * The max results to return if this is a synchronous search. 492 * <p> 493 * We'll look in this order: 494 * * load synchronous up to (on params) 495 * * param count (+ offset) 496 * * StorageSettings fetch size default max 497 * * 498 */ 499 private Integer getSynchronousMaxResultsToFetch(SearchParameterMap theParams, Integer theLoadSynchronousUpTo) { 500 if (theLoadSynchronousUpTo != null) { 501 return theLoadSynchronousUpTo; 502 } 503 504 if (theParams.getCount() != null) { 505 int valToReturn = theParams.getCount() + 1; 506 if (theParams.getOffset() != null) { 507 valToReturn += theParams.getOffset(); 508 } 509 return valToReturn; 510 } 511 512 if (myStorageSettings.getFetchSizeDefaultMaximum() != null) { 513 return myStorageSettings.getFetchSizeDefaultMaximum(); 514 } 515 516 return myStorageSettings.getInternalSynchronousSearchSize(); 517 } 518 519 private void validateSearch(SearchParameterMap theParams) { 520 assert checkNoDuplicateParameters(theParams) 521 : "Duplicate parameters found in query: " + theParams.toNormalizedQueryString(myContext); 522 523 validateIncludes(theParams.getIncludes(), Constants.PARAM_INCLUDE); 524 validateIncludes(theParams.getRevIncludes(), Constants.PARAM_REVINCLUDE); 525 } 526 527 /** 528 * This method detects whether we have any duplicate lists of parameters and returns 529 * {@literal true} if none are found. For example, the following query would result 530 * in this method returning {@literal false}: 531 * <code>Patient?name=bart,homer&name=bart,homer</code> 532 * <p> 533 * This is not an optimized test, and it's not technically even prohibited to have 534 * duplicates like these in queries so this method should only be called as a 535 * part of an {@literal assert} statement to catch errors in tests. 536 */ 537 private boolean checkNoDuplicateParameters(SearchParameterMap theParams) { 538 HashSet<List<IQueryParameterType>> lists = new HashSet<>(); 539 for (List<List<IQueryParameterType>> andList : theParams.values()) { 540 541 lists.clear(); 542 for (int i = 0; i < andList.size(); i++) { 543 List<IQueryParameterType> orListI = andList.get(i); 544 if (!orListI.isEmpty() && !lists.add(orListI)) { 545 return false; 546 } 547 } 548 } 549 return true; 550 } 551 552 private void validateIncludes(Set<Include> includes, String name) { 553 for (Include next : includes) { 554 String value = next.getValue(); 555 if (value.equals(Constants.INCLUDE_STAR) || isBlank(value)) { 556 continue; 557 } 558 559 String paramType = next.getParamType(); 560 String paramName = next.getParamName(); 561 String paramTargetType = next.getParamTargetType(); 562 563 if (isBlank(paramType) || isBlank(paramName)) { 564 String msg = myContext 565 .getLocalizer() 566 .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidInclude", name, value, ""); 567 throw new InvalidRequestException(Msg.code(2018) + msg); 568 } 569 570 if (!myDaoRegistry.isResourceTypeSupported(paramType)) { 571 String resourceTypeMsg = myContext 572 .getLocalizer() 573 .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidResourceType", paramType); 574 String msg = myContext 575 .getLocalizer() 576 .getMessage( 577 SearchCoordinatorSvcImpl.class, 578 "invalidInclude", 579 UrlUtil.sanitizeUrlPart(name), 580 UrlUtil.sanitizeUrlPart(value), 581 resourceTypeMsg); // last param is pre-sanitized 582 throw new InvalidRequestException(Msg.code(2017) + msg); 583 } 584 585 if (isNotBlank(paramTargetType) && !myDaoRegistry.isResourceTypeSupported(paramTargetType)) { 586 String resourceTypeMsg = myContext 587 .getLocalizer() 588 .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidResourceType", paramTargetType); 589 String msg = myContext 590 .getLocalizer() 591 .getMessage( 592 SearchCoordinatorSvcImpl.class, 593 "invalidInclude", 594 UrlUtil.sanitizeUrlPart(name), 595 UrlUtil.sanitizeUrlPart(value), 596 resourceTypeMsg); // last param is pre-sanitized 597 throw new InvalidRequestException(Msg.code(2016) + msg); 598 } 599 600 if (!Constants.INCLUDE_STAR.equals(paramName) 601 && mySearchParamRegistry.getActiveSearchParam( 602 paramType, paramName, ISearchParamRegistry.SearchParamLookupContextEnum.SEARCH) 603 == null) { 604 List<String> validNames = mySearchParamRegistry 605 .getActiveSearchParams(paramType, ISearchParamRegistry.SearchParamLookupContextEnum.SEARCH) 606 .values() 607 .stream() 608 .filter(t -> t.getParamType() == RestSearchParameterTypeEnum.REFERENCE) 609 .map(t -> UrlUtil.sanitizeUrlPart(t.getName())) 610 .sorted() 611 .collect(Collectors.toList()); 612 String searchParamMessage = myContext 613 .getLocalizer() 614 .getMessage( 615 BaseStorageDao.class, 616 "invalidSearchParameter", 617 UrlUtil.sanitizeUrlPart(paramName), 618 UrlUtil.sanitizeUrlPart(paramType), 619 validNames); 620 String msg = myContext 621 .getLocalizer() 622 .getMessage( 623 SearchCoordinatorSvcImpl.class, 624 "invalidInclude", 625 UrlUtil.sanitizeUrlPart(name), 626 UrlUtil.sanitizeUrlPart(value), 627 searchParamMessage); // last param is pre-sanitized 628 throw new InvalidRequestException(Msg.code(2015) + msg); 629 } 630 } 631 } 632 633 @Override 634 public Optional<Integer> getSearchTotal( 635 String theUuid, @Nullable RequestDetails theRequestDetails, RequestPartitionId theRequestPartitionId) { 636 SearchTask task = myIdToSearchTask.get(theUuid); 637 if (task != null) { 638 return Optional.ofNullable(task.awaitInitialSync()); 639 } 640 641 /* 642 * In case there is no running search, if the total is listed as accurate we know one is coming 643 * so let's wait a bit for it to show up 644 */ 645 Optional<Search> search = myTxService 646 .withRequest(theRequestDetails) 647 .execute(() -> mySearchCacheSvc.fetchByUuid(theUuid, theRequestPartitionId)); 648 if (search.isPresent()) { 649 Optional<SearchParameterMap> searchParameterMap = search.get().getSearchParameterMap(); 650 if (searchParameterMap.isPresent() 651 && searchParameterMap.get().getSearchTotalMode() == SearchTotalModeEnum.ACCURATE) { 652 for (int i = 0; i < 10; i++) { 653 if (search.isPresent()) { 654 QueryParameterUtils.verifySearchHasntFailedOrThrowInternalErrorException(search.get()); 655 if (search.get().getTotalCount() != null) { 656 return Optional.of(search.get().getTotalCount()); 657 } 658 } 659 search = mySearchCacheSvc.fetchByUuid(theUuid, theRequestPartitionId); 660 } 661 } 662 } 663 664 return Optional.empty(); 665 } 666 667 @Nonnull 668 private PersistedJpaSearchFirstPageBundleProvider submitSearch( 669 IDao theCallingDao, 670 SearchParameterMap theParams, 671 String theResourceType, 672 RequestDetails theRequestDetails, 673 ISearchBuilder<JpaPid> theSb, 674 RequestPartitionId theRequestPartitionId, 675 Search theSearch) { 676 StopWatch w = new StopWatch(); 677 678 SearchTaskParameters stp = new SearchTaskParameters( 679 theSearch, 680 theCallingDao, 681 theParams, 682 theResourceType, 683 theRequestDetails, 684 theRequestPartitionId, 685 myOnRemoveSearchTask, 686 mySyncSize); 687 stp.setLoadingThrottleForUnitTests(myLoadingThrottleForUnitTests); 688 SearchTask task = (SearchTask) myBeanFactory.getBean(SearchConfig.SEARCH_TASK, stp); 689 myIdToSearchTask.put(theSearch.getUuid(), task); 690 task.call(); 691 692 PersistedJpaSearchFirstPageBundleProvider retVal = myPersistedJpaBundleProviderFactory.newInstanceFirstPage( 693 theRequestDetails, task, theSb, theRequestPartitionId); 694 695 ourLog.debug("Search initial phase completed in {}ms", w.getMillis()); 696 return retVal; 697 } 698 699 @Nullable 700 private PersistedJpaBundleProvider findCachedQuery( 701 SearchParameterMap theParams, 702 String theResourceType, 703 RequestDetails theRequestDetails, 704 String theQueryString, 705 RequestPartitionId theRequestPartitionId) { 706 // May be null 707 return myTxService 708 .withRequest(theRequestDetails) 709 .withRequestPartitionId(theRequestPartitionId) 710 .execute(() -> { 711 IInterceptorBroadcaster compositeBroadcaster = 712 CompositeInterceptorBroadcaster.newCompositeBroadcaster( 713 myInterceptorBroadcaster, theRequestDetails); 714 715 // Interceptor call: STORAGE_PRECHECK_FOR_CACHED_SEARCH 716 717 HookParams params = new HookParams() 718 .add(SearchParameterMap.class, theParams) 719 .add(RequestDetails.class, theRequestDetails) 720 .addIfMatchesType(ServletRequestDetails.class, theRequestDetails); 721 boolean canUseCache = 722 compositeBroadcaster.callHooks(Pointcut.STORAGE_PRECHECK_FOR_CACHED_SEARCH, params); 723 if (!canUseCache) { 724 return null; 725 } 726 727 // Check for a search matching the given hash 728 Search searchToUse = findSearchToUseOrNull(theQueryString, theResourceType, theRequestPartitionId); 729 if (searchToUse == null) { 730 return null; 731 } 732 733 ourLog.debug("Reusing search {} from cache", searchToUse.getUuid()); 734 // Interceptor call: JPA_PERFTRACE_SEARCH_REUSING_CACHED 735 params = new HookParams() 736 .add(SearchParameterMap.class, theParams) 737 .add(RequestDetails.class, theRequestDetails) 738 .addIfMatchesType(ServletRequestDetails.class, theRequestDetails); 739 compositeBroadcaster.callHooks(Pointcut.JPA_PERFTRACE_SEARCH_REUSING_CACHED, params); 740 741 return myPersistedJpaBundleProviderFactory.newInstance(theRequestDetails, searchToUse.getUuid()); 742 }); 743 } 744 745 @Nullable 746 private Search findSearchToUseOrNull( 747 String theQueryString, String theResourceType, RequestPartitionId theRequestPartitionId) { 748 // createdCutoff is in recent past 749 final Instant createdCutoff = 750 Instant.now().minus(myStorageSettings.getReuseCachedSearchResultsForMillis(), ChronoUnit.MILLIS); 751 752 Optional<Search> candidate = mySearchCacheSvc.findCandidatesForReuse( 753 theResourceType, theQueryString, createdCutoff, theRequestPartitionId); 754 return candidate.orElse(null); 755 } 756 757 @Nullable 758 private Integer getLoadSynchronousUpToOrNull(CacheControlDirective theCacheControlDirective) { 759 final Integer loadSynchronousUpTo; 760 if (theCacheControlDirective != null && theCacheControlDirective.isNoStore()) { 761 if (theCacheControlDirective.getMaxResults() != null) { 762 loadSynchronousUpTo = theCacheControlDirective.getMaxResults(); 763 if (loadSynchronousUpTo > myStorageSettings.getCacheControlNoStoreMaxResultsUpperLimit()) { 764 throw new InvalidRequestException(Msg.code(1165) + Constants.HEADER_CACHE_CONTROL + " header " 765 + Constants.CACHE_CONTROL_MAX_RESULTS + " value must not exceed " 766 + myStorageSettings.getCacheControlNoStoreMaxResultsUpperLimit()); 767 } 768 } else { 769 loadSynchronousUpTo = 100; 770 } 771 } else { 772 loadSynchronousUpTo = null; 773 } 774 return loadSynchronousUpTo; 775 } 776 777 /** 778 * Creates a {@link Pageable} using a start and end index 779 */ 780 @SuppressWarnings("WeakerAccess") 781 @Nullable 782 public static Pageable toPage(final int theFromIndex, int theToIndex) { 783 int pageSize = theToIndex - theFromIndex; 784 if (pageSize < 1) { 785 return null; 786 } 787 788 int pageIndex = theFromIndex / pageSize; 789 790 return new PageRequest(pageIndex, pageSize, Sort.unsorted()) { 791 private static final long serialVersionUID = 1L; 792 793 @Override 794 public long getOffset() { 795 return theFromIndex; 796 } 797 }; 798 } 799}