
001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L%family 019 */ 020package ca.uhn.fhir.jpa.search; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.i18n.Msg; 024import ca.uhn.fhir.interceptor.api.HookParams; 025import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; 026import ca.uhn.fhir.interceptor.api.Pointcut; 027import ca.uhn.fhir.interceptor.model.RequestPartitionId; 028import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; 029import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 030import ca.uhn.fhir.jpa.api.dao.IDao; 031import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 032import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc; 033import ca.uhn.fhir.jpa.config.SearchConfig; 034import ca.uhn.fhir.jpa.dao.BaseStorageDao; 035import ca.uhn.fhir.jpa.dao.ISearchBuilder; 036import ca.uhn.fhir.jpa.dao.NonPersistedSearch; 037import ca.uhn.fhir.jpa.dao.SearchBuilderFactory; 038import ca.uhn.fhir.jpa.dao.search.ResourceNotFoundInIndexException; 039import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; 040import ca.uhn.fhir.jpa.entity.Search; 041import ca.uhn.fhir.jpa.model.dao.JpaPid; 042import ca.uhn.fhir.jpa.model.search.SearchStatusEnum; 043import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc; 044import ca.uhn.fhir.jpa.search.builder.StorageInterceptorHooksFacade; 045import ca.uhn.fhir.jpa.search.builder.tasks.SearchContinuationTask; 046import ca.uhn.fhir.jpa.search.builder.tasks.SearchTask; 047import ca.uhn.fhir.jpa.search.builder.tasks.SearchTaskParameters; 048import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc; 049import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc; 050import ca.uhn.fhir.jpa.search.cache.SearchCacheStatusEnum; 051import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 052import ca.uhn.fhir.jpa.util.QueryParameterUtils; 053import ca.uhn.fhir.model.api.IQueryParameterType; 054import ca.uhn.fhir.model.api.Include; 055import ca.uhn.fhir.rest.api.CacheControlDirective; 056import ca.uhn.fhir.rest.api.Constants; 057import ca.uhn.fhir.rest.api.RestSearchParameterTypeEnum; 058import ca.uhn.fhir.rest.api.SearchTotalModeEnum; 059import ca.uhn.fhir.rest.api.server.IBundleProvider; 060import ca.uhn.fhir.rest.api.server.RequestDetails; 061import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 062import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; 063import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails; 064import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster; 065import ca.uhn.fhir.rest.server.util.ISearchParamRegistry; 066import ca.uhn.fhir.util.AsyncUtil; 067import ca.uhn.fhir.util.StopWatch; 068import ca.uhn.fhir.util.UrlUtil; 069import com.google.common.annotations.VisibleForTesting; 070import jakarta.annotation.Nonnull; 071import jakarta.annotation.Nullable; 072import org.apache.commons.lang3.time.DateUtils; 073import org.hl7.fhir.instance.model.api.IBaseResource; 074import org.springframework.beans.factory.BeanFactory; 075import org.springframework.data.domain.PageRequest; 076import org.springframework.data.domain.Pageable; 077import org.springframework.data.domain.Sort; 078import org.springframework.stereotype.Component; 079import org.springframework.transaction.support.TransactionSynchronizationManager; 080 081import java.time.Instant; 082import java.time.temporal.ChronoUnit; 083import java.util.Date; 084import java.util.HashSet; 085import java.util.List; 086import java.util.Optional; 087import java.util.Set; 088import java.util.UUID; 089import java.util.concurrent.Callable; 090import java.util.concurrent.ConcurrentHashMap; 091import java.util.concurrent.TimeUnit; 092import java.util.function.Consumer; 093import java.util.stream.Collectors; 094 095import static ca.uhn.fhir.jpa.util.QueryParameterUtils.DEFAULT_SYNC_SIZE; 096import static org.apache.commons.lang3.StringUtils.isBlank; 097import static org.apache.commons.lang3.StringUtils.isNotBlank; 098 099@Component("mySearchCoordinatorSvc") 100public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc<JpaPid> { 101 102 private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchCoordinatorSvcImpl.class); 103 public static final int SEARCH_EXPIRY_OFFSET_MINUTES = 10; 104 105 private final FhirContext myContext; 106 private final JpaStorageSettings myStorageSettings; 107 private final IInterceptorBroadcaster myInterceptorBroadcaster; 108 private final HapiTransactionService myTxService; 109 private final ISearchCacheSvc mySearchCacheSvc; 110 private final ISearchResultCacheSvc mySearchResultCacheSvc; 111 private final DaoRegistry myDaoRegistry; 112 private final SearchBuilderFactory<JpaPid> mySearchBuilderFactory; 113 private final ISynchronousSearchSvc mySynchronousSearchSvc; 114 private final PersistedJpaBundleProviderFactory myPersistedJpaBundleProviderFactory; 115 private final ISearchParamRegistry mySearchParamRegistry; 116 private final SearchStrategyFactory mySearchStrategyFactory; 117 private final ExceptionService myExceptionSvc; 118 private final BeanFactory myBeanFactory; 119 private final IRequestPartitionHelperSvc myRequestPartitionHelperSvc; 120 private ConcurrentHashMap<String, SearchTask> myIdToSearchTask = new ConcurrentHashMap<>(); 121 122 private final Consumer<String> myOnRemoveSearchTask = myIdToSearchTask::remove; 123 124 private final StorageInterceptorHooksFacade myStorageInterceptorHooks; 125 private Integer myLoadingThrottleForUnitTests = null; 126 private long myMaxMillisToWaitForRemoteResults = DateUtils.MILLIS_PER_MINUTE; 127 private boolean myNeverUseLocalSearchForUnitTests; 128 private int mySyncSize = DEFAULT_SYNC_SIZE; 129 130 /** 131 * Constructor 132 */ 133 public SearchCoordinatorSvcImpl( 134 FhirContext theContext, 135 JpaStorageSettings theStorageSettings, 136 IInterceptorBroadcaster theInterceptorBroadcaster, 137 HapiTransactionService theTxService, 138 ISearchCacheSvc theSearchCacheSvc, 139 ISearchResultCacheSvc theSearchResultCacheSvc, 140 DaoRegistry theDaoRegistry, 141 SearchBuilderFactory<JpaPid> theSearchBuilderFactory, 142 ISynchronousSearchSvc theSynchronousSearchSvc, 143 PersistedJpaBundleProviderFactory thePersistedJpaBundleProviderFactory, 144 ISearchParamRegistry theSearchParamRegistry, 145 SearchStrategyFactory theSearchStrategyFactory, 146 ExceptionService theExceptionSvc, 147 BeanFactory theBeanFactory, 148 IRequestPartitionHelperSvc theRequestPartitionHelperSvc) { 149 super(); 150 myContext = theContext; 151 myStorageSettings = theStorageSettings; 152 myInterceptorBroadcaster = theInterceptorBroadcaster; 153 myTxService = theTxService; 154 mySearchCacheSvc = theSearchCacheSvc; 155 mySearchResultCacheSvc = theSearchResultCacheSvc; 156 myDaoRegistry = theDaoRegistry; 157 mySearchBuilderFactory = theSearchBuilderFactory; 158 mySynchronousSearchSvc = theSynchronousSearchSvc; 159 myPersistedJpaBundleProviderFactory = thePersistedJpaBundleProviderFactory; 160 mySearchParamRegistry = theSearchParamRegistry; 161 mySearchStrategyFactory = theSearchStrategyFactory; 162 myExceptionSvc = theExceptionSvc; 163 myBeanFactory = theBeanFactory; 164 myRequestPartitionHelperSvc = theRequestPartitionHelperSvc; 165 166 myStorageInterceptorHooks = new StorageInterceptorHooksFacade(myInterceptorBroadcaster); 167 } 168 169 @VisibleForTesting 170 Set<String> getActiveSearchIds() { 171 return myIdToSearchTask.keySet(); 172 } 173 174 @VisibleForTesting 175 public void setIdToSearchTaskMapForUnitTests(ConcurrentHashMap<String, SearchTask> theIdToSearchTaskMap) { 176 myIdToSearchTask = theIdToSearchTaskMap; 177 } 178 179 @VisibleForTesting 180 public void setLoadingThrottleForUnitTests(Integer theLoadingThrottleForUnitTests) { 181 myLoadingThrottleForUnitTests = theLoadingThrottleForUnitTests; 182 } 183 184 @VisibleForTesting 185 public void setNeverUseLocalSearchForUnitTests(boolean theNeverUseLocalSearchForUnitTests) { 186 myNeverUseLocalSearchForUnitTests = theNeverUseLocalSearchForUnitTests; 187 } 188 189 @VisibleForTesting 190 public void setSyncSizeForUnitTests(int theSyncSize) { 191 mySyncSize = theSyncSize; 192 } 193 194 @Override 195 public void cancelAllActiveSearches() { 196 for (SearchTask next : myIdToSearchTask.values()) { 197 ourLog.info( 198 "Requesting immediate abort of search: {}", next.getSearch().getUuid()); 199 next.requestImmediateAbort(); 200 AsyncUtil.awaitLatchAndIgnoreInterrupt(next.getCompletionLatch(), 30, TimeUnit.SECONDS); 201 } 202 } 203 204 @SuppressWarnings("SameParameterValue") 205 @VisibleForTesting 206 public void setMaxMillisToWaitForRemoteResultsForUnitTest(long theMaxMillisToWaitForRemoteResults) { 207 myMaxMillisToWaitForRemoteResults = theMaxMillisToWaitForRemoteResults; 208 } 209 210 /** 211 * This method is called by the HTTP client processing thread in order to 212 * fetch resources. 213 * <p> 214 * This method must not be called from inside a transaction. The rationale is that 215 * the {@link Search} entity is treated as a piece of shared state across client threads 216 * accessing the same thread, so we need to be able to update that table in a transaction 217 * and commit it right away in order for that to work. Examples of needing to do this 218 * include if two different clients request the same search and are both paging at the 219 * same time, but also includes clients that are hacking the paging links to 220 * fetch multiple pages of a search result in parallel. In both cases we need to only 221 * let one of them actually activate the search, or we will have conflicts. The other thread 222 * just needs to wait until the first one actually fetches more results. 223 */ 224 @Override 225 public List<JpaPid> getResources( 226 final String theUuid, 227 int theFrom, 228 int theTo, 229 @Nullable RequestDetails theRequestDetails, 230 RequestPartitionId theRequestPartitionId) { 231 assert !TransactionSynchronizationManager.isActualTransactionActive(); 232 233 // If we're actively searching right now, don't try to do anything until at least one batch has been 234 // persisted in the DB 235 SearchTask searchTask = myIdToSearchTask.get(theUuid); 236 if (searchTask != null) { 237 searchTask.awaitInitialSync(); 238 } 239 240 ourLog.trace("About to start looking for resources {}-{}", theFrom, theTo); 241 242 Search search; 243 StopWatch sw = new StopWatch(); 244 while (true) { 245 246 if (myNeverUseLocalSearchForUnitTests == false) { 247 if (searchTask != null) { 248 ourLog.trace("Local search found"); 249 List<JpaPid> resourcePids = searchTask.getResourcePids(theFrom, theTo); 250 ourLog.trace( 251 "Local search returned {} pids, wanted {}-{} - Search: {}", 252 resourcePids.size(), 253 theFrom, 254 theTo, 255 searchTask.getSearch()); 256 257 /* 258 * Generally, if a search task is open, the fastest possible thing is to just return its results. This 259 * will work most of the time, but can fail if the task hit a search threshold and the client is requesting 260 * results beyond that threashold. In that case, we'll keep going below, since that will trigger another 261 * task. 262 */ 263 if ((searchTask.getSearch().getNumFound() 264 - searchTask.getSearch().getNumBlocked()) 265 >= theTo 266 || resourcePids.size() == (theTo - theFrom)) { 267 return resourcePids; 268 } 269 } 270 } 271 272 Callable<Search> searchCallback = () -> mySearchCacheSvc 273 .fetchByUuid(theUuid, theRequestPartitionId) 274 .orElseThrow(() -> myExceptionSvc.newUnknownSearchException(theUuid)); 275 search = myTxService 276 .withRequest(theRequestDetails) 277 .withRequestPartitionId(theRequestPartitionId) 278 .execute(searchCallback); 279 QueryParameterUtils.verifySearchHasntFailedOrThrowInternalErrorException(search); 280 281 if (search.getStatus() == SearchStatusEnum.FINISHED) { 282 ourLog.trace("Search entity marked as finished with {} results", search.getNumFound()); 283 break; 284 } 285 if ((search.getNumFound() - search.getNumBlocked()) >= theTo) { 286 ourLog.trace("Search entity has {} results so far", search.getNumFound()); 287 break; 288 } 289 290 if (sw.getMillis() > myMaxMillisToWaitForRemoteResults) { 291 ourLog.error( 292 "Search {} of type {} for {}{} timed out after {}ms with status {}.", 293 search.getId(), 294 search.getSearchType(), 295 search.getResourceType(), 296 search.getSearchQueryString(), 297 sw.getMillis(), 298 search.getStatus()); 299 throw new InternalErrorException(Msg.code(1163) + "Request timed out after " + sw.getMillis() + "ms"); 300 } 301 302 // If the search was saved in "pass complete mode" it's probably time to 303 // start a new pass 304 if (search.getStatus() == SearchStatusEnum.PASSCMPLET) { 305 ourLog.trace("Going to try to start next search"); 306 Optional<Search> newSearch = 307 mySearchCacheSvc.tryToMarkSearchAsInProgress(search, theRequestPartitionId); 308 if (newSearch.isPresent()) { 309 ourLog.trace("Launching new search"); 310 search = newSearch.get(); 311 String resourceType = search.getResourceType(); 312 SearchParameterMap params = search.getSearchParameterMap() 313 .orElseThrow(() -> new IllegalStateException("No map in PASSCOMPLET search")); 314 IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(resourceType); 315 316 SearchTaskParameters parameters = new SearchTaskParameters( 317 search, 318 resourceDao, 319 params, 320 resourceType, 321 theRequestDetails, 322 theRequestPartitionId, 323 myOnRemoveSearchTask, 324 mySyncSize); 325 parameters.setLoadingThrottleForUnitTests(myLoadingThrottleForUnitTests); 326 SearchContinuationTask task = 327 (SearchContinuationTask) myBeanFactory.getBean(SearchConfig.CONTINUE_TASK, parameters); 328 myIdToSearchTask.put(search.getUuid(), task); 329 task.call(); 330 } 331 } 332 333 if (!search.getStatus().isDone()) { 334 AsyncUtil.sleep(500); 335 } 336 } 337 338 ourLog.trace("Finished looping"); 339 340 List<JpaPid> pids = fetchResultPids(theUuid, theFrom, theTo, theRequestDetails, search, theRequestPartitionId); 341 342 updateSearchExpiryOrNull(search, theRequestPartitionId); 343 344 ourLog.trace("Fetched {} results", pids.size()); 345 346 return pids; 347 } 348 349 @Nonnull 350 private List<JpaPid> fetchResultPids( 351 String theUuid, 352 int theFrom, 353 int theTo, 354 @Nullable RequestDetails theRequestDetails, 355 Search theSearch, 356 RequestPartitionId theRequestPartitionId) { 357 List<JpaPid> pids = mySearchResultCacheSvc.fetchResultPids( 358 theSearch, theFrom, theTo, theRequestDetails, theRequestPartitionId); 359 if (pids == null) { 360 throw myExceptionSvc.newUnknownSearchException(theUuid); 361 } 362 return pids; 363 } 364 365 private void updateSearchExpiryOrNull(Search theSearch, RequestPartitionId theRequestPartitionId) { 366 // The created time may be null in some unit tests 367 if (theSearch.getCreated() != null) { 368 // start tracking last-access-time for this search when it is more than halfway to expire by created time 369 // we do this to avoid generating excessive write traffic on busy cached searches. 370 long expireAfterMillis = myStorageSettings.getExpireSearchResultsAfterMillis(); 371 long createdCutoff = theSearch.getCreated().getTime() + expireAfterMillis; 372 if (createdCutoff - System.currentTimeMillis() < expireAfterMillis / 2) { 373 theSearch.setExpiryOrNull(DateUtils.addMinutes(new Date(), SEARCH_EXPIRY_OFFSET_MINUTES)); 374 // TODO: A nice future enhancement might be to make this flush be asynchronous 375 mySearchCacheSvc.save(theSearch, theRequestPartitionId); 376 } 377 } 378 } 379 380 @Override 381 public IBundleProvider registerSearch( 382 final IFhirResourceDao<?> theCallingDao, 383 final SearchParameterMap theParams, 384 String theResourceType, 385 CacheControlDirective theCacheControlDirective, 386 RequestDetails theRequestDetails, 387 @Nullable RequestPartitionId theRequestPartitionId) { 388 final String searchUuid = UUID.randomUUID().toString(); 389 390 final String queryString = theParams.toNormalizedQueryString(myContext); 391 ourLog.debug("Registering new search {}", searchUuid); 392 393 RequestPartitionId requestPartitionId = theRequestPartitionId; 394 395 // If an explicit request partition wasn't provided, calculate the request 396 // partition after invoking STORAGE_PRESEARCH_REGISTERED just in case any interceptors 397 // made changes which could affect the calculated partition 398 if (requestPartitionId == null) { 399 400 // Invoke any STORAGE_PRESEARCH_PARTITION_SELECTED interceptor hooks 401 NonPersistedSearch search = new NonPersistedSearch(theResourceType); 402 search.setUuid(searchUuid); 403 myStorageInterceptorHooks.callStoragePresearchPartitionSelected(theRequestDetails, theParams, search); 404 405 requestPartitionId = myRequestPartitionHelperSvc.determineReadPartitionForRequestForSearchType( 406 theRequestDetails, theResourceType, theParams); 407 } 408 409 Search search = new Search(); 410 QueryParameterUtils.populateSearchEntity( 411 theParams, theResourceType, searchUuid, queryString, search, requestPartitionId); 412 413 // Invoke any STORAGE_PRESEARCH_REGISTERED interceptor hooks 414 myStorageInterceptorHooks.callStoragePresearchRegistered( 415 theRequestDetails, theParams, search, requestPartitionId); 416 417 validateSearch(theParams); 418 419 Class<? extends IBaseResource> resourceTypeClass = 420 myContext.getResourceDefinition(theResourceType).getImplementingClass(); 421 final ISearchBuilder<JpaPid> sb = mySearchBuilderFactory.newSearchBuilder(theResourceType, resourceTypeClass); 422 sb.setFetchSize(mySyncSize); 423 sb.setRequireTotal(theParams.getCount() != null); 424 425 final Integer loadSynchronousUpTo = getLoadSynchronousUpToOrNull(theCacheControlDirective); 426 boolean isOffsetQuery = theParams.isOffsetQuery(); 427 428 // todo someday - not today. 429 // SearchStrategyFactory.ISearchStrategy searchStrategy = mySearchStrategyFactory.pickStrategy(theResourceType, 430 // theParams, theRequestDetails); 431 // return searchStrategy.get(); 432 433 if (theParams.isLoadSynchronous() || loadSynchronousUpTo != null || isOffsetQuery) { 434 if (mySearchStrategyFactory.isSupportsHSearchDirect(theResourceType, theParams, theRequestDetails)) { 435 ourLog.info("Search {} is using direct load strategy", searchUuid); 436 SearchStrategyFactory.ISearchStrategy direct = mySearchStrategyFactory.makeDirectStrategy( 437 searchUuid, theResourceType, theParams, theRequestDetails); 438 439 try { 440 return direct.get(); 441 } catch (ResourceNotFoundInIndexException theE) { 442 // some resources were not found in index, so we will inform this and resort to JPA search 443 ourLog.warn( 444 "Some resources were not found in index. Make sure all resources were indexed. Resorting to database search."); 445 } 446 } 447 448 // we need a max to fetch for synchronous searches; 449 // otherwise we'll explode memory. 450 Integer maxToLoad = getSynchronousMaxResultsToFetch(theParams, loadSynchronousUpTo); 451 ourLog.debug("Setting a max fetch value of {} for synchronous search", maxToLoad); 452 sb.setMaxResultsToFetch(maxToLoad); 453 454 ourLog.debug("Search {} is loading in synchronous mode", searchUuid); 455 return mySynchronousSearchSvc.executeQuery( 456 theParams, theRequestDetails, searchUuid, sb, loadSynchronousUpTo, requestPartitionId); 457 } 458 459 /* 460 * See if there are any cached searches whose results we can return 461 * instead 462 */ 463 SearchCacheStatusEnum cacheStatus = SearchCacheStatusEnum.MISS; 464 if (theCacheControlDirective != null && theCacheControlDirective.isNoCache()) { 465 cacheStatus = SearchCacheStatusEnum.NOT_TRIED; 466 } 467 468 if (cacheStatus != SearchCacheStatusEnum.NOT_TRIED) { 469 if (theParams.getEverythingMode() == null) { 470 if (myStorageSettings.getReuseCachedSearchResultsForMillis() != null) { 471 PersistedJpaBundleProvider foundSearchProvider = findCachedQuery( 472 theParams, theResourceType, theRequestDetails, queryString, requestPartitionId); 473 if (foundSearchProvider != null) { 474 foundSearchProvider.setCacheStatus(SearchCacheStatusEnum.HIT); 475 return foundSearchProvider; 476 } 477 } 478 } 479 } 480 481 PersistedJpaSearchFirstPageBundleProvider retVal = submitSearch( 482 theCallingDao, theParams, theResourceType, theRequestDetails, sb, requestPartitionId, search); 483 retVal.setCacheStatus(cacheStatus); 484 return retVal; 485 } 486 487 /** 488 * The max results to return if this is a synchronous search. 489 * <p> 490 * We'll look in this order: 491 * * load synchronous up to (on params) 492 * * param count (+ offset) 493 * * StorageSettings fetch size default max 494 * * 495 */ 496 private Integer getSynchronousMaxResultsToFetch(SearchParameterMap theParams, Integer theLoadSynchronousUpTo) { 497 if (theLoadSynchronousUpTo != null) { 498 return theLoadSynchronousUpTo; 499 } 500 501 if (theParams.getCount() != null) { 502 int valToReturn = theParams.getCount() + 1; 503 if (theParams.getOffset() != null) { 504 valToReturn += theParams.getOffset(); 505 } 506 return valToReturn; 507 } 508 509 if (myStorageSettings.getFetchSizeDefaultMaximum() != null) { 510 return myStorageSettings.getFetchSizeDefaultMaximum(); 511 } 512 513 return myStorageSettings.getInternalSynchronousSearchSize(); 514 } 515 516 private void validateSearch(SearchParameterMap theParams) { 517 assert checkNoDuplicateParameters(theParams) 518 : "Duplicate parameters found in query: " + theParams.toNormalizedQueryString(myContext); 519 520 validateIncludes(theParams.getIncludes(), Constants.PARAM_INCLUDE); 521 validateIncludes(theParams.getRevIncludes(), Constants.PARAM_REVINCLUDE); 522 } 523 524 /** 525 * This method detects whether we have any duplicate lists of parameters and returns 526 * {@literal true} if none are found. For example, the following query would result 527 * in this method returning {@literal false}: 528 * <code>Patient?name=bart,homer&name=bart,homer</code> 529 * <p> 530 * This is not an optimized test, and it's not technically even prohibited to have 531 * duplicates like these in queries so this method should only be called as a 532 * part of an {@literal assert} statement to catch errors in tests. 533 */ 534 private boolean checkNoDuplicateParameters(SearchParameterMap theParams) { 535 HashSet<List<IQueryParameterType>> lists = new HashSet<>(); 536 for (List<List<IQueryParameterType>> andList : theParams.values()) { 537 538 lists.clear(); 539 for (int i = 0; i < andList.size(); i++) { 540 List<IQueryParameterType> orListI = andList.get(i); 541 if (!orListI.isEmpty() && !lists.add(orListI)) { 542 return false; 543 } 544 } 545 } 546 return true; 547 } 548 549 private void validateIncludes(Set<Include> includes, String name) { 550 for (Include next : includes) { 551 String value = next.getValue(); 552 if (value.equals(Constants.INCLUDE_STAR) || isBlank(value)) { 553 continue; 554 } 555 556 String paramType = next.getParamType(); 557 String paramName = next.getParamName(); 558 String paramTargetType = next.getParamTargetType(); 559 560 if (isBlank(paramType) || isBlank(paramName)) { 561 String msg = myContext 562 .getLocalizer() 563 .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidInclude", name, value, ""); 564 throw new InvalidRequestException(Msg.code(2018) + msg); 565 } 566 567 if (!myDaoRegistry.isResourceTypeSupported(paramType)) { 568 String resourceTypeMsg = myContext 569 .getLocalizer() 570 .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidResourceType", paramType); 571 String msg = myContext 572 .getLocalizer() 573 .getMessage( 574 SearchCoordinatorSvcImpl.class, 575 "invalidInclude", 576 UrlUtil.sanitizeUrlPart(name), 577 UrlUtil.sanitizeUrlPart(value), 578 resourceTypeMsg); // last param is pre-sanitized 579 throw new InvalidRequestException(Msg.code(2017) + msg); 580 } 581 582 if (isNotBlank(paramTargetType) && !myDaoRegistry.isResourceTypeSupported(paramTargetType)) { 583 String resourceTypeMsg = myContext 584 .getLocalizer() 585 .getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidResourceType", paramTargetType); 586 String msg = myContext 587 .getLocalizer() 588 .getMessage( 589 SearchCoordinatorSvcImpl.class, 590 "invalidInclude", 591 UrlUtil.sanitizeUrlPart(name), 592 UrlUtil.sanitizeUrlPart(value), 593 resourceTypeMsg); // last param is pre-sanitized 594 throw new InvalidRequestException(Msg.code(2016) + msg); 595 } 596 597 if (!Constants.INCLUDE_STAR.equals(paramName) 598 && mySearchParamRegistry.getActiveSearchParam( 599 paramType, paramName, ISearchParamRegistry.SearchParamLookupContextEnum.SEARCH) 600 == null) { 601 List<String> validNames = mySearchParamRegistry 602 .getActiveSearchParams(paramType, ISearchParamRegistry.SearchParamLookupContextEnum.SEARCH) 603 .values() 604 .stream() 605 .filter(t -> t.getParamType() == RestSearchParameterTypeEnum.REFERENCE) 606 .map(t -> UrlUtil.sanitizeUrlPart(t.getName())) 607 .sorted() 608 .collect(Collectors.toList()); 609 String searchParamMessage = myContext 610 .getLocalizer() 611 .getMessage( 612 BaseStorageDao.class, 613 "invalidSearchParameter", 614 UrlUtil.sanitizeUrlPart(paramName), 615 UrlUtil.sanitizeUrlPart(paramType), 616 validNames); 617 String msg = myContext 618 .getLocalizer() 619 .getMessage( 620 SearchCoordinatorSvcImpl.class, 621 "invalidInclude", 622 UrlUtil.sanitizeUrlPart(name), 623 UrlUtil.sanitizeUrlPart(value), 624 searchParamMessage); // last param is pre-sanitized 625 throw new InvalidRequestException(Msg.code(2015) + msg); 626 } 627 } 628 } 629 630 @Override 631 public Optional<Integer> getSearchTotal( 632 String theUuid, @Nullable RequestDetails theRequestDetails, RequestPartitionId theRequestPartitionId) { 633 SearchTask task = myIdToSearchTask.get(theUuid); 634 if (task != null) { 635 return Optional.ofNullable(task.awaitInitialSync()); 636 } 637 638 /* 639 * In case there is no running search, if the total is listed as accurate we know one is coming 640 * so let's wait a bit for it to show up 641 */ 642 Optional<Search> search = myTxService 643 .withRequest(theRequestDetails) 644 .execute(() -> mySearchCacheSvc.fetchByUuid(theUuid, theRequestPartitionId)); 645 if (search.isPresent()) { 646 Optional<SearchParameterMap> searchParameterMap = search.get().getSearchParameterMap(); 647 if (searchParameterMap.isPresent() 648 && searchParameterMap.get().getSearchTotalMode() == SearchTotalModeEnum.ACCURATE) { 649 for (int i = 0; i < 10; i++) { 650 if (search.isPresent()) { 651 QueryParameterUtils.verifySearchHasntFailedOrThrowInternalErrorException(search.get()); 652 if (search.get().getTotalCount() != null) { 653 return Optional.of(search.get().getTotalCount()); 654 } 655 } 656 search = mySearchCacheSvc.fetchByUuid(theUuid, theRequestPartitionId); 657 } 658 } 659 } 660 661 return Optional.empty(); 662 } 663 664 @Nonnull 665 private PersistedJpaSearchFirstPageBundleProvider submitSearch( 666 IDao theCallingDao, 667 SearchParameterMap theParams, 668 String theResourceType, 669 RequestDetails theRequestDetails, 670 ISearchBuilder<JpaPid> theSb, 671 RequestPartitionId theRequestPartitionId, 672 Search theSearch) { 673 StopWatch w = new StopWatch(); 674 675 SearchTaskParameters stp = new SearchTaskParameters( 676 theSearch, 677 theCallingDao, 678 theParams, 679 theResourceType, 680 theRequestDetails, 681 theRequestPartitionId, 682 myOnRemoveSearchTask, 683 mySyncSize); 684 stp.setLoadingThrottleForUnitTests(myLoadingThrottleForUnitTests); 685 SearchTask task = (SearchTask) myBeanFactory.getBean(SearchConfig.SEARCH_TASK, stp); 686 myIdToSearchTask.put(theSearch.getUuid(), task); 687 task.call(); 688 689 PersistedJpaSearchFirstPageBundleProvider retVal = myPersistedJpaBundleProviderFactory.newInstanceFirstPage( 690 theRequestDetails, task, theSb, theRequestPartitionId); 691 692 ourLog.debug("Search initial phase completed in {}ms", w.getMillis()); 693 return retVal; 694 } 695 696 @Nullable 697 private PersistedJpaBundleProvider findCachedQuery( 698 SearchParameterMap theParams, 699 String theResourceType, 700 RequestDetails theRequestDetails, 701 String theQueryString, 702 RequestPartitionId theRequestPartitionId) { 703 // May be null 704 return myTxService 705 .withRequest(theRequestDetails) 706 .withRequestPartitionId(theRequestPartitionId) 707 .execute(() -> { 708 IInterceptorBroadcaster compositeBroadcaster = 709 CompositeInterceptorBroadcaster.newCompositeBroadcaster( 710 myInterceptorBroadcaster, theRequestDetails); 711 712 // Interceptor call: STORAGE_PRECHECK_FOR_CACHED_SEARCH 713 714 HookParams params = new HookParams() 715 .add(SearchParameterMap.class, theParams) 716 .add(RequestDetails.class, theRequestDetails) 717 .addIfMatchesType(ServletRequestDetails.class, theRequestDetails); 718 boolean canUseCache = 719 compositeBroadcaster.callHooks(Pointcut.STORAGE_PRECHECK_FOR_CACHED_SEARCH, params); 720 if (!canUseCache) { 721 return null; 722 } 723 724 // Check for a search matching the given hash 725 Search searchToUse = findSearchToUseOrNull(theQueryString, theResourceType, theRequestPartitionId); 726 if (searchToUse == null) { 727 return null; 728 } 729 730 ourLog.debug("Reusing search {} from cache", searchToUse.getUuid()); 731 // Interceptor call: JPA_PERFTRACE_SEARCH_REUSING_CACHED 732 params = new HookParams() 733 .add(SearchParameterMap.class, theParams) 734 .add(RequestDetails.class, theRequestDetails) 735 .addIfMatchesType(ServletRequestDetails.class, theRequestDetails); 736 compositeBroadcaster.callHooks(Pointcut.JPA_PERFTRACE_SEARCH_REUSING_CACHED, params); 737 738 return myPersistedJpaBundleProviderFactory.newInstance(theRequestDetails, searchToUse.getUuid()); 739 }); 740 } 741 742 @Nullable 743 private Search findSearchToUseOrNull( 744 String theQueryString, String theResourceType, RequestPartitionId theRequestPartitionId) { 745 // createdCutoff is in recent past 746 final Instant createdCutoff = 747 Instant.now().minus(myStorageSettings.getReuseCachedSearchResultsForMillis(), ChronoUnit.MILLIS); 748 749 Optional<Search> candidate = mySearchCacheSvc.findCandidatesForReuse( 750 theResourceType, theQueryString, createdCutoff, theRequestPartitionId); 751 return candidate.orElse(null); 752 } 753 754 @Nullable 755 private Integer getLoadSynchronousUpToOrNull(CacheControlDirective theCacheControlDirective) { 756 final Integer loadSynchronousUpTo; 757 if (theCacheControlDirective != null && theCacheControlDirective.isNoStore()) { 758 if (theCacheControlDirective.getMaxResults() != null) { 759 loadSynchronousUpTo = theCacheControlDirective.getMaxResults(); 760 if (loadSynchronousUpTo > myStorageSettings.getCacheControlNoStoreMaxResultsUpperLimit()) { 761 throw new InvalidRequestException(Msg.code(1165) + Constants.HEADER_CACHE_CONTROL + " header " 762 + Constants.CACHE_CONTROL_MAX_RESULTS + " value must not exceed " 763 + myStorageSettings.getCacheControlNoStoreMaxResultsUpperLimit()); 764 } 765 } else { 766 loadSynchronousUpTo = 100; 767 } 768 } else { 769 loadSynchronousUpTo = null; 770 } 771 return loadSynchronousUpTo; 772 } 773 774 /** 775 * Creates a {@link Pageable} using a start and end index 776 */ 777 @SuppressWarnings("WeakerAccess") 778 @Nullable 779 public static Pageable toPage(final int theFromIndex, int theToIndex) { 780 int pageSize = theToIndex - theFromIndex; 781 if (pageSize < 1) { 782 return null; 783 } 784 785 int pageIndex = theFromIndex / pageSize; 786 787 return new PageRequest(pageIndex, pageSize, Sort.unsorted()) { 788 private static final long serialVersionUID = 1L; 789 790 @Override 791 public long getOffset() { 792 return theFromIndex; 793 } 794 }; 795 } 796}