View Javadoc
1   package ca.uhn.fhir.jpa.search;
2   
3   /*-
4    * #%L
5    * HAPI FHIR JPA Server
6    * %%
7    * Copyright (C) 2014 - 2019 University Health Network
8    * %%
9    * Licensed under the Apache License, Version 2.0 (the "License");
10   * you may not use this file except in compliance with the License.
11   * You may obtain a copy of the License at
12   * 
13   *      http://www.apache.org/licenses/LICENSE-2.0
14   * 
15   * Unless required by applicable law or agreed to in writing, software
16   * distributed under the License is distributed on an "AS IS" BASIS,
17   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18   * See the License for the specific language governing permissions and
19   * limitations under the License.
20   * #L%family
21   */
22  
23  import ca.uhn.fhir.context.FhirContext;
24  import ca.uhn.fhir.jpa.dao.*;
25  import ca.uhn.fhir.jpa.dao.data.ISearchDao;
26  import ca.uhn.fhir.jpa.dao.data.ISearchIncludeDao;
27  import ca.uhn.fhir.jpa.dao.data.ISearchResultDao;
28  import ca.uhn.fhir.jpa.entity.*;
29  import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
30  import ca.uhn.fhir.model.api.Include;
31  import ca.uhn.fhir.rest.api.CacheControlDirective;
32  import ca.uhn.fhir.rest.api.Constants;
33  import ca.uhn.fhir.rest.api.SearchTotalModeEnum;
34  import ca.uhn.fhir.rest.api.SummaryEnum;
35  import ca.uhn.fhir.rest.api.server.IBundleProvider;
36  import ca.uhn.fhir.rest.server.IPagingProvider;
37  import ca.uhn.fhir.rest.server.SimpleBundleProvider;
38  import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException;
39  import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
40  import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
41  import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
42  import ca.uhn.fhir.rest.server.method.PageMethodBinding;
43  import ca.uhn.fhir.util.StopWatch;
44  import com.google.common.annotations.VisibleForTesting;
45  import com.google.common.collect.Lists;
46  import org.apache.commons.lang3.Validate;
47  import org.apache.commons.lang3.exception.ExceptionUtils;
48  import org.apache.commons.lang3.time.DateUtils;
49  import org.hl7.fhir.instance.model.api.IBaseResource;
50  import org.springframework.beans.factory.annotation.Autowired;
51  import org.springframework.data.domain.AbstractPageRequest;
52  import org.springframework.data.domain.Page;
53  import org.springframework.data.domain.Pageable;
54  import org.springframework.data.domain.Sort;
55  import org.springframework.orm.jpa.JpaDialect;
56  import org.springframework.orm.jpa.JpaTransactionManager;
57  import org.springframework.orm.jpa.vendor.HibernateJpaDialect;
58  import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
59  import org.springframework.stereotype.Component;
60  import org.springframework.transaction.PlatformTransactionManager;
61  import org.springframework.transaction.TransactionDefinition;
62  import org.springframework.transaction.TransactionStatus;
63  import org.springframework.transaction.annotation.Propagation;
64  import org.springframework.transaction.annotation.Transactional;
65  import org.springframework.transaction.support.TransactionCallbackWithoutResult;
66  import org.springframework.transaction.support.TransactionTemplate;
67  
68  import javax.annotation.Nullable;
69  import javax.annotation.PostConstruct;
70  import javax.persistence.EntityManager;
71  import java.util.*;
72  import java.util.concurrent.*;
73  
74  import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
75  
76  @Component("mySearchCoordinatorSvc")
77  public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
78  	public static final int DEFAULT_SYNC_SIZE = 250;
79  
80  	private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchCoordinatorSvcImpl.class);
81  	private final ConcurrentHashMap<String, BaseTask> myIdToSearchTask = new ConcurrentHashMap<>();
82  	@Autowired
83  	private FhirContext myContext;
84  	@Autowired
85  	private DaoConfig myDaoConfig;
86  	@Autowired
87  	private EntityManager myEntityManager;
88  	private ExecutorService myExecutor;
89  	private Integer myLoadingThrottleForUnitTests = null;
90  	private long myMaxMillisToWaitForRemoteResults = DateUtils.MILLIS_PER_MINUTE;
91  	private boolean myNeverUseLocalSearchForUnitTests;
92  	@Autowired
93  	private ISearchDao mySearchDao;
94  	@Autowired
95  	private ISearchIncludeDao mySearchIncludeDao;
96  	@Autowired
97  	private ISearchResultDao mySearchResultDao;
98  	@Autowired
99  	private PlatformTransactionManager myManagedTxManager;
100 	@Autowired
101 	private DaoRegistry myDaoRegistry;
102 	@Autowired
103 	private IPagingProvider myPagingProvider;
104 
105 	private int mySyncSize = DEFAULT_SYNC_SIZE;
106 	/**
107 	 * Set in {@link #start()}
108 	 */
109 	private boolean myCustomIsolationSupported;
110 
111 	/**
112 	 * Constructor
113 	 */
114 	public SearchCoordinatorSvcImpl() {
115 		CustomizableThreadFactory threadFactory = new CustomizableThreadFactory("search_coord_");
116 		myExecutor = Executors.newCachedThreadPool(threadFactory);
117 	}
118 
119 	@PostConstruct
120 	public void start() {
121 		if (myManagedTxManager instanceof JpaTransactionManager) {
122 			JpaDialect jpaDialect = ((JpaTransactionManager) myManagedTxManager).getJpaDialect();
123 			if (jpaDialect instanceof HibernateJpaDialect) {
124 				myCustomIsolationSupported = true;
125 			}
126 		}
127 		if (myCustomIsolationSupported == false) {
128 			ourLog.warn("JPA dialect does not support transaction isolation! This can have an impact on search performance.");
129 		}
130 	}
131 
132 	@Override
133 	public void cancelAllActiveSearches() {
134 		for (BaseTask next : myIdToSearchTask.values()) {
135 			next.requestImmediateAbort();
136 			try {
137 				next.getCompletionLatch().await(30, TimeUnit.SECONDS);
138 			} catch (InterruptedException e) {
139 				ourLog.warn("Failed to wait for completion", e);
140 			}
141 		}
142 	}
143 
144 	/**
145 	 * This method is called by the HTTP client processing thread in order to
146 	 * fetch resources.
147 	 */
148 	@Override
149 	@Transactional(propagation = Propagation.NEVER)
150 	public List<Long> getResources(final String theUuid, int theFrom, int theTo) {
151 		TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
152 		txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
153 
154 		Search search;
155 		StopWatch sw = new StopWatch();
156 		while (true) {
157 
158 			if (myNeverUseLocalSearchForUnitTests == false) {
159 				BaseTask task = myIdToSearchTask.get(theUuid);
160 				if (task != null) {
161 					ourLog.trace("Local search found");
162 					List<Long> resourcePids = task.getResourcePids(theFrom, theTo);
163 					if (resourcePids != null) {
164 						return resourcePids;
165 					}
166 				}
167 			}
168 
169 			search = txTemplate.execute(t -> mySearchDao.findByUuid(theUuid));
170 
171 			if (search == null) {
172 				ourLog.info("Client requested unknown paging ID[{}]", theUuid);
173 				String msg = myContext.getLocalizer().getMessage(PageMethodBinding.class, "unknownSearchId", theUuid);
174 				throw new ResourceGoneException(msg);
175 			}
176 
177 			verifySearchHasntFailedOrThrowInternalErrorException(search);
178 			if (search.getStatus() == SearchStatusEnum.FINISHED) {
179 				ourLog.info("Search entity marked as finished with {} results", search.getNumFound());
180 				break;
181 			}
182 			if (search.getNumFound() >= theTo) {
183 				ourLog.info("Search entity has {} results so far", search.getNumFound());
184 				break;
185 			}
186 
187 			if (sw.getMillis() > myMaxMillisToWaitForRemoteResults) {
188 				ourLog.error("Search {} of type {} for {}{} timed out after {}ms", search.getId(), search.getSearchType(), search.getResourceType(), search.getSearchQueryString(), sw.getMillis());
189 				throw new InternalErrorException("Request timed out after " + sw.getMillis() + "ms");
190 			}
191 
192 			// If the search was saved in "pass complete mode" it's probably time to
193 			// start a new pass
194 			if (search.getStatus() == SearchStatusEnum.PASSCMPLET) {
195 				Optional<Search> newSearch = tryToMarkSearchAsInProgress(search);
196 				if (newSearch.isPresent()) {
197 					search = newSearch.get();
198 					String resourceType = search.getResourceType();
199 					SearchParameterMap params = search.getSearchParameterMap();
200 					IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(resourceType);
201 					SearchContinuationTask task = new SearchContinuationTask(search, resourceDao, params, resourceType);
202 					myIdToSearchTask.put(search.getUuid(), task);
203 					myExecutor.submit(task);
204 				}
205 			}
206 
207 			try {
208 				Thread.sleep(500);
209 			} catch (InterruptedException e) {
210 				// ignore
211 			}
212 		}
213 
214 		final Pageable page = toPage(theFrom, theTo);
215 		if (page == null) {
216 			return Collections.emptyList();
217 		}
218 
219 		final Search foundSearch = search;
220 
221 		ourLog.trace("Loading stored search");
222 		List<Long> retVal = txTemplate.execute(theStatus -> {
223 			final List<Long> resultPids = new ArrayList<>();
224 			Page<Long> searchResultPids = mySearchResultDao.findWithSearchUuid(foundSearch, page);
225 			for (Long next : searchResultPids) {
226 				resultPids.add(next);
227 			}
228 			return resultPids;
229 		});
230 		return retVal;
231 	}
232 
233 	private Optional<Search> tryToMarkSearchAsInProgress(Search theSearch) {
234 		ourLog.trace("Going to try to change search status from {} to {}", theSearch.getStatus(), SearchStatusEnum.LOADING);
235 		try {
236 			TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
237 			txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
238 			txTemplate.afterPropertiesSet();
239 			return txTemplate.execute(t -> {
240 				Search search = mySearchDao.findById(theSearch.getId()).orElse(theSearch);
241 
242 				if (search.getStatus() != SearchStatusEnum.PASSCMPLET) {
243 					throw new IllegalStateException("Can't change to LOADING because state is " + theSearch.getStatus());
244 				}
245 				search.setStatus(SearchStatusEnum.LOADING);
246 				Search newSearch = mySearchDao.save(search);
247 				return Optional.of(newSearch);
248 			});
249 		} catch (Exception e) {
250 			ourLog.warn("Failed to activate search: {}", e.toString());
251 			ourLog.trace("Failed to activate search", e);
252 			return Optional.empty();
253 		}
254 	}
255 
256 	private void populateBundleProvider(PersistedJpaBundleProvider theRetVal) {
257 		theRetVal.setContext(myContext);
258 		theRetVal.setEntityManager(myEntityManager);
259 		theRetVal.setPlatformTransactionManager(myManagedTxManager);
260 		theRetVal.setSearchDao(mySearchDao);
261 		theRetVal.setSearchCoordinatorSvc(this);
262 	}
263 
264 	@Override
265 	public IBundleProvider registerSearch(final IDao theCallingDao, final SearchParameterMap theParams, String theResourceType, CacheControlDirective theCacheControlDirective) {
266 		StopWatch w = new StopWatch();
267 		final String searchUuid = UUID.randomUUID().toString();
268 
269 		ourLog.debug("Registering new search {}", searchUuid);
270 
271 		Class<? extends IBaseResource> resourceTypeClass = myContext.getResourceDefinition(theResourceType).getImplementingClass();
272 		final ISearchBuilder sb = theCallingDao.newSearchBuilder();
273 		sb.setType(resourceTypeClass, theResourceType);
274 		sb.setFetchSize(mySyncSize);
275 
276 		final Integer loadSynchronousUpTo;
277 		if (theCacheControlDirective != null && theCacheControlDirective.isNoStore()) {
278 			if (theCacheControlDirective.getMaxResults() != null) {
279 				loadSynchronousUpTo = theCacheControlDirective.getMaxResults();
280 				if (loadSynchronousUpTo > myDaoConfig.getCacheControlNoStoreMaxResultsUpperLimit()) {
281 					throw new InvalidRequestException(Constants.HEADER_CACHE_CONTROL + " header " + Constants.CACHE_CONTROL_MAX_RESULTS + " value must not exceed " + myDaoConfig.getCacheControlNoStoreMaxResultsUpperLimit());
282 				}
283 			} else {
284 				loadSynchronousUpTo = 100;
285 			}
286 		} else {
287 			loadSynchronousUpTo = null;
288 		}
289 
290 		if (theParams.isLoadSynchronous() || loadSynchronousUpTo != null) {
291 
292 			ourLog.debug("Search {} is loading in synchronous mode", searchUuid);
293 
294 			// Execute the query and make sure we return distinct results
295 			TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
296 			txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
297 			return txTemplate.execute(t -> {
298 
299 				// Load the results synchronously
300 				final List<Long> pids = new ArrayList<>();
301 
302 				Iterator<Long> resultIter = sb.createQuery(theParams, searchUuid);
303 				while (resultIter.hasNext()) {
304 					pids.add(resultIter.next());
305 					if (loadSynchronousUpTo != null && pids.size() >= loadSynchronousUpTo) {
306 						break;
307 					}
308 					if (theParams.getLoadSynchronousUpTo() != null && pids.size() >= theParams.getLoadSynchronousUpTo()) {
309 						break;
310 					}
311 				}
312 
313 				/*
314 				 * For synchronous queries, we load all the includes right away
315 				 * since we're returning a static bundle with all the results
316 				 * pre-loaded. This is ok because syncronous requests are not
317 				 * expected to be paged
318 				 *
319 				 * On the other hand for async queries we load includes/revincludes
320 				 * individually for pages as we return them to clients
321 				 */
322 				final Set<Long> includedPids = new HashSet<>();
323 				includedPids.addAll(sb.loadIncludes(myContext, myEntityManager, pids, theParams.getRevIncludes(), true, theParams.getLastUpdated(), "(synchronous)"));
324 				includedPids.addAll(sb.loadIncludes(myContext, myEntityManager, pids, theParams.getIncludes(), false, theParams.getLastUpdated(), "(synchronous)"));
325 
326 				List<IBaseResource> resources = new ArrayList<>();
327 				sb.loadResourcesByPid(pids, resources, includedPids, false, myEntityManager, myContext, theCallingDao);
328 				return new SimpleBundleProvider(resources);
329 			});
330 		}
331 
332 		/*
333 		 * See if there are any cached searches whose results we can return
334 		 * instead
335 		 */
336 		boolean useCache = true;
337 		if (theCacheControlDirective != null && theCacheControlDirective.isNoCache() == true) {
338 			useCache = false;
339 		}
340 		final String queryString = theParams.toNormalizedQueryString(myContext);
341 		if (theParams.getEverythingMode() == null) {
342 			if (myDaoConfig.getReuseCachedSearchResultsForMillis() != null && useCache) {
343 
344 				final Date createdCutoff = new Date(System.currentTimeMillis() - myDaoConfig.getReuseCachedSearchResultsForMillis());
345 				final String resourceType = theResourceType;
346 
347 				TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
348 				PersistedJpaBundleProvider foundSearchProvider = txTemplate.execute(t -> {
349 					Search searchToUse = null;
350 
351 					int hashCode = queryString.hashCode();
352 					Collection<Search> candidates = mySearchDao.find(resourceType, hashCode, createdCutoff);
353 					for (Search nextCandidateSearch : candidates) {
354 						if (queryString.equals(nextCandidateSearch.getSearchQueryString())) {
355 							searchToUse = nextCandidateSearch;
356 						}
357 					}
358 
359 					PersistedJpaBundleProvider retVal = null;
360 					if (searchToUse != null) {
361 						ourLog.info("Reusing search {} from cache", searchToUse.getUuid());
362 						searchToUse.setSearchLastReturned(new Date());
363 						mySearchDao.updateSearchLastReturned(searchToUse.getId(), new Date());
364 
365 						retVal = new PersistedJpaBundleProvider(searchToUse.getUuid(), theCallingDao);
366 						retVal.setCacheHit(true);
367 
368 						populateBundleProvider(retVal);
369 					}
370 
371 					return retVal;
372 				});
373 
374 				if (foundSearchProvider != null) {
375 					return foundSearchProvider;
376 				}
377 
378 			}
379 		}
380 
381 		Search search = new Search();
382 		populateSearchEntity(theParams, theResourceType, searchUuid, queryString, search);
383 
384 		SearchTask task = new SearchTask(search, theCallingDao, theParams, theResourceType);
385 		myIdToSearchTask.put(search.getUuid(), task);
386 		myExecutor.submit(task);
387 
388 		PersistedJpaSearchFirstPageBundleProvider retVal = new PersistedJpaSearchFirstPageBundleProvider(search, theCallingDao, task, sb, myManagedTxManager);
389 		populateBundleProvider(retVal);
390 
391 		ourLog.debug("Search initial phase completed in {}ms", w.getMillis());
392 		return retVal;
393 
394 	}
395 
396 	@VisibleForTesting
397 	void setContextForUnitTest(FhirContext theCtx) {
398 		myContext = theCtx;
399 	}
400 
401 	@VisibleForTesting
402 	void setDaoConfigForUnitTest(DaoConfig theDaoConfig) {
403 		myDaoConfig = theDaoConfig;
404 	}
405 
406 	@VisibleForTesting
407 	void setEntityManagerForUnitTest(EntityManager theEntityManager) {
408 		myEntityManager = theEntityManager;
409 	}
410 
411 	@VisibleForTesting
412 	public void setLoadingThrottleForUnitTests(Integer theLoadingThrottleForUnitTests) {
413 		myLoadingThrottleForUnitTests = theLoadingThrottleForUnitTests;
414 	}
415 
416 	@VisibleForTesting
417 	void setMaxMillisToWaitForRemoteResultsForUnitTest(long theMaxMillisToWaitForRemoteResults) {
418 		myMaxMillisToWaitForRemoteResults = theMaxMillisToWaitForRemoteResults;
419 	}
420 
421 	@VisibleForTesting
422 	public void setNeverUseLocalSearchForUnitTests(boolean theNeverUseLocalSearchForUnitTests) {
423 		myNeverUseLocalSearchForUnitTests = theNeverUseLocalSearchForUnitTests;
424 	}
425 
426 	@VisibleForTesting
427 	void setSearchDaoForUnitTest(ISearchDao theSearchDao) {
428 		mySearchDao = theSearchDao;
429 	}
430 
431 	@VisibleForTesting
432 	void setSearchDaoIncludeForUnitTest(ISearchIncludeDao theSearchIncludeDao) {
433 		mySearchIncludeDao = theSearchIncludeDao;
434 	}
435 
436 	@VisibleForTesting
437 	void setSearchDaoResultForUnitTest(ISearchResultDao theSearchResultDao) {
438 		mySearchResultDao = theSearchResultDao;
439 	}
440 
441 	@VisibleForTesting
442 	public void setSyncSizeForUnitTests(int theSyncSize) {
443 		mySyncSize = theSyncSize;
444 	}
445 
446 	@VisibleForTesting
447 	void setTransactionManagerForUnitTest(PlatformTransactionManager theTxManager) {
448 		myManagedTxManager = theTxManager;
449 	}
450 
451 	@VisibleForTesting
452 	public void setDaoRegistryForUnitTest(DaoRegistry theDaoRegistry) {
453 		myDaoRegistry = theDaoRegistry;
454 	}
455 
456 	public abstract class BaseTask implements Callable<Void> {
457 		private final SearchParameterMap myParams;
458 		private final IDao myCallingDao;
459 		private final String myResourceType;
460 		private final ArrayList<Long> mySyncedPids = new ArrayList<>();
461 		private final CountDownLatch myInitialCollectionLatch = new CountDownLatch(1);
462 		private final CountDownLatch myCompletionLatch;
463 		private final ArrayList<Long> myUnsyncedPids = new ArrayList<>();
464 		private Search mySearch;
465 		private boolean myAbortRequested;
466 		private int myCountSaved = 0;
467 		private boolean myAdditionalPrefetchThresholdsRemaining;
468 		private List<Long> myPreviouslyAddedResourcePids;
469 		private Integer myMaxResultsToFetch;
470 		private int myCountFetchedDuringThisPass;
471 
472 		/**
473 		 * Constructor
474 		 */
475 		protected BaseTask(Search theSearch, IDao theCallingDao, SearchParameterMap theParams, String theResourceType) {
476 			mySearch = theSearch;
477 			myCallingDao = theCallingDao;
478 			myParams = theParams;
479 			myResourceType = theResourceType;
480 			myCompletionLatch = new CountDownLatch(1);
481 		}
482 
483 		protected Search getSearch() {
484 			return mySearch;
485 		}
486 
487 		protected CountDownLatch getInitialCollectionLatch() {
488 			return myInitialCollectionLatch;
489 		}
490 
491 		protected void setPreviouslyAddedResourcePids(List<Long> thePreviouslyAddedResourcePids) {
492 			myPreviouslyAddedResourcePids = thePreviouslyAddedResourcePids;
493 			myCountSaved = myPreviouslyAddedResourcePids.size();
494 		}
495 
496 		private ISearchBuilder newSearchBuilder() {
497 			Class<? extends IBaseResource> resourceTypeClass = myContext.getResourceDefinition(myResourceType).getImplementingClass();
498 			ISearchBuilder sb = myCallingDao.newSearchBuilder();
499 			sb.setType(resourceTypeClass, myResourceType);
500 
501 			return sb;
502 		}
503 
504 		public List<Long> getResourcePids(int theFromIndex, int theToIndex) {
505 			ourLog.debug("Requesting search PIDs from {}-{}", theFromIndex, theToIndex);
506 
507 			boolean keepWaiting;
508 			do {
509 				synchronized (mySyncedPids) {
510 					ourLog.trace("Search status is {}", mySearch.getStatus());
511 					boolean haveEnoughResults = mySyncedPids.size() >= theToIndex;
512 					if (!haveEnoughResults) {
513 						switch (mySearch.getStatus()) {
514 							case LOADING:
515 								keepWaiting = true;
516 								break;
517 							case PASSCMPLET:
518 								/*
519 								 * If we get here, it means that the user requested resources that crossed the
520 								 * current pre-fetch boundary. For example, if the prefetch threshold is 50 and the
521 								 * user has requested resources 0-60, then they would get 0-50 back but the search
522 								 * coordinator would then stop searching.SearchCoordinatorSvcImplTest
523 								 */
524 								keepWaiting = false;
525 								break;
526 							case FAILED:
527 							case FINISHED:
528 							default:
529 								keepWaiting = false;
530 								break;
531 						}
532 					} else {
533 						keepWaiting = false;
534 					}
535 				}
536 
537 				if (keepWaiting) {
538 					ourLog.info("Waiting, as we only have {} results", mySyncedPids.size());
539 					try {
540 						Thread.sleep(500);
541 					} catch (InterruptedException theE) {
542 						// ignore
543 					}
544 				}
545 			} while (keepWaiting);
546 
547 			ourLog.info("Proceeding, as we have {} results", mySyncedPids.size());
548 
549 			ArrayList<Long> retVal = new ArrayList<>();
550 			synchronized (mySyncedPids) {
551 				verifySearchHasntFailedOrThrowInternalErrorException(mySearch);
552 
553 				int toIndex = theToIndex;
554 				if (mySyncedPids.size() < toIndex) {
555 					toIndex = mySyncedPids.size();
556 				}
557 				for (int i = theFromIndex; i < toIndex; i++) {
558 					retVal.add(mySyncedPids.get(i));
559 				}
560 			}
561 
562 			ourLog.trace("Done syncing results - Wanted {}-{} and returning {} of {}", theFromIndex, theToIndex, retVal.size(), mySyncedPids.size());
563 
564 			return retVal;
565 		}
566 
567 		protected void saveSearch() {
568 			TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
569 			txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
570 			txTemplate.execute(new TransactionCallbackWithoutResult() {
571 				@Override
572 				protected void doInTransactionWithoutResult(TransactionStatus theArg0) {
573 					doSaveSearch();
574 				}
575 
576 			});
577 		}
578 
579 		private void saveUnsynced(final IResultIterator theResultIter) {
580 			TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
581 			txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
582 			txTemplate.execute(new TransactionCallbackWithoutResult() {
583 				@Override
584 				protected void doInTransactionWithoutResult(TransactionStatus theArg0) {
585 					if (mySearch.getId() == null) {
586 						doSaveSearch();
587 					}
588 
589 					List<SearchResult> resultsToSave = Lists.newArrayList();
590 					for (Long nextPid : myUnsyncedPids) {
591 						SearchResult nextResult = new SearchResult(mySearch);
592 						nextResult.setResourcePid(nextPid);
593 						nextResult.setOrder(myCountSaved++);
594 						resultsToSave.add(nextResult);
595 						ourLog.trace("Saving ORDER[{}] Resource {}", nextResult.getOrder(), nextResult.getResourcePid());
596 					}
597 					mySearchResultDao.saveAll(resultsToSave);
598 
599 					synchronized (mySyncedPids) {
600 						int numSyncedThisPass = myUnsyncedPids.size();
601 						ourLog.trace("Syncing {} search results", numSyncedThisPass);
602 						mySyncedPids.addAll(myUnsyncedPids);
603 						myUnsyncedPids.clear();
604 
605 						if (theResultIter.hasNext() == false) {
606 							mySearch.setNumFound(myCountSaved);
607 							int loadedCountThisPass = theResultIter.getSkippedCount() + myCountSaved;
608 							if (myMaxResultsToFetch != null && loadedCountThisPass < myMaxResultsToFetch) {
609 								mySearch.setStatus(SearchStatusEnum.FINISHED);
610 								mySearch.setTotalCount(myCountSaved);
611 							} else if (myAdditionalPrefetchThresholdsRemaining) {
612 								ourLog.trace("Setting search status to PASSCMPLET");
613 								mySearch.setStatus(SearchStatusEnum.PASSCMPLET);
614 								mySearch.setSearchParameterMap(myParams);
615 							} else {
616 								mySearch.setStatus(SearchStatusEnum.FINISHED);
617 								mySearch.setTotalCount(myCountSaved);
618 							}
619 						}
620 					}
621 
622 					mySearch.setNumFound(myCountSaved);
623 
624 					int numSynced;
625 					synchronized (mySyncedPids) {
626 						numSynced = mySyncedPids.size();
627 					}
628 
629 					if (myDaoConfig.getCountSearchResultsUpTo() == null ||
630 						myDaoConfig.getCountSearchResultsUpTo() <= 0 ||
631 						myDaoConfig.getCountSearchResultsUpTo() <= numSynced) {
632 						myInitialCollectionLatch.countDown();
633 					}
634 
635 					doSaveSearch();
636 
637 				}
638 			});
639 
640 		}
641 
642 		public boolean isNotAborted() {
643 			return myAbortRequested == false;
644 		}
645 
646 		protected void markComplete() {
647 			myCompletionLatch.countDown();
648 		}
649 
650 		public CountDownLatch getCompletionLatch() {
651 			return myCompletionLatch;
652 		}
653 
654 		/**
655 		 * Request that the task abort as soon as possible
656 		 */
657 		public void requestImmediateAbort() {
658 			myAbortRequested = true;
659 		}
660 
661 		/**
662 		 * This is the method which actually performs the search.
663 		 * It is called automatically by the thread pool.
664 		 */
665 		@Override
666 		public Void call() {
667 			StopWatch sw = new StopWatch();
668 
669 			try {
670 				// Create an initial search in the DB and give it an ID
671 				saveSearch();
672 
673 				TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
674 				txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
675 
676 				if (myCustomIsolationSupported) {
677 					txTemplate.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
678 				}
679 
680 				txTemplate.execute(new TransactionCallbackWithoutResult() {
681 					@Override
682 					protected void doInTransactionWithoutResult(TransactionStatus theStatus) {
683 						doSearch();
684 					}
685 				});
686 
687 				ourLog.info("Completed search for [{}{}] and found {} resources in {}ms", mySearch.getResourceType(), mySearch.getSearchQueryString(), mySyncedPids.size(), sw.getMillis());
688 
689 			} catch (Throwable t) {
690 
691 				/*
692 				 * Don't print a stack trace for client errors (i.e. requests that
693 				 * aren't valid because the client screwed up).. that's just noise
694 				 * in the logs and who needs that.
695 				 */
696 				boolean logged = false;
697 				if (t instanceof BaseServerResponseException) {
698 					BaseServerResponseException exception = (BaseServerResponseException) t;
699 					if (exception.getStatusCode() >= 400 && exception.getStatusCode() < 500) {
700 						logged = true;
701 						ourLog.warn("Failed during search due to invalid request: {}", t.toString());
702 					}
703 				}
704 
705 				if (!logged) {
706 					ourLog.error("Failed during search loading after {}ms", sw.getMillis(), t);
707 				}
708 				myUnsyncedPids.clear();
709 
710 				Throwable rootCause = ExceptionUtils.getRootCause(t);
711 				rootCause = defaultIfNull(rootCause, t);
712 
713 				String failureMessage = rootCause.getMessage();
714 
715 				int failureCode = InternalErrorException.STATUS_CODE;
716 				if (t instanceof BaseServerResponseException) {
717 					failureCode = ((BaseServerResponseException) t).getStatusCode();
718 				}
719 
720 				mySearch.setFailureMessage(failureMessage);
721 				mySearch.setFailureCode(failureCode);
722 				mySearch.setStatus(SearchStatusEnum.FAILED);
723 
724 				saveSearch();
725 
726 			} finally {
727 
728 				myIdToSearchTask.remove(mySearch.getUuid());
729 				myInitialCollectionLatch.countDown();
730 				markComplete();
731 
732 			}
733 			return null;
734 		}
735 
736 		private void doSaveSearch() {
737 
738 			Search newSearch;
739 			if (mySearch.getId() == null) {
740 				newSearch = mySearchDao.save(mySearch);
741 				for (SearchInclude next : mySearch.getIncludes()) {
742 					mySearchIncludeDao.save(next);
743 				}
744 			} else {
745 				newSearch = mySearchDao.save(mySearch);
746 			}
747 
748 			// mySearchDao.save is not supposed to return null, but in unit tests
749 			// it can if the mock search dao isn't set up to handle that
750 			if (newSearch != null) {
751 				mySearch = newSearch;
752 			}
753 		}
754 
755 		/**
756 		 * This method actually creates the database query to perform the
757 		 * search, and starts it.
758 		 */
759 		private void doSearch() {
760 
761 			/*
762 			 * If the user has explicitly requested a _count, perform a
763 			 *
764 			 * SELECT COUNT(*) ....
765 			 *
766 			 * before doing anything else.
767 			 */
768 			boolean wantOnlyCount = SummaryEnum.COUNT.equals(myParams.getSummaryMode());
769 			boolean wantCount =
770 				wantOnlyCount ||
771 					SearchTotalModeEnum.ACCURATE.equals(myParams.getSearchTotalMode()) ||
772 					(myParams.getSearchTotalMode() == null && SearchTotalModeEnum.ACCURATE.equals(myDaoConfig.getDefaultTotalMode()));
773 			if (wantCount) {
774 				ourLog.trace("Performing count");
775 				ISearchBuilder sb = newSearchBuilder();
776 				Iterator<Long> countIterator = sb.createCountQuery(myParams, mySearch.getUuid());
777 				Long count = countIterator.next();
778 				ourLog.trace("Got count {}", count);
779 
780 				TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
781 				txTemplate.execute(new TransactionCallbackWithoutResult() {
782 					@Override
783 					protected void doInTransactionWithoutResult(TransactionStatus theArg0) {
784 						mySearch.setTotalCount(count.intValue());
785 						if (wantOnlyCount) {
786 							mySearch.setStatus(SearchStatusEnum.FINISHED);
787 						}
788 						doSaveSearch();
789 						mySearchDao.flush();
790 					}
791 				});
792 				if (wantOnlyCount) {
793 					return;
794 				}
795 			}
796 
797 			ourLog.trace("Done count");
798 			ISearchBuilder sb = newSearchBuilder();
799 
800 			/*
801 			 * Figure out how many results we're actually going to fetch from the
802 			 * database in this pass. This calculation takes into consideration the
803 			 * "pre-fetch thresholds" specified in DaoConfig#getSearchPreFetchThresholds()
804 			 * as well as the value of the _count parameter.
805 			 */
806 			int currentlyLoaded = defaultIfNull(mySearch.getNumFound(), 0);
807 			int minWanted = 0;
808 			if (myParams.getCount() != null) {
809 				minWanted = myParams.getCount();
810 				minWanted = Math.max(minWanted, myPagingProvider.getMaximumPageSize());
811 				minWanted += currentlyLoaded;
812 			}
813 
814 			for (Iterator<Integer> iter = myDaoConfig.getSearchPreFetchThresholds().iterator(); iter.hasNext(); ) {
815 				int next = iter.next();
816 				if (next != -1 && next <= currentlyLoaded) {
817 					continue;
818 				}
819 
820 				if (next == -1) {
821 					sb.setMaxResultsToFetch(null);
822 				} else {
823 					myMaxResultsToFetch = Math.max(next, minWanted);
824 					sb.setMaxResultsToFetch(myMaxResultsToFetch);
825 				}
826 
827 				if (iter.hasNext()) {
828 					myAdditionalPrefetchThresholdsRemaining = true;
829 				}
830 
831 				// If we get here's we've found an appropriate threshold
832 				break;
833 			}
834 
835 			/*
836 			 * Provide any PID we loaded in previous seasrch passes to the
837 			 * SearchBuilder so that we don't get duplicates coming from running
838 			 * the same query again.
839 			 *
840 			 * We could possibly accomplish this in a different way by using sorted
841 			 * results in our SQL query and specifying an offset. I don't actually
842 			 * know if that would be faster or not. At some point should test this
843 			 * idea.
844 			 */
845 			if (myPreviouslyAddedResourcePids != null) {
846 				sb.setPreviouslyAddedResourcePids(myPreviouslyAddedResourcePids);
847 				mySyncedPids.addAll(myPreviouslyAddedResourcePids);
848 			}
849 
850 			/*
851 			 * Construct the SQL query we'll be sending to the database
852 			 */
853 			IResultIterator theResultIterator = sb.createQuery(myParams, mySearch.getUuid());
854 			assert (theResultIterator != null);
855 
856 			/*
857 			 * The following loop actually loads the PIDs of the resources
858 			 * matching the search off of the disk and into memory. After
859 			 * every X results, we commit to the HFJ_SEARCH table.
860 			 */
861 			int syncSize = mySyncSize;
862 			while (theResultIterator.hasNext()) {
863 				myUnsyncedPids.add(theResultIterator.next());
864 				myCountFetchedDuringThisPass++;
865 
866 				boolean shouldSync = myUnsyncedPids.size() >= syncSize;
867 
868 				if (myDaoConfig.getCountSearchResultsUpTo() != null &&
869 					myDaoConfig.getCountSearchResultsUpTo() > 0 &&
870 					myDaoConfig.getCountSearchResultsUpTo() < myUnsyncedPids.size()) {
871 					shouldSync = false;
872 				}
873 
874 				if (myUnsyncedPids.size() > 50000) {
875 					shouldSync = true;
876 				}
877 
878 				// If no abort was requested, bail out
879 				Validate.isTrue(isNotAborted(), "Abort has been requested");
880 
881 				if (shouldSync) {
882 					saveUnsynced(theResultIterator);
883 				}
884 
885 				if (myLoadingThrottleForUnitTests != null) {
886 					try {
887 						Thread.sleep(myLoadingThrottleForUnitTests);
888 					} catch (InterruptedException e) {
889 						// ignore
890 					}
891 				}
892 
893 			}
894 
895 			// If no abort was requested, bail out
896 			Validate.isTrue(isNotAborted(), "Abort has been requested");
897 
898 			saveUnsynced(theResultIterator);
899 		}
900 	}
901 
902 
903 	public class SearchContinuationTask extends BaseTask {
904 
905 		public SearchContinuationTask(Search theSearch, IDao theCallingDao, SearchParameterMap theParams, String theResourceType) {
906 			super(theSearch, theCallingDao, theParams, theResourceType);
907 		}
908 
909 		@Override
910 		public Void call() {
911 			try {
912 				TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
913 				txTemplate.afterPropertiesSet();
914 				txTemplate.execute(t -> {
915 					List<Long> previouslyAddedResourcePids = mySearchResultDao.findWithSearchUuid(getSearch());
916 					ourLog.debug("Have {} previously added IDs in search: {}", previouslyAddedResourcePids.size(), getSearch().getUuid());
917 					setPreviouslyAddedResourcePids(previouslyAddedResourcePids);
918 					return null;
919 				});
920 			} catch (Throwable e) {
921 				ourLog.error("Failure processing search", e);
922 				getSearch().setFailureMessage(e.toString());
923 				getSearch().setStatus(SearchStatusEnum.FAILED);
924 
925 				saveSearch();
926 				return null;
927 			}
928 
929 			return super.call();
930 		}
931 
932 		@Override
933 		public List<Long> getResourcePids(int theFromIndex, int theToIndex) {
934 			return super.getResourcePids(theFromIndex, theToIndex);
935 		}
936 	}
937 
938 	/**
939 	 * A search task is a Callable task that runs in
940 	 * a thread pool to handle an individual search. One instance
941 	 * is created for any requested search and runs from the
942 	 * beginning to the end of the search.
943 	 * <p>
944 	 * Understand:
945 	 * This class executes in its own thread separate from the
946 	 * web server client thread that made the request. We do that
947 	 * so that we can return to the client as soon as possible,
948 	 * but keep the search going in the background (and have
949 	 * the next page of results ready to go when the client asks).
950 	 */
951 	public class SearchTask extends BaseTask {
952 
953 		/**
954 		 * Constructor
955 		 */
956 		public SearchTask(Search theSearch, IDao theCallingDao, SearchParameterMap theParams, String theResourceType) {
957 			super(theSearch, theCallingDao, theParams, theResourceType);
958 		}
959 
960 		/**
961 		 * This method is called by the server HTTP thread, and
962 		 * will block until at least one page of results have been
963 		 * fetched from the DB, and will never block after that.
964 		 */
965 		public Integer awaitInitialSync() {
966 			ourLog.trace("Awaiting initial sync");
967 			do {
968 				try {
969 					if (getInitialCollectionLatch().await(250, TimeUnit.MILLISECONDS)) {
970 						break;
971 					}
972 				} catch (InterruptedException e) {
973 					// Shouldn't happen
974 					throw new InternalErrorException(e);
975 				}
976 			} while (getSearch().getStatus() == SearchStatusEnum.LOADING);
977 			ourLog.trace("Initial sync completed");
978 
979 			return getSearch().getTotalCount();
980 		}
981 
982 	}
983 
984 	public static void populateSearchEntity(SearchParameterMap theParams, String theResourceType, String theSearchUuid, String theQueryString, Search theSearch) {
985 		theSearch.setDeleted(false);
986 		theSearch.setUuid(theSearchUuid);
987 		theSearch.setCreated(new Date());
988 		theSearch.setSearchLastReturned(new Date());
989 		theSearch.setTotalCount(null);
990 		theSearch.setNumFound(0);
991 		theSearch.setPreferredPageSize(theParams.getCount());
992 		theSearch.setSearchType(theParams.getEverythingMode() != null ? SearchTypeEnum.EVERYTHING : SearchTypeEnum.SEARCH);
993 		theSearch.setLastUpdated(theParams.getLastUpdated());
994 		theSearch.setResourceType(theResourceType);
995 		theSearch.setStatus(SearchStatusEnum.LOADING);
996 
997 		theSearch.setSearchQueryString(theQueryString);
998 		theSearch.setSearchQueryStringHash(theQueryString.hashCode());
999 
1000 		for (Include next : theParams.getIncludes()) {
1001 			theSearch.addInclude(new SearchInclude(theSearch, next.getValue(), false, next.isRecurse()));
1002 		}
1003 		for (Include next : theParams.getRevIncludes()) {
1004 			theSearch.addInclude(new SearchInclude(theSearch, next.getValue(), true, next.isRecurse()));
1005 		}
1006 	}
1007 
1008 	/**
1009 	 * Creates a {@link Pageable} using a start and end index
1010 	 */
1011 	@SuppressWarnings("WeakerAccess")
1012 	public static @Nullable
1013 	Pageable toPage(final int theFromIndex, int theToIndex) {
1014 		int pageSize = theToIndex - theFromIndex;
1015 		if (pageSize < 1) {
1016 			return null;
1017 		}
1018 
1019 		int pageIndex = theFromIndex / pageSize;
1020 
1021 		Pageable page = new AbstractPageRequest(pageIndex, pageSize) {
1022 			private static final long serialVersionUID = 1L;
1023 
1024 			@Override
1025 			public long getOffset() {
1026 				return theFromIndex;
1027 			}
1028 
1029 			@Override
1030 			public Sort getSort() {
1031 				return Sort.unsorted();
1032 			}
1033 
1034 			@Override
1035 			public Pageable next() {
1036 				return null;
1037 			}
1038 
1039 			@Override
1040 			public Pageable previous() {
1041 				return null;
1042 			}
1043 
1044 			@Override
1045 			public Pageable first() {
1046 				return null;
1047 			}
1048 		};
1049 
1050 		return page;
1051 	}
1052 
1053 	static void verifySearchHasntFailedOrThrowInternalErrorException(Search theSearch) {
1054 		if (theSearch.getStatus() == SearchStatusEnum.FAILED) {
1055 			Integer status = theSearch.getFailureCode();
1056 			status = defaultIfNull(status, 500);
1057 
1058 			String message = theSearch.getFailureMessage();
1059 			throw BaseServerResponseException.newInstance(status, message);
1060 		}
1061 	}
1062 
1063 }