
001package ca.uhn.fhir.jpa.search; 002 003/*- 004 * #%L 005 * HAPI FHIR JPA Server 006 * %% 007 * Copyright (C) 2014 - 2022 Smile CDR, Inc. 008 * %% 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 * #L%family 021 */ 022 023import ca.uhn.fhir.context.FhirContext; 024import ca.uhn.fhir.i18n.Msg; 025import ca.uhn.fhir.interceptor.api.HookParams; 026import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; 027import ca.uhn.fhir.interceptor.api.Pointcut; 028import ca.uhn.fhir.interceptor.model.RequestPartitionId; 029import ca.uhn.fhir.jpa.api.config.DaoConfig; 030import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 031import ca.uhn.fhir.jpa.api.dao.IDao; 032import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 033import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc; 034import ca.uhn.fhir.jpa.dao.BaseStorageDao; 035import ca.uhn.fhir.jpa.dao.IResultIterator; 036import ca.uhn.fhir.jpa.dao.ISearchBuilder; 037import ca.uhn.fhir.jpa.dao.SearchBuilderFactory; 038import ca.uhn.fhir.jpa.entity.Search; 039import ca.uhn.fhir.jpa.entity.SearchInclude; 040import ca.uhn.fhir.jpa.entity.SearchTypeEnum; 041import ca.uhn.fhir.jpa.interceptor.JpaPreResourceAccessDetails; 042import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails; 043import ca.uhn.fhir.jpa.model.search.SearchStatusEnum; 044import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc; 045import ca.uhn.fhir.jpa.search.builder.predicate.ResourceLinkPredicateBuilder; 046import ca.uhn.fhir.jpa.search.cache.ISearchCacheSvc; 047import ca.uhn.fhir.jpa.search.cache.ISearchResultCacheSvc; 048import ca.uhn.fhir.jpa.search.cache.SearchCacheStatusEnum; 049import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 050import ca.uhn.fhir.model.api.Include; 051import ca.uhn.fhir.rest.api.CacheControlDirective; 052import ca.uhn.fhir.rest.api.Constants; 053import ca.uhn.fhir.rest.api.RestSearchParameterTypeEnum; 054import ca.uhn.fhir.rest.api.SearchTotalModeEnum; 055import ca.uhn.fhir.rest.api.server.IBundleProvider; 056import ca.uhn.fhir.rest.api.server.IPreResourceAccessDetails; 057import ca.uhn.fhir.rest.api.server.RequestDetails; 058import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId; 059import ca.uhn.fhir.rest.server.IPagingProvider; 060import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException; 061import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 062import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; 063import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException; 064import ca.uhn.fhir.rest.server.method.PageMethodBinding; 065import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails; 066import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster; 067import ca.uhn.fhir.rest.server.util.ICachedSearchDetails; 068import ca.uhn.fhir.rest.server.util.ISearchParamRegistry; 069import ca.uhn.fhir.util.AsyncUtil; 070import ca.uhn.fhir.util.StopWatch; 071import ca.uhn.fhir.util.UrlUtil; 072import co.elastic.apm.api.ElasticApm; 073import co.elastic.apm.api.Span; 074import co.elastic.apm.api.Transaction; 075import com.google.common.annotations.VisibleForTesting; 076import org.apache.commons.lang3.Validate; 077import org.apache.commons.lang3.exception.ExceptionUtils; 078import org.apache.commons.lang3.time.DateUtils; 079import org.hl7.fhir.instance.model.api.IBaseResource; 080import org.springframework.beans.factory.annotation.Autowired; 081import org.springframework.data.domain.AbstractPageRequest; 082import org.springframework.data.domain.Pageable; 083import org.springframework.data.domain.Sort; 084import org.springframework.orm.jpa.JpaDialect; 085import org.springframework.orm.jpa.JpaTransactionManager; 086import org.springframework.orm.jpa.vendor.HibernateJpaDialect; 087import org.springframework.stereotype.Component; 088import org.springframework.transaction.PlatformTransactionManager; 089import org.springframework.transaction.TransactionDefinition; 090import org.springframework.transaction.TransactionStatus; 091import org.springframework.transaction.annotation.Propagation; 092import org.springframework.transaction.annotation.Transactional; 093import org.springframework.transaction.support.TransactionCallbackWithoutResult; 094import org.springframework.transaction.support.TransactionTemplate; 095 096import javax.annotation.Nonnull; 097import javax.annotation.Nullable; 098import javax.annotation.PostConstruct; 099import java.io.IOException; 100import java.time.Instant; 101import java.time.temporal.ChronoUnit; 102import java.util.ArrayList; 103import java.util.Date; 104import java.util.Iterator; 105import java.util.List; 106import java.util.Optional; 107import java.util.Set; 108import java.util.UUID; 109import java.util.concurrent.Callable; 110import java.util.concurrent.ConcurrentHashMap; 111import java.util.concurrent.CountDownLatch; 112import java.util.concurrent.TimeUnit; 113import java.util.stream.Collectors; 114 115import static ca.uhn.fhir.jpa.util.SearchParameterMapCalculator.isWantCount; 116import static ca.uhn.fhir.jpa.util.SearchParameterMapCalculator.isWantOnlyCount; 117import static java.util.Objects.nonNull; 118import static org.apache.commons.lang3.ObjectUtils.defaultIfNull; 119import static org.apache.commons.lang3.StringUtils.isBlank; 120import static org.apache.commons.lang3.StringUtils.isNotBlank; 121 122@Component("mySearchCoordinatorSvc") 123public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { 124 public static final int DEFAULT_SYNC_SIZE = 250; 125 public static final String UNIT_TEST_CAPTURE_STACK = "unit_test_capture_stack"; 126 127 private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchCoordinatorSvcImpl.class); 128 private final ConcurrentHashMap<String, SearchTask> myIdToSearchTask = new ConcurrentHashMap<>(); 129 @Autowired 130 private FhirContext myContext; 131 @Autowired 132 private DaoConfig myDaoConfig; 133 134 private Integer myLoadingThrottleForUnitTests = null; 135 private long myMaxMillisToWaitForRemoteResults = DateUtils.MILLIS_PER_MINUTE; 136 private boolean myNeverUseLocalSearchForUnitTests; 137 @Autowired 138 private IInterceptorBroadcaster myInterceptorBroadcaster; 139 @Autowired 140 private PlatformTransactionManager myManagedTxManager; 141 @Autowired 142 private ISearchCacheSvc mySearchCacheSvc; 143 @Autowired 144 private ISearchResultCacheSvc mySearchResultCacheSvc; 145 @Autowired 146 private DaoRegistry myDaoRegistry; 147 @Autowired 148 private IPagingProvider myPagingProvider; 149 @Autowired 150 private SearchBuilderFactory mySearchBuilderFactory; 151 152 @Autowired 153 private ISynchronousSearchSvc mySynchronousSearchSvc; 154 155 private int mySyncSize = DEFAULT_SYNC_SIZE; 156 /** 157 * Set in {@link #start()} 158 */ 159 private boolean myCustomIsolationSupported; 160 @Autowired 161 private PersistedJpaBundleProviderFactory myPersistedJpaBundleProviderFactory; 162 @Autowired 163 private IRequestPartitionHelperSvc myRequestPartitionHelperService; 164 @Autowired 165 private ISearchParamRegistry mySearchParamRegistry; 166 @Autowired 167 private SearchStrategyFactory mySearchStrategyFactory; 168 169 /** 170 * Constructor 171 */ 172 @Autowired 173 public SearchCoordinatorSvcImpl() { 174 super(); 175 } 176 177 @VisibleForTesting 178 Set<String> getActiveSearchIds() { 179 return myIdToSearchTask.keySet(); 180 } 181 182 @VisibleForTesting 183 public void setSearchCacheServicesForUnitTest(ISearchCacheSvc theSearchCacheSvc, ISearchResultCacheSvc theSearchResultCacheSvc) { 184 mySearchCacheSvc = theSearchCacheSvc; 185 mySearchResultCacheSvc = theSearchResultCacheSvc; 186 } 187 188 @PostConstruct 189 public void start() { 190 if (myManagedTxManager instanceof JpaTransactionManager) { 191 JpaDialect jpaDialect = ((JpaTransactionManager) myManagedTxManager).getJpaDialect(); 192 if (jpaDialect instanceof HibernateJpaDialect) { 193 myCustomIsolationSupported = true; 194 } 195 } 196 if (myCustomIsolationSupported == false) { 197 ourLog.warn("JPA dialect does not support transaction isolation! This can have an impact on search performance."); 198 } 199 } 200 201 @Override 202 public void cancelAllActiveSearches() { 203 for (SearchTask next : myIdToSearchTask.values()) { 204 ourLog.info("Requesting immediate abort of search: {}", next.getSearch().getUuid()); 205 next.requestImmediateAbort(); 206 AsyncUtil.awaitLatchAndIgnoreInterrupt(next.getCompletionLatch(), 30, TimeUnit.SECONDS); 207 } 208 } 209 210 @SuppressWarnings("SameParameterValue") 211 @VisibleForTesting 212 void setMaxMillisToWaitForRemoteResultsForUnitTest(long theMaxMillisToWaitForRemoteResults) { 213 myMaxMillisToWaitForRemoteResults = theMaxMillisToWaitForRemoteResults; 214 } 215 216 /** 217 * facade over raw hook intererface 218 */ 219 public class StorageInterceptorHooks { 220 /** 221 * Interceptor call: STORAGE_PRESEARCH_REGISTERED 222 * 223 * @param theRequestDetails 224 * @param theParams 225 * @param search 226 */ 227 private void callStoragePresearchRegistered(RequestDetails theRequestDetails, SearchParameterMap theParams, Search search) { 228 HookParams params = new HookParams() 229 .add(ICachedSearchDetails.class, search) 230 .add(RequestDetails.class, theRequestDetails) 231 .addIfMatchesType(ServletRequestDetails.class, theRequestDetails) 232 .add(SearchParameterMap.class, theParams); 233 CompositeInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, theRequestDetails, Pointcut.STORAGE_PRESEARCH_REGISTERED, params); 234 } 235 //private IInterceptorBroadcaster myInterceptorBroadcaster; 236 } 237 private StorageInterceptorHooks myStorageInterceptorHooks = new StorageInterceptorHooks(); 238 239 /** 240 * This method is called by the HTTP client processing thread in order to 241 * fetch resources. 242 */ 243 @Override 244 @Transactional(propagation = Propagation.NEVER) 245 public List<ResourcePersistentId> getResources(final String theUuid, int theFrom, int theTo, @Nullable RequestDetails theRequestDetails) { 246 TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager); 247 txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); 248 249 // If we're actively searching right now, don't try to do anything until at least one batch has been 250 // persisted in the DB 251 SearchTask searchTask = myIdToSearchTask.get(theUuid); 252 if (searchTask != null) { 253 searchTask.awaitInitialSync(); 254 } 255 256 ourLog.trace("About to start looking for resources {}-{}", theFrom, theTo); 257 258 Search search; 259 StopWatch sw = new StopWatch(); 260 while (true) { 261 262 if (myNeverUseLocalSearchForUnitTests == false) { 263 if (searchTask != null) { 264 ourLog.trace("Local search found"); 265 List<ResourcePersistentId> resourcePids = searchTask.getResourcePids(theFrom, theTo); 266 ourLog.trace("Local search returned {} pids, wanted {}-{} - Search: {}", resourcePids.size(), theFrom, theTo, searchTask.getSearch()); 267 268 /* 269 * Generally, if a search task is open, the fastest possible thing is to just return its results. This 270 * will work most of the time, but can fail if the task hit a search threshold and the client is requesting 271 * results beyond that threashold. In that case, we'll keep going below, since that will trigger another 272 * task. 273 */ 274 if ((searchTask.getSearch().getNumFound() - searchTask.getSearch().getNumBlocked()) >= theTo || resourcePids.size() == (theTo - theFrom)) { 275 return resourcePids; 276 } 277 } 278 } 279 280 search = mySearchCacheSvc 281 .fetchByUuid(theUuid) 282 .orElseThrow(() -> newResourceGoneException(theUuid)); 283 284 verifySearchHasntFailedOrThrowInternalErrorException(search); 285 if (search.getStatus() == SearchStatusEnum.FINISHED) { 286 ourLog.trace("Search entity marked as finished with {} results", search.getNumFound()); 287 break; 288 } 289 if ((search.getNumFound() - search.getNumBlocked()) >= theTo) { 290 ourLog.trace("Search entity has {} results so far", search.getNumFound()); 291 break; 292 } 293 294 if (sw.getMillis() > myMaxMillisToWaitForRemoteResults) { 295 ourLog.error("Search {} of type {} for {}{} timed out after {}ms", search.getId(), search.getSearchType(), search.getResourceType(), search.getSearchQueryString(), sw.getMillis()); 296 throw new InternalErrorException(Msg.code(1163) + "Request timed out after " + sw.getMillis() + "ms"); 297 } 298 299 // If the search was saved in "pass complete mode" it's probably time to 300 // start a new pass 301 if (search.getStatus() == SearchStatusEnum.PASSCMPLET) { 302 ourLog.trace("Going to try to start next search"); 303 Optional<Search> newSearch = mySearchCacheSvc.tryToMarkSearchAsInProgress(search); 304 if (newSearch.isPresent()) { 305 ourLog.trace("Launching new search"); 306 search = newSearch.get(); 307 String resourceType = search.getResourceType(); 308 SearchParameterMap params = search.getSearchParameterMap().orElseThrow(() -> new IllegalStateException("No map in PASSCOMPLET search")); 309 IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(resourceType); 310 RequestPartitionId requestPartitionId = myRequestPartitionHelperService.determineReadPartitionForRequestForSearchType(theRequestDetails, resourceType, params, null); 311 SearchContinuationTask task = new SearchContinuationTask(search, resourceDao, params, resourceType, theRequestDetails, requestPartitionId); 312 myIdToSearchTask.put(search.getUuid(), task); 313 task.call(); 314 } 315 } 316 317 AsyncUtil.sleep(500); 318 } 319 320 ourLog.trace("Finished looping"); 321 322 List<ResourcePersistentId> pids = mySearchResultCacheSvc.fetchResultPids(search, theFrom, theTo); 323 if (pids == null) { 324 throw newResourceGoneException(theUuid); 325 } 326 327 ourLog.trace("Fetched {} results", pids.size()); 328 329 return pids; 330 } 331 332 @Nonnull 333 private ResourceGoneException newResourceGoneException(String theUuid) { 334 ourLog.trace("Client requested unknown paging ID[{}]", theUuid); 335 String msg = myContext.getLocalizer().getMessage(PageMethodBinding.class, "unknownSearchId", theUuid); 336 return new ResourceGoneException(msg); 337 } 338 339 @Override 340 public IBundleProvider registerSearch(final IFhirResourceDao<?> theCallingDao, final SearchParameterMap theParams, String theResourceType, CacheControlDirective theCacheControlDirective, RequestDetails theRequestDetails, RequestPartitionId theRequestPartitionId) { 341 final String searchUuid = UUID.randomUUID().toString(); 342 343 final String queryString = theParams.toNormalizedQueryString(myContext); 344 ourLog.debug("Registering new search {}", searchUuid); 345 346 Search search = new Search(); 347 populateSearchEntity(theParams, theResourceType, searchUuid, queryString, search, theRequestPartitionId); 348 349 myStorageInterceptorHooks.callStoragePresearchRegistered(theRequestDetails, theParams, search); 350 351 validateSearch(theParams); 352 353 Class<? extends IBaseResource> resourceTypeClass = myContext.getResourceDefinition(theResourceType).getImplementingClass(); 354 final ISearchBuilder sb = mySearchBuilderFactory.newSearchBuilder(theCallingDao, theResourceType, resourceTypeClass); 355 sb.setFetchSize(mySyncSize); 356 357 final Integer loadSynchronousUpTo = getLoadSynchronousUpToOrNull(theCacheControlDirective); 358 boolean isOffsetQuery = theParams.isOffsetQuery(); 359 360 // todo someday - not today. 361// SearchStrategyFactory.ISearchStrategy searchStrategy = mySearchStrategyFactory.pickStrategy(theResourceType, theParams, theRequestDetails); 362// return searchStrategy.get(); 363 364 if (mySearchStrategyFactory.isSupportsHSearchDirect(theResourceType, theParams, theRequestDetails)) { 365 ourLog.info("Search {} is using direct load strategy", searchUuid); 366 SearchStrategyFactory.ISearchStrategy direct = mySearchStrategyFactory.makeDirectStrategy(searchUuid, theResourceType, theParams, theRequestDetails); 367 return direct.get(); 368 } 369 370 if (theParams.isLoadSynchronous() || loadSynchronousUpTo != null || isOffsetQuery) { 371 ourLog.debug("Search {} is loading in synchronous mode", searchUuid); 372 return mySynchronousSearchSvc.executeQuery(theParams, theRequestDetails, searchUuid, sb, loadSynchronousUpTo, theRequestPartitionId); 373 } 374 375 /* 376 * See if there are any cached searches whose results we can return 377 * instead 378 */ 379 SearchCacheStatusEnum cacheStatus = SearchCacheStatusEnum.MISS; 380 if (theCacheControlDirective != null && theCacheControlDirective.isNoCache() == true) { 381 cacheStatus = SearchCacheStatusEnum.NOT_TRIED; 382 } 383 384 if (cacheStatus != SearchCacheStatusEnum.NOT_TRIED) { 385 if (theParams.getEverythingMode() == null) { 386 if (myDaoConfig.getReuseCachedSearchResultsForMillis() != null) { 387 PersistedJpaBundleProvider foundSearchProvider = findCachedQuery(theParams, theResourceType, theRequestDetails, queryString, theRequestPartitionId); 388 if (foundSearchProvider != null) { 389 foundSearchProvider.setCacheStatus(SearchCacheStatusEnum.HIT); 390 return foundSearchProvider; 391 } 392 } 393 } 394 } 395 396 PersistedJpaSearchFirstPageBundleProvider retVal = submitSearch(theCallingDao, theParams, theResourceType, theRequestDetails, searchUuid, sb, queryString, theRequestPartitionId, search); 397 retVal.setCacheStatus(cacheStatus); 398 return retVal; 399 } 400 401 private void validateSearch(SearchParameterMap theParams) { 402 validateIncludes(theParams.getIncludes(), Constants.PARAM_INCLUDE); 403 validateIncludes(theParams.getRevIncludes(), Constants.PARAM_REVINCLUDE); 404 } 405 406 private void validateIncludes(Set<Include> includes, String name) { 407 for (Include next : includes) { 408 String value = next.getValue(); 409 if (value.equals(Constants.INCLUDE_STAR) || isBlank(value)) { 410 continue; 411 } 412 413 String paramType = next.getParamType(); 414 String paramName = next.getParamName(); 415 String paramTargetType = next.getParamTargetType(); 416 417 if (isBlank(paramType) || isBlank(paramName)) { 418 String msg = myContext.getLocalizer().getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidInclude", name, value, ""); 419 throw new InvalidRequestException(Msg.code(2018) + msg); 420 } 421 422 if (!myDaoRegistry.isResourceTypeSupported(paramType)) { 423 String resourceTypeMsg = myContext.getLocalizer().getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidResourceType", paramType); 424 String msg = myContext.getLocalizer().getMessage(SearchCoordinatorSvcImpl.class, "invalidInclude", UrlUtil.sanitizeUrlPart(name), UrlUtil.sanitizeUrlPart(value), resourceTypeMsg); // last param is pre-sanitized 425 throw new InvalidRequestException(Msg.code(2017) + msg); 426 } 427 428 if (isNotBlank(paramTargetType) && !myDaoRegistry.isResourceTypeSupported(paramTargetType)) { 429 String resourceTypeMsg = myContext.getLocalizer().getMessageSanitized(SearchCoordinatorSvcImpl.class, "invalidResourceType", paramTargetType); 430 String msg = myContext.getLocalizer().getMessage(SearchCoordinatorSvcImpl.class, "invalidInclude", UrlUtil.sanitizeUrlPart(name), UrlUtil.sanitizeUrlPart(value), resourceTypeMsg); // last param is pre-sanitized 431 throw new InvalidRequestException(Msg.code(2016) + msg); 432 } 433 434 if (!Constants.INCLUDE_STAR.equals(paramName) && mySearchParamRegistry.getActiveSearchParam(paramType, paramName) == null) { 435 List<String> validNames = mySearchParamRegistry 436 .getActiveSearchParams(paramType) 437 .values() 438 .stream() 439 .filter(t -> t.getParamType() == RestSearchParameterTypeEnum.REFERENCE) 440 .map(t -> UrlUtil.sanitizeUrlPart(t.getName())) 441 .sorted() 442 .collect(Collectors.toList()); 443 String searchParamMessage = myContext.getLocalizer().getMessage(BaseStorageDao.class, "invalidSearchParameter", UrlUtil.sanitizeUrlPart(paramName), UrlUtil.sanitizeUrlPart(paramType), validNames); 444 String msg = myContext.getLocalizer().getMessage(SearchCoordinatorSvcImpl.class, "invalidInclude", UrlUtil.sanitizeUrlPart(name), UrlUtil.sanitizeUrlPart(value), searchParamMessage); // last param is pre-sanitized 445 throw new InvalidRequestException(Msg.code(2015) + msg); 446 } 447 448 } 449 } 450 451 @Override 452 public Optional<Integer> getSearchTotal(String theUuid) { 453 SearchTask task = myIdToSearchTask.get(theUuid); 454 if (task != null) { 455 return Optional.ofNullable(task.awaitInitialSync()); 456 } 457 458 /* 459 * In case there is no running search, if the total is listed as accurate we know one is coming 460 * so let's wait a bit for it to show up 461 */ 462 TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager); 463 txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); 464 Optional<Search> search = mySearchCacheSvc.fetchByUuid(theUuid); 465 if (search.isPresent()) { 466 Optional<SearchParameterMap> searchParameterMap = search.get().getSearchParameterMap(); 467 if (searchParameterMap.isPresent() && searchParameterMap.get().getSearchTotalMode() == SearchTotalModeEnum.ACCURATE) { 468 for (int i = 0; i < 10; i++) { 469 if (search.isPresent()) { 470 verifySearchHasntFailedOrThrowInternalErrorException(search.get()); 471 if (search.get().getTotalCount() != null) { 472 return Optional.of(search.get().getTotalCount()); 473 } 474 } 475 search = mySearchCacheSvc.fetchByUuid(theUuid); 476 } 477 } 478 } 479 480 return Optional.empty(); 481 } 482 483 @Nonnull 484 private PersistedJpaSearchFirstPageBundleProvider submitSearch(IDao theCallingDao, SearchParameterMap theParams, String theResourceType, RequestDetails theRequestDetails, String theSearchUuid, ISearchBuilder theSb, String theQueryString, RequestPartitionId theRequestPartitionId, Search theSearch) { 485 StopWatch w = new StopWatch(); 486 SearchTask task = new SearchTask(theSearch, theCallingDao, theParams, theResourceType, theRequestDetails, theRequestPartitionId); 487 myIdToSearchTask.put(theSearch.getUuid(), task); 488 task.call(); 489 490 PersistedJpaSearchFirstPageBundleProvider retVal = myPersistedJpaBundleProviderFactory.newInstanceFirstPage(theRequestDetails, theSearch, task, theSb); 491 492 ourLog.debug("Search initial phase completed in {}ms", w.getMillis()); 493 return retVal; 494 } 495 496 @Nullable 497 private PersistedJpaBundleProvider findCachedQuery(SearchParameterMap theParams, String theResourceType, RequestDetails theRequestDetails, String theQueryString, RequestPartitionId theRequestPartitionId) { 498 TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager); 499 500 // May be null 501 return txTemplate.execute(t -> { 502 503 // Interceptor call: STORAGE_PRECHECK_FOR_CACHED_SEARCH 504 HookParams params = new HookParams() 505 .add(SearchParameterMap.class, theParams) 506 .add(RequestDetails.class, theRequestDetails) 507 .addIfMatchesType(ServletRequestDetails.class, theRequestDetails); 508 Object outcome = CompositeInterceptorBroadcaster.doCallHooksAndReturnObject(myInterceptorBroadcaster, theRequestDetails, Pointcut.STORAGE_PRECHECK_FOR_CACHED_SEARCH, params); 509 if (Boolean.FALSE.equals(outcome)) { 510 return null; 511 } 512 513 // Check for a search matching the given hash 514 Search searchToUse = findSearchToUseOrNull(theQueryString, theResourceType, theRequestPartitionId); 515 if (searchToUse == null) { 516 return null; 517 } 518 519 ourLog.debug("Reusing search {} from cache", searchToUse.getUuid()); 520 // Interceptor call: JPA_PERFTRACE_SEARCH_REUSING_CACHED 521 params = new HookParams() 522 .add(SearchParameterMap.class, theParams) 523 .add(RequestDetails.class, theRequestDetails) 524 .addIfMatchesType(ServletRequestDetails.class, theRequestDetails); 525 CompositeInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, theRequestDetails, Pointcut.JPA_PERFTRACE_SEARCH_REUSING_CACHED, params); 526 527 return myPersistedJpaBundleProviderFactory.newInstance(theRequestDetails, searchToUse.getUuid()); 528 }); 529 } 530 531 @Nullable 532 private Search findSearchToUseOrNull(String theQueryString, String theResourceType, RequestPartitionId theRequestPartitionId) { 533 // createdCutoff is in recent past 534 final Instant createdCutoff = Instant.now().minus(myDaoConfig.getReuseCachedSearchResultsForMillis(), ChronoUnit.MILLIS); 535 536 Optional<Search> candidate = mySearchCacheSvc.findCandidatesForReuse(theResourceType, theQueryString, createdCutoff, theRequestPartitionId); 537 return candidate.orElse(null); 538 } 539 540 @Nullable 541 private Integer getLoadSynchronousUpToOrNull(CacheControlDirective theCacheControlDirective) { 542 final Integer loadSynchronousUpTo; 543 if (theCacheControlDirective != null && theCacheControlDirective.isNoStore()) { 544 if (theCacheControlDirective.getMaxResults() != null) { 545 loadSynchronousUpTo = theCacheControlDirective.getMaxResults(); 546 if (loadSynchronousUpTo > myDaoConfig.getCacheControlNoStoreMaxResultsUpperLimit()) { 547 throw new InvalidRequestException(Msg.code(1165) + Constants.HEADER_CACHE_CONTROL + " header " + Constants.CACHE_CONTROL_MAX_RESULTS + " value must not exceed " + myDaoConfig.getCacheControlNoStoreMaxResultsUpperLimit()); 548 } 549 } else { 550 loadSynchronousUpTo = 100; 551 } 552 } else { 553 loadSynchronousUpTo = null; 554 } 555 return loadSynchronousUpTo; 556 } 557 558 @VisibleForTesting 559 void setContextForUnitTest(FhirContext theCtx) { 560 myContext = theCtx; 561 } 562 563 @VisibleForTesting 564 void setDaoConfigForUnitTest(DaoConfig theDaoConfig) { 565 myDaoConfig = theDaoConfig; 566 } 567 568 @VisibleForTesting 569 public void setLoadingThrottleForUnitTests(Integer theLoadingThrottleForUnitTests) { 570 myLoadingThrottleForUnitTests = theLoadingThrottleForUnitTests; 571 } 572 573 @VisibleForTesting 574 public void setNeverUseLocalSearchForUnitTests(boolean theNeverUseLocalSearchForUnitTests) { 575 myNeverUseLocalSearchForUnitTests = theNeverUseLocalSearchForUnitTests; 576 } 577 578 @VisibleForTesting 579 public void setSyncSizeForUnitTests(int theSyncSize) { 580 mySyncSize = theSyncSize; 581 } 582 583 @VisibleForTesting 584 void setTransactionManagerForUnitTest(PlatformTransactionManager theTxManager) { 585 myManagedTxManager = theTxManager; 586 } 587 588 @VisibleForTesting 589 void setDaoRegistryForUnitTest(DaoRegistry theDaoRegistry) { 590 myDaoRegistry = theDaoRegistry; 591 } 592 593 @VisibleForTesting 594 void setInterceptorBroadcasterForUnitTest(IInterceptorBroadcaster theInterceptorBroadcaster) { 595 myInterceptorBroadcaster = theInterceptorBroadcaster; 596 } 597 598 @VisibleForTesting 599 public void setSearchBuilderFactoryForUnitTest(SearchBuilderFactory theSearchBuilderFactory) { 600 mySearchBuilderFactory = theSearchBuilderFactory; 601 } 602 603 @VisibleForTesting 604 public void setPersistedJpaBundleProviderFactoryForUnitTest(PersistedJpaBundleProviderFactory thePersistedJpaBundleProviderFactory) { 605 myPersistedJpaBundleProviderFactory = thePersistedJpaBundleProviderFactory; 606 } 607 608 @VisibleForTesting 609 public void setRequestPartitionHelperService(IRequestPartitionHelperSvc theRequestPartitionHelperService) { 610 myRequestPartitionHelperService = theRequestPartitionHelperService; 611 } 612 613 @VisibleForTesting 614 public void setSynchronousSearchSvc(ISynchronousSearchSvc theSynchronousSearchSvc) { 615 mySynchronousSearchSvc = theSynchronousSearchSvc; 616 } 617 618 public static void populateSearchEntity(SearchParameterMap theParams, String theResourceType, String theSearchUuid, String theQueryString, Search theSearch, RequestPartitionId theRequestPartitionId) { 619 theSearch.setDeleted(false); 620 theSearch.setUuid(theSearchUuid); 621 theSearch.setCreated(new Date()); 622 theSearch.setTotalCount(null); 623 theSearch.setNumFound(0); 624 theSearch.setPreferredPageSize(theParams.getCount()); 625 theSearch.setSearchType(theParams.getEverythingMode() != null ? SearchTypeEnum.EVERYTHING : SearchTypeEnum.SEARCH); 626 theSearch.setLastUpdated(theParams.getLastUpdated()); 627 theSearch.setResourceType(theResourceType); 628 theSearch.setStatus(SearchStatusEnum.LOADING); 629 theSearch.setSearchQueryString(theQueryString, theRequestPartitionId); 630 631 if (theParams.hasIncludes()) { 632 for (Include next : theParams.getIncludes()) { 633 theSearch.addInclude(new SearchInclude(theSearch, next.getValue(), false, next.isRecurse())); 634 } 635 } 636 637 for (Include next : theParams.getRevIncludes()) { 638 theSearch.addInclude(new SearchInclude(theSearch, next.getValue(), true, next.isRecurse())); 639 } 640 } 641 642 /** 643 * Creates a {@link Pageable} using a start and end index 644 */ 645 @SuppressWarnings("WeakerAccess") 646 @Nullable 647 public static Pageable toPage(final int theFromIndex, int theToIndex) { 648 int pageSize = theToIndex - theFromIndex; 649 if (pageSize < 1) { 650 return null; 651 } 652 653 int pageIndex = theFromIndex / pageSize; 654 655 Pageable page = new AbstractPageRequest(pageIndex, pageSize) { 656 private static final long serialVersionUID = 1L; 657 658 @Override 659 public long getOffset() { 660 return theFromIndex; 661 } 662 663 @Override 664 public Sort getSort() { 665 return Sort.unsorted(); 666 } 667 668 @Override 669 public Pageable next() { 670 return null; 671 } 672 673 @Override 674 public Pageable previous() { 675 return null; 676 } 677 678 @Override 679 public Pageable first() { 680 return null; 681 } 682 683 @Override 684 public Pageable withPage(int theI) { 685 return null; 686 } 687 }; 688 689 return page; 690 } 691 692 static void verifySearchHasntFailedOrThrowInternalErrorException(Search theSearch) { 693 if (theSearch.getStatus() == SearchStatusEnum.FAILED) { 694 Integer status = theSearch.getFailureCode(); 695 status = defaultIfNull(status, 500); 696 697 String message = theSearch.getFailureMessage(); 698 throw BaseServerResponseException.newInstance(status, message); 699 } 700 } 701 702 /** 703 * A search task is a Callable task that runs in 704 * a thread pool to handle an individual search. One instance 705 * is created for any requested search and runs from the 706 * beginning to the end of the search. 707 * <p> 708 * Understand: 709 * This class executes in its own thread separate from the 710 * web server client thread that made the request. We do that 711 * so that we can return to the client as soon as possible, 712 * but keep the search going in the background (and have 713 * the next page of results ready to go when the client asks). 714 */ 715 public class SearchTask implements Callable<Void> { 716 private final SearchParameterMap myParams; 717 private final IDao myCallingDao; 718 private final String myResourceType; 719 private final ArrayList<ResourcePersistentId> mySyncedPids = new ArrayList<>(); 720 private final CountDownLatch myInitialCollectionLatch = new CountDownLatch(1); 721 private final CountDownLatch myCompletionLatch; 722 private final ArrayList<ResourcePersistentId> myUnsyncedPids = new ArrayList<>(); 723 private final RequestDetails myRequest; 724 private final RequestPartitionId myRequestPartitionId; 725 private final SearchRuntimeDetails mySearchRuntimeDetails; 726 private final Transaction myParentTransaction; 727 private Search mySearch; 728 private boolean myAbortRequested; 729 private int myCountSavedTotal = 0; 730 private int myCountSavedThisPass = 0; 731 private int myCountBlockedThisPass = 0; 732 private boolean myAdditionalPrefetchThresholdsRemaining; 733 private List<ResourcePersistentId> myPreviouslyAddedResourcePids; 734 private Integer myMaxResultsToFetch; 735 736 /** 737 * Constructor 738 */ 739 protected SearchTask(Search theSearch, IDao theCallingDao, SearchParameterMap theParams, String theResourceType, RequestDetails theRequest, RequestPartitionId theRequestPartitionId) { 740 mySearch = theSearch; 741 myCallingDao = theCallingDao; 742 myParams = theParams; 743 myResourceType = theResourceType; 744 myCompletionLatch = new CountDownLatch(1); 745 mySearchRuntimeDetails = new SearchRuntimeDetails(theRequest, mySearch.getUuid()); 746 mySearchRuntimeDetails.setQueryString(theParams.toNormalizedQueryString(theCallingDao.getContext())); 747 myRequestPartitionId = theRequestPartitionId; 748 myRequest = theRequest; 749 myParentTransaction = ElasticApm.currentTransaction(); 750 } 751 752 /** 753 * This method is called by the server HTTP thread, and 754 * will block until at least one page of results have been 755 * fetched from the DB, and will never block after that. 756 */ 757 Integer awaitInitialSync() { 758 ourLog.trace("Awaiting initial sync"); 759 do { 760 ourLog.trace("Search {} aborted: {}", getSearch().getUuid(), !isNotAborted()); 761 if (AsyncUtil.awaitLatchAndThrowInternalErrorExceptionOnInterrupt(getInitialCollectionLatch(), 250L, TimeUnit.MILLISECONDS)) { 762 break; 763 } 764 } while (getSearch().getStatus() == SearchStatusEnum.LOADING); 765 ourLog.trace("Initial sync completed"); 766 767 return getSearch().getTotalCount(); 768 } 769 770 protected Search getSearch() { 771 return mySearch; 772 } 773 774 CountDownLatch getInitialCollectionLatch() { 775 return myInitialCollectionLatch; 776 } 777 778 void setPreviouslyAddedResourcePids(List<ResourcePersistentId> thePreviouslyAddedResourcePids) { 779 myPreviouslyAddedResourcePids = thePreviouslyAddedResourcePids; 780 myCountSavedTotal = myPreviouslyAddedResourcePids.size(); 781 } 782 783 private ISearchBuilder newSearchBuilder() { 784 Class<? extends IBaseResource> resourceTypeClass = myContext.getResourceDefinition(myResourceType).getImplementingClass(); 785 return mySearchBuilderFactory.newSearchBuilder(myCallingDao, myResourceType, resourceTypeClass); 786 } 787 788 @Nonnull 789 List<ResourcePersistentId> getResourcePids(int theFromIndex, int theToIndex) { 790 ourLog.debug("Requesting search PIDs from {}-{}", theFromIndex, theToIndex); 791 792 boolean keepWaiting; 793 do { 794 synchronized (mySyncedPids) { 795 ourLog.trace("Search status is {}", mySearch.getStatus()); 796 boolean haveEnoughResults = mySyncedPids.size() >= theToIndex; 797 if (!haveEnoughResults) { 798 switch (mySearch.getStatus()) { 799 case LOADING: 800 keepWaiting = true; 801 break; 802 case PASSCMPLET: 803 /* 804 * If we get here, it means that the user requested resources that crossed the 805 * current pre-fetch boundary. For example, if the prefetch threshold is 50 and the 806 * user has requested resources 0-60, then they would get 0-50 back but the search 807 * coordinator would then stop searching.SearchCoordinatorSvcImplTest 808 */ 809 keepWaiting = false; 810 break; 811 case FAILED: 812 case FINISHED: 813 case GONE: 814 default: 815 keepWaiting = false; 816 break; 817 } 818 } else { 819 keepWaiting = false; 820 } 821 } 822 823 if (keepWaiting) { 824 ourLog.info("Waiting as we only have {} results - Search status: {}", mySyncedPids.size(), mySearch.getStatus()); 825 AsyncUtil.sleep(500L); 826 } 827 } while (keepWaiting); 828 829 ourLog.debug("Proceeding, as we have {} results", mySyncedPids.size()); 830 831 ArrayList<ResourcePersistentId> retVal = new ArrayList<>(); 832 synchronized (mySyncedPids) { 833 verifySearchHasntFailedOrThrowInternalErrorException(mySearch); 834 835 int toIndex = theToIndex; 836 if (mySyncedPids.size() < toIndex) { 837 toIndex = mySyncedPids.size(); 838 } 839 for (int i = theFromIndex; i < toIndex; i++) { 840 retVal.add(mySyncedPids.get(i)); 841 } 842 } 843 844 ourLog.trace("Done syncing results - Wanted {}-{} and returning {} of {}", theFromIndex, theToIndex, retVal.size(), mySyncedPids.size()); 845 846 return retVal; 847 } 848 849 void saveSearch() { 850 TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager); 851 txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); 852 txTemplate.execute(new TransactionCallbackWithoutResult() { 853 @Override 854 protected void doInTransactionWithoutResult(@Nonnull TransactionStatus theArg0) { 855 doSaveSearch(); 856 } 857 858 }); 859 } 860 861 private void saveUnsynced(final IResultIterator theResultIter) { 862 TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager); 863 txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); 864 txTemplate.execute(new TransactionCallbackWithoutResult() { 865 @Override 866 protected void doInTransactionWithoutResult(@Nonnull TransactionStatus theArg0) { 867 if (mySearch.getId() == null) { 868 doSaveSearch(); 869 } 870 871 ArrayList<ResourcePersistentId> unsyncedPids = myUnsyncedPids; 872 int countBlocked = 0; 873 874 // Interceptor call: STORAGE_PREACCESS_RESOURCES 875 // This can be used to remove results from the search result details before 876 // the user has a chance to know that they were in the results 877 if (mySearchRuntimeDetails.getRequestDetails() != null && unsyncedPids.isEmpty() == false) { 878 JpaPreResourceAccessDetails accessDetails = new JpaPreResourceAccessDetails(unsyncedPids, () -> newSearchBuilder()); 879 HookParams params = new HookParams() 880 .add(IPreResourceAccessDetails.class, accessDetails) 881 .add(RequestDetails.class, mySearchRuntimeDetails.getRequestDetails()) 882 .addIfMatchesType(ServletRequestDetails.class, mySearchRuntimeDetails.getRequestDetails()); 883 CompositeInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, myRequest, Pointcut.STORAGE_PREACCESS_RESOURCES, params); 884 885 for (int i = unsyncedPids.size() - 1; i >= 0; i--) { 886 if (accessDetails.isDontReturnResourceAtIndex(i)) { 887 unsyncedPids.remove(i); 888 myCountBlockedThisPass++; 889 myCountSavedTotal++; 890 countBlocked++; 891 } 892 } 893 } 894 895 // Actually store the results in the query cache storage 896 myCountSavedTotal += unsyncedPids.size(); 897 myCountSavedThisPass += unsyncedPids.size(); 898 mySearchResultCacheSvc.storeResults(mySearch, mySyncedPids, unsyncedPids); 899 900 synchronized (mySyncedPids) { 901 int numSyncedThisPass = unsyncedPids.size(); 902 ourLog.trace("Syncing {} search results - Have more: {}", numSyncedThisPass, theResultIter.hasNext()); 903 mySyncedPids.addAll(unsyncedPids); 904 unsyncedPids.clear(); 905 906 if (theResultIter.hasNext() == false) { 907 int skippedCount = theResultIter.getSkippedCount(); 908 int nonSkippedCount = theResultIter.getNonSkippedCount(); 909 int totalFetched = skippedCount + myCountSavedThisPass + myCountBlockedThisPass; 910 ourLog.trace("MaxToFetch[{}] SkippedCount[{}] CountSavedThisPass[{}] CountSavedThisTotal[{}] AdditionalPrefetchRemaining[{}]", myMaxResultsToFetch, skippedCount, myCountSavedThisPass, myCountSavedTotal, myAdditionalPrefetchThresholdsRemaining); 911 912 if (nonSkippedCount == 0 || (myMaxResultsToFetch != null && totalFetched < myMaxResultsToFetch)) { 913 ourLog.trace("Setting search status to FINISHED"); 914 mySearch.setStatus(SearchStatusEnum.FINISHED); 915 mySearch.setTotalCount(myCountSavedTotal - countBlocked); 916 } else if (myAdditionalPrefetchThresholdsRemaining) { 917 ourLog.trace("Setting search status to PASSCMPLET"); 918 mySearch.setStatus(SearchStatusEnum.PASSCMPLET); 919 mySearch.setSearchParameterMap(myParams); 920 } else { 921 ourLog.trace("Setting search status to FINISHED"); 922 mySearch.setStatus(SearchStatusEnum.FINISHED); 923 mySearch.setTotalCount(myCountSavedTotal - countBlocked); 924 } 925 } 926 } 927 928 mySearch.setNumFound(myCountSavedTotal); 929 mySearch.setNumBlocked(mySearch.getNumBlocked() + countBlocked); 930 931 int numSynced; 932 synchronized (mySyncedPids) { 933 numSynced = mySyncedPids.size(); 934 } 935 936 if (myDaoConfig.getCountSearchResultsUpTo() == null || 937 myDaoConfig.getCountSearchResultsUpTo() <= 0 || 938 myDaoConfig.getCountSearchResultsUpTo() <= numSynced) { 939 myInitialCollectionLatch.countDown(); 940 } 941 942 doSaveSearch(); 943 944 ourLog.trace("saveUnsynced() - pre-commit"); 945 } 946 }); 947 ourLog.trace("saveUnsynced() - post-commit"); 948 949 } 950 951 boolean isNotAborted() { 952 return myAbortRequested == false; 953 } 954 955 void markComplete() { 956 myCompletionLatch.countDown(); 957 } 958 959 CountDownLatch getCompletionLatch() { 960 return myCompletionLatch; 961 } 962 963 /** 964 * Request that the task abort as soon as possible 965 */ 966 void requestImmediateAbort() { 967 myAbortRequested = true; 968 } 969 970 /** 971 * This is the method which actually performs the search. 972 * It is called automatically by the thread pool. 973 */ 974 @Override 975 public Void call() { 976 StopWatch sw = new StopWatch(); 977 Span span = myParentTransaction.startSpan("db", "query", "search"); 978 span.setName("FHIR Database Search"); 979 try { 980 // Create an initial search in the DB and give it an ID 981 saveSearch(); 982 983 TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager); 984 txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); 985 986 if (myCustomIsolationSupported) { 987 txTemplate.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED); 988 } 989 990 txTemplate.execute(new TransactionCallbackWithoutResult() { 991 @Override 992 protected void doInTransactionWithoutResult(@Nonnull TransactionStatus theStatus) { 993 doSearch(); 994 } 995 }); 996 997 mySearchRuntimeDetails.setSearchStatus(mySearch.getStatus()); 998 if (mySearch.getStatus() == SearchStatusEnum.FINISHED) { 999 HookParams params = new HookParams() 1000 .add(RequestDetails.class, myRequest) 1001 .addIfMatchesType(ServletRequestDetails.class, myRequest) 1002 .add(SearchRuntimeDetails.class, mySearchRuntimeDetails); 1003 CompositeInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, myRequest, Pointcut.JPA_PERFTRACE_SEARCH_COMPLETE, params); 1004 } else { 1005 HookParams params = new HookParams() 1006 .add(RequestDetails.class, myRequest) 1007 .addIfMatchesType(ServletRequestDetails.class, myRequest) 1008 .add(SearchRuntimeDetails.class, mySearchRuntimeDetails); 1009 CompositeInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, myRequest, Pointcut.JPA_PERFTRACE_SEARCH_PASS_COMPLETE, params); 1010 } 1011 1012 ourLog.trace("Have completed search for [{}{}] and found {} resources in {}ms - Status is {}", mySearch.getResourceType(), mySearch.getSearchQueryString(), mySyncedPids.size(), sw.getMillis(), mySearch.getStatus()); 1013 1014 } catch (Throwable t) { 1015 1016 /* 1017 * Don't print a stack trace for client errors (i.e. requests that 1018 * aren't valid because the client screwed up).. that's just noise 1019 * in the logs and who needs that. 1020 */ 1021 boolean logged = false; 1022 if (t instanceof BaseServerResponseException) { 1023 BaseServerResponseException exception = (BaseServerResponseException) t; 1024 if (exception.getStatusCode() >= 400 && exception.getStatusCode() < 500) { 1025 logged = true; 1026 ourLog.warn("Failed during search due to invalid request: {}", t.toString()); 1027 } 1028 } 1029 1030 if (!logged) { 1031 ourLog.error("Failed during search loading after {}ms", sw.getMillis(), t); 1032 } 1033 myUnsyncedPids.clear(); 1034 Throwable rootCause = ExceptionUtils.getRootCause(t); 1035 rootCause = defaultIfNull(rootCause, t); 1036 1037 String failureMessage = rootCause.getMessage(); 1038 1039 int failureCode = InternalErrorException.STATUS_CODE; 1040 if (t instanceof BaseServerResponseException) { 1041 failureCode = ((BaseServerResponseException) t).getStatusCode(); 1042 } 1043 1044 if (System.getProperty(UNIT_TEST_CAPTURE_STACK) != null) { 1045 failureMessage += "\nStack\n" + ExceptionUtils.getStackTrace(rootCause); 1046 } 1047 1048 mySearch.setFailureMessage(failureMessage); 1049 mySearch.setFailureCode(failureCode); 1050 mySearch.setStatus(SearchStatusEnum.FAILED); 1051 1052 mySearchRuntimeDetails.setSearchStatus(mySearch.getStatus()); 1053 HookParams params = new HookParams() 1054 .add(RequestDetails.class, myRequest) 1055 .addIfMatchesType(ServletRequestDetails.class, myRequest) 1056 .add(SearchRuntimeDetails.class, mySearchRuntimeDetails); 1057 CompositeInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, myRequest, Pointcut.JPA_PERFTRACE_SEARCH_FAILED, params); 1058 1059 saveSearch(); 1060 span.captureException(t); 1061 } finally { 1062 1063 myIdToSearchTask.remove(mySearch.getUuid()); 1064 myInitialCollectionLatch.countDown(); 1065 markComplete(); 1066 span.end(); 1067 1068 } 1069 return null; 1070 } 1071 1072 private void doSaveSearch() { 1073 Search newSearch = mySearchCacheSvc.save(mySearch); 1074 1075 // mySearchDao.save is not supposed to return null, but in unit tests 1076 // it can if the mock search dao isn't set up to handle that 1077 if (newSearch != null) { 1078 mySearch = newSearch; 1079 } 1080 } 1081 1082 /** 1083 * This method actually creates the database query to perform the 1084 * search, and starts it. 1085 */ 1086 private void doSearch() { 1087 /* 1088 * If the user has explicitly requested a _count, perform a 1089 * 1090 * SELECT COUNT(*) .... 1091 * 1092 * before doing anything else. 1093 */ 1094 boolean myParamWantOnlyCount = isWantOnlyCount(myParams); 1095 boolean myParamOrDefaultWantCount = nonNull(myParams.getSearchTotalMode()) ? isWantCount(myParams) : isWantCount(myDaoConfig.getDefaultTotalMode()); 1096 1097 if (myParamWantOnlyCount || myParamOrDefaultWantCount) { 1098 ourLog.trace("Performing count"); 1099 ISearchBuilder sb = newSearchBuilder(); 1100 1101 /* 1102 * createCountQuery 1103 * NB: (see createQuery below) 1104 * Because FulltextSearchSvcImpl will (internally) 1105 * mutate the myParams (searchmap), 1106 * (specifically removing the _content and _text filters) 1107 * we will have to clone those parameters here so that 1108 * the "correct" params are used in createQuery below 1109 */ 1110 Long count = sb.createCountQuery(myParams.clone(), mySearch.getUuid(), myRequest, myRequestPartitionId); 1111 1112 ourLog.trace("Got count {}", count); 1113 1114 TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager); 1115 txTemplate.execute(new TransactionCallbackWithoutResult() { 1116 @Override 1117 protected void doInTransactionWithoutResult(@Nonnull TransactionStatus theArg0) { 1118 mySearch.setTotalCount(count.intValue()); 1119 if (myParamWantOnlyCount) { 1120 mySearch.setStatus(SearchStatusEnum.FINISHED); 1121 } 1122 doSaveSearch(); 1123 } 1124 }); 1125 if (myParamWantOnlyCount) { 1126 return; 1127 } 1128 } 1129 1130 ourLog.trace("Done count"); 1131 ISearchBuilder sb = newSearchBuilder(); 1132 1133 /* 1134 * Figure out how many results we're actually going to fetch from the 1135 * database in this pass. This calculation takes into consideration the 1136 * "pre-fetch thresholds" specified in DaoConfig#getSearchPreFetchThresholds() 1137 * as well as the value of the _count parameter. 1138 */ 1139 int currentlyLoaded = defaultIfNull(mySearch.getNumFound(), 0); 1140 int minWanted = 0; 1141 if (myParams.getCount() != null) { 1142 minWanted = myParams.getCount(); 1143 minWanted = Math.min(minWanted, myPagingProvider.getMaximumPageSize()); 1144 minWanted += currentlyLoaded; 1145 } 1146 1147 for (Iterator<Integer> iter = myDaoConfig.getSearchPreFetchThresholds().iterator(); iter.hasNext(); ) { 1148 int next = iter.next(); 1149 if (next != -1 && next <= currentlyLoaded) { 1150 continue; 1151 } 1152 1153 if (next == -1) { 1154 sb.setMaxResultsToFetch(null); 1155 } else { 1156 myMaxResultsToFetch = Math.max(next, minWanted); 1157 sb.setMaxResultsToFetch(myMaxResultsToFetch); 1158 } 1159 1160 if (iter.hasNext()) { 1161 myAdditionalPrefetchThresholdsRemaining = true; 1162 } 1163 1164 // If we get here's we've found an appropriate threshold 1165 break; 1166 } 1167 1168 /* 1169 * Provide any PID we loaded in previous search passes to the 1170 * SearchBuilder so that we don't get duplicates coming from running 1171 * the same query again. 1172 * 1173 * We could possibly accomplish this in a different way by using sorted 1174 * results in our SQL query and specifying an offset. I don't actually 1175 * know if that would be faster or not. At some point should test this 1176 * idea. 1177 */ 1178 if (myPreviouslyAddedResourcePids != null) { 1179 sb.setPreviouslyAddedResourcePids(myPreviouslyAddedResourcePids); 1180 mySyncedPids.addAll(myPreviouslyAddedResourcePids); 1181 } 1182 1183 /* 1184 * createQuery 1185 * Construct the SQL query we'll be sending to the database 1186 * 1187 * NB: (See createCountQuery above) 1188 * We will pass the original myParams here (not a copy) 1189 * because we actually _want_ the mutation of the myParams to happen. 1190 * Specifically because SearchBuilder itself will _expect_ 1191 * not to have these parameters when dumping back 1192 * to our DB. 1193 * 1194 * This is an odd implementation behaviour, but the change 1195 * for this will require a lot more handling at higher levels 1196 */ 1197 try (IResultIterator resultIterator = sb.createQuery(myParams, mySearchRuntimeDetails, myRequest, myRequestPartitionId)) { 1198 assert (resultIterator != null); 1199 1200 /* 1201 * The following loop actually loads the PIDs of the resources 1202 * matching the search off of the disk and into memory. After 1203 * every X results, we commit to the HFJ_SEARCH table. 1204 */ 1205 int syncSize = mySyncSize; 1206 while (resultIterator.hasNext()) { 1207 myUnsyncedPids.add(resultIterator.next()); 1208 1209 boolean shouldSync = myUnsyncedPids.size() >= syncSize; 1210 1211 if (myDaoConfig.getCountSearchResultsUpTo() != null && 1212 myDaoConfig.getCountSearchResultsUpTo() > 0 && 1213 myDaoConfig.getCountSearchResultsUpTo() < myUnsyncedPids.size()) { 1214 shouldSync = false; 1215 } 1216 1217 if (myUnsyncedPids.size() > 50000) { 1218 shouldSync = true; 1219 } 1220 1221 // If no abort was requested, bail out 1222 Validate.isTrue(isNotAborted(), "Abort has been requested"); 1223 1224 if (shouldSync) { 1225 saveUnsynced(resultIterator); 1226 } 1227 1228 if (myLoadingThrottleForUnitTests != null) { 1229 AsyncUtil.sleep(myLoadingThrottleForUnitTests); 1230 } 1231 1232 } 1233 1234 // If no abort was requested, bail out 1235 Validate.isTrue(isNotAborted(), "Abort has been requested"); 1236 1237 saveUnsynced(resultIterator); 1238 1239 } catch (IOException e) { 1240 ourLog.error("IO failure during database access", e); 1241 throw new InternalErrorException(Msg.code(1166) + e); 1242 } 1243 } 1244 } 1245 1246 public class SearchContinuationTask extends SearchTask { 1247 1248 public SearchContinuationTask(Search theSearch, IDao theCallingDao, SearchParameterMap theParams, String theResourceType, RequestDetails theRequest, RequestPartitionId theRequestPartitionId) { 1249 super(theSearch, theCallingDao, theParams, theResourceType, theRequest, theRequestPartitionId); 1250 } 1251 1252 @Override 1253 public Void call() { 1254 try { 1255 TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager); 1256 txTemplate.afterPropertiesSet(); 1257 txTemplate.execute(t -> { 1258 List<ResourcePersistentId> previouslyAddedResourcePids = mySearchResultCacheSvc.fetchAllResultPids(getSearch()); 1259 if (previouslyAddedResourcePids == null) { 1260 throw newResourceGoneException(getSearch().getUuid()); 1261 } 1262 1263 ourLog.trace("Have {} previously added IDs in search: {}", previouslyAddedResourcePids.size(), getSearch().getUuid()); 1264 setPreviouslyAddedResourcePids(previouslyAddedResourcePids); 1265 return null; 1266 }); 1267 } catch (Throwable e) { 1268 ourLog.error("Failure processing search", e); 1269 getSearch().setFailureMessage(e.getMessage()); 1270 getSearch().setStatus(SearchStatusEnum.FAILED); 1271 if (e instanceof BaseServerResponseException) { 1272 getSearch().setFailureCode(((BaseServerResponseException) e).getStatusCode()); 1273 } 1274 1275 saveSearch(); 1276 return null; 1277 } 1278 1279 return super.call(); 1280 } 1281 1282 } 1283 1284}