View Javadoc
1   package ca.uhn.fhir.jpa.search;
2   
3   /*-
4    * #%L
5    * HAPI FHIR JPA Server
6    * %%
7    * Copyright (C) 2014 - 2018 University Health Network
8    * %%
9    * Licensed under the Apache License, Version 2.0 (the "License");
10   * you may not use this file except in compliance with the License.
11   * You may obtain a copy of the License at
12   * 
13   *      http://www.apache.org/licenses/LICENSE-2.0
14   * 
15   * Unless required by applicable law or agreed to in writing, software
16   * distributed under the License is distributed on an "AS IS" BASIS,
17   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18   * See the License for the specific language governing permissions and
19   * limitations under the License.
20   * #L%family
21   */
22  
23  import ca.uhn.fhir.context.FhirContext;
24  import ca.uhn.fhir.jpa.dao.DaoConfig;
25  import ca.uhn.fhir.jpa.dao.IDao;
26  import ca.uhn.fhir.jpa.dao.ISearchBuilder;
27  import ca.uhn.fhir.jpa.dao.SearchParameterMap;
28  import ca.uhn.fhir.jpa.dao.data.ISearchDao;
29  import ca.uhn.fhir.jpa.dao.data.ISearchIncludeDao;
30  import ca.uhn.fhir.jpa.dao.data.ISearchResultDao;
31  import ca.uhn.fhir.jpa.entity.*;
32  import ca.uhn.fhir.util.StopWatch;
33  import ca.uhn.fhir.model.api.Include;
34  import ca.uhn.fhir.rest.api.CacheControlDirective;
35  import ca.uhn.fhir.rest.api.Constants;
36  import ca.uhn.fhir.rest.api.server.IBundleProvider;
37  import ca.uhn.fhir.rest.server.SimpleBundleProvider;
38  import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException;
39  import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
40  import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
41  import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
42  import ca.uhn.fhir.rest.server.method.PageMethodBinding;
43  import com.google.common.annotations.VisibleForTesting;
44  import com.google.common.collect.Lists;
45  import org.apache.commons.lang3.ObjectUtils;
46  import org.apache.commons.lang3.Validate;
47  import org.apache.commons.lang3.exception.ExceptionUtils;
48  import org.apache.commons.lang3.time.DateUtils;
49  import org.hl7.fhir.instance.model.api.IBaseResource;
50  import org.springframework.beans.factory.annotation.Autowired;
51  import org.springframework.data.domain.Page;
52  import org.springframework.data.domain.PageRequest;
53  import org.springframework.data.domain.Pageable;
54  import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
55  import org.springframework.transaction.PlatformTransactionManager;
56  import org.springframework.transaction.TransactionDefinition;
57  import org.springframework.transaction.TransactionStatus;
58  import org.springframework.transaction.annotation.Propagation;
59  import org.springframework.transaction.annotation.Transactional;
60  import org.springframework.transaction.support.TransactionCallback;
61  import org.springframework.transaction.support.TransactionCallbackWithoutResult;
62  import org.springframework.transaction.support.TransactionTemplate;
63  
64  import javax.annotation.Nullable;
65  import javax.persistence.EntityManager;
66  import java.util.*;
67  import java.util.concurrent.*;
68  
69  public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
70  	public static final int DEFAULT_SYNC_SIZE = 250;
71  
72  	private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchCoordinatorSvcImpl.class);
73  	private final ConcurrentHashMap<String, SearchTask> myIdToSearchTask = new ConcurrentHashMap<String, SearchTask>();
74  	@Autowired
75  	private FhirContext myContext;
76  	@Autowired
77  	private DaoConfig myDaoConfig;
78  	@Autowired
79  	private EntityManager myEntityManager;
80  	private ExecutorService myExecutor;
81  	private Integer myLoadingThrottleForUnitTests = null;
82  	private long myMaxMillisToWaitForRemoteResults = DateUtils.MILLIS_PER_MINUTE;
83  	private boolean myNeverUseLocalSearchForUnitTests;
84  	@Autowired
85  	private ISearchDao mySearchDao;
86  	@Autowired
87  	private ISearchIncludeDao mySearchIncludeDao;
88  	@Autowired
89  	private ISearchResultDao mySearchResultDao;
90  	@Autowired
91  	private PlatformTransactionManager myManagedTxManager;
92  
93  	private int mySyncSize = DEFAULT_SYNC_SIZE;
94  
95  	/**
96  	 * Constructor
97  	 */
98  	public SearchCoordinatorSvcImpl() {
99  		CustomizableThreadFactory threadFactory = new CustomizableThreadFactory("search_coord_");
100 		myExecutor = Executors.newCachedThreadPool(threadFactory);
101 	}
102 
103 	@Override
104 	public void cancelAllActiveSearches() {
105 		for (SearchTask next : myIdToSearchTask.values()) {
106 			next.requestImmediateAbort();
107 			try {
108 				next.getCompletionLatch().await(30, TimeUnit.SECONDS);
109 			} catch (InterruptedException e) {
110 				ourLog.warn("Failed to wait for completion", e);
111 			}
112 		}
113 	}
114 
115 	@Override
116 	@Transactional(propagation = Propagation.NEVER)
117 	public List<Long> getResources(final String theUuid, int theFrom, int theTo) {
118 		if (myNeverUseLocalSearchForUnitTests == false) {
119 			SearchTask task = myIdToSearchTask.get(theUuid);
120 			if (task != null) {
121 				ourLog.trace("Local search found");
122 				return task.getResourcePids(theFrom, theTo);
123 			} else {
124 				ourLog.trace("No local search found");
125 			}
126 		} else {
127 			ourLog.trace("Forced not using local search");
128 		}
129 
130 		TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
131 		txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
132 
133 		Search search;
134 		StopWatch sw = new StopWatch();
135 		while (true) {
136 
137 			search = txTemplate.execute(new TransactionCallback<Search>() {
138 				@Override
139 				public Search doInTransaction(TransactionStatus theStatus) {
140 					return mySearchDao.findByUuid(theUuid);
141 				}
142 			});
143 
144 			if (search == null) {
145 				ourLog.info("Client requested unknown paging ID[{}]", theUuid);
146 				String msg = myContext.getLocalizer().getMessage(PageMethodBinding.class, "unknownSearchId", theUuid);
147 				throw new ResourceGoneException(msg);
148 			}
149 
150 			verifySearchHasntFailedOrThrowInternalErrorException(search);
151 			if (search.getStatus() == SearchStatusEnum.FINISHED) {
152 				ourLog.info("Search entity marked as finished");
153 				break;
154 			}
155 			if (search.getNumFound() >= theTo) {
156 				ourLog.info("Search entity has {} results so far", search.getNumFound());
157 				break;
158 			}
159 
160 			if (sw.getMillis() > myMaxMillisToWaitForRemoteResults) {
161 				throw new InternalErrorException("Request timed out after " + sw.getMillis() + "ms");
162 			}
163 
164 			try {
165 				Thread.sleep(500);
166 			} catch (InterruptedException e) {
167 				// ignore
168 			}
169 		}
170 
171 		final Pageable page = toPage(theFrom, theTo);
172 		if (page == null) {
173 			return Collections.emptyList();
174 		}
175 
176 		final Search foundSearch = search;
177 
178 		List<Long> retVal = txTemplate.execute(new TransactionCallback<List<Long>>() {
179 			@Override
180 			public List<Long> doInTransaction(TransactionStatus theStatus) {
181 				final List<Long> resultPids = new ArrayList<Long>();
182 				Page<Long> searchResultPids = mySearchResultDao.findWithSearchUuid(foundSearch, page);
183 				for (Long next : searchResultPids) {
184 					resultPids.add(next);
185 				}
186 				return resultPids;
187 			}
188 		});
189 		return retVal;
190 	}
191 
192 	private void populateBundleProvider(PersistedJpaBundleProvider theRetVal) {
193 		theRetVal.setContext(myContext);
194 		theRetVal.setEntityManager(myEntityManager);
195 		theRetVal.setPlatformTransactionManager(myManagedTxManager);
196 		theRetVal.setSearchDao(mySearchDao);
197 		theRetVal.setSearchCoordinatorSvc(this);
198 	}
199 
200 	@Override
201 	public IBundleProvider registerSearch(final IDao theCallingDao, final SearchParameterMap theParams, String theResourceType, CacheControlDirective theCacheControlDirective) {
202 		StopWatch w = new StopWatch();
203 		final String searchUuid = UUID.randomUUID().toString();
204 
205 		ourLog.debug("Registering new search {}", searchUuid);
206 
207 		Class<? extends IBaseResource> resourceTypeClass = myContext.getResourceDefinition(theResourceType).getImplementingClass();
208 		final ISearchBuilder sb = theCallingDao.newSearchBuilder();
209 		sb.setType(resourceTypeClass, theResourceType);
210 		sb.setFetchSize(mySyncSize);
211 
212 		final Integer loadSynchronousUpTo;
213 		if (theCacheControlDirective != null && theCacheControlDirective.isNoStore()) {
214 			if (theCacheControlDirective.getMaxResults() != null) {
215 				loadSynchronousUpTo = theCacheControlDirective.getMaxResults();
216 				if (loadSynchronousUpTo > myDaoConfig.getCacheControlNoStoreMaxResultsUpperLimit()) {
217 					throw new InvalidRequestException(Constants.HEADER_CACHE_CONTROL + " header " + Constants.CACHE_CONTROL_MAX_RESULTS + " value must not exceed " + myDaoConfig.getCacheControlNoStoreMaxResultsUpperLimit());
218 				}
219 			} else {
220 				loadSynchronousUpTo = 100;
221 			}
222 		} else {
223 			loadSynchronousUpTo = null;
224 		}
225 
226 		if (theParams.isLoadSynchronous() || loadSynchronousUpTo != null) {
227 
228 			ourLog.debug("Search {} is loading in synchronous mode", searchUuid);
229 
230 			// Execute the query and make sure we return distinct results
231 			TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
232 			txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRED);
233 			return txTemplate.execute(new TransactionCallback<SimpleBundleProvider>() {
234 				@Override
235 				public SimpleBundleProvider doInTransaction(TransactionStatus theStatus) {
236 
237 					// Load the results synchronously
238 					final List<Long> pids = new ArrayList<>();
239 
240 					Iterator<Long> resultIter = sb.createQuery(theParams, searchUuid);
241 					while (resultIter.hasNext()) {
242 						pids.add(resultIter.next());
243 						if (loadSynchronousUpTo != null && pids.size() >= loadSynchronousUpTo) {
244 							break;
245 						}
246 						if (theParams.getLoadSynchronousUpTo() != null && pids.size() >= theParams.getLoadSynchronousUpTo()) {
247 							break;
248 						}
249 					}
250 
251 					/*
252 					 * For synchronous queries, we load all the includes right away
253 					 * since we're returning a static bundle with all the results
254 					 * pre-loaded. This is ok because syncronous requests are not
255 					 * expected to be paged
256 					 * 
257 					 * On the other hand for async queries we load includes/revincludes
258 					 * individually for pages as we return them to clients
259 					 */
260 					final Set<Long> includedPids = new HashSet<Long>();
261 					includedPids.addAll(sb.loadIncludes(theCallingDao, myContext, myEntityManager, pids, theParams.getRevIncludes(), true, theParams.getLastUpdated()));
262 					includedPids.addAll(sb.loadIncludes(theCallingDao, myContext, myEntityManager, pids, theParams.getIncludes(), false, theParams.getLastUpdated()));
263 
264 					List<IBaseResource> resources = new ArrayList<IBaseResource>();
265 					sb.loadResourcesByPid(pids, resources, includedPids, false, myEntityManager, myContext, theCallingDao);
266 					return new SimpleBundleProvider(resources);
267 				}
268 			});
269 		}
270 
271 		/*
272 		 * See if there are any cached searches whose results we can return
273 		 * instead
274 		 */
275 		boolean useCache = true;
276 		if (theCacheControlDirective != null && theCacheControlDirective.isNoCache() == true) {
277 			useCache = false;
278 		}
279 		final String queryString = theParams.toNormalizedQueryString(myContext);
280 		if (theParams.getEverythingMode() == null) {
281 			if (myDaoConfig.getReuseCachedSearchResultsForMillis() != null && useCache) {
282 
283 				final Date createdCutoff = new Date(System.currentTimeMillis() - myDaoConfig.getReuseCachedSearchResultsForMillis());
284 				final String resourceType = theResourceType;
285 
286 				TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
287 				PersistedJpaBundleProvider foundSearchProvider = txTemplate.execute(new TransactionCallback<PersistedJpaBundleProvider>() {
288 					@Override
289 					public PersistedJpaBundleProvider doInTransaction(TransactionStatus theStatus) {
290 						Search searchToUse = null;
291 
292 						int hashCode = queryString.hashCode();
293 						Collection<Search> candidates = mySearchDao.find(resourceType, hashCode, createdCutoff);
294 						for (Search nextCandidateSearch : candidates) {
295 							if (queryString.equals(nextCandidateSearch.getSearchQueryString())) {
296 								searchToUse = nextCandidateSearch;
297 							}
298 						}
299 
300 						PersistedJpaBundleProvider retVal = null;
301 						if (searchToUse != null) {
302 							ourLog.info("Reusing search {} from cache", searchToUse.getUuid());
303 							searchToUse.setSearchLastReturned(new Date());
304 							mySearchDao.updateSearchLastReturned(searchToUse.getId(), new Date());
305 
306 							retVal = new PersistedJpaBundleProvider(searchToUse.getUuid(), theCallingDao);
307 							retVal.setCacheHit(true);
308 
309 							populateBundleProvider(retVal);
310 						}
311 
312 						return retVal;
313 					}
314 				});
315 
316 				if (foundSearchProvider != null) {
317 					return foundSearchProvider;
318 				}
319 
320 			}
321 		}
322 
323 		Search search = new Search();
324 		search.setUuid(searchUuid);
325 		search.setCreated(new Date());
326 		search.setSearchLastReturned(new Date());
327 		search.setTotalCount(null);
328 		search.setNumFound(0);
329 		search.setPreferredPageSize(theParams.getCount());
330 		search.setSearchType(theParams.getEverythingMode() != null ? SearchTypeEnum.EVERYTHING : SearchTypeEnum.SEARCH);
331 		search.setLastUpdated(theParams.getLastUpdated());
332 		search.setResourceType(theResourceType);
333 		search.setStatus(SearchStatusEnum.LOADING);
334 
335 		search.setSearchQueryString(queryString);
336 		search.setSearchQueryStringHash(queryString.hashCode());
337 
338 		for (Include next : theParams.getIncludes()) {
339 			search.addInclude(new SearchInclude(search, next.getValue(), false, next.isRecurse()));
340 		}
341 		for (Include next : theParams.getRevIncludes()) {
342 			search.addInclude(new SearchInclude(search, next.getValue(), true, next.isRecurse()));
343 		}
344 
345 		SearchTask task = new SearchTask(search, theCallingDao, theParams, theResourceType, searchUuid);
346 		myIdToSearchTask.put(search.getUuid(), task);
347 		myExecutor.submit(task);
348 
349 		PersistedJpaSearchFirstPageBundleProvider retVal = new PersistedJpaSearchFirstPageBundleProvider(search, theCallingDao, task, sb, myManagedTxManager);
350 		populateBundleProvider(retVal);
351 
352 		ourLog.info("Search initial phase completed in {}ms", w.getMillis());
353 		return retVal;
354 
355 	}
356 
357 	@VisibleForTesting
358 	void setContextForUnitTest(FhirContext theCtx) {
359 		myContext = theCtx;
360 	}
361 
362 	@VisibleForTesting
363 	void setDaoConfigForUnitTest(DaoConfig theDaoConfig) {
364 		myDaoConfig = theDaoConfig;
365 	}
366 
367 	@VisibleForTesting
368 	void setEntityManagerForUnitTest(EntityManager theEntityManager) {
369 		myEntityManager = theEntityManager;
370 	}
371 
372 	@VisibleForTesting
373 	public void setLoadingThrottleForUnitTests(Integer theLoadingThrottleForUnitTests) {
374 		myLoadingThrottleForUnitTests = theLoadingThrottleForUnitTests;
375 	}
376 
377 	@VisibleForTesting
378 	void setMaxMillisToWaitForRemoteResultsForUnitTest(long theMaxMillisToWaitForRemoteResults) {
379 		myMaxMillisToWaitForRemoteResults = theMaxMillisToWaitForRemoteResults;
380 	}
381 
382 	@VisibleForTesting
383 	public void setNeverUseLocalSearchForUnitTests(boolean theNeverUseLocalSearchForUnitTests) {
384 		myNeverUseLocalSearchForUnitTests = theNeverUseLocalSearchForUnitTests;
385 	}
386 
387 	@VisibleForTesting
388 	void setSearchDaoForUnitTest(ISearchDao theSearchDao) {
389 		mySearchDao = theSearchDao;
390 	}
391 
392 	@VisibleForTesting
393 	void setSearchDaoIncludeForUnitTest(ISearchIncludeDao theSearchIncludeDao) {
394 		mySearchIncludeDao = theSearchIncludeDao;
395 	}
396 
397 	@VisibleForTesting
398 	void setSearchDaoResultForUnitTest(ISearchResultDao theSearchResultDao) {
399 		mySearchResultDao = theSearchResultDao;
400 	}
401 
402 	@VisibleForTesting
403 	public void setSyncSizeForUnitTests(int theSyncSize) {
404 		mySyncSize = theSyncSize;
405 	}
406 
407 	@VisibleForTesting
408 	void setTransactionManagerForUnitTest(PlatformTransactionManager theTxManager) {
409 		myManagedTxManager = theTxManager;
410 	}
411 
412 	/**
413 	 * Creates a {@link Pageable} using a start and end index
414 	 */
415 	@SuppressWarnings("WeakerAccess")
416 	public static @Nullable	Pageable toPage(final int theFromIndex, int theToIndex) {
417 		int pageSize = theToIndex - theFromIndex;
418 		if (pageSize < 1) {
419 			return null;
420 		}
421 
422 		int pageIndex = theFromIndex / pageSize;
423 
424 		Pageable page = new PageRequest(pageIndex, pageSize) {
425 			private static final long serialVersionUID = 1L;
426 
427 			@Override
428 			public long getOffset() {
429 				return theFromIndex;
430 			}
431 		};
432 
433 		return page;
434 	}
435 
436 	static void verifySearchHasntFailedOrThrowInternalErrorException(Search theSearch) {
437 		if (theSearch.getStatus() == SearchStatusEnum.FAILED) {
438 			Integer status = theSearch.getFailureCode();
439 			status = ObjectUtils.defaultIfNull(status, 500);
440 
441 			String message = theSearch.getFailureMessage();
442 			throw BaseServerResponseException.newInstance(status, message);
443 		}
444 	}
445 
446 	/**
447 	 * A search task is a Callable task that runs in
448 	 * a thread pool to handle an individual search. One instance
449 	 * is created for any requested search and runs from the
450 	 * beginning to the end of the search.
451 	 *
452 	 * Understand:
453 	 * This class executes in its own thread separate from the
454 	 * web server client thread that made the request. We do that
455 	 * so that we can return to the client as soon as possible,
456 	 * but keep the search going in the background (and have
457 	 * the next page of results ready to go when the client asks).
458 	 */
459 	public class SearchTask implements Callable<Void> {
460 
461 		private final IDao myCallingDao;
462 		private final CountDownLatch myCompletionLatch;
463 		private final CountDownLatch myInitialCollectionLatch = new CountDownLatch(1);
464 		private final SearchParameterMap myParams;
465 		private final String myResourceType;
466 		private final Search mySearch;
467 		private final ArrayList<Long> mySyncedPids = new ArrayList<>();
468 		private final ArrayList<Long> myUnsyncedPids = new ArrayList<>();
469 		private boolean myAbortRequested;
470 		private int myCountSaved = 0;
471 		private String mySearchUuid;
472 
473 		/**
474 		 * Constructor
475 		 */
476 		public SearchTask(Search theSearch, IDao theCallingDao, SearchParameterMap theParams, String theResourceType, String theSearchUuid) {
477 			mySearch = theSearch;
478 			myCallingDao = theCallingDao;
479 			myParams = theParams;
480 			myResourceType = theResourceType;
481 			myCompletionLatch = new CountDownLatch(1);
482 			mySearchUuid = theSearchUuid;
483 		}
484 
485 		/**
486 		 * This method is called by the server HTTP thread, and
487 		 * will block until at least one page of results have been
488 		 * fetched from the DB, and will never block after that.
489 		 */
490 		public Integer awaitInitialSync() {
491 			ourLog.trace("Awaiting initial sync");
492 			do {
493 				try {
494 					if (myInitialCollectionLatch.await(250, TimeUnit.MILLISECONDS)) {
495 						break;
496 					}
497 				} catch (InterruptedException e) {
498 					// Shouldn't happen
499 					throw new InternalErrorException(e);
500 				}
501 			} while (mySearch.getStatus() == SearchStatusEnum.LOADING);
502 			ourLog.trace("Initial sync completed");
503 
504 			return mySearch.getTotalCount();
505 		}
506 
507 		/**
508 		 * This is the method which actually performs the search.
509 		 * It is called automatically by the thread pool.
510 		 */
511 		@Override
512 		public Void call() {
513 			StopWatch sw = new StopWatch();
514 
515 			try {
516 				// Create an initial search in the DB and give it an ID
517 				saveSearch();
518 
519 				TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
520 				txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
521 				txTemplate.execute(new TransactionCallbackWithoutResult() {
522 					@Override
523 					protected void doInTransactionWithoutResult(TransactionStatus theStatus) {
524 						doSearch();
525 					}
526 				});
527 
528 				ourLog.info("Completed search for {} resources in {}ms", mySyncedPids.size(), sw.getMillis());
529 
530 			} catch (Throwable t) {
531 
532 				/*
533 				 * Don't print a stack trace for client errors (i.e. requests that
534 				 * aren't valid because the client screwed up).. that's just noise
535 				 * in the logs and who needs that.
536 				 */
537 				boolean logged = false;
538 				if (t instanceof BaseServerResponseException) {
539 					BaseServerResponseException exception = (BaseServerResponseException) t;
540 					if (exception.getStatusCode() >= 400 && exception.getStatusCode() < 500) {
541 						logged = true;
542 						ourLog.warn("Failed during search due to invalid request: {}", t.toString());
543 					}
544 				}
545 
546 				if (!logged) {
547 					ourLog.error("Failed during search loading after {}ms", sw.getMillis(), t);
548 				}
549 				myUnsyncedPids.clear();
550 
551 				Throwable rootCause = ExceptionUtils.getRootCause(t);
552 				rootCause = ObjectUtils.defaultIfNull(rootCause, t);
553 
554 				String failureMessage = rootCause.getMessage();
555 
556 				int failureCode = InternalErrorException.STATUS_CODE;
557 				if (t instanceof BaseServerResponseException) {
558 					failureCode = ((BaseServerResponseException) t).getStatusCode();
559 				}
560 
561 				mySearch.setFailureMessage(failureMessage);
562 				mySearch.setFailureCode(failureCode);
563 				mySearch.setStatus(SearchStatusEnum.FAILED);
564 
565 				saveSearch();
566 
567 			} finally {
568 
569 				myIdToSearchTask.remove(mySearch.getUuid());
570 				myInitialCollectionLatch.countDown();
571 				myCompletionLatch.countDown();
572 
573 			}
574 			return null;
575 		}
576 
577 		private void doSaveSearch() {
578 			if (mySearch.getId() == null) {
579 				mySearchDao.save(mySearch);
580 				for (SearchInclude next : mySearch.getIncludes()) {
581 					mySearchIncludeDao.save(next);
582 				}
583 			} else {
584 				mySearchDao.save(mySearch);
585 			}
586 		}
587 
588 		private void doSearch() {
589 			Class<? extends IBaseResource> resourceTypeClass = myContext.getResourceDefinition(myResourceType).getImplementingClass();
590 			ISearchBuilder sb = myCallingDao.newSearchBuilder();
591 			sb.setType(resourceTypeClass, myResourceType);
592 			Iterator<Long> theResultIterator = sb.createQuery(myParams, mySearchUuid);
593 
594 			while (theResultIterator.hasNext()) {
595 				myUnsyncedPids.add(theResultIterator.next());
596 
597 				boolean shouldSync = myUnsyncedPids.size() >= mySyncSize;
598 
599 				if (myDaoConfig.getCountSearchResultsUpTo() != null &&
600 					myDaoConfig.getCountSearchResultsUpTo() > 0 &&
601 					myDaoConfig.getCountSearchResultsUpTo() < myUnsyncedPids.size()) {
602 					shouldSync = false;
603 				}
604 
605 				if (myUnsyncedPids.size() > 50000) {
606 					shouldSync = true;
607 				}
608 
609 				// If no abort was requested, bail out
610 				Validate.isTrue(myAbortRequested == false, "Abort has been requested");
611 
612 				if (shouldSync) {
613 					saveUnsynced(theResultIterator);
614 				}
615 
616 				if (myLoadingThrottleForUnitTests != null) {
617 					try {
618 						Thread.sleep(myLoadingThrottleForUnitTests);
619 					} catch (InterruptedException e) {
620 						// ignore
621 					}
622 				}
623 
624 			}
625 
626 			// If no abort was requested, bail out
627 			Validate.isTrue(myAbortRequested == false, "Abort has been requested");
628 
629 			saveUnsynced(theResultIterator);
630 		}
631 
632 		public CountDownLatch getCompletionLatch() {
633 			return myCompletionLatch;
634 		}
635 
636 		public List<Long> getResourcePids(int theFromIndex, int theToIndex) {
637 			ourLog.info("Requesting search PIDs from {}-{}", theFromIndex, theToIndex);
638 
639 			boolean keepWaiting;
640 			do {
641 				synchronized (mySyncedPids) {
642 					keepWaiting = mySyncedPids.size() < theToIndex && mySearch.getStatus() == SearchStatusEnum.LOADING;
643 				}
644 				if (keepWaiting) {
645 					ourLog.info("Waiting, as we only have {} results", mySyncedPids.size());
646 					try {
647 						Thread.sleep(500);
648 					} catch (InterruptedException theE) {
649 						// ignore
650 					}
651 				}
652 			} while (keepWaiting);
653 
654 			ourLog.info("Proceeding, as we have {} results", mySyncedPids.size());
655 
656 			ArrayList<Long> retVal = new ArrayList<>();
657 			synchronized (mySyncedPids) {
658 				verifySearchHasntFailedOrThrowInternalErrorException(mySearch);
659 
660 				int toIndex = theToIndex;
661 				if (mySyncedPids.size() < toIndex) {
662 					toIndex = mySyncedPids.size();
663 				}
664 				for (int i = theFromIndex; i < toIndex; i++) {
665 					retVal.add(mySyncedPids.get(i));
666 				}
667 			}
668 
669 			ourLog.info("Done syncing results", mySyncedPids.size());
670 
671 			return retVal;
672 		}
673 
674 		/**
675 		 * Request that the task abort as soon as possible
676 		 */
677 		public void requestImmediateAbort() {
678 			myAbortRequested = true;
679 		}
680 
681 		private void saveSearch() {
682 			TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
683 			txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
684 			txTemplate.execute(new TransactionCallbackWithoutResult() {
685 				@Override
686 				protected void doInTransactionWithoutResult(TransactionStatus theArg0) {
687 					doSaveSearch();
688 				}
689 
690 			});
691 		}
692 
693 		private void saveUnsynced(final Iterator<Long> theResultIter) {
694 			TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
695 			txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRED);
696 			txTemplate.execute(new TransactionCallbackWithoutResult() {
697 				@Override
698 				protected void doInTransactionWithoutResult(TransactionStatus theArg0) {
699 					if (mySearch.getId() == null) {
700 						doSaveSearch();
701 					}
702 
703 					List<SearchResult> resultsToSave = Lists.newArrayList();
704 					for (Long nextPid : myUnsyncedPids) {
705 						SearchResult nextResult = new SearchResult(mySearch);
706 						nextResult.setResourcePid(nextPid);
707 						nextResult.setOrder(myCountSaved++);
708 						resultsToSave.add(nextResult);
709 					}
710 					mySearchResultDao.saveAll(resultsToSave);
711 
712 					synchronized (mySyncedPids) {
713 						int numSyncedThisPass = myUnsyncedPids.size();
714 						ourLog.trace("Syncing {} search results", numSyncedThisPass);
715 						mySyncedPids.addAll(myUnsyncedPids);
716 						myUnsyncedPids.clear();
717 
718 						if (theResultIter.hasNext() == false) {
719 							mySearch.setTotalCount(myCountSaved);
720 							mySearch.setStatus(SearchStatusEnum.FINISHED);
721 						}
722 					}
723 
724 					mySearch.setNumFound(myCountSaved);
725 
726 					int numSynced;
727 					synchronized (mySyncedPids) {
728 						numSynced = mySyncedPids.size();
729 					}
730 
731 					if (myDaoConfig.getCountSearchResultsUpTo() == null ||
732 						myDaoConfig.getCountSearchResultsUpTo() <= 0 ||
733 						myDaoConfig.getCountSearchResultsUpTo() <= numSynced) {
734 						myInitialCollectionLatch.countDown();
735 					}
736 
737 					doSaveSearch();
738 
739 				}
740 			});
741 
742 		}
743 
744 	}
745 
746 }