View Javadoc
1   package ca.uhn.fhir.jpa.search;
2   
3   /*-
4    * #%L
5    * HAPI FHIR JPA Server
6    * %%
7    * Copyright (C) 2014 - 2018 University Health Network
8    * %%
9    * Licensed under the Apache License, Version 2.0 (the "License");
10   * you may not use this file except in compliance with the License.
11   * You may obtain a copy of the License at
12   * 
13   *      http://www.apache.org/licenses/LICENSE-2.0
14   * 
15   * Unless required by applicable law or agreed to in writing, software
16   * distributed under the License is distributed on an "AS IS" BASIS,
17   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18   * See the License for the specific language governing permissions and
19   * limitations under the License.
20   * #L%family
21   */
22  
23  import ca.uhn.fhir.context.FhirContext;
24  import ca.uhn.fhir.jpa.dao.DaoConfig;
25  import ca.uhn.fhir.jpa.dao.IDao;
26  import ca.uhn.fhir.jpa.dao.ISearchBuilder;
27  import ca.uhn.fhir.jpa.dao.SearchParameterMap;
28  import ca.uhn.fhir.jpa.dao.data.ISearchDao;
29  import ca.uhn.fhir.jpa.dao.data.ISearchIncludeDao;
30  import ca.uhn.fhir.jpa.dao.data.ISearchResultDao;
31  import ca.uhn.fhir.jpa.entity.*;
32  import ca.uhn.fhir.util.StopWatch;
33  import ca.uhn.fhir.model.api.Include;
34  import ca.uhn.fhir.rest.api.CacheControlDirective;
35  import ca.uhn.fhir.rest.api.Constants;
36  import ca.uhn.fhir.rest.api.server.IBundleProvider;
37  import ca.uhn.fhir.rest.server.SimpleBundleProvider;
38  import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException;
39  import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
40  import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
41  import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
42  import ca.uhn.fhir.rest.server.method.PageMethodBinding;
43  import com.google.common.annotations.VisibleForTesting;
44  import com.google.common.collect.Lists;
45  import org.apache.commons.lang3.ObjectUtils;
46  import org.apache.commons.lang3.Validate;
47  import org.apache.commons.lang3.exception.ExceptionUtils;
48  import org.apache.commons.lang3.time.DateUtils;
49  import org.hl7.fhir.instance.model.api.IBaseResource;
50  import org.springframework.beans.factory.annotation.Autowired;
51  import org.springframework.data.domain.Page;
52  import org.springframework.data.domain.PageRequest;
53  import org.springframework.data.domain.Pageable;
54  import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
55  import org.springframework.transaction.PlatformTransactionManager;
56  import org.springframework.transaction.TransactionDefinition;
57  import org.springframework.transaction.TransactionStatus;
58  import org.springframework.transaction.annotation.Propagation;
59  import org.springframework.transaction.annotation.Transactional;
60  import org.springframework.transaction.support.TransactionCallback;
61  import org.springframework.transaction.support.TransactionCallbackWithoutResult;
62  import org.springframework.transaction.support.TransactionTemplate;
63  
64  import javax.persistence.EntityManager;
65  import java.util.*;
66  import java.util.concurrent.*;
67  
68  public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
69  	public static final int DEFAULT_SYNC_SIZE = 250;
70  
71  	private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SearchCoordinatorSvcImpl.class);
72  	private final ConcurrentHashMap<String, SearchTask> myIdToSearchTask = new ConcurrentHashMap<String, SearchTask>();
73  	@Autowired
74  	private FhirContext myContext;
75  	@Autowired
76  	private DaoConfig myDaoConfig;
77  	@Autowired
78  	private EntityManager myEntityManager;
79  	private ExecutorService myExecutor;
80  	private Integer myLoadingThrottleForUnitTests = null;
81  	private long myMaxMillisToWaitForRemoteResults = DateUtils.MILLIS_PER_MINUTE;
82  	private boolean myNeverUseLocalSearchForUnitTests;
83  	@Autowired
84  	private ISearchDao mySearchDao;
85  	@Autowired
86  	private ISearchIncludeDao mySearchIncludeDao;
87  	@Autowired
88  	private ISearchResultDao mySearchResultDao;
89  	@Autowired
90  	private PlatformTransactionManager myManagedTxManager;
91  
92  	private int mySyncSize = DEFAULT_SYNC_SIZE;
93  
94  	/**
95  	 * Constructor
96  	 */
97  	public SearchCoordinatorSvcImpl() {
98  		CustomizableThreadFactory threadFactory = new CustomizableThreadFactory("search_coord_");
99  		myExecutor = Executors.newCachedThreadPool(threadFactory);
100 	}
101 
102 	@Override
103 	public void cancelAllActiveSearches() {
104 		for (SearchTask next : myIdToSearchTask.values()) {
105 			next.requestImmediateAbort();
106 			try {
107 				next.getCompletionLatch().await(30, TimeUnit.SECONDS);
108 			} catch (InterruptedException e) {
109 				ourLog.warn("Failed to wait for completion", e);
110 			}
111 		}
112 	}
113 
114 	@Override
115 	@Transactional(propagation = Propagation.NEVER)
116 	public List<Long> getResources(final String theUuid, int theFrom, int theTo) {
117 		if (myNeverUseLocalSearchForUnitTests == false) {
118 			SearchTask task = myIdToSearchTask.get(theUuid);
119 			if (task != null) {
120 				ourLog.trace("Local search found");
121 				return task.getResourcePids(theFrom, theTo);
122 			} else {
123 				ourLog.trace("No local search found");
124 			}
125 		} else {
126 			ourLog.trace("Forced not using local search");
127 		}
128 
129 		TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
130 		txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
131 
132 		Search search;
133 		StopWatch sw = new StopWatch();
134 		while (true) {
135 
136 			search = txTemplate.execute(new TransactionCallback<Search>() {
137 				@Override
138 				public Search doInTransaction(TransactionStatus theStatus) {
139 					return mySearchDao.findByUuid(theUuid);
140 				}
141 			});
142 
143 			if (search == null) {
144 				ourLog.info("Client requested unknown paging ID[{}]", theUuid);
145 				String msg = myContext.getLocalizer().getMessage(PageMethodBinding.class, "unknownSearchId", theUuid);
146 				throw new ResourceGoneException(msg);
147 			}
148 
149 			verifySearchHasntFailedOrThrowInternalErrorException(search);
150 			if (search.getStatus() == SearchStatusEnum.FINISHED) {
151 				ourLog.info("Search entity marked as finished");
152 				break;
153 			}
154 			if (search.getNumFound() >= theTo) {
155 				ourLog.info("Search entity has {} results so far", search.getNumFound());
156 				break;
157 			}
158 
159 			if (sw.getMillis() > myMaxMillisToWaitForRemoteResults) {
160 				throw new InternalErrorException("Request timed out after " + sw.getMillis() + "ms");
161 			}
162 
163 			try {
164 				Thread.sleep(500);
165 			} catch (InterruptedException e) {
166 				// ignore
167 			}
168 		}
169 
170 		final Pageable page = toPage(theFrom, theTo);
171 		if (page == null) {
172 			return Collections.emptyList();
173 		}
174 
175 		final Search foundSearch = search;
176 
177 		List<Long> retVal = txTemplate.execute(new TransactionCallback<List<Long>>() {
178 			@Override
179 			public List<Long> doInTransaction(TransactionStatus theStatus) {
180 				final List<Long> resultPids = new ArrayList<Long>();
181 				Page<SearchResult> searchResults = mySearchResultDao.findWithSearchUuid(foundSearch, page);
182 				for (SearchResult next : searchResults) {
183 					resultPids.add(next.getResourcePid());
184 				}
185 				return resultPids;
186 			}
187 		});
188 		return retVal;
189 	}
190 
191 	private void populateBundleProvider(PersistedJpaBundleProvider theRetVal) {
192 		theRetVal.setContext(myContext);
193 		theRetVal.setEntityManager(myEntityManager);
194 		theRetVal.setPlatformTransactionManager(myManagedTxManager);
195 		theRetVal.setSearchDao(mySearchDao);
196 		theRetVal.setSearchCoordinatorSvc(this);
197 	}
198 
199 	@Override
200 	public IBundleProvider registerSearch(final IDao theCallingDao, final SearchParameterMap theParams, String theResourceType, CacheControlDirective theCacheControlDirective) {
201 		StopWatch w = new StopWatch();
202 		final String searchUuid = UUID.randomUUID().toString();
203 
204 		ourLog.debug("Registering new search {}", searchUuid);
205 
206 		Class<? extends IBaseResource> resourceTypeClass = myContext.getResourceDefinition(theResourceType).getImplementingClass();
207 		final ISearchBuilder sb = theCallingDao.newSearchBuilder();
208 		sb.setType(resourceTypeClass, theResourceType);
209 		sb.setFetchSize(mySyncSize);
210 
211 		final Integer loadSynchronousUpTo;
212 		if (theCacheControlDirective != null && theCacheControlDirective.isNoStore()) {
213 			if (theCacheControlDirective.getMaxResults() != null) {
214 				loadSynchronousUpTo = theCacheControlDirective.getMaxResults();
215 				if (loadSynchronousUpTo > myDaoConfig.getCacheControlNoStoreMaxResultsUpperLimit()) {
216 					throw new InvalidRequestException(Constants.HEADER_CACHE_CONTROL + " header " + Constants.CACHE_CONTROL_MAX_RESULTS + " value must not exceed " + myDaoConfig.getCacheControlNoStoreMaxResultsUpperLimit());
217 				}
218 			} else {
219 				loadSynchronousUpTo = 100;
220 			}
221 		} else {
222 			loadSynchronousUpTo = null;
223 		}
224 
225 		if (theParams.isLoadSynchronous() || loadSynchronousUpTo != null) {
226 
227 			ourLog.debug("Search {} is loading in synchronous mode", searchUuid);
228 
229 			// Execute the query and make sure we return distinct results
230 			TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
231 			txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRED);
232 			return txTemplate.execute(new TransactionCallback<SimpleBundleProvider>() {
233 				@Override
234 				public SimpleBundleProvider doInTransaction(TransactionStatus theStatus) {
235 
236 					// Load the results synchronously
237 					final List<Long> pids = new ArrayList<>();
238 
239 					Iterator<Long> resultIter = sb.createQuery(theParams, searchUuid);
240 					while (resultIter.hasNext()) {
241 						pids.add(resultIter.next());
242 						if (loadSynchronousUpTo != null && pids.size() >= loadSynchronousUpTo) {
243 							break;
244 						}
245 						if (theParams.getLoadSynchronousUpTo() != null && pids.size() >= theParams.getLoadSynchronousUpTo()) {
246 							break;
247 						}
248 					}
249 
250 					/*
251 					 * For synchronous queries, we load all the includes right away
252 					 * since we're returning a static bundle with all the results
253 					 * pre-loaded. This is ok because syncronous requests are not
254 					 * expected to be paged
255 					 * 
256 					 * On the other hand for async queries we load includes/revincludes
257 					 * individually for pages as we return them to clients
258 					 */
259 					final Set<Long> includedPids = new HashSet<Long>();
260 					includedPids.addAll(sb.loadReverseIncludes(theCallingDao, myContext, myEntityManager, pids, theParams.getRevIncludes(), true, theParams.getLastUpdated()));
261 					includedPids.addAll(sb.loadReverseIncludes(theCallingDao, myContext, myEntityManager, pids, theParams.getIncludes(), false, theParams.getLastUpdated()));
262 
263 					List<IBaseResource> resources = new ArrayList<IBaseResource>();
264 					sb.loadResourcesByPid(pids, resources, includedPids, false, myEntityManager, myContext, theCallingDao);
265 					return new SimpleBundleProvider(resources);
266 				}
267 			});
268 		}
269 
270 		/*
271 		 * See if there are any cached searches whose results we can return
272 		 * instead
273 		 */
274 		boolean useCache = true;
275 		if (theCacheControlDirective != null && theCacheControlDirective.isNoCache() == true) {
276 			useCache = false;
277 		}
278 		final String queryString = theParams.toNormalizedQueryString(myContext);
279 		if (theParams.getEverythingMode() == null) {
280 			if (myDaoConfig.getReuseCachedSearchResultsForMillis() != null && useCache) {
281 
282 				final Date createdCutoff = new Date(System.currentTimeMillis() - myDaoConfig.getReuseCachedSearchResultsForMillis());
283 				final String resourceType = theResourceType;
284 
285 				TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
286 				PersistedJpaBundleProvider foundSearchProvider = txTemplate.execute(new TransactionCallback<PersistedJpaBundleProvider>() {
287 					@Override
288 					public PersistedJpaBundleProvider doInTransaction(TransactionStatus theStatus) {
289 						Search searchToUse = null;
290 
291 						int hashCode = queryString.hashCode();
292 						Collection<Search> candidates = mySearchDao.find(resourceType, hashCode, createdCutoff);
293 						for (Search nextCandidateSearch : candidates) {
294 							if (queryString.equals(nextCandidateSearch.getSearchQueryString())) {
295 								searchToUse = nextCandidateSearch;
296 							}
297 						}
298 
299 						PersistedJpaBundleProvider retVal = null;
300 						if (searchToUse != null) {
301 							ourLog.info("Reusing search {} from cache", searchToUse.getUuid());
302 							searchToUse.setSearchLastReturned(new Date());
303 							mySearchDao.updateSearchLastReturned(searchToUse.getId(), new Date());
304 
305 							retVal = new PersistedJpaBundleProvider(searchToUse.getUuid(), theCallingDao);
306 							retVal.setCacheHit(true);
307 
308 							populateBundleProvider(retVal);
309 						}
310 
311 						return retVal;
312 					}
313 				});
314 
315 				if (foundSearchProvider != null) {
316 					return foundSearchProvider;
317 				}
318 
319 			}
320 		}
321 
322 		Search search = new Search();
323 		search.setUuid(searchUuid);
324 		search.setCreated(new Date());
325 		search.setSearchLastReturned(new Date());
326 		search.setTotalCount(null);
327 		search.setNumFound(0);
328 		search.setPreferredPageSize(theParams.getCount());
329 		search.setSearchType(theParams.getEverythingMode() != null ? SearchTypeEnum.EVERYTHING : SearchTypeEnum.SEARCH);
330 		search.setLastUpdated(theParams.getLastUpdated());
331 		search.setResourceType(theResourceType);
332 		search.setStatus(SearchStatusEnum.LOADING);
333 
334 		search.setSearchQueryString(queryString);
335 		search.setSearchQueryStringHash(queryString.hashCode());
336 
337 		for (Include next : theParams.getIncludes()) {
338 			search.getIncludes().add(new SearchInclude(search, next.getValue(), false, next.isRecurse()));
339 		}
340 		for (Include next : theParams.getRevIncludes()) {
341 			search.getIncludes().add(new SearchInclude(search, next.getValue(), true, next.isRecurse()));
342 		}
343 
344 		SearchTask task = new SearchTask(search, theCallingDao, theParams, theResourceType, searchUuid);
345 		myIdToSearchTask.put(search.getUuid(), task);
346 		myExecutor.submit(task);
347 
348 		PersistedJpaSearchFirstPageBundleProvider retVal = new PersistedJpaSearchFirstPageBundleProvider(search, theCallingDao, task, sb, myManagedTxManager);
349 		populateBundleProvider(retVal);
350 
351 		ourLog.info("Search initial phase completed in {}ms", w.getMillis());
352 		return retVal;
353 
354 	}
355 
356 	@VisibleForTesting
357 	void setContextForUnitTest(FhirContext theCtx) {
358 		myContext = theCtx;
359 	}
360 
361 	@VisibleForTesting
362 	void setDaoConfigForUnitTest(DaoConfig theDaoConfig) {
363 		myDaoConfig = theDaoConfig;
364 	}
365 
366 	@VisibleForTesting
367 	void setEntityManagerForUnitTest(EntityManager theEntityManager) {
368 		myEntityManager = theEntityManager;
369 	}
370 
371 	@VisibleForTesting
372 	public void setLoadingThrottleForUnitTests(Integer theLoadingThrottleForUnitTests) {
373 		myLoadingThrottleForUnitTests = theLoadingThrottleForUnitTests;
374 	}
375 
376 	@VisibleForTesting
377 	void setMaxMillisToWaitForRemoteResultsForUnitTest(long theMaxMillisToWaitForRemoteResults) {
378 		myMaxMillisToWaitForRemoteResults = theMaxMillisToWaitForRemoteResults;
379 	}
380 
381 	@VisibleForTesting
382 	public void setNeverUseLocalSearchForUnitTests(boolean theNeverUseLocalSearchForUnitTests) {
383 		myNeverUseLocalSearchForUnitTests = theNeverUseLocalSearchForUnitTests;
384 	}
385 
386 	@VisibleForTesting
387 	void setSearchDaoForUnitTest(ISearchDao theSearchDao) {
388 		mySearchDao = theSearchDao;
389 	}
390 
391 	@VisibleForTesting
392 	void setSearchDaoIncludeForUnitTest(ISearchIncludeDao theSearchIncludeDao) {
393 		mySearchIncludeDao = theSearchIncludeDao;
394 	}
395 
396 	@VisibleForTesting
397 	void setSearchDaoResultForUnitTest(ISearchResultDao theSearchResultDao) {
398 		mySearchResultDao = theSearchResultDao;
399 	}
400 
401 	@VisibleForTesting
402 	public void setSyncSizeForUnitTests(int theSyncSize) {
403 		mySyncSize = theSyncSize;
404 	}
405 
406 	@VisibleForTesting
407 	void setTransactionManagerForUnitTest(PlatformTransactionManager theTxManager) {
408 		myManagedTxManager = theTxManager;
409 	}
410 
411 	static Pageable toPage(final int theFromIndex, int theToIndex) {
412 		int pageSize = theToIndex - theFromIndex;
413 		if (pageSize < 1) {
414 			return null;
415 		}
416 
417 		int pageIndex = theFromIndex / pageSize;
418 
419 		Pageable page = new PageRequest(pageIndex, pageSize) {
420 			private static final long serialVersionUID = 1L;
421 
422 			@Override
423 			public int getOffset() {
424 				return theFromIndex;
425 			}
426 		};
427 
428 		return page;
429 	}
430 
431 	static void verifySearchHasntFailedOrThrowInternalErrorException(Search theSearch) {
432 		if (theSearch.getStatus() == SearchStatusEnum.FAILED) {
433 			Integer status = theSearch.getFailureCode();
434 			status = ObjectUtils.defaultIfNull(status, 500);
435 
436 			String message = theSearch.getFailureMessage();
437 			throw BaseServerResponseException.newInstance(status, message);
438 		}
439 	}
440 
441 	/**
442 	 * A search task is a Callable task that runs in
443 	 * a thread pool to handle an individual search. One instance
444 	 * is created for any requested search and runs from the
445 	 * beginning to the end of the search.
446 	 *
447 	 * Understand:
448 	 * This class executes in its own thread separate from the
449 	 * web server client thread that made the request. We do that
450 	 * so that we can return to the client as soon as possible,
451 	 * but keep the search going in the background (and have
452 	 * the next page of results ready to go when the client asks).
453 	 */
454 	public class SearchTask implements Callable<Void> {
455 
456 		private final IDao myCallingDao;
457 		private final CountDownLatch myCompletionLatch;
458 		private final CountDownLatch myInitialCollectionLatch = new CountDownLatch(1);
459 		private final SearchParameterMap myParams;
460 		private final String myResourceType;
461 		private final Search mySearch;
462 		private final ArrayList<Long> mySyncedPids = new ArrayList<>();
463 		private final ArrayList<Long> myUnsyncedPids = new ArrayList<>();
464 		private boolean myAbortRequested;
465 		private int myCountSaved = 0;
466 		private String mySearchUuid;
467 
468 		/**
469 		 * Constructor
470 		 */
471 		public SearchTask(Search theSearch, IDao theCallingDao, SearchParameterMap theParams, String theResourceType, String theSearchUuid) {
472 			mySearch = theSearch;
473 			myCallingDao = theCallingDao;
474 			myParams = theParams;
475 			myResourceType = theResourceType;
476 			myCompletionLatch = new CountDownLatch(1);
477 			mySearchUuid = theSearchUuid;
478 		}
479 
480 		/**
481 		 * This method is called by the server HTTP thread, and
482 		 * will block until at least one page of results have been
483 		 * fetched from the DB, and will never block after that.
484 		 */
485 		public Integer awaitInitialSync() {
486 			ourLog.trace("Awaiting initial sync");
487 			do {
488 				try {
489 					if (myInitialCollectionLatch.await(250, TimeUnit.MILLISECONDS)) {
490 						break;
491 					}
492 				} catch (InterruptedException e) {
493 					// Shouldn't happen
494 					throw new InternalErrorException(e);
495 				}
496 			} while (mySearch.getStatus() == SearchStatusEnum.LOADING);
497 			ourLog.trace("Initial sync completed");
498 
499 			return mySearch.getTotalCount();
500 		}
501 
502 		/**
503 		 * This is the method which actually performs the search.
504 		 * It is called automatically by the thread pool.
505 		 */
506 		@Override
507 		public Void call() {
508 			StopWatch sw = new StopWatch();
509 
510 			try {
511 				// Create an initial search in the DB and give it an ID
512 				saveSearch();
513 
514 				TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
515 				txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
516 				txTemplate.execute(new TransactionCallbackWithoutResult() {
517 					@Override
518 					protected void doInTransactionWithoutResult(TransactionStatus theStatus) {
519 						doSearch();
520 					}
521 				});
522 
523 				ourLog.info("Completed search for {} resources in {}ms", mySyncedPids.size(), sw.getMillis());
524 
525 			} catch (Throwable t) {
526 
527 				/*
528 				 * Don't print a stack trace for client errors (i.e. requests that
529 				 * aren't valid because the client screwed up).. that's just noise
530 				 * in the logs and who needs that.
531 				 */
532 				boolean logged = false;
533 				if (t instanceof BaseServerResponseException) {
534 					BaseServerResponseException exception = (BaseServerResponseException) t;
535 					if (exception.getStatusCode() >= 400 && exception.getStatusCode() < 500) {
536 						logged = true;
537 						ourLog.warn("Failed during search due to invalid request: {}", t.toString());
538 					}
539 				}
540 
541 				if (!logged) {
542 					ourLog.error("Failed during search loading after {}ms", sw.getMillis(), t);
543 				}
544 				myUnsyncedPids.clear();
545 
546 				Throwable rootCause = ExceptionUtils.getRootCause(t);
547 				rootCause = ObjectUtils.defaultIfNull(rootCause, t);
548 
549 				String failureMessage = rootCause.getMessage();
550 
551 				int failureCode = InternalErrorException.STATUS_CODE;
552 				if (t instanceof BaseServerResponseException) {
553 					failureCode = ((BaseServerResponseException) t).getStatusCode();
554 				}
555 
556 				mySearch.setFailureMessage(failureMessage);
557 				mySearch.setFailureCode(failureCode);
558 				mySearch.setStatus(SearchStatusEnum.FAILED);
559 
560 				saveSearch();
561 
562 			} finally {
563 
564 				myIdToSearchTask.remove(mySearch.getUuid());
565 				myInitialCollectionLatch.countDown();
566 				myCompletionLatch.countDown();
567 
568 			}
569 			return null;
570 		}
571 
572 		private void doSaveSearch() {
573 			if (mySearch.getId() == null) {
574 				mySearchDao.save(mySearch);
575 				for (SearchInclude next : mySearch.getIncludes()) {
576 					mySearchIncludeDao.save(next);
577 				}
578 			} else {
579 				mySearchDao.save(mySearch);
580 			}
581 		}
582 
583 		private void doSearch() {
584 			Class<? extends IBaseResource> resourceTypeClass = myContext.getResourceDefinition(myResourceType).getImplementingClass();
585 			ISearchBuilder sb = myCallingDao.newSearchBuilder();
586 			sb.setType(resourceTypeClass, myResourceType);
587 			Iterator<Long> theResultIterator = sb.createQuery(myParams, mySearchUuid);
588 
589 			while (theResultIterator.hasNext()) {
590 				myUnsyncedPids.add(theResultIterator.next());
591 
592 				boolean shouldSync = myUnsyncedPids.size() >= mySyncSize;
593 
594 				if (myDaoConfig.getCountSearchResultsUpTo() != null &&
595 					myDaoConfig.getCountSearchResultsUpTo() > 0 &&
596 					myDaoConfig.getCountSearchResultsUpTo() < myUnsyncedPids.size()) {
597 					shouldSync = false;
598 				}
599 
600 				if (myUnsyncedPids.size() > 50000) {
601 					shouldSync = true;
602 				}
603 
604 				// If no abort was requested, bail out
605 				Validate.isTrue(myAbortRequested == false, "Abort has been requested");
606 
607 				if (shouldSync) {
608 					saveUnsynced(theResultIterator);
609 				}
610 
611 				if (myLoadingThrottleForUnitTests != null) {
612 					try {
613 						Thread.sleep(myLoadingThrottleForUnitTests);
614 					} catch (InterruptedException e) {
615 						// ignore
616 					}
617 				}
618 
619 			}
620 
621 			// If no abort was requested, bail out
622 			Validate.isTrue(myAbortRequested == false, "Abort has been requested");
623 
624 			saveUnsynced(theResultIterator);
625 		}
626 
627 		public CountDownLatch getCompletionLatch() {
628 			return myCompletionLatch;
629 		}
630 
631 		public List<Long> getResourcePids(int theFromIndex, int theToIndex) {
632 			ourLog.info("Requesting search PIDs from {}-{}", theFromIndex, theToIndex);
633 
634 			boolean keepWaiting;
635 			do {
636 				synchronized (mySyncedPids) {
637 					keepWaiting = mySyncedPids.size() < theToIndex && mySearch.getStatus() == SearchStatusEnum.LOADING;
638 				}
639 				if (keepWaiting) {
640 					ourLog.info("Waiting, as we only have {} results", mySyncedPids.size());
641 					try {
642 						Thread.sleep(500);
643 					} catch (InterruptedException theE) {
644 						// ignore
645 					}
646 				}
647 			} while (keepWaiting);
648 
649 			ourLog.info("Proceeding, as we have {} results", mySyncedPids.size());
650 
651 			ArrayList<Long> retVal = new ArrayList<>();
652 			synchronized (mySyncedPids) {
653 				verifySearchHasntFailedOrThrowInternalErrorException(mySearch);
654 
655 				int toIndex = theToIndex;
656 				if (mySyncedPids.size() < toIndex) {
657 					toIndex = mySyncedPids.size();
658 				}
659 				for (int i = theFromIndex; i < toIndex; i++) {
660 					retVal.add(mySyncedPids.get(i));
661 				}
662 			}
663 
664 			ourLog.info("Done syncing results", mySyncedPids.size());
665 
666 			return retVal;
667 		}
668 
669 		/**
670 		 * Request that the task abort as soon as possible
671 		 */
672 		public void requestImmediateAbort() {
673 			myAbortRequested = true;
674 		}
675 
676 		private void saveSearch() {
677 			TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
678 			txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
679 			txTemplate.execute(new TransactionCallbackWithoutResult() {
680 				@Override
681 				protected void doInTransactionWithoutResult(TransactionStatus theArg0) {
682 					doSaveSearch();
683 				}
684 
685 			});
686 		}
687 
688 		private void saveUnsynced(final Iterator<Long> theResultIter) {
689 			TransactionTemplate txTemplate = new TransactionTemplate(myManagedTxManager);
690 			txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRED);
691 			txTemplate.execute(new TransactionCallbackWithoutResult() {
692 				@Override
693 				protected void doInTransactionWithoutResult(TransactionStatus theArg0) {
694 					if (mySearch.getId() == null) {
695 						doSaveSearch();
696 					}
697 
698 					List<SearchResult> resultsToSave = Lists.newArrayList();
699 					for (Long nextPid : myUnsyncedPids) {
700 						SearchResult nextResult = new SearchResult(mySearch);
701 						nextResult.setResourcePid(nextPid);
702 						nextResult.setOrder(myCountSaved++);
703 						resultsToSave.add(nextResult);
704 					}
705 					mySearchResultDao.save(resultsToSave);
706 
707 					synchronized (mySyncedPids) {
708 						int numSyncedThisPass = myUnsyncedPids.size();
709 						ourLog.trace("Syncing {} search results", numSyncedThisPass);
710 						mySyncedPids.addAll(myUnsyncedPids);
711 						myUnsyncedPids.clear();
712 
713 						if (theResultIter.hasNext() == false) {
714 							mySearch.setTotalCount(myCountSaved);
715 							mySearch.setStatus(SearchStatusEnum.FINISHED);
716 						}
717 					}
718 
719 					mySearch.setNumFound(myCountSaved);
720 
721 					int numSynced;
722 					synchronized (mySyncedPids) {
723 						numSynced = mySyncedPids.size();
724 					}
725 
726 					if (myDaoConfig.getCountSearchResultsUpTo() == null ||
727 						myDaoConfig.getCountSearchResultsUpTo() <= 0 ||
728 						myDaoConfig.getCountSearchResultsUpTo() <= numSynced) {
729 						myInitialCollectionLatch.countDown();
730 					}
731 
732 					doSaveSearch();
733 
734 				}
735 			});
736 
737 		}
738 
739 	}
740 
741 }