View Javadoc
1   package ca.uhn.fhir.jpa.search;
2   
3   /*-
4    * #%L
5    * HAPI FHIR JPA Server
6    * %%
7    * Copyright (C) 2014 - 2019 University Health Network
8    * %%
9    * Licensed under the Apache License, Version 2.0 (the "License");
10   * you may not use this file except in compliance with the License.
11   * You may obtain a copy of the License at
12   * 
13   *      http://www.apache.org/licenses/LICENSE-2.0
14   * 
15   * Unless required by applicable law or agreed to in writing, software
16   * distributed under the License is distributed on an "AS IS" BASIS,
17   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18   * See the License for the specific language governing permissions and
19   * limitations under the License.
20   * #L%family
21   */
22  
23  import ca.uhn.fhir.context.FhirContext;
24  import ca.uhn.fhir.interceptor.api.HookParams;
25  import ca.uhn.fhir.jpa.dao.*;
26  import ca.uhn.fhir.jpa.dao.data.ISearchDao;
27  import ca.uhn.fhir.jpa.dao.data.ISearchIncludeDao;
28  import ca.uhn.fhir.jpa.dao.data.ISearchResultDao;
29  import ca.uhn.fhir.jpa.entity.*;
30  import ca.uhn.fhir.jpa.model.search.SearchStatusEnum;
31  import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
32  import ca.uhn.fhir.interceptor.api.Pointcut;
33  import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
34  import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
35  import ca.uhn.fhir.model.api.Include;
36  import ca.uhn.fhir.rest.api.CacheControlDirective;
37  import ca.uhn.fhir.rest.api.Constants;
38  import ca.uhn.fhir.rest.api.SearchTotalModeEnum;
39  import ca.uhn.fhir.rest.api.SummaryEnum;
40  import ca.uhn.fhir.rest.api.server.IBundleProvider;
41  import ca.uhn.fhir.rest.server.IPagingProvider;
42  import ca.uhn.fhir.rest.server.SimpleBundleProvider;
43  import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException;
44  import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
45  import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
46  import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
47  import ca.uhn.fhir.rest.server.method.PageMethodBinding;
48  import ca.uhn.fhir.util.StopWatch;
49  import com.google.common.annotations.VisibleForTesting;
50  import com.google.common.collect.Lists;
51  import org.apache.commons.lang3.Validate;
52  import org.apache.commons.lang3.exception.ExceptionUtils;
53  import org.apache.commons.lang3.time.DateUtils;
54  import org.hl7.fhir.instance.model.api.IBaseResource;
55  import org.springframework.beans.factory.annotation.Autowired;
56  import org.springframework.data.domain.AbstractPageRequest;
57  import org.springframework.data.domain.Page;
58  import org.springframework.data.domain.Pageable;
59  import org.springframework.data.domain.Sort;
60  import org.springframework.orm.jpa.JpaDialect;
61  import org.springframework.orm.jpa.JpaTransactionManager;
62  import org.springframework.orm.jpa.vendor.HibernateJpaDialect;
63  import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
64  import org.springframework.stereotype.Component;
65  import org.springframework.transaction.PlatformTransactionManager;
66  import org.springframework.transaction.TransactionDefinition;
67  import org.springframework.transaction.TransactionStatus;
68  import org.springframework.transaction.annotation.Propagation;
69  import org.springframework.transaction.annotation.Transactional;
70  import org.springframework.transaction.support.TransactionCallbackWithoutResult;
71  import org.springframework.transaction.support.TransactionTemplate;
72  
73  import javax.annotation.Nullable;
74  import javax.annotation.PostConstruct;
75  import javax.persistence.EntityManager;
76  import java.io.IOException;
77  import java.util.*;
78  import java.util.List;
79  import java.util.concurrent.*;
80  
81  import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
82  
83  @Component("mySearchCoordinatorSvc")
84  public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
85  	public static final int DEFAULT_SYNC_SIZE = 250;
86  
87  	private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchCoordinatorSvcImpl.class);
88  	private final ConcurrentHashMap<String, BaseTask> myIdToSearchTask = new ConcurrentHashMap<>();
89  	@Autowired
90  	private FhirContext myContext;
91  	@Autowired
92  	private DaoConfig myDaoConfig;
93  	@Autowired
94  	private EntityManager myEntityManager;
95  	private ExecutorService myExecutor;
96  	private Integer myLoadingThrottleForUnitTests = null;
97  	private long myMaxMillisToWaitForRemoteResults = DateUtils.MILLIS_PER_MINUTE;
98  	private boolean myNeverUseLocalSearchForUnitTests;
99  	@Autowired
100 	private ISearchDao mySearchDao;
101 	@Autowired
102 	private ISearchIncludeDao mySearchIncludeDao;
103 	@Autowired
104 	private ISearchResultDao mySearchResultDao;
105 	@Autowired
106 	private IInterceptorBroadcaster myInterceptorBroadcaster;
107 	@Autowired
108 	private PlatformTransactionManager myManagedTxManager;
109 	@Autowired
110 	private DaoRegistry myDaoRegistry;
111 	@Autowired
112 	private IPagingProvider myPagingProvider;
113 
114 	private int mySyncSize = DEFAULT_SYNC_SIZE;
115 	/**
116 	 * Set in {@link #start()}
117 	 */
118 	private boolean myCustomIsolationSupported;
119 
120 	/**
121 	 * Constructor
122 	 */
123 	public SearchCoordinatorSvcImpl() {
124 		CustomizableThreadFactory threadFactory = new CustomizableThreadFactory("search_coord_");
125 		myExecutor = Executors.newCachedThreadPool(threadFactory);
126 	}
127 
128 	@PostConstruct
129 	public void start() {
130 		if (myManagedTxManager instanceof JpaTransactionManager) {
131 			JpaDialect jpaDialect = ((JpaTransactionManager) myManagedTxManager).getJpaDialect();
132 			if (jpaDialect instanceof HibernateJpaDialect) {
133 				myCustomIsolationSupported = true;
134 			}
135 		}
136 		if (myCustomIsolationSupported == false) {
137 			ourLog.warn("JPA dialect does not support transaction isolation! This can have an impact on search performance.");
138 		}
139 	}
140 
141 	@Override
142 	public void cancelAllActiveSearches() {
143 		for (BaseTask next : myIdToSearchTask.values()) {
144 			next.requestImmediateAbort();
145 			try {
146 				next.getCompletionLatch().await(30, TimeUnit.SECONDS);
147 			} catch (InterruptedException e) {
148 				ourLog.warn("Failed to wait for completion", e);
149 			}
150 		}
151 	}
152 
153 	/**
154 	 * This method is called by the HTTP client processing thread in order to
155 	 * fetch resources.
156 	 */
157 	@Override
158 	@Transactional(propagation = Propagation.NEVER)
159 	public List<Long> getResources(final String theUuid, int theFrom, int theTo) {
160 		TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
161 		txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
162 
163 		Search search;
164 		StopWatch sw = new StopWatch();
165 		while (true) {
166 
167 			if (myNeverUseLocalSearchForUnitTests == false) {
168 				BaseTask task = myIdToSearchTask.get(theUuid);
169 				if (task != null) {
170 					ourLog.trace("Local search found");
171 					List<Long> resourcePids = task.getResourcePids(theFrom, theTo);
172 					if (resourcePids != null) {
173 						return resourcePids;
174 					}
175 				}
176 			}
177 
178 			search = txTemplate.execute(t -> mySearchDao.findByUuid(theUuid));
179 
180 			if (search == null) {
181 				ourLog.debug("Client requested unknown paging ID[{}]", theUuid);
182 				String msg = myContext.getLocalizer().getMessage(PageMethodBinding.class, "unknownSearchId", theUuid);
183 				throw new ResourceGoneException(msg);
184 			}
185 
186 			verifySearchHasntFailedOrThrowInternalErrorException(search);
187 			if (search.getStatus() == SearchStatusEnum.FINISHED) {
188 				ourLog.debug("Search entity marked as finished with {} results", search.getNumFound());
189 				break;
190 			}
191 			if (search.getNumFound() >= theTo) {
192 				ourLog.debug("Search entity has {} results so far", search.getNumFound());
193 				break;
194 			}
195 
196 			if (sw.getMillis() > myMaxMillisToWaitForRemoteResults) {
197 				ourLog.error("Search {} of type {} for {}{} timed out after {}ms", search.getId(), search.getSearchType(), search.getResourceType(), search.getSearchQueryString(), sw.getMillis());
198 				throw new InternalErrorException("Request timed out after " + sw.getMillis() + "ms");
199 			}
200 
201 			// If the search was saved in "pass complete mode" it's probably time to
202 			// start a new pass
203 			if (search.getStatus() == SearchStatusEnum.PASSCMPLET) {
204 				Optional<Search> newSearch = tryToMarkSearchAsInProgress(search);
205 				if (newSearch.isPresent()) {
206 					search = newSearch.get();
207 					String resourceType = search.getResourceType();
208 					SearchParameterMap params = search.getSearchParameterMap();
209 					IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(resourceType);
210 					SearchContinuationTask task = new SearchContinuationTask(search, resourceDao, params, resourceType);
211 					myIdToSearchTask.put(search.getUuid(), task);
212 					myExecutor.submit(task);
213 				}
214 			}
215 
216 			try {
217 				Thread.sleep(500);
218 			} catch (InterruptedException e) {
219 				// ignore
220 			}
221 		}
222 
223 		final Pageable page = toPage(theFrom, theTo);
224 		if (page == null) {
225 			return Collections.emptyList();
226 		}
227 
228 		final Search foundSearch = search;
229 
230 		ourLog.trace("Loading stored search");
231 		List<Long> retVal = txTemplate.execute(theStatus -> {
232 			final List<Long> resultPids = new ArrayList<>();
233 			Page<Long> searchResultPids = mySearchResultDao.findWithSearchUuid(foundSearch, page);
234 			for (Long next : searchResultPids) {
235 				resultPids.add(next);
236 			}
237 			return resultPids;
238 		});
239 		return retVal;
240 	}
241 
242 	private Optional<Search> tryToMarkSearchAsInProgress(Search theSearch) {
243 		ourLog.trace("Going to try to change search status from {} to {}", theSearch.getStatus(), SearchStatusEnum.LOADING);
244 		try {
245 			TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
246 			txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
247 			txTemplate.afterPropertiesSet();
248 			return txTemplate.execute(t -> {
249 				Search search = mySearchDao.findById(theSearch.getId()).orElse(theSearch);
250 
251 				if (search.getStatus() != SearchStatusEnum.PASSCMPLET) {
252 					throw new IllegalStateException("Can't change to LOADING because state is " + theSearch.getStatus());
253 				}
254 				search.setStatus(SearchStatusEnum.LOADING);
255 				Search newSearch = mySearchDao.save(search);
256 				return Optional.of(newSearch);
257 			});
258 		} catch (Exception e) {
259 			ourLog.warn("Failed to activate search: {}", e.toString());
260 			ourLog.trace("Failed to activate search", e);
261 			return Optional.empty();
262 		}
263 	}
264 
265 	private void populateBundleProvider(PersistedJpaBundleProvider theRetVal) {
266 		theRetVal.setContext(myContext);
267 		theRetVal.setEntityManager(myEntityManager);
268 		theRetVal.setPlatformTransactionManager(myManagedTxManager);
269 		theRetVal.setSearchDao(mySearchDao);
270 		theRetVal.setSearchCoordinatorSvc(this);
271 	}
272 
273 	@Override
274 	public IBundleProvider registerSearch(final IDao theCallingDao, final SearchParameterMap theParams, String theResourceType, CacheControlDirective theCacheControlDirective) {
275 		StopWatch w = new StopWatch();
276 		final String searchUuid = UUID.randomUUID().toString();
277 
278 		ourLog.debug("Registering new search {}", searchUuid);
279 
280 		Class<? extends IBaseResource> resourceTypeClass = myContext.getResourceDefinition(theResourceType).getImplementingClass();
281 		final ISearchBuilder sb = theCallingDao.newSearchBuilder();
282 		sb.setType(resourceTypeClass, theResourceType);
283 		sb.setFetchSize(mySyncSize);
284 
285 		final Integer loadSynchronousUpTo;
286 		if (theCacheControlDirective != null && theCacheControlDirective.isNoStore()) {
287 			if (theCacheControlDirective.getMaxResults() != null) {
288 				loadSynchronousUpTo = theCacheControlDirective.getMaxResults();
289 				if (loadSynchronousUpTo > myDaoConfig.getCacheControlNoStoreMaxResultsUpperLimit()) {
290 					throw new InvalidRequestException(Constants.HEADER_CACHE_CONTROL + " header " + Constants.CACHE_CONTROL_MAX_RESULTS + " value must not exceed " + myDaoConfig.getCacheControlNoStoreMaxResultsUpperLimit());
291 				}
292 			} else {
293 				loadSynchronousUpTo = 100;
294 			}
295 		} else {
296 			loadSynchronousUpTo = null;
297 		}
298 
299 		if (theParams.isLoadSynchronous() || loadSynchronousUpTo != null) {
300 
301 			ourLog.debug("Search {} is loading in synchronous mode", searchUuid);
302 			SearchRuntimeDetails searchRuntimeDetails = new SearchRuntimeDetails(searchUuid);
303 			searchRuntimeDetails.setLoadSynchronous(true);
304 
305 			// Execute the query and make sure we return distinct results
306 			TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
307 			txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
308 			return txTemplate.execute(t -> {
309 
310 				// Load the results synchronously
311 				final List<Long> pids = new ArrayList<>();
312 
313 				try (IResultIterator resultIter = sb.createQuery(theParams, searchRuntimeDetails)) {
314 					while (resultIter.hasNext()) {
315 						pids.add(resultIter.next());
316 						if (loadSynchronousUpTo != null && pids.size() >= loadSynchronousUpTo) {
317 							break;
318 						}
319 						if (theParams.getLoadSynchronousUpTo() != null && pids.size() >= theParams.getLoadSynchronousUpTo()) {
320 							break;
321 						}
322 					}
323 				} catch (IOException e) {
324 					ourLog.error("IO failure during database access", e);
325 					throw new InternalErrorException(e);
326 				}
327 
328 				/*
329 				 * For synchronous queries, we load all the includes right away
330 				 * since we're returning a static bundle with all the results
331 				 * pre-loaded. This is ok because syncronous requests are not
332 				 * expected to be paged
333 				 *
334 				 * On the other hand for async queries we load includes/revincludes
335 				 * individually for pages as we return them to clients
336 				 */
337 				final Set<Long> includedPids = new HashSet<>();
338 				includedPids.addAll(sb.loadIncludes(myContext, myEntityManager, pids, theParams.getRevIncludes(), true, theParams.getLastUpdated(), "(synchronous)"));
339 				includedPids.addAll(sb.loadIncludes(myContext, myEntityManager, pids, theParams.getIncludes(), false, theParams.getLastUpdated(), "(synchronous)"));
340 
341 				List<IBaseResource> resources = new ArrayList<>();
342 				sb.loadResourcesByPid(pids, resources, includedPids, false, myEntityManager, myContext, theCallingDao);
343 				return new SimpleBundleProvider(resources);
344 			});
345 		}
346 
347 		/*
348 		 * See if there are any cached searches whose results we can return
349 		 * instead
350 		 */
351 		boolean useCache = true;
352 		if (theCacheControlDirective != null && theCacheControlDirective.isNoCache() == true) {
353 			useCache = false;
354 		}
355 		final String queryString = theParams.toNormalizedQueryString(myContext);
356 		if (theParams.getEverythingMode() == null) {
357 			if (myDaoConfig.getReuseCachedSearchResultsForMillis() != null && useCache) {
358 
359 				final Date createdCutoff = new Date(System.currentTimeMillis() - myDaoConfig.getReuseCachedSearchResultsForMillis());
360 				final String resourceType = theResourceType;
361 
362 				TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
363 				PersistedJpaBundleProvider foundSearchProvider = txTemplate.execute(t -> {
364 					Search searchToUse = null;
365 
366 					int hashCode = queryString.hashCode();
367 					Collection<Search> candidates = mySearchDao.find(resourceType, hashCode, createdCutoff);
368 					for (Search nextCandidateSearch : candidates) {
369 						if (queryString.equals(nextCandidateSearch.getSearchQueryString())) {
370 							searchToUse = nextCandidateSearch;
371 						}
372 					}
373 
374 					PersistedJpaBundleProvider retVal = null;
375 					if (searchToUse != null) {
376 						ourLog.debug("Reusing search {} from cache", searchToUse.getUuid());
377 						searchToUse.setSearchLastReturned(new Date());
378 						mySearchDao.updateSearchLastReturned(searchToUse.getId(), new Date());
379 
380 						retVal = new PersistedJpaBundleProvider(searchToUse.getUuid(), theCallingDao);
381 						retVal.setCacheHit(true);
382 
383 						populateBundleProvider(retVal);
384 					}
385 
386 					return retVal;
387 				});
388 
389 				if (foundSearchProvider != null) {
390 					return foundSearchProvider;
391 				}
392 
393 			}
394 		}
395 
396 		Search search = new Search();
397 		populateSearchEntity(theParams, theResourceType, searchUuid, queryString, search);
398 
399 		SearchTask task = new SearchTask(search, theCallingDao, theParams, theResourceType);
400 		myIdToSearchTask.put(search.getUuid(), task);
401 		myExecutor.submit(task);
402 
403 		PersistedJpaSearchFirstPageBundleProvider retVal = new PersistedJpaSearchFirstPageBundleProvider(search, theCallingDao, task, sb, myManagedTxManager);
404 		populateBundleProvider(retVal);
405 
406 		ourLog.debug("Search initial phase completed in {}ms", w.getMillis());
407 		return retVal;
408 
409 	}
410 
411 	@VisibleForTesting
412 	void setContextForUnitTest(FhirContext theCtx) {
413 		myContext = theCtx;
414 	}
415 
416 	@VisibleForTesting
417 	void setDaoConfigForUnitTest(DaoConfig theDaoConfig) {
418 		myDaoConfig = theDaoConfig;
419 	}
420 
421 	@VisibleForTesting
422 	void setEntityManagerForUnitTest(EntityManager theEntityManager) {
423 		myEntityManager = theEntityManager;
424 	}
425 
426 	@VisibleForTesting
427 	public void setLoadingThrottleForUnitTests(Integer theLoadingThrottleForUnitTests) {
428 		myLoadingThrottleForUnitTests = theLoadingThrottleForUnitTests;
429 	}
430 
431 	@VisibleForTesting
432 	void setMaxMillisToWaitForRemoteResultsForUnitTest(long theMaxMillisToWaitForRemoteResults) {
433 		myMaxMillisToWaitForRemoteResults = theMaxMillisToWaitForRemoteResults;
434 	}
435 
436 	@VisibleForTesting
437 	public void setNeverUseLocalSearchForUnitTests(boolean theNeverUseLocalSearchForUnitTests) {
438 		myNeverUseLocalSearchForUnitTests = theNeverUseLocalSearchForUnitTests;
439 	}
440 
441 	@VisibleForTesting
442 	void setSearchDaoForUnitTest(ISearchDao theSearchDao) {
443 		mySearchDao = theSearchDao;
444 	}
445 
446 	@VisibleForTesting
447 	void setSearchDaoIncludeForUnitTest(ISearchIncludeDao theSearchIncludeDao) {
448 		mySearchIncludeDao = theSearchIncludeDao;
449 	}
450 
451 	@VisibleForTesting
452 	void setSearchDaoResultForUnitTest(ISearchResultDao theSearchResultDao) {
453 		mySearchResultDao = theSearchResultDao;
454 	}
455 
456 	@VisibleForTesting
457 	public void setSyncSizeForUnitTests(int theSyncSize) {
458 		mySyncSize = theSyncSize;
459 	}
460 
461 	@VisibleForTesting
462 	void setTransactionManagerForUnitTest(PlatformTransactionManager theTxManager) {
463 		myManagedTxManager = theTxManager;
464 	}
465 
466 	@VisibleForTesting
467 	void setDaoRegistryForUnitTest(DaoRegistry theDaoRegistry) {
468 		myDaoRegistry = theDaoRegistry;
469 	}
470 
471 	@VisibleForTesting
472 	void setInterceptorBroadcasterForUnitTest(IInterceptorBroadcaster theInterceptorBroadcaster) {
473 		myInterceptorBroadcaster = theInterceptorBroadcaster;
474 	}
475 
476 	public abstract class BaseTask implements Callable<Void> {
477 		private final SearchParameterMap myParams;
478 		private final IDao myCallingDao;
479 		private final String myResourceType;
480 		private final ArrayList<Long> mySyncedPids = new ArrayList<>();
481 		private final CountDownLatch myInitialCollectionLatch = new CountDownLatch(1);
482 		private final CountDownLatch myCompletionLatch;
483 		private final ArrayList<Long> myUnsyncedPids = new ArrayList<>();
484 		private Search mySearch;
485 		private boolean myAbortRequested;
486 		private int myCountSavedTotal = 0;
487 		private int myCountSavedThisPass = 0;
488 		private boolean myAdditionalPrefetchThresholdsRemaining;
489 		private List<Long> myPreviouslyAddedResourcePids;
490 		private Integer myMaxResultsToFetch;
491 		private SearchRuntimeDetails mySearchRuntimeDetails;
492 
493 		/**
494 		 * Constructor
495 		 */
496 		protected BaseTask(Search theSearch, IDao theCallingDao, SearchParameterMap theParams, String theResourceType) {
497 			mySearch = theSearch;
498 			myCallingDao = theCallingDao;
499 			myParams = theParams;
500 			myResourceType = theResourceType;
501 			myCompletionLatch = new CountDownLatch(1);
502 			mySearchRuntimeDetails = new SearchRuntimeDetails(mySearch.getUuid());
503 			mySearchRuntimeDetails.setQueryString(theParams.toNormalizedQueryString(theCallingDao.getContext()));
504 		}
505 
506 		public SearchRuntimeDetails getSearchRuntimeDetails() {
507 			return mySearchRuntimeDetails;
508 		}
509 
510 		protected Search getSearch() {
511 			return mySearch;
512 		}
513 
514 		protected CountDownLatch getInitialCollectionLatch() {
515 			return myInitialCollectionLatch;
516 		}
517 
518 		protected void setPreviouslyAddedResourcePids(List<Long> thePreviouslyAddedResourcePids) {
519 			myPreviouslyAddedResourcePids = thePreviouslyAddedResourcePids;
520 			myCountSavedTotal = myPreviouslyAddedResourcePids.size();
521 		}
522 
523 		private ISearchBuilder newSearchBuilder() {
524 			Class<? extends IBaseResource> resourceTypeClass = myContext.getResourceDefinition(myResourceType).getImplementingClass();
525 			ISearchBuilder sb = myCallingDao.newSearchBuilder();
526 			sb.setType(resourceTypeClass, myResourceType);
527 
528 			return sb;
529 		}
530 
531 		public List<Long> getResourcePids(int theFromIndex, int theToIndex) {
532 			ourLog.debug("Requesting search PIDs from {}-{}", theFromIndex, theToIndex);
533 
534 			boolean keepWaiting;
535 			do {
536 				synchronized (mySyncedPids) {
537 					ourLog.trace("Search status is {}", mySearch.getStatus());
538 					boolean haveEnoughResults = mySyncedPids.size() >= theToIndex;
539 					if (!haveEnoughResults) {
540 						switch (mySearch.getStatus()) {
541 							case LOADING:
542 								keepWaiting = true;
543 								break;
544 							case PASSCMPLET:
545 								/*
546 								 * If we get here, it means that the user requested resources that crossed the
547 								 * current pre-fetch boundary. For example, if the prefetch threshold is 50 and the
548 								 * user has requested resources 0-60, then they would get 0-50 back but the search
549 								 * coordinator would then stop searching.SearchCoordinatorSvcImplTest
550 								 */
551 								keepWaiting = false;
552 								break;
553 							case FAILED:
554 							case FINISHED:
555 							default:
556 								keepWaiting = false;
557 								break;
558 						}
559 					} else {
560 						keepWaiting = false;
561 					}
562 				}
563 
564 				if (keepWaiting) {
565 					ourLog.info("Waiting as we only have {} results - Search status: {}", mySyncedPids.size(), mySearch.getStatus());
566 					try {
567 						Thread.sleep(500);
568 					} catch (InterruptedException theE) {
569 						// ignore
570 					}
571 				}
572 			} while (keepWaiting);
573 
574 			ourLog.debug("Proceeding, as we have {} results", mySyncedPids.size());
575 
576 			ArrayList<Long> retVal = new ArrayList<>();
577 			synchronized (mySyncedPids) {
578 				verifySearchHasntFailedOrThrowInternalErrorException(mySearch);
579 
580 				int toIndex = theToIndex;
581 				if (mySyncedPids.size() < toIndex) {
582 					toIndex = mySyncedPids.size();
583 				}
584 				for (int i = theFromIndex; i < toIndex; i++) {
585 					retVal.add(mySyncedPids.get(i));
586 				}
587 			}
588 
589 			ourLog.trace("Done syncing results - Wanted {}-{} and returning {} of {}", theFromIndex, theToIndex, retVal.size(), mySyncedPids.size());
590 
591 			return retVal;
592 		}
593 
594 		protected void saveSearch() {
595 			TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
596 			txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
597 			txTemplate.execute(new TransactionCallbackWithoutResult() {
598 				@Override
599 				protected void doInTransactionWithoutResult(TransactionStatus theArg0) {
600 					doSaveSearch();
601 				}
602 
603 			});
604 		}
605 
606 		private void saveUnsynced(final IResultIterator theResultIter) {
607 			TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
608 			txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
609 			txTemplate.execute(new TransactionCallbackWithoutResult() {
610 				@Override
611 				protected void doInTransactionWithoutResult(TransactionStatus theArg0) {
612 					if (mySearch.getId() == null) {
613 						doSaveSearch();
614 					}
615 
616 					List<SearchResult> resultsToSave = Lists.newArrayList();
617 					for (Long nextPid : myUnsyncedPids) {
618 						SearchResult nextResult = new SearchResult(mySearch);
619 						nextResult.setResourcePid(nextPid);
620 						nextResult.setOrder(myCountSavedTotal);
621 						resultsToSave.add(nextResult);
622 						int order = nextResult.getOrder();
623 						ourLog.trace("Saving ORDER[{}] Resource {}", order, nextResult.getResourcePid());
624 
625 						myCountSavedTotal++;
626 						myCountSavedThisPass++;
627 					}
628 
629 					mySearchResultDao.saveAll(resultsToSave);
630 
631 					synchronized (mySyncedPids) {
632 						int numSyncedThisPass = myUnsyncedPids.size();
633 						ourLog.trace("Syncing {} search results - Have more: {}", numSyncedThisPass, theResultIter.hasNext());
634 						mySyncedPids.addAll(myUnsyncedPids);
635 						myUnsyncedPids.clear();
636 
637 						if (theResultIter.hasNext() == false) {
638 							mySearch.setNumFound(myCountSavedTotal);
639 							int skippedCount = theResultIter.getSkippedCount();
640 							int totalFetched = skippedCount + myCountSavedThisPass;
641 							ourLog.trace("MaxToFetch[{}] SkippedCount[{}] CountSavedThisPass[{}] CountSavedThisTotal[{}] AdditionalPrefetchRemaining[{}]", myMaxResultsToFetch, skippedCount, myCountSavedThisPass, myCountSavedTotal, myAdditionalPrefetchThresholdsRemaining);
642 
643 							if (myMaxResultsToFetch != null && totalFetched < myMaxResultsToFetch) {
644 								ourLog.trace("Setting search status to FINISHED");
645 								mySearch.setStatus(SearchStatusEnum.FINISHED);
646 								mySearch.setTotalCount(myCountSavedTotal);
647 							} else if (myAdditionalPrefetchThresholdsRemaining) {
648 								ourLog.trace("Setting search status to PASSCMPLET");
649 								mySearch.setStatus(SearchStatusEnum.PASSCMPLET);
650 								mySearch.setSearchParameterMap(myParams);
651 							} else {
652 								ourLog.trace("Setting search status to FINISHED");
653 								mySearch.setStatus(SearchStatusEnum.FINISHED);
654 								mySearch.setTotalCount(myCountSavedTotal);
655 							}
656 						}
657 					}
658 
659 					mySearch.setNumFound(myCountSavedTotal);
660 
661 					int numSynced;
662 					synchronized (mySyncedPids) {
663 						numSynced = mySyncedPids.size();
664 					}
665 
666 					if (myDaoConfig.getCountSearchResultsUpTo() == null ||
667 						myDaoConfig.getCountSearchResultsUpTo() <= 0 ||
668 						myDaoConfig.getCountSearchResultsUpTo() <= numSynced) {
669 						myInitialCollectionLatch.countDown();
670 					}
671 
672 					doSaveSearch();
673 
674 				}
675 			});
676 
677 		}
678 
679 		public boolean isNotAborted() {
680 			return myAbortRequested == false;
681 		}
682 
683 		protected void markComplete() {
684 			myCompletionLatch.countDown();
685 		}
686 
687 		public CountDownLatch getCompletionLatch() {
688 			return myCompletionLatch;
689 		}
690 
691 		/**
692 		 * Request that the task abort as soon as possible
693 		 */
694 		public void requestImmediateAbort() {
695 			myAbortRequested = true;
696 		}
697 
698 		/**
699 		 * This is the method which actually performs the search.
700 		 * It is called automatically by the thread pool.
701 		 */
702 		@Override
703 		public Void call() {
704 			StopWatch sw = new StopWatch();
705 
706 			try {
707 				// Create an initial search in the DB and give it an ID
708 				saveSearch();
709 
710 				TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
711 				txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
712 
713 				if (myCustomIsolationSupported) {
714 					txTemplate.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
715 				}
716 
717 				txTemplate.execute(new TransactionCallbackWithoutResult() {
718 					@Override
719 					protected void doInTransactionWithoutResult(TransactionStatus theStatus) {
720 						doSearch();
721 					}
722 				});
723 
724 				mySearchRuntimeDetails.setSearchStatus(mySearch.getStatus());
725 				if (mySearch.getStatus() == SearchStatusEnum.FINISHED) {
726 					HookParams params = new HookParams().add(SearchRuntimeDetails.class, mySearchRuntimeDetails);
727 					myInterceptorBroadcaster.callHooks(Pointcut.JPA_PERFTRACE_SEARCH_COMPLETE, params);
728 				} else {
729 					HookParams params = new HookParams().add(SearchRuntimeDetails.class, mySearchRuntimeDetails);
730 					myInterceptorBroadcaster.callHooks(Pointcut.JPA_PERFTRACE_SEARCH_PASS_COMPLETE, params);
731 				}
732 
733 				ourLog.info("Have completed search for [{}{}] and found {} resources in {}ms - Status is {}", mySearch.getResourceType(), mySearch.getSearchQueryString(), mySyncedPids.size(), sw.getMillis(), mySearch.getStatus());
734 
735 			} catch (Throwable t) {
736 
737 				/*
738 				 * Don't print a stack trace for client errors (i.e. requests that
739 				 * aren't valid because the client screwed up).. that's just noise
740 				 * in the logs and who needs that.
741 				 */
742 				boolean logged = false;
743 				if (t instanceof BaseServerResponseException) {
744 					BaseServerResponseException exception = (BaseServerResponseException) t;
745 					if (exception.getStatusCode() >= 400 && exception.getStatusCode() < 500) {
746 						logged = true;
747 						ourLog.warn("Failed during search due to invalid request: {}", t.toString());
748 					}
749 				}
750 
751 				if (!logged) {
752 					ourLog.error("Failed during search loading after {}ms", sw.getMillis(), t);
753 				}
754 				myUnsyncedPids.clear();
755 
756 				Throwable rootCause = ExceptionUtils.getRootCause(t);
757 				rootCause = defaultIfNull(rootCause, t);
758 
759 				String failureMessage = rootCause.getMessage();
760 
761 				int failureCode = InternalErrorException.STATUS_CODE;
762 				if (t instanceof BaseServerResponseException) {
763 					failureCode = ((BaseServerResponseException) t).getStatusCode();
764 				}
765 
766 				mySearch.setFailureMessage(failureMessage);
767 				mySearch.setFailureCode(failureCode);
768 				mySearch.setStatus(SearchStatusEnum.FAILED);
769 
770 				mySearchRuntimeDetails.setSearchStatus(mySearch.getStatus());
771 				HookParams params = new HookParams().add(SearchRuntimeDetails.class, mySearchRuntimeDetails);
772 				myInterceptorBroadcaster.callHooks(Pointcut.JPA_PERFTRACE_SEARCH_FAILED, params);
773 
774 				saveSearch();
775 
776 			} finally {
777 
778 				myIdToSearchTask.remove(mySearch.getUuid());
779 				myInitialCollectionLatch.countDown();
780 				markComplete();
781 
782 			}
783 			return null;
784 		}
785 
786 		private void doSaveSearch() {
787 
788 			Search newSearch;
789 			if (mySearch.getId() == null) {
790 				newSearch = mySearchDao.save(mySearch);
791 				for (SearchInclude next : mySearch.getIncludes()) {
792 					mySearchIncludeDao.save(next);
793 				}
794 			} else {
795 				newSearch = mySearchDao.save(mySearch);
796 			}
797 
798 			// mySearchDao.save is not supposed to return null, but in unit tests
799 			// it can if the mock search dao isn't set up to handle that
800 			if (newSearch != null) {
801 				mySearch = newSearch;
802 			}
803 		}
804 
805 		/**
806 		 * This method actually creates the database query to perform the
807 		 * search, and starts it.
808 		 */
809 		private void doSearch() {
810 
811 			/*
812 			 * If the user has explicitly requested a _count, perform a
813 			 *
814 			 * SELECT COUNT(*) ....
815 			 *
816 			 * before doing anything else.
817 			 */
818 			boolean wantOnlyCount = SummaryEnum.COUNT.equals(myParams.getSummaryMode());
819 			boolean wantCount =
820 				wantOnlyCount ||
821 					SearchTotalModeEnum.ACCURATE.equals(myParams.getSearchTotalMode()) ||
822 					(myParams.getSearchTotalMode() == null && SearchTotalModeEnum.ACCURATE.equals(myDaoConfig.getDefaultTotalMode()));
823 			if (wantCount) {
824 				ourLog.trace("Performing count");
825 				ISearchBuilder sb = newSearchBuilder();
826 				Iterator<Long> countIterator = sb.createCountQuery(myParams, mySearch.getUuid());
827 				Long count = countIterator.next();
828 				ourLog.trace("Got count {}", count);
829 
830 				TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
831 				txTemplate.execute(new TransactionCallbackWithoutResult() {
832 					@Override
833 					protected void doInTransactionWithoutResult(TransactionStatus theArg0) {
834 						mySearch.setTotalCount(count.intValue());
835 						if (wantOnlyCount) {
836 							mySearch.setStatus(SearchStatusEnum.FINISHED);
837 						}
838 						doSaveSearch();
839 						mySearchDao.flush();
840 					}
841 				});
842 				if (wantOnlyCount) {
843 					return;
844 				}
845 			}
846 
847 			ourLog.trace("Done count");
848 			ISearchBuilder sb = newSearchBuilder();
849 
850 			/*
851 			 * Figure out how many results we're actually going to fetch from the
852 			 * database in this pass. This calculation takes into consideration the
853 			 * "pre-fetch thresholds" specified in DaoConfig#getSearchPreFetchThresholds()
854 			 * as well as the value of the _count parameter.
855 			 */
856 			int currentlyLoaded = defaultIfNull(mySearch.getNumFound(), 0);
857 			int minWanted = 0;
858 			if (myParams.getCount() != null) {
859 				minWanted = myParams.getCount();
860 				minWanted = Math.max(minWanted, myPagingProvider.getMaximumPageSize());
861 				minWanted += currentlyLoaded;
862 			}
863 
864 			for (Iterator<Integer> iter = myDaoConfig.getSearchPreFetchThresholds().iterator(); iter.hasNext(); ) {
865 				int next = iter.next();
866 				if (next != -1 && next <= currentlyLoaded) {
867 					continue;
868 				}
869 
870 				if (next == -1) {
871 					sb.setMaxResultsToFetch(null);
872 				} else {
873 					myMaxResultsToFetch = Math.max(next, minWanted);
874 					sb.setMaxResultsToFetch(myMaxResultsToFetch);
875 				}
876 
877 				if (iter.hasNext()) {
878 					myAdditionalPrefetchThresholdsRemaining = true;
879 				}
880 
881 				// If we get here's we've found an appropriate threshold
882 				break;
883 			}
884 
885 			/*
886 			 * Provide any PID we loaded in previous seasrch passes to the
887 			 * SearchBuilder so that we don't get duplicates coming from running
888 			 * the same query again.
889 			 *
890 			 * We could possibly accomplish this in a different way by using sorted
891 			 * results in our SQL query and specifying an offset. I don't actually
892 			 * know if that would be faster or not. At some point should test this
893 			 * idea.
894 			 */
895 			if (myPreviouslyAddedResourcePids != null) {
896 				sb.setPreviouslyAddedResourcePids(myPreviouslyAddedResourcePids);
897 				mySyncedPids.addAll(myPreviouslyAddedResourcePids);
898 			}
899 
900 			/*
901 			 * Construct the SQL query we'll be sending to the database
902 			 */
903 			try (IResultIterator resultIterator = sb.createQuery(myParams, mySearchRuntimeDetails)) {
904 				assert (resultIterator != null);
905 
906 				/*
907 				 * The following loop actually loads the PIDs of the resources
908 				 * matching the search off of the disk and into memory. After
909 				 * every X results, we commit to the HFJ_SEARCH table.
910 				 */
911 				int syncSize = mySyncSize;
912 				while (resultIterator.hasNext()) {
913 					myUnsyncedPids.add(resultIterator.next());
914 
915 					boolean shouldSync = myUnsyncedPids.size() >= syncSize;
916 
917 					if (myDaoConfig.getCountSearchResultsUpTo() != null &&
918 						myDaoConfig.getCountSearchResultsUpTo() > 0 &&
919 						myDaoConfig.getCountSearchResultsUpTo() < myUnsyncedPids.size()) {
920 						shouldSync = false;
921 					}
922 
923 					if (myUnsyncedPids.size() > 50000) {
924 						shouldSync = true;
925 					}
926 
927 					// If no abort was requested, bail out
928 					Validate.isTrue(isNotAborted(), "Abort has been requested");
929 
930 					if (shouldSync) {
931 						saveUnsynced(resultIterator);
932 					}
933 
934 					if (myLoadingThrottleForUnitTests != null) {
935 						try {
936 							Thread.sleep(myLoadingThrottleForUnitTests);
937 						} catch (InterruptedException e) {
938 							// ignore
939 						}
940 					}
941 
942 				}
943 
944 				// If no abort was requested, bail out
945 				Validate.isTrue(isNotAborted(), "Abort has been requested");
946 
947 				saveUnsynced(resultIterator);
948 
949 			} catch (IOException e) {
950 				ourLog.error("IO failure during database access", e);
951 				throw new InternalErrorException(e);
952 			}
953 		}
954 	}
955 
956 
957 	public class SearchContinuationTask extends BaseTask {
958 
959 		public SearchContinuationTask(Search theSearch, IDao theCallingDao, SearchParameterMap theParams, String theResourceType) {
960 			super(theSearch, theCallingDao, theParams, theResourceType);
961 		}
962 
963 		@Override
964 		public Void call() {
965 			try {
966 				TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
967 				txTemplate.afterPropertiesSet();
968 				txTemplate.execute(t -> {
969 					List<Long> previouslyAddedResourcePids = mySearchResultDao.findWithSearchUuidOrderIndependent(getSearch());
970 					ourLog.debug("Have {} previously added IDs in search: {}", previouslyAddedResourcePids.size(), getSearch().getUuid());
971 					setPreviouslyAddedResourcePids(previouslyAddedResourcePids);
972 					return null;
973 				});
974 			} catch (Throwable e) {
975 				ourLog.error("Failure processing search", e);
976 				getSearch().setFailureMessage(e.toString());
977 				getSearch().setStatus(SearchStatusEnum.FAILED);
978 
979 				saveSearch();
980 				return null;
981 			}
982 
983 			return super.call();
984 		}
985 
986 		@Override
987 		public List<Long> getResourcePids(int theFromIndex, int theToIndex) {
988 			return super.getResourcePids(theFromIndex, theToIndex);
989 		}
990 	}
991 
992 	/**
993 	 * A search task is a Callable task that runs in
994 	 * a thread pool to handle an individual search. One instance
995 	 * is created for any requested search and runs from the
996 	 * beginning to the end of the search.
997 	 * <p>
998 	 * Understand:
999 	 * This class executes in its own thread separate from the
1000 	 * web server client thread that made the request. We do that
1001 	 * so that we can return to the client as soon as possible,
1002 	 * but keep the search going in the background (and have
1003 	 * the next page of results ready to go when the client asks).
1004 	 */
1005 	public class SearchTask extends BaseTask {
1006 
1007 		/**
1008 		 * Constructor
1009 		 */
1010 		public SearchTask(Search theSearch, IDao theCallingDao, SearchParameterMap theParams, String theResourceType) {
1011 			super(theSearch, theCallingDao, theParams, theResourceType);
1012 		}
1013 
1014 		/**
1015 		 * This method is called by the server HTTP thread, and
1016 		 * will block until at least one page of results have been
1017 		 * fetched from the DB, and will never block after that.
1018 		 */
1019 		public Integer awaitInitialSync() {
1020 			ourLog.trace("Awaiting initial sync");
1021 			do {
1022 				try {
1023 					if (getInitialCollectionLatch().await(250, TimeUnit.MILLISECONDS)) {
1024 						break;
1025 					}
1026 				} catch (InterruptedException e) {
1027 					// Shouldn't happen
1028 					throw new InternalErrorException(e);
1029 				}
1030 			} while (getSearch().getStatus() == SearchStatusEnum.LOADING);
1031 			ourLog.trace("Initial sync completed");
1032 
1033 			return getSearch().getTotalCount();
1034 		}
1035 
1036 	}
1037 
1038 	public static void populateSearchEntity(SearchParameterMap theParams, String theResourceType, String theSearchUuid, String theQueryString, Search theSearch) {
1039 		theSearch.setDeleted(false);
1040 		theSearch.setUuid(theSearchUuid);
1041 		theSearch.setCreated(new Date());
1042 		theSearch.setSearchLastReturned(new Date());
1043 		theSearch.setTotalCount(null);
1044 		theSearch.setNumFound(0);
1045 		theSearch.setPreferredPageSize(theParams.getCount());
1046 		theSearch.setSearchType(theParams.getEverythingMode() != null ? SearchTypeEnum.EVERYTHING : SearchTypeEnum.SEARCH);
1047 		theSearch.setLastUpdated(theParams.getLastUpdated());
1048 		theSearch.setResourceType(theResourceType);
1049 		theSearch.setStatus(SearchStatusEnum.LOADING);
1050 
1051 		theSearch.setSearchQueryString(theQueryString);
1052 		theSearch.setSearchQueryStringHash(theQueryString.hashCode());
1053 
1054 		for (Include next : theParams.getIncludes()) {
1055 			theSearch.addInclude(new SearchInclude(theSearch, next.getValue(), false, next.isRecurse()));
1056 		}
1057 		for (Include next : theParams.getRevIncludes()) {
1058 			theSearch.addInclude(new SearchInclude(theSearch, next.getValue(), true, next.isRecurse()));
1059 		}
1060 	}
1061 
1062 	/**
1063 	 * Creates a {@link Pageable} using a start and end index
1064 	 */
1065 	@SuppressWarnings("WeakerAccess")
1066 	public static @Nullable
1067 	Pageable toPage(final int theFromIndex, int theToIndex) {
1068 		int pageSize = theToIndex - theFromIndex;
1069 		if (pageSize < 1) {
1070 			return null;
1071 		}
1072 
1073 		int pageIndex = theFromIndex / pageSize;
1074 
1075 		Pageable page = new AbstractPageRequest(pageIndex, pageSize) {
1076 			private static final long serialVersionUID = 1L;
1077 
1078 			@Override
1079 			public long getOffset() {
1080 				return theFromIndex;
1081 			}
1082 
1083 			@Override
1084 			public Sort getSort() {
1085 				return Sort.unsorted();
1086 			}
1087 
1088 			@Override
1089 			public Pageable next() {
1090 				return null;
1091 			}
1092 
1093 			@Override
1094 			public Pageable previous() {
1095 				return null;
1096 			}
1097 
1098 			@Override
1099 			public Pageable first() {
1100 				return null;
1101 			}
1102 		};
1103 
1104 		return page;
1105 	}
1106 
1107 	static void verifySearchHasntFailedOrThrowInternalErrorException(Search theSearch) {
1108 		if (theSearch.getStatus() == SearchStatusEnum.FAILED) {
1109 			Integer status = theSearch.getFailureCode();
1110 			status = defaultIfNull(status, 500);
1111 
1112 			String message = theSearch.getFailureMessage();
1113 			throw BaseServerResponseException.newInstance(status, message);
1114 		}
1115 	}
1116 
1117 }