View Javadoc
1   package ca.uhn.fhir.jpa.search.reindex;
2   
3   /*-
4    * #%L
5    * HAPI FHIR JPA Server
6    * %%
7    * Copyright (C) 2014 - 2019 University Health Network
8    * %%
9    * Licensed under the Apache License, Version 2.0 (the "License");
10   * you may not use this file except in compliance with the License.
11   * You may obtain a copy of the License at
12   * 
13   *      http://www.apache.org/licenses/LICENSE-2.0
14   * 
15   * Unless required by applicable law or agreed to in writing, software
16   * distributed under the License is distributed on an "AS IS" BASIS,
17   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18   * See the License for the specific language governing permissions and
19   * limitations under the License.
20   * #L%
21   */
22  
23  import ca.uhn.fhir.context.FhirContext;
24  import ca.uhn.fhir.context.RuntimeResourceDefinition;
25  import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
26  import ca.uhn.fhir.jpa.dao.DaoConfig;
27  import ca.uhn.fhir.jpa.dao.DaoRegistry;
28  import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
29  import ca.uhn.fhir.jpa.dao.data.IForcedIdDao;
30  import ca.uhn.fhir.jpa.dao.data.IResourceHistoryTableDao;
31  import ca.uhn.fhir.jpa.dao.data.IResourceReindexJobDao;
32  import ca.uhn.fhir.jpa.dao.data.IResourceTableDao;
33  import ca.uhn.fhir.jpa.entity.ResourceReindexJobEntity;
34  import ca.uhn.fhir.jpa.model.entity.ForcedId;
35  import ca.uhn.fhir.jpa.model.entity.ResourceTable;
36  import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamRegistry;
37  import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
38  import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
39  import ca.uhn.fhir.util.StopWatch;
40  import com.google.common.annotations.VisibleForTesting;
41  import org.apache.commons.lang3.Validate;
42  import org.apache.commons.lang3.concurrent.BasicThreadFactory;
43  import org.apache.commons.lang3.time.DateUtils;
44  import org.hibernate.search.util.impl.Executors;
45  import org.hl7.fhir.instance.model.api.IBaseResource;
46  import org.slf4j.Logger;
47  import org.slf4j.LoggerFactory;
48  import org.springframework.beans.factory.annotation.Autowired;
49  import org.springframework.data.domain.PageRequest;
50  import org.springframework.data.domain.Slice;
51  import org.springframework.scheduling.annotation.Scheduled;
52  import org.springframework.transaction.PlatformTransactionManager;
53  import org.springframework.transaction.TransactionDefinition;
54  import org.springframework.transaction.support.TransactionCallback;
55  import org.springframework.transaction.support.TransactionTemplate;
56  
57  import javax.annotation.PostConstruct;
58  import javax.persistence.EntityManager;
59  import javax.persistence.PersistenceContext;
60  import javax.persistence.PersistenceContextType;
61  import javax.persistence.Query;
62  import javax.transaction.Transactional;
63  import java.util.Collection;
64  import java.util.Date;
65  import java.util.List;
66  import java.util.concurrent.*;
67  import java.util.concurrent.atomic.AtomicInteger;
68  import java.util.concurrent.locks.ReentrantLock;
69  import java.util.stream.Collectors;
70  
71  import static org.apache.commons.lang3.StringUtils.isBlank;
72  import static org.apache.commons.lang3.StringUtils.isNotBlank;
73  
74  public class ResourceReindexingSvcImpl implements IResourceReindexingSvc {
75  
76  	private static final Date BEGINNING_OF_TIME = new Date(0);
77  	private static final Logger ourLog = LoggerFactory.getLogger(ResourceReindexingSvcImpl.class);
78  	private static final int PASS_SIZE = 25000;
79  	private final ReentrantLock myIndexingLock = new ReentrantLock();
80  	@Autowired
81  	private IResourceReindexJobDao myReindexJobDao;
82  	@Autowired
83  	private DaoConfig myDaoConfig;
84  	@Autowired
85  	private PlatformTransactionManager myTxManager;
86  	private TransactionTemplate myTxTemplate;
87  	private ThreadFactory myReindexingThreadFactory = new BasicThreadFactory.Builder().namingPattern("ResourceReindex-%d").build();
88  	private ThreadPoolExecutor myTaskExecutor;
89  	@Autowired
90  	private IResourceTableDao myResourceTableDao;
91  	@Autowired
92  	private IResourceHistoryTableDao myResourceHistoryTableDao;
93  	@Autowired
94  	private DaoRegistry myDaoRegistry;
95  	@Autowired
96  	private IForcedIdDao myForcedIdDao;
97  	@Autowired
98  	private FhirContext myContext;
99  	@PersistenceContext(type = PersistenceContextType.TRANSACTION)
100 	private EntityManager myEntityManager;
101 	@Autowired
102 	private ISearchParamRegistry mySearchParamRegistry;
103 
104 	@VisibleForTesting
105 	void setReindexJobDaoForUnitTest(IResourceReindexJobDao theReindexJobDao) {
106 		myReindexJobDao = theReindexJobDao;
107 	}
108 
109 	@VisibleForTesting
110 	void setDaoConfigForUnitTest(DaoConfig theDaoConfig) {
111 		myDaoConfig = theDaoConfig;
112 	}
113 
114 	@VisibleForTesting
115 	void setTxManagerForUnitTest(PlatformTransactionManager theTxManager) {
116 		myTxManager = theTxManager;
117 	}
118 
119 	@VisibleForTesting
120 	void setResourceTableDaoForUnitTest(IResourceTableDao theResourceTableDao) {
121 		myResourceTableDao = theResourceTableDao;
122 	}
123 
124 	@VisibleForTesting
125 	void setDaoRegistryForUnitTest(DaoRegistry theDaoRegistry) {
126 		myDaoRegistry = theDaoRegistry;
127 	}
128 
129 	@VisibleForTesting
130 	void setForcedIdDaoForUnitTest(IForcedIdDao theForcedIdDao) {
131 		myForcedIdDao = theForcedIdDao;
132 	}
133 
134 	@VisibleForTesting
135 	void setContextForUnitTest(FhirContext theContext) {
136 		myContext = theContext;
137 	}
138 
139 	@PostConstruct
140 	public void start() {
141 		myTxTemplate = new TransactionTemplate(myTxManager);
142 		initExecutor();
143 	}
144 
145 	private void initExecutor() {
146 		// Create the threadpool executor used for reindex jobs
147 		int reindexThreadCount = myDaoConfig.getReindexThreadCount();
148 		RejectedExecutionHandler rejectHandler = new Executors.BlockPolicy();
149 		myTaskExecutor = new ThreadPoolExecutor(0, reindexThreadCount,
150 			0L, TimeUnit.MILLISECONDS,
151 			new LinkedBlockingQueue<>(100),
152 			myReindexingThreadFactory,
153 			rejectHandler
154 		);
155 	}
156 
157 	@Override
158 	@Transactional(Transactional.TxType.REQUIRED)
159 	public Long markAllResourcesForReindexing() {
160 		return markAllResourcesForReindexing(null);
161 	}
162 
163 	@Override
164 	@Transactional(Transactional.TxType.REQUIRED)
165 	public Long markAllResourcesForReindexing(String theType) {
166 		String typeDesc;
167 		if (isNotBlank(theType)) {
168 			myReindexJobDao.markAllOfTypeAsDeleted(theType);
169 			typeDesc = theType;
170 		} else {
171 			myReindexJobDao.markAllOfTypeAsDeleted();
172 			typeDesc = "(any)";
173 		}
174 
175 		ResourceReindexJobEntity job = new ResourceReindexJobEntity();
176 		job.setResourceType(theType);
177 		job.setThresholdHigh(DateUtils.addMinutes(new Date(), 5));
178 		job = myReindexJobDao.saveAndFlush(job);
179 
180 		ourLog.info("Marking all resources of type {} for reindexing - Got job ID[{}]", typeDesc, job.getId());
181 		return job.getId();
182 	}
183 
184 	@Override
185 	@Transactional(Transactional.TxType.NEVER)
186 	@Scheduled(fixedDelay = 10 * DateUtils.MILLIS_PER_SECOND)
187 	public void scheduleReindexingPass() {
188 		runReindexingPass();
189 	}
190 
191 	@Override
192 	@Transactional(Transactional.TxType.NEVER)
193 	public Integer runReindexingPass() {
194 		if (myDaoConfig.isSchedulingDisabled()) {
195 			return null;
196 		}
197 		if (myIndexingLock.tryLock()) {
198 			try {
199 				return doReindexingPassInsideLock();
200 			} finally {
201 				myIndexingLock.unlock();
202 			}
203 		}
204 		return null;
205 	}
206 
207 	private int doReindexingPassInsideLock() {
208 		expungeJobsMarkedAsDeleted();
209 		return runReindexJobs();
210 	}
211 
212 	@Override
213 	public int forceReindexingPass() {
214 		myIndexingLock.lock();
215 		try {
216 			return doReindexingPassInsideLock();
217 		} finally {
218 			myIndexingLock.unlock();
219 		}
220 	}
221 
222 	@Override
223 	public void cancelAndPurgeAllJobs() {
224 		ourLog.info("Cancelling and purging all resource reindexing jobs");
225 		myTxTemplate.execute(t -> {
226 			myReindexJobDao.markAllOfTypeAsDeleted();
227 			return null;
228 		});
229 
230 		myTaskExecutor.shutdown();
231 		initExecutor();
232 
233 		expungeJobsMarkedAsDeleted();
234 	}
235 
236 	private int runReindexJobs() {
237 		Collection<ResourceReindexJobEntity> jobs = getResourceReindexJobEntities();
238 
239 		if (jobs.size() > 0) {
240 			ourLog.info("Running {} reindex jobs: {}", jobs.size(), jobs);
241 		} else {
242 			ourLog.debug("Running {} reindex jobs: {}", jobs.size(), jobs);
243 			return 0;
244 		}
245 
246 		int count = 0;
247 		for (ResourceReindexJobEntity next : jobs) {
248 
249 			if (next.getThresholdLow() != null && next.getThresholdLow().getTime() >= next.getThresholdHigh().getTime()) {
250 				markJobAsDeleted(next);
251 				continue;
252 			}
253 
254 			count += runReindexJob(next);
255 		}
256 		return count;
257 	}
258 
259 	@Override
260 	public int countReindexJobs() {
261 		return getResourceReindexJobEntities().size();
262 	}
263 
264 	private Collection<ResourceReindexJobEntity> getResourceReindexJobEntities() {
265 		Collection<ResourceReindexJobEntity> jobs = myTxTemplate.execute(t -> myReindexJobDao.findAll(PageRequest.of(0, 10), false));
266 		assert jobs != null;
267 		return jobs;
268 	}
269 
270 	private void markJobAsDeleted(ResourceReindexJobEntity theJob) {
271 		ourLog.info("Marking reindexing job ID[{}] as deleted", theJob.getId());
272 		myTxTemplate.execute(t -> {
273 			myReindexJobDao.markAsDeletedById(theJob.getId());
274 			return null;
275 		});
276 	}
277 
278 	@VisibleForTesting
279 	public void setSearchParamRegistryForUnitTest(ISearchParamRegistry theSearchParamRegistry) {
280 		mySearchParamRegistry = theSearchParamRegistry;
281 	}
282 
283 	private int runReindexJob(ResourceReindexJobEntity theJob) {
284 		if (theJob.getSuspendedUntil() != null) {
285 			if (theJob.getSuspendedUntil().getTime() > System.currentTimeMillis()) {
286 				return 0;
287 			}
288 		}
289 
290 		ourLog.info("Performing reindex pass for JOB[{}]", theJob.getId());
291 		StopWatch sw = new StopWatch();
292 		AtomicInteger counter = new AtomicInteger();
293 
294 		/*
295 		 * On the first time we run a particular reindex job, let's make sure we
296 		 * have the latest search parameters loaded. A common reason to
297 		 * be reindexing is that the search parameters have changed in some way, so
298 		 * this makes sure we're on the latest versions
299 		 */
300 		if (theJob.getThresholdLow() == null) {
301 			mySearchParamRegistry.forceRefresh();
302 		}
303 
304 		// Calculate range
305 		Date low = theJob.getThresholdLow() != null ? theJob.getThresholdLow() : BEGINNING_OF_TIME;
306 		Date high = theJob.getThresholdHigh();
307 
308 		// Query for resources within threshold
309 		StopWatch pageSw = new StopWatch();
310 		Slice<Long> range = myTxTemplate.execute(t -> {
311 			PageRequest page = PageRequest.of(0, PASS_SIZE);
312 			if (isNotBlank(theJob.getResourceType())) {
313 				return myResourceTableDao.findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest(page, theJob.getResourceType(), low, high);
314 			} else {
315 				return myResourceTableDao.findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest(page, low, high);
316 			}
317 		});
318 		Validate.notNull(range);
319 		int count = range.getNumberOfElements();
320 		ourLog.info("Loaded {} resources for reindexing in {}", count, pageSw.toString());
321 
322 		// If we didn't find any results at all, mark as deleted
323 		if (count == 0) {
324 			markJobAsDeleted(theJob);
325 			return 0;
326 		}
327 
328 		// Submit each resource requiring reindexing
329 		List<Future<Date>> futures = range
330 			.stream()
331 			.map(t -> myTaskExecutor.submit(new ResourceReindexingTask(t, counter)))
332 			.collect(Collectors.toList());
333 
334 		Date latestDate = null;
335 		for (Future<Date> next : futures) {
336 			Date nextDate;
337 			try {
338 				nextDate = next.get();
339 			} catch (Exception e) {
340 				ourLog.error("Failure reindexing", e);
341 				Date suspendedUntil = DateUtils.addMinutes(new Date(), 1);
342 				myTxTemplate.execute(t -> {
343 					myReindexJobDao.setSuspendedUntil(suspendedUntil);
344 					return null;
345 				});
346 				return counter.get();
347 			}
348 
349 			if (nextDate != null) {
350 				if (latestDate == null || latestDate.getTime() < nextDate.getTime()) {
351 					latestDate = new Date(nextDate.getTime());
352 				}
353 			}
354 		}
355 
356 		Validate.notNull(latestDate);
357 		Date newLow;
358 		if (latestDate.getTime() == low.getTime()) {
359 			if (count == PASS_SIZE) {
360 				// Just in case we end up in some sort of infinite loop. This shouldn't happen, and couldn't really
361 				// happen unless there were 10000 resources with the exact same update time down to the
362 				// millisecond.
363 				ourLog.error("Final pass time for reindex JOB[{}] has same ending low value: {}", theJob.getId(), latestDate);
364 			}
365 
366 			newLow = new Date(latestDate.getTime() + 1);
367 		} else {
368 			newLow = latestDate;
369 		}
370 
371 		myTxTemplate.execute(t -> {
372 			myReindexJobDao.setThresholdLow(theJob.getId(), newLow);
373 			Integer existingCount = myReindexJobDao.getReindexCount(theJob.getId()).orElse(0);
374 			int newCount = existingCount + counter.get();
375 			myReindexJobDao.setReindexCount(theJob.getId(), newCount);
376 			return null;
377 		});
378 
379 		ourLog.info("Completed pass of reindex JOB[{}] - Indexed {} resources in {} ({} / sec) - Have indexed until: {}", theJob.getId(), count, sw.toString(), sw.formatThroughput(count, TimeUnit.SECONDS), newLow);
380 		return counter.get();
381 	}
382 
383 	private void expungeJobsMarkedAsDeleted() {
384 		myTxTemplate.execute(t -> {
385 			Collection<ResourceReindexJobEntity> toDelete = myReindexJobDao.findAll(PageRequest.of(0, 10), true);
386 			toDelete.forEach(job -> {
387 				ourLog.info("Purging deleted job[{}]", job.getId());
388 				myReindexJobDao.deleteById(job.getId());
389 			});
390 			return null;
391 		});
392 	}
393 
394 	private void markResourceAsIndexingFailed(final long theId) {
395 		TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
396 		txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
397 		txTemplate.execute((TransactionCallback<Void>) theStatus -> {
398 			ourLog.info("Marking resource with PID {} as indexing_failed", new Object[]{theId});
399 
400 			myResourceTableDao.updateIndexStatus(theId, BaseHapiFhirDao.INDEX_STATUS_INDEXING_FAILED);
401 
402 			Query q = myEntityManager.createQuery("DELETE FROM ResourceTag t WHERE t.myResourceId = :id");
403 			q.setParameter("id", theId);
404 			q.executeUpdate();
405 
406 			q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamCoords t WHERE t.myResourcePid = :id");
407 			q.setParameter("id", theId);
408 			q.executeUpdate();
409 
410 			q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamDate t WHERE t.myResourcePid = :id");
411 			q.setParameter("id", theId);
412 			q.executeUpdate();
413 
414 			q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamNumber t WHERE t.myResourcePid = :id");
415 			q.setParameter("id", theId);
416 			q.executeUpdate();
417 
418 			q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamQuantity t WHERE t.myResourcePid = :id");
419 			q.setParameter("id", theId);
420 			q.executeUpdate();
421 
422 			q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamString t WHERE t.myResourcePid = :id");
423 			q.setParameter("id", theId);
424 			q.executeUpdate();
425 
426 			q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamToken t WHERE t.myResourcePid = :id");
427 			q.setParameter("id", theId);
428 			q.executeUpdate();
429 
430 			q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamUri t WHERE t.myResourcePid = :id");
431 			q.setParameter("id", theId);
432 			q.executeUpdate();
433 
434 			q = myEntityManager.createQuery("DELETE FROM ResourceLink t WHERE t.mySourceResourcePid = :id");
435 			q.setParameter("id", theId);
436 			q.executeUpdate();
437 
438 			q = myEntityManager.createQuery("DELETE FROM ResourceLink t WHERE t.myTargetResourcePid = :id");
439 			q.setParameter("id", theId);
440 			q.executeUpdate();
441 
442 			return null;
443 		});
444 	}
445 
446 	private class ResourceReindexingTask implements Callable<Date> {
447 		private final Long myNextId;
448 		private final AtomicInteger myCounter;
449 		private Date myUpdated;
450 
451 		ResourceReindexingTask(Long theNextId, AtomicInteger theCounter) {
452 			myNextId = theNextId;
453 			myCounter = theCounter;
454 		}
455 
456 
457 		@SuppressWarnings("unchecked")
458 		private <T extends IBaseResource> void doReindex(ResourceTable theResourceTable, T theResource) {
459 			RuntimeResourceDefinition resourceDefinition = myContext.getResourceDefinition(theResource.getClass());
460 			Class<T> resourceClass = (Class<T>) resourceDefinition.getImplementingClass();
461 			final IFhirResourceDao<T> dao = myDaoRegistry.getResourceDao(resourceClass);
462 			dao.reindex(theResource, theResourceTable);
463 
464 			myCounter.incrementAndGet();
465 		}
466 
467 		@Override
468 		public Date call() {
469 			Throwable reindexFailure;
470 			try {
471 				reindexFailure = myTxTemplate.execute(t -> {
472 					ResourceTable resourceTable = myResourceTableDao.findById(myNextId).orElseThrow(IllegalStateException::new);
473 					myUpdated = resourceTable.getUpdatedDate();
474 
475 					try {
476 						/*
477 						 * This part is because from HAPI 1.5 - 1.6 we changed the format of forced ID to be "type/id" instead of just "id"
478 						 */
479 						ForcedId forcedId = resourceTable.getForcedId();
480 						if (forcedId != null) {
481 							if (isBlank(forcedId.getResourceType())) {
482 								ourLog.info("Updating resource {} forcedId type to {}", forcedId.getForcedId(), resourceTable.getResourceType());
483 								forcedId.setResourceType(resourceTable.getResourceType());
484 								myForcedIdDao.save(forcedId);
485 							}
486 						}
487 
488 						IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(resourceTable.getResourceType());
489 						long expectedVersion = resourceTable.getVersion();
490 						IBaseResource resource = dao.read(resourceTable.getIdDt().toVersionless(), null, true);
491 						if (resource == null) {
492 							throw new InternalErrorException("Could not find resource version " + resourceTable.getIdDt().toUnqualified().getValue() + " in database");
493 						}
494 
495 						Long actualVersion = resource.getIdElement().getVersionIdPartAsLong();
496 						if (actualVersion < expectedVersion) {
497 							ourLog.warn("Resource {} version {} does not exist, renumbering version {}", resource.getIdElement().toUnqualifiedVersionless().getValue(), resource.getIdElement().getVersionIdPart(), expectedVersion);
498 							myResourceHistoryTableDao.updateVersion(resourceTable.getId(), actualVersion, expectedVersion);
499 						}
500 
501 						doReindex(resourceTable, resource);
502 						return null;
503 
504 					} catch (Exception e) {
505 						ourLog.error("Failed to index resource {}: {}", resourceTable.getIdDt(), e.toString(), e);
506 						t.setRollbackOnly();
507 						return e;
508 					}
509 				});
510 
511 			} catch (ResourceVersionConflictException e) {
512 				/*
513 				 * We reindex in multiple threads, so it's technically possible that two threads try
514 				 * to index resources that cause a constraint error now (i.e. because a unique index has been
515 				 * added that didn't previously exist). In this case, one of the threads would succeed and
516 				 * not get this error, so we'll let the other one fail and try
517 				 * again later.
518 				 */
519 				ourLog.info("Failed to reindex because of a version conflict. Leaving in unindexed state: {}", e.getMessage());
520 				reindexFailure = null;
521 			}
522 
523 			if (reindexFailure != null) {
524 				ourLog.info("Setting resource PID[{}] status to ERRORED", myNextId);
525 				markResourceAsIndexingFailed(myNextId);
526 			}
527 
528 			return myUpdated;
529 		}
530 	}
531 }