View Javadoc
1   package ca.uhn.fhir.jpa.search;
2   
3   /*-
4    * #%L
5    * HAPI FHIR JPA Server
6    * %%
7    * Copyright (C) 2014 - 2018 University Health Network
8    * %%
9    * Licensed under the Apache License, Version 2.0 (the "License");
10   * you may not use this file except in compliance with the License.
11   * You may obtain a copy of the License at
12   * 
13   *      http://www.apache.org/licenses/LICENSE-2.0
14   * 
15   * Unless required by applicable law or agreed to in writing, software
16   * distributed under the License is distributed on an "AS IS" BASIS,
17   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18   * See the License for the specific language governing permissions and
19   * limitations under the License.
20   * #L%family
21   */
22  
23  import ca.uhn.fhir.context.FhirContext;
24  import ca.uhn.fhir.jpa.dao.*;
25  import ca.uhn.fhir.jpa.dao.data.ISearchDao;
26  import ca.uhn.fhir.jpa.dao.data.ISearchIncludeDao;
27  import ca.uhn.fhir.jpa.dao.data.ISearchResultDao;
28  import ca.uhn.fhir.jpa.entity.*;
29  import ca.uhn.fhir.model.api.Include;
30  import ca.uhn.fhir.rest.api.CacheControlDirective;
31  import ca.uhn.fhir.rest.api.Constants;
32  import ca.uhn.fhir.rest.api.SearchTotalModeEnum;
33  import ca.uhn.fhir.rest.api.SummaryEnum;
34  import ca.uhn.fhir.rest.api.server.IBundleProvider;
35  import ca.uhn.fhir.rest.server.IPagingProvider;
36  import ca.uhn.fhir.rest.server.SimpleBundleProvider;
37  import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException;
38  import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
39  import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
40  import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
41  import ca.uhn.fhir.rest.server.method.PageMethodBinding;
42  import ca.uhn.fhir.util.StopWatch;
43  import com.google.common.annotations.VisibleForTesting;
44  import com.google.common.collect.Lists;
45  import org.apache.commons.lang3.Validate;
46  import org.apache.commons.lang3.exception.ExceptionUtils;
47  import org.apache.commons.lang3.time.DateUtils;
48  import org.hl7.fhir.instance.model.api.IBaseResource;
49  import org.springframework.beans.factory.annotation.Autowired;
50  import org.springframework.data.domain.*;
51  import org.springframework.orm.jpa.JpaDialect;
52  import org.springframework.orm.jpa.JpaTransactionManager;
53  import org.springframework.orm.jpa.vendor.HibernateJpaDialect;
54  import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
55  import org.springframework.transaction.PlatformTransactionManager;
56  import org.springframework.transaction.TransactionDefinition;
57  import org.springframework.transaction.TransactionStatus;
58  import org.springframework.transaction.annotation.Propagation;
59  import org.springframework.transaction.annotation.Transactional;
60  import org.springframework.transaction.support.TransactionCallbackWithoutResult;
61  import org.springframework.transaction.support.TransactionTemplate;
62  
63  import javax.annotation.Nullable;
64  import javax.annotation.PostConstruct;
65  import javax.persistence.EntityManager;
66  import java.util.*;
67  import java.util.concurrent.*;
68  
69  import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
70  
71  public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
72  	public static final int DEFAULT_SYNC_SIZE = 250;
73  
74  	private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchCoordinatorSvcImpl.class);
75  	private final ConcurrentHashMap<String, BaseTask> myIdToSearchTask = new ConcurrentHashMap<>();
76  	@Autowired
77  	private FhirContext myContext;
78  	@Autowired
79  	private DaoConfig myDaoConfig;
80  	@Autowired
81  	private EntityManager myEntityManager;
82  	private ExecutorService myExecutor;
83  	private Integer myLoadingThrottleForUnitTests = null;
84  	private long myMaxMillisToWaitForRemoteResults = DateUtils.MILLIS_PER_MINUTE;
85  	private boolean myNeverUseLocalSearchForUnitTests;
86  	@Autowired
87  	private ISearchDao mySearchDao;
88  	@Autowired
89  	private ISearchIncludeDao mySearchIncludeDao;
90  	@Autowired
91  	private ISearchResultDao mySearchResultDao;
92  	@Autowired
93  	private PlatformTransactionManager myManagedTxManager;
94  	@Autowired
95  	private DaoRegistry myDaoRegistry;
96  	@Autowired
97  	private IPagingProvider myPagingProvider;
98  
99  	private int mySyncSize = DEFAULT_SYNC_SIZE;
100 	/** Set in {@link #start()} */
101 	private boolean myCustomIsolationSupported;
102 
103 	@PostConstruct
104 	public void start() {
105 		if (myManagedTxManager instanceof JpaTransactionManager) {
106 			JpaDialect jpaDialect = ((JpaTransactionManager) myManagedTxManager).getJpaDialect();
107 			if (jpaDialect instanceof HibernateJpaDialect) {
108 				myCustomIsolationSupported = true;
109 			}
110 		}
111 		if (myCustomIsolationSupported == false) {
112 			ourLog.warn("JPA dialect does not support transaction isolation! This can have an impact on search performance.");
113 		}
114 	}
115 
116 	/**
117 	 * Constructor
118 	 */
119 	public SearchCoordinatorSvcImpl() {
120 		CustomizableThreadFactory threadFactory = new CustomizableThreadFactory("search_coord_");
121 		myExecutor = Executors.newCachedThreadPool(threadFactory);
122 	}
123 
124 	@Override
125 	public void cancelAllActiveSearches() {
126 		for (BaseTask next : myIdToSearchTask.values()) {
127 			next.requestImmediateAbort();
128 			try {
129 				next.getCompletionLatch().await(30, TimeUnit.SECONDS);
130 			} catch (InterruptedException e) {
131 				ourLog.warn("Failed to wait for completion", e);
132 			}
133 		}
134 	}
135 
136 	/**
137 	 * This method is called by the HTTP client processing thread in order to
138 	 * fetch resources.
139 	 */
140 	@Override
141 	@Transactional(propagation = Propagation.NEVER)
142 	public List<Long> getResources(final String theUuid, int theFrom, int theTo) {
143 		TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
144 		txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
145 
146 		Search search;
147 		StopWatch sw = new StopWatch();
148 		while (true) {
149 
150 			if (myNeverUseLocalSearchForUnitTests == false) {
151 				BaseTask task = myIdToSearchTask.get(theUuid);
152 				if (task != null) {
153 					ourLog.trace("Local search found");
154 					List<Long> resourcePids = task.getResourcePids(theFrom, theTo);
155 					if (resourcePids != null) {
156 						return resourcePids;
157 					}
158 				}
159 			}
160 
161 			search = txTemplate.execute(t -> mySearchDao.findByUuid(theUuid));
162 
163 			if (search == null) {
164 				ourLog.info("Client requested unknown paging ID[{}]", theUuid);
165 				String msg = myContext.getLocalizer().getMessage(PageMethodBinding.class, "unknownSearchId", theUuid);
166 				throw new ResourceGoneException(msg);
167 			}
168 
169 			verifySearchHasntFailedOrThrowInternalErrorException(search);
170 			if (search.getStatus() == SearchStatusEnum.FINISHED) {
171 				ourLog.info("Search entity marked as finished with {} results", search.getNumFound());
172 				break;
173 			}
174 			if (search.getNumFound() >= theTo) {
175 				ourLog.info("Search entity has {} results so far", search.getNumFound());
176 				break;
177 			}
178 
179 			if (sw.getMillis() > myMaxMillisToWaitForRemoteResults) {
180 				ourLog.error("Search {} of type {} for {}{} timed out after {}ms", search.getId(), search.getSearchType(), search.getResourceType(), search.getSearchQueryString(), sw.getMillis());
181 				throw new InternalErrorException("Request timed out after " + sw.getMillis() + "ms");
182 			}
183 
184 			// If the search was saved in "pass complete mode" it's probably time to
185 			// start a new pass
186 			if (search.getStatus() == SearchStatusEnum.PASSCMPLET) {
187 				Optional<Search> newSearch = tryToMarkSearchAsInProgress(search);
188 				if (newSearch.isPresent()) {
189 					search = newSearch.get();
190 					String resourceType = search.getResourceType();
191 					SearchParameterMap params = search.getSearchParameterMap();
192 					IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(resourceType);
193 					SearchContinuationTask task = new SearchContinuationTask(search, resourceDao, params, resourceType);
194 					myIdToSearchTask.put(search.getUuid(), task);
195 					myExecutor.submit(task);
196 				}
197 			}
198 
199 			try {
200 				Thread.sleep(500);
201 			} catch (InterruptedException e) {
202 				// ignore
203 			}
204 		}
205 
206 		final Pageable page = toPage(theFrom, theTo);
207 		if (page == null) {
208 			return Collections.emptyList();
209 		}
210 
211 		final Search foundSearch = search;
212 
213 		ourLog.trace("Loading stored search");
214 		List<Long> retVal = txTemplate.execute(theStatus -> {
215 			final List<Long> resultPids = new ArrayList<>();
216 			Page<Long> searchResultPids = mySearchResultDao.findWithSearchUuid(foundSearch, page);
217 			for (Long next : searchResultPids) {
218 				resultPids.add(next);
219 			}
220 			return resultPids;
221 		});
222 		return retVal;
223 	}
224 
225 	private Optional<Search> tryToMarkSearchAsInProgress(Search theSearch) {
226 		ourLog.trace("Going to try to change search status from {} to {}", theSearch.getStatus(), SearchStatusEnum.LOADING);
227 		try {
228 			TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
229 			txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
230 			txTemplate.afterPropertiesSet();
231 			return txTemplate.execute(t -> {
232 				Search search = mySearchDao.findById(theSearch.getId()).orElse(theSearch);
233 
234 				if (search.getStatus() != SearchStatusEnum.PASSCMPLET) {
235 					throw new IllegalStateException("Can't change to LOADING because state is " + theSearch.getStatus());
236 				}
237 				search.setStatus(SearchStatusEnum.LOADING);
238 				Search newSearch = mySearchDao.save(search);
239 				return Optional.of(newSearch);
240 			});
241 		} catch (Exception e) {
242 			ourLog.warn("Failed to activate search: {}", e.toString());
243 			// FIXME: aaaaa
244 			ourLog.info("Failed to activate search", e);
245 			return Optional.empty();
246 		}
247 	}
248 
249 	private void populateBundleProvider(PersistedJpaBundleProvider theRetVal) {
250 		theRetVal.setContext(myContext);
251 		theRetVal.setEntityManager(myEntityManager);
252 		theRetVal.setPlatformTransactionManager(myManagedTxManager);
253 		theRetVal.setSearchDao(mySearchDao);
254 		theRetVal.setSearchCoordinatorSvc(this);
255 	}
256 
257 	@Override
258 	public IBundleProvider registerSearch(final IDao theCallingDao, final SearchParameterMap theParams, String theResourceType, CacheControlDirective theCacheControlDirective) {
259 		StopWatch w = new StopWatch();
260 		final String searchUuid = UUID.randomUUID().toString();
261 
262 		ourLog.debug("Registering new search {}", searchUuid);
263 
264 		Class<? extends IBaseResource> resourceTypeClass = myContext.getResourceDefinition(theResourceType).getImplementingClass();
265 		final ISearchBuilder sb = theCallingDao.newSearchBuilder();
266 		sb.setType(resourceTypeClass, theResourceType);
267 		sb.setFetchSize(mySyncSize);
268 
269 		final Integer loadSynchronousUpTo;
270 		if (theCacheControlDirective != null && theCacheControlDirective.isNoStore()) {
271 			if (theCacheControlDirective.getMaxResults() != null) {
272 				loadSynchronousUpTo = theCacheControlDirective.getMaxResults();
273 				if (loadSynchronousUpTo > myDaoConfig.getCacheControlNoStoreMaxResultsUpperLimit()) {
274 					throw new InvalidRequestException(Constants.HEADER_CACHE_CONTROL + " header " + Constants.CACHE_CONTROL_MAX_RESULTS + " value must not exceed " + myDaoConfig.getCacheControlNoStoreMaxResultsUpperLimit());
275 				}
276 			} else {
277 				loadSynchronousUpTo = 100;
278 			}
279 		} else {
280 			loadSynchronousUpTo = null;
281 		}
282 
283 		if (theParams.isLoadSynchronous() || loadSynchronousUpTo != null) {
284 
285 			ourLog.debug("Search {} is loading in synchronous mode", searchUuid);
286 
287 			// Execute the query and make sure we return distinct results
288 			TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
289 			txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
290 			return txTemplate.execute(t -> {
291 
292 				// Load the results synchronously
293 				final List<Long> pids = new ArrayList<>();
294 
295 				Iterator<Long> resultIter = sb.createQuery(theParams, searchUuid);
296 				while (resultIter.hasNext()) {
297 					pids.add(resultIter.next());
298 					if (loadSynchronousUpTo != null && pids.size() >= loadSynchronousUpTo) {
299 						break;
300 					}
301 					if (theParams.getLoadSynchronousUpTo() != null && pids.size() >= theParams.getLoadSynchronousUpTo()) {
302 						break;
303 					}
304 				}
305 
306 				/*
307 				 * For synchronous queries, we load all the includes right away
308 				 * since we're returning a static bundle with all the results
309 				 * pre-loaded. This is ok because syncronous requests are not
310 				 * expected to be paged
311 				 *
312 				 * On the other hand for async queries we load includes/revincludes
313 				 * individually for pages as we return them to clients
314 				 */
315 				final Set<Long> includedPids = new HashSet<>();
316 				includedPids.addAll(sb.loadIncludes(theCallingDao, myContext, myEntityManager, pids, theParams.getRevIncludes(), true, theParams.getLastUpdated(), "(synchronous)"));
317 				includedPids.addAll(sb.loadIncludes(theCallingDao, myContext, myEntityManager, pids, theParams.getIncludes(), false, theParams.getLastUpdated(), "(synchronous)"));
318 
319 				List<IBaseResource> resources = new ArrayList<>();
320 				sb.loadResourcesByPid(pids, resources, includedPids, false, myEntityManager, myContext, theCallingDao);
321 				return new SimpleBundleProvider(resources);
322 			});
323 		}
324 
325 		/*
326 		 * See if there are any cached searches whose results we can return
327 		 * instead
328 		 */
329 		boolean useCache = true;
330 		if (theCacheControlDirective != null && theCacheControlDirective.isNoCache() == true) {
331 			useCache = false;
332 		}
333 		final String queryString = theParams.toNormalizedQueryString(myContext);
334 		if (theParams.getEverythingMode() == null) {
335 			if (myDaoConfig.getReuseCachedSearchResultsForMillis() != null && useCache) {
336 
337 				final Date createdCutoff = new Date(System.currentTimeMillis() - myDaoConfig.getReuseCachedSearchResultsForMillis());
338 				final String resourceType = theResourceType;
339 
340 				TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
341 				PersistedJpaBundleProvider foundSearchProvider = txTemplate.execute(t -> {
342 					Search searchToUse = null;
343 
344 					int hashCode = queryString.hashCode();
345 					Collection<Search> candidates = mySearchDao.find(resourceType, hashCode, createdCutoff);
346 					for (Search nextCandidateSearch : candidates) {
347 						if (queryString.equals(nextCandidateSearch.getSearchQueryString())) {
348 							searchToUse = nextCandidateSearch;
349 						}
350 					}
351 
352 					PersistedJpaBundleProvider retVal = null;
353 					if (searchToUse != null) {
354 						ourLog.info("Reusing search {} from cache", searchToUse.getUuid());
355 						searchToUse.setSearchLastReturned(new Date());
356 						mySearchDao.updateSearchLastReturned(searchToUse.getId(), new Date());
357 
358 						retVal = new PersistedJpaBundleProvider(searchToUse.getUuid(), theCallingDao);
359 						retVal.setCacheHit(true);
360 
361 						populateBundleProvider(retVal);
362 					}
363 
364 					return retVal;
365 				});
366 
367 				if (foundSearchProvider != null) {
368 					return foundSearchProvider;
369 				}
370 
371 			}
372 		}
373 
374 		Search search = new Search();
375 		populateSearchEntity(theParams, theResourceType, searchUuid, queryString, search);
376 
377 		SearchTask task = new SearchTask(search, theCallingDao, theParams, theResourceType);
378 		myIdToSearchTask.put(search.getUuid(), task);
379 		myExecutor.submit(task);
380 
381 		PersistedJpaSearchFirstPageBundleProvider retVal = new PersistedJpaSearchFirstPageBundleProvider(search, theCallingDao, task, sb, myManagedTxManager);
382 		populateBundleProvider(retVal);
383 
384 		ourLog.debug("Search initial phase completed in {}ms", w.getMillis());
385 		return retVal;
386 
387 	}
388 
389 	@VisibleForTesting
390 	void setContextForUnitTest(FhirContext theCtx) {
391 		myContext = theCtx;
392 	}
393 
394 	@VisibleForTesting
395 	void setDaoConfigForUnitTest(DaoConfig theDaoConfig) {
396 		myDaoConfig = theDaoConfig;
397 	}
398 
399 	@VisibleForTesting
400 	void setEntityManagerForUnitTest(EntityManager theEntityManager) {
401 		myEntityManager = theEntityManager;
402 	}
403 
404 	@VisibleForTesting
405 	public void setLoadingThrottleForUnitTests(Integer theLoadingThrottleForUnitTests) {
406 		myLoadingThrottleForUnitTests = theLoadingThrottleForUnitTests;
407 	}
408 
409 	@VisibleForTesting
410 	void setMaxMillisToWaitForRemoteResultsForUnitTest(long theMaxMillisToWaitForRemoteResults) {
411 		myMaxMillisToWaitForRemoteResults = theMaxMillisToWaitForRemoteResults;
412 	}
413 
414 	@VisibleForTesting
415 	public void setNeverUseLocalSearchForUnitTests(boolean theNeverUseLocalSearchForUnitTests) {
416 		myNeverUseLocalSearchForUnitTests = theNeverUseLocalSearchForUnitTests;
417 	}
418 
419 	@VisibleForTesting
420 	void setSearchDaoForUnitTest(ISearchDao theSearchDao) {
421 		mySearchDao = theSearchDao;
422 	}
423 
424 	@VisibleForTesting
425 	void setSearchDaoIncludeForUnitTest(ISearchIncludeDao theSearchIncludeDao) {
426 		mySearchIncludeDao = theSearchIncludeDao;
427 	}
428 
429 	@VisibleForTesting
430 	void setSearchDaoResultForUnitTest(ISearchResultDao theSearchResultDao) {
431 		mySearchResultDao = theSearchResultDao;
432 	}
433 
434 	@VisibleForTesting
435 	public void setSyncSizeForUnitTests(int theSyncSize) {
436 		mySyncSize = theSyncSize;
437 	}
438 
439 	@VisibleForTesting
440 	void setTransactionManagerForUnitTest(PlatformTransactionManager theTxManager) {
441 		myManagedTxManager = theTxManager;
442 	}
443 
444 	@VisibleForTesting
445 	public void setDaoRegistryForUnitTest(DaoRegistry theDaoRegistry) {
446 		myDaoRegistry = theDaoRegistry;
447 	}
448 
449 	public abstract class BaseTask implements Callable<Void> {
450 		private final SearchParameterMap myParams;
451 		private final IDao myCallingDao;
452 		private final String myResourceType;
453 		private final ArrayList<Long> mySyncedPids = new ArrayList<>();
454 		private final CountDownLatch myInitialCollectionLatch = new CountDownLatch(1);
455 		private final CountDownLatch myCompletionLatch;
456 		private final ArrayList<Long> myUnsyncedPids = new ArrayList<>();
457 		private Search mySearch;
458 		private boolean myAbortRequested;
459 		private int myCountSaved = 0;
460 		private boolean myAdditionalPrefetchThresholdsRemaining;
461 		private List<Long> myPreviouslyAddedResourcePids;
462 		private Integer myMaxResultsToFetch;
463 		private int myCountFetchedDuringThisPass;
464 		/**
465 		 * Constructor
466 		 */
467 		protected BaseTask(Search theSearch, IDao theCallingDao, SearchParameterMap theParams, String theResourceType) {
468 			mySearch = theSearch;
469 			myCallingDao = theCallingDao;
470 			myParams = theParams;
471 			myResourceType = theResourceType;
472 			myCompletionLatch = new CountDownLatch(1);
473 		}
474 
475 		protected Search getSearch() {
476 			return mySearch;
477 		}
478 
479 		protected CountDownLatch getInitialCollectionLatch() {
480 			return myInitialCollectionLatch;
481 		}
482 
483 		protected void setPreviouslyAddedResourcePids(List<Long> thePreviouslyAddedResourcePids) {
484 			myPreviouslyAddedResourcePids = thePreviouslyAddedResourcePids;
485 			myCountSaved = myPreviouslyAddedResourcePids.size();
486 		}
487 
488 		private ISearchBuilder newSearchBuilder() {
489 			Class<? extends IBaseResource> resourceTypeClass = myContext.getResourceDefinition(myResourceType).getImplementingClass();
490 			ISearchBuilder sb = myCallingDao.newSearchBuilder();
491 			sb.setType(resourceTypeClass, myResourceType);
492 
493 			return sb;
494 		}
495 
496 		public List<Long> getResourcePids(int theFromIndex, int theToIndex) {
497 			ourLog.debug("Requesting search PIDs from {}-{}", theFromIndex, theToIndex);
498 
499 			boolean keepWaiting;
500 			do {
501 				synchronized (mySyncedPids) {
502 					ourLog.trace("Search status is {}", mySearch.getStatus());
503 					boolean haveEnoughResults = mySyncedPids.size() >= theToIndex;
504 					if (!haveEnoughResults) {
505 						switch (mySearch.getStatus()) {
506 							case LOADING:
507 								keepWaiting = true;
508 								break;
509 							case PASSCMPLET:
510 								/*
511 								 * If we get here, it means that the user requested resources that crossed the
512 								 * current pre-fetch boundary. For example, if the prefetch threshold is 50 and the
513 								 * user has requested resources 0-60, then they would get 0-50 back but the search
514 								 * coordinator would then stop searching.SearchCoordinatorSvcImplTest
515 								 */
516 								// FIXME: aaaaaaaa
517 //								List<Long> remainingResources = SearchCoordinatorSvcImpl.this.getResources(mySearch.getUuid(), mySyncedPids.size(), theToIndex);
518 //								ourLog.debug("Adding {} resources to the existing {} synced resource IDs", remainingResources.size(), mySyncedPids.size());
519 //								mySyncedPids.addAll(remainingResources);
520 								keepWaiting = false;
521 								break;
522 							case FAILED:
523 							case FINISHED:
524 							default:
525 								keepWaiting = false;
526 								break;
527 						}
528 					} else {
529 						keepWaiting = false;
530 					}
531 				}
532 
533 				if (keepWaiting) {
534 					ourLog.info("Waiting, as we only have {} results", mySyncedPids.size());
535 					try {
536 						Thread.sleep(500);
537 					} catch (InterruptedException theE) {
538 						// ignore
539 					}
540 				}
541 			} while (keepWaiting);
542 
543 			ourLog.info("Proceeding, as we have {} results", mySyncedPids.size());
544 
545 			ArrayList<Long> retVal = new ArrayList<>();
546 			synchronized (mySyncedPids) {
547 				verifySearchHasntFailedOrThrowInternalErrorException(mySearch);
548 
549 				int toIndex = theToIndex;
550 				if (mySyncedPids.size() < toIndex) {
551 					toIndex = mySyncedPids.size();
552 				}
553 				for (int i = theFromIndex; i < toIndex; i++) {
554 					retVal.add(mySyncedPids.get(i));
555 				}
556 			}
557 
558 			ourLog.trace("Done syncing results - Wanted {}-{} and returning {} of {}", theFromIndex, theToIndex, retVal.size(), mySyncedPids.size());
559 
560 			return retVal;
561 		}
562 
563 		protected void saveSearch() {
564 			TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
565 			txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
566 			txTemplate.execute(new TransactionCallbackWithoutResult() {
567 				@Override
568 				protected void doInTransactionWithoutResult(TransactionStatus theArg0) {
569 					doSaveSearch();
570 				}
571 
572 			});
573 		}
574 
575 		private void saveUnsynced(final IResultIterator theResultIter) {
576 			TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
577 			txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
578 			txTemplate.execute(new TransactionCallbackWithoutResult() {
579 				@Override
580 				protected void doInTransactionWithoutResult(TransactionStatus theArg0) {
581 					if (mySearch.getId() == null) {
582 						doSaveSearch();
583 					}
584 
585 					List<SearchResult> resultsToSave = Lists.newArrayList();
586 					for (Long nextPid : myUnsyncedPids) {
587 						SearchResult nextResult = new SearchResult(mySearch);
588 						nextResult.setResourcePid(nextPid);
589 						nextResult.setOrder(myCountSaved++);
590 						resultsToSave.add(nextResult);
591 						ourLog.trace("Saving ORDER[{}] Resource {}", nextResult.getOrder(), nextResult.getResourcePid());
592 					}
593 					mySearchResultDao.saveAll(resultsToSave);
594 
595 					synchronized (mySyncedPids) {
596 						int numSyncedThisPass = myUnsyncedPids.size();
597 						ourLog.trace("Syncing {} search results", numSyncedThisPass);
598 						mySyncedPids.addAll(myUnsyncedPids);
599 						myUnsyncedPids.clear();
600 
601 						if (theResultIter.hasNext() == false) {
602 							mySearch.setNumFound(myCountSaved);
603 							int loadedCountThisPass = theResultIter.getSkippedCount() + myCountSaved;
604 							if (myMaxResultsToFetch != null && loadedCountThisPass < myMaxResultsToFetch) {
605 								mySearch.setStatus(SearchStatusEnum.FINISHED);
606 								mySearch.setTotalCount(myCountSaved);
607 							} else if (myAdditionalPrefetchThresholdsRemaining) {
608 								ourLog.trace("Setting search status to PASSCMPLET");
609 								mySearch.setStatus(SearchStatusEnum.PASSCMPLET);
610 								mySearch.setSearchParameterMap(myParams);
611 							} else {
612 								mySearch.setStatus(SearchStatusEnum.FINISHED);
613 								mySearch.setTotalCount(myCountSaved);
614 							}
615 						}
616 					}
617 
618 					mySearch.setNumFound(myCountSaved);
619 
620 					int numSynced;
621 					synchronized (mySyncedPids) {
622 						numSynced = mySyncedPids.size();
623 					}
624 
625 					if (myDaoConfig.getCountSearchResultsUpTo() == null ||
626 						myDaoConfig.getCountSearchResultsUpTo() <= 0 ||
627 						myDaoConfig.getCountSearchResultsUpTo() <= numSynced) {
628 						myInitialCollectionLatch.countDown();
629 					}
630 
631 					doSaveSearch();
632 
633 				}
634 			});
635 
636 		}
637 
638 		public boolean isNotAborted() {
639 			return myAbortRequested == false;
640 		}
641 
642 		protected void markComplete() {
643 			myCompletionLatch.countDown();
644 		}
645 
646 		public CountDownLatch getCompletionLatch() {
647 			return myCompletionLatch;
648 		}
649 
650 		/**
651 		 * Request that the task abort as soon as possible
652 		 */
653 		public void requestImmediateAbort() {
654 			myAbortRequested = true;
655 		}
656 
657 		/**
658 		 * This is the method which actually performs the search.
659 		 * It is called automatically by the thread pool.
660 		 */
661 		@Override
662 		public Void call() {
663 			StopWatch sw = new StopWatch();
664 
665 			try {
666 				// Create an initial search in the DB and give it an ID
667 				saveSearch();
668 
669 				TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
670 				txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
671 
672 				if (myCustomIsolationSupported) {
673 					txTemplate.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
674 				}
675 
676 				txTemplate.execute(new TransactionCallbackWithoutResult() {
677 					@Override
678 					protected void doInTransactionWithoutResult(TransactionStatus theStatus) {
679 						doSearch();
680 					}
681 				});
682 
683 				ourLog.info("Completed search for [{}{}] and found {} resources in {}ms", mySearch.getResourceType(), mySearch.getSearchQueryString(), mySyncedPids.size(), sw.getMillis());
684 
685 			} catch (Throwable t) {
686 
687 				/*
688 				 * Don't print a stack trace for client errors (i.e. requests that
689 				 * aren't valid because the client screwed up).. that's just noise
690 				 * in the logs and who needs that.
691 				 */
692 				boolean logged = false;
693 				if (t instanceof BaseServerResponseException) {
694 					BaseServerResponseException exception = (BaseServerResponseException) t;
695 					if (exception.getStatusCode() >= 400 && exception.getStatusCode() < 500) {
696 						logged = true;
697 						ourLog.warn("Failed during search due to invalid request: {}", t.toString());
698 					}
699 				}
700 
701 				if (!logged) {
702 					ourLog.error("Failed during search loading after {}ms", sw.getMillis(), t);
703 				}
704 				myUnsyncedPids.clear();
705 
706 				Throwable rootCause = ExceptionUtils.getRootCause(t);
707 				rootCause = defaultIfNull(rootCause, t);
708 
709 				String failureMessage = rootCause.getMessage();
710 
711 				int failureCode = InternalErrorException.STATUS_CODE;
712 				if (t instanceof BaseServerResponseException) {
713 					failureCode = ((BaseServerResponseException) t).getStatusCode();
714 				}
715 
716 				mySearch.setFailureMessage(failureMessage);
717 				mySearch.setFailureCode(failureCode);
718 				mySearch.setStatus(SearchStatusEnum.FAILED);
719 
720 				saveSearch();
721 
722 			} finally {
723 
724 				myIdToSearchTask.remove(mySearch.getUuid());
725 				myInitialCollectionLatch.countDown();
726 				markComplete();
727 
728 			}
729 			return null;
730 		}
731 
732 		private void doSaveSearch() {
733 
734 			Search newSearch;
735 			if (mySearch.getId() == null) {
736 				newSearch = mySearchDao.save(mySearch);
737 				for (SearchInclude next : mySearch.getIncludes()) {
738 					mySearchIncludeDao.save(next);
739 				}
740 			} else {
741 				newSearch = mySearchDao.save(mySearch);
742 			}
743 
744 			// mySearchDao.save is not supposed to return null, but in unit tests
745 			// it can if the mock search dao isn't set up to handle that
746 			if (newSearch != null) {
747 				mySearch = newSearch;
748 			}
749 		}
750 
751 		/**
752 		 * This method actually creates the database query to perform the
753 		 * search, and starts it.
754 		 */
755 		private void doSearch() {
756 
757 			/*
758 			 * If the user has explicitly requested a _count, perform a
759 			 *
760 			 * SELECT COUNT(*) ....
761 			 *
762 			 * before doing anything else.
763 			 */
764 			boolean wantOnlyCount = SummaryEnum.COUNT.equals(myParams.getSummaryMode());
765 			boolean wantCount = wantOnlyCount || SearchTotalModeEnum.ACCURATE.equals(myParams.getSearchTotalMode());
766 			if (wantCount) {
767 				ourLog.trace("Performing count");
768 				ISearchBuilder sb = newSearchBuilder();
769 				Iterator<Long> countIterator = sb.createCountQuery(myParams, mySearch.getUuid());
770 				Long count = countIterator.next();
771 				ourLog.trace("Got count {}", count);
772 
773 				TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
774 				txTemplate.execute(new TransactionCallbackWithoutResult() {
775 					@Override
776 					protected void doInTransactionWithoutResult(TransactionStatus theArg0) {
777 						mySearch.setTotalCount(count.intValue());
778 						if (wantOnlyCount) {
779 							mySearch.setStatus(SearchStatusEnum.FINISHED);
780 						}
781 						doSaveSearch();
782 						mySearchDao.flush();
783 					}
784 				});
785 				if (wantOnlyCount) {
786 					return;
787 				}
788 			}
789 
790 			ourLog.trace("Done count");
791 			ISearchBuilder sb = newSearchBuilder();
792 
793 			/*
794 			 * Figure out how many results we're actually going to fetch from the
795 			 * database in this pass. This calculation takes into consideration the
796 			 * "pre-fetch thresholds" specified in DaoConfig#getSearchPreFetchThresholds()
797 			 * as well as the value of the _count parameter.
798 			 */
799 			int currentlyLoaded = defaultIfNull(mySearch.getNumFound(), 0);
800 			int minWanted = 0;
801 			if (myParams.getCount() != null) {
802 				minWanted = myParams.getCount();
803 				minWanted = Math.max(minWanted, myPagingProvider.getMaximumPageSize());
804 				minWanted += currentlyLoaded;
805 			}
806 
807 			for (Iterator<Integer> iter = myDaoConfig.getSearchPreFetchThresholds().iterator(); iter.hasNext(); ) {
808 				int next = iter.next();
809 				if (next != -1 && next <= currentlyLoaded) {
810 					continue;
811 				}
812 
813 				if (next == -1) {
814 					sb.setMaxResultsToFetch(null);
815 				} else {
816 					myMaxResultsToFetch = Math.max(next, minWanted);
817 					sb.setMaxResultsToFetch(myMaxResultsToFetch);
818 				}
819 
820 				if (iter.hasNext()) {
821 					myAdditionalPrefetchThresholdsRemaining = true;
822 				}
823 
824 				// If we get here's we've found an appropriate threshold
825 				break;
826 			}
827 
828 			/*
829 			 * Provide any PID we loaded in previous seasrch passes to the
830 			 * SearchBuilder so that we don't get duplicates coming from running
831 			 * the same query again.
832 			 *
833 			 * We could possibly accomplish this in a different way by using sorted
834 			 * results in our SQL query and specifying an offset. I don't actually
835 			 * know if that would be faster or not. At some point should test this
836 			 * idea.
837 			 */
838 			if (myPreviouslyAddedResourcePids != null) {
839 				sb.setPreviouslyAddedResourcePids(myPreviouslyAddedResourcePids);
840 				mySyncedPids.addAll(myPreviouslyAddedResourcePids);
841 			}
842 
843 			/*
844 			 * Construct the SQL query we'll be sending to the database
845 			 */
846 			IResultIterator theResultIterator = sb.createQuery(myParams, mySearch.getUuid());
847 			assert (theResultIterator != null);
848 
849 			/*
850 			 * The following loop actually loads the PIDs of the resources
851 			 * matching the search off of the disk and into memory. After
852 			 * every X results, we commit to the HFJ_SEARCH table.
853 			 */
854 			int syncSize = mySyncSize;
855 			while (theResultIterator.hasNext()) {
856 				myUnsyncedPids.add(theResultIterator.next());
857 				myCountFetchedDuringThisPass++;
858 
859 				boolean shouldSync = myUnsyncedPids.size() >= syncSize;
860 
861 				if (myDaoConfig.getCountSearchResultsUpTo() != null &&
862 					myDaoConfig.getCountSearchResultsUpTo() > 0 &&
863 					myDaoConfig.getCountSearchResultsUpTo() < myUnsyncedPids.size()) {
864 					shouldSync = false;
865 				}
866 
867 				if (myUnsyncedPids.size() > 50000) {
868 					shouldSync = true;
869 				}
870 
871 				// If no abort was requested, bail out
872 				Validate.isTrue(isNotAborted(), "Abort has been requested");
873 
874 				if (shouldSync) {
875 					saveUnsynced(theResultIterator);
876 				}
877 
878 				if (myLoadingThrottleForUnitTests != null) {
879 					try {
880 						Thread.sleep(myLoadingThrottleForUnitTests);
881 					} catch (InterruptedException e) {
882 						// ignore
883 					}
884 				}
885 
886 			}
887 
888 			// If no abort was requested, bail out
889 			Validate.isTrue(isNotAborted(), "Abort has been requested");
890 
891 			saveUnsynced(theResultIterator);
892 		}
893 	}
894 
895 
896 	public class SearchContinuationTask extends BaseTask {
897 
898 		public SearchContinuationTask(Search theSearch, IDao theCallingDao, SearchParameterMap theParams, String theResourceType) {
899 			super(theSearch, theCallingDao, theParams, theResourceType);
900 		}
901 
902 		@Override
903 		public Void call() {
904 			try {
905 				TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
906 				txTemplate.afterPropertiesSet();
907 				txTemplate.execute(t -> {
908 					List<Long> previouslyAddedResourcePids = mySearchResultDao.findWithSearchUuid(getSearch());
909 					ourLog.debug("Have {} previously added IDs in search: {}", previouslyAddedResourcePids.size(), getSearch().getUuid());
910 					setPreviouslyAddedResourcePids(previouslyAddedResourcePids);
911 					return null;
912 				});
913 			} catch (Throwable e) {
914 				ourLog.error("Failure processing search", e);
915 				getSearch().setFailureMessage(e.toString());
916 				getSearch().setStatus(SearchStatusEnum.FAILED);
917 
918 				saveSearch();
919 				return null;
920 			}
921 
922 			return super.call();
923 		}
924 
925 		@Override
926 		public List<Long> getResourcePids(int theFromIndex, int theToIndex) {
927 			return super.getResourcePids(theFromIndex, theToIndex);
928 		}
929 	}
930 
931 	/**
932 	 * A search task is a Callable task that runs in
933 	 * a thread pool to handle an individual search. One instance
934 	 * is created for any requested search and runs from the
935 	 * beginning to the end of the search.
936 	 * <p>
937 	 * Understand:
938 	 * This class executes in its own thread separate from the
939 	 * web server client thread that made the request. We do that
940 	 * so that we can return to the client as soon as possible,
941 	 * but keep the search going in the background (and have
942 	 * the next page of results ready to go when the client asks).
943 	 */
944 	public class SearchTask extends BaseTask {
945 
946 		/**
947 		 * Constructor
948 		 */
949 		public SearchTask(Search theSearch, IDao theCallingDao, SearchParameterMap theParams, String theResourceType) {
950 			super(theSearch, theCallingDao, theParams, theResourceType);
951 		}
952 
953 		/**
954 		 * This method is called by the server HTTP thread, and
955 		 * will block until at least one page of results have been
956 		 * fetched from the DB, and will never block after that.
957 		 */
958 		public Integer awaitInitialSync() {
959 			ourLog.trace("Awaiting initial sync");
960 			do {
961 				try {
962 					if (getInitialCollectionLatch().await(250, TimeUnit.MILLISECONDS)) {
963 						break;
964 					}
965 				} catch (InterruptedException e) {
966 					// Shouldn't happen
967 					throw new InternalErrorException(e);
968 				}
969 			} while (getSearch().getStatus() == SearchStatusEnum.LOADING);
970 			ourLog.trace("Initial sync completed");
971 
972 			return getSearch().getTotalCount();
973 		}
974 
975 	}
976 
977 	public static void populateSearchEntity(SearchParameterMap theParams, String theResourceType, String theSearchUuid, String theQueryString, Search theSearch) {
978 		theSearch.setDeleted(false);
979 		theSearch.setUuid(theSearchUuid);
980 		theSearch.setCreated(new Date());
981 		theSearch.setSearchLastReturned(new Date());
982 		theSearch.setTotalCount(null);
983 		theSearch.setNumFound(0);
984 		theSearch.setPreferredPageSize(theParams.getCount());
985 		theSearch.setSearchType(theParams.getEverythingMode() != null ? SearchTypeEnum.EVERYTHING : SearchTypeEnum.SEARCH);
986 		theSearch.setLastUpdated(theParams.getLastUpdated());
987 		theSearch.setResourceType(theResourceType);
988 		theSearch.setStatus(SearchStatusEnum.LOADING);
989 
990 		theSearch.setSearchQueryString(theQueryString);
991 		theSearch.setSearchQueryStringHash(theQueryString.hashCode());
992 
993 		for (Include next : theParams.getIncludes()) {
994 			theSearch.addInclude(new SearchInclude(theSearch, next.getValue(), false, next.isRecurse()));
995 		}
996 		for (Include next : theParams.getRevIncludes()) {
997 			theSearch.addInclude(new SearchInclude(theSearch, next.getValue(), true, next.isRecurse()));
998 		}
999 	}
1000 
1001 	/**
1002 	 * Creates a {@link Pageable} using a start and end index
1003 	 */
1004 	@SuppressWarnings("WeakerAccess")
1005 	public static @Nullable
1006 	Pageable toPage(final int theFromIndex, int theToIndex) {
1007 		int pageSize = theToIndex - theFromIndex;
1008 		if (pageSize < 1) {
1009 			return null;
1010 		}
1011 
1012 		int pageIndex = theFromIndex / pageSize;
1013 
1014 		Pageable page = new AbstractPageRequest(pageIndex, pageSize) {
1015 			private static final long serialVersionUID = 1L;
1016 
1017 			@Override
1018 			public long getOffset() {
1019 				return theFromIndex;
1020 			}
1021 
1022 			@Override
1023 			public Sort getSort() {
1024 				return Sort.unsorted();
1025 			}
1026 
1027 			@Override
1028 			public Pageable next() {
1029 				return null;
1030 			}
1031 
1032 			@Override
1033 			public Pageable previous() {
1034 				return null;
1035 			}
1036 
1037 			@Override
1038 			public Pageable first() {
1039 				return null;
1040 			}
1041 		};
1042 
1043 		return page;
1044 	}
1045 
1046 	static void verifySearchHasntFailedOrThrowInternalErrorException(Search theSearch) {
1047 		if (theSearch.getStatus() == SearchStatusEnum.FAILED) {
1048 			Integer status = theSearch.getFailureCode();
1049 			status = defaultIfNull(status, 500);
1050 
1051 			String message = theSearch.getFailureMessage();
1052 			throw BaseServerResponseException.newInstance(status, message);
1053 		}
1054 	}
1055 
1056 }