View Javadoc
1   package ca.uhn.fhir.jpa.search.reindex;
2   
3   /*-
4    * #%L
5    * HAPI FHIR JPA Server
6    * %%
7    * Copyright (C) 2014 - 2019 University Health Network
8    * %%
9    * Licensed under the Apache License, Version 2.0 (the "License");
10   * you may not use this file except in compliance with the License.
11   * You may obtain a copy of the License at
12   *
13   *      http://www.apache.org/licenses/LICENSE-2.0
14   *
15   * Unless required by applicable law or agreed to in writing, software
16   * distributed under the License is distributed on an "AS IS" BASIS,
17   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18   * See the License for the specific language governing permissions and
19   * limitations under the License.
20   * #L%
21   */
22  
23  import ca.uhn.fhir.context.FhirContext;
24  import ca.uhn.fhir.context.RuntimeResourceDefinition;
25  import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
26  import ca.uhn.fhir.jpa.dao.DaoConfig;
27  import ca.uhn.fhir.jpa.dao.DaoRegistry;
28  import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
29  import ca.uhn.fhir.jpa.dao.data.IForcedIdDao;
30  import ca.uhn.fhir.jpa.dao.data.IResourceHistoryTableDao;
31  import ca.uhn.fhir.jpa.dao.data.IResourceReindexJobDao;
32  import ca.uhn.fhir.jpa.dao.data.IResourceTableDao;
33  import ca.uhn.fhir.jpa.entity.ResourceReindexJobEntity;
34  import ca.uhn.fhir.jpa.model.entity.ForcedId;
35  import ca.uhn.fhir.jpa.model.entity.ResourceTable;
36  import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
37  import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
38  import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamRegistry;
39  import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
40  import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
41  import ca.uhn.fhir.util.StopWatch;
42  import com.google.common.annotations.VisibleForTesting;
43  import org.apache.commons.lang3.Validate;
44  import org.apache.commons.lang3.concurrent.BasicThreadFactory;
45  import org.apache.commons.lang3.time.DateUtils;
46  import org.hibernate.search.util.impl.Executors;
47  import org.hl7.fhir.instance.model.api.IBaseResource;
48  import org.hl7.fhir.r4.model.InstantType;
49  import org.quartz.Job;
50  import org.quartz.JobExecutionContext;
51  import org.slf4j.Logger;
52  import org.slf4j.LoggerFactory;
53  import org.springframework.beans.factory.annotation.Autowired;
54  import org.springframework.data.domain.PageRequest;
55  import org.springframework.data.domain.Slice;
56  import org.springframework.transaction.PlatformTransactionManager;
57  import org.springframework.transaction.TransactionDefinition;
58  import org.springframework.transaction.support.TransactionCallback;
59  import org.springframework.transaction.support.TransactionTemplate;
60  
61  import javax.annotation.PostConstruct;
62  import javax.persistence.EntityManager;
63  import javax.persistence.PersistenceContext;
64  import javax.persistence.PersistenceContextType;
65  import javax.persistence.Query;
66  import javax.transaction.Transactional;
67  import java.util.Collection;
68  import java.util.Date;
69  import java.util.List;
70  import java.util.concurrent.*;
71  import java.util.concurrent.atomic.AtomicInteger;
72  import java.util.concurrent.locks.ReentrantLock;
73  import java.util.stream.Collectors;
74  
75  import static org.apache.commons.lang3.StringUtils.isBlank;
76  import static org.apache.commons.lang3.StringUtils.isNotBlank;
77  
78  public class ResourceReindexingSvcImpl implements IResourceReindexingSvc {
79  
80  	private static final Date BEGINNING_OF_TIME = new Date(0);
81  	private static final Logger ourLog = LoggerFactory.getLogger(ResourceReindexingSvcImpl.class);
82  	private static final int PASS_SIZE = 25000;
83  	private final ReentrantLock myIndexingLock = new ReentrantLock();
84  	@Autowired
85  	private IResourceReindexJobDao myReindexJobDao;
86  	@Autowired
87  	private DaoConfig myDaoConfig;
88  	@Autowired
89  	private PlatformTransactionManager myTxManager;
90  	private TransactionTemplate myTxTemplate;
91  	private ThreadFactory myReindexingThreadFactory = new BasicThreadFactory.Builder().namingPattern("ResourceReindex-%d").build();
92  	private ThreadPoolExecutor myTaskExecutor;
93  	@Autowired
94  	private IResourceTableDao myResourceTableDao;
95  	@Autowired
96  	private IResourceHistoryTableDao myResourceHistoryTableDao;
97  	@Autowired
98  	private DaoRegistry myDaoRegistry;
99  	@Autowired
100 	private IForcedIdDao myForcedIdDao;
101 	@Autowired
102 	private FhirContext myContext;
103 	@PersistenceContext(type = PersistenceContextType.TRANSACTION)
104 	private EntityManager myEntityManager;
105 	@Autowired
106 	private ISearchParamRegistry mySearchParamRegistry;
107 	@Autowired
108 	private ISchedulerService mySchedulerService;
109 
110 	@VisibleForTesting
111 	void setReindexJobDaoForUnitTest(IResourceReindexJobDao theReindexJobDao) {
112 		myReindexJobDao = theReindexJobDao;
113 	}
114 
115 	@VisibleForTesting
116 	void setDaoConfigForUnitTest(DaoConfig theDaoConfig) {
117 		myDaoConfig = theDaoConfig;
118 	}
119 
120 	@VisibleForTesting
121 	void setTxManagerForUnitTest(PlatformTransactionManager theTxManager) {
122 		myTxManager = theTxManager;
123 	}
124 
125 	@VisibleForTesting
126 	void setResourceTableDaoForUnitTest(IResourceTableDao theResourceTableDao) {
127 		myResourceTableDao = theResourceTableDao;
128 	}
129 
130 	@VisibleForTesting
131 	void setDaoRegistryForUnitTest(DaoRegistry theDaoRegistry) {
132 		myDaoRegistry = theDaoRegistry;
133 	}
134 
135 	@VisibleForTesting
136 	void setForcedIdDaoForUnitTest(IForcedIdDao theForcedIdDao) {
137 		myForcedIdDao = theForcedIdDao;
138 	}
139 
140 	@VisibleForTesting
141 	void setContextForUnitTest(FhirContext theContext) {
142 		myContext = theContext;
143 	}
144 
145 	@PostConstruct
146 	public void start() {
147 		myTxTemplate = new TransactionTemplate(myTxManager);
148 		initExecutor();
149 	}
150 
151 	public void initExecutor() {
152 		// Create the threadpool executor used for reindex jobs
153 		int reindexThreadCount = myDaoConfig.getReindexThreadCount();
154 		RejectedExecutionHandler rejectHandler = new Executors.BlockPolicy();
155 		myTaskExecutor = new ThreadPoolExecutor(0, reindexThreadCount,
156 			0L, TimeUnit.MILLISECONDS,
157 			new LinkedBlockingQueue<>(100),
158 			myReindexingThreadFactory,
159 			rejectHandler
160 		);
161 	}
162 
163 	@Override
164 	@Transactional(Transactional.TxType.REQUIRED)
165 	public Long markAllResourcesForReindexing() {
166 		return markAllResourcesForReindexing(null);
167 	}
168 
169 	@Override
170 	@Transactional(Transactional.TxType.REQUIRED)
171 	public Long markAllResourcesForReindexing(String theType) {
172 		String typeDesc;
173 		if (isNotBlank(theType)) {
174 			myReindexJobDao.markAllOfTypeAsDeleted(theType);
175 			typeDesc = theType;
176 		} else {
177 			myReindexJobDao.markAllOfTypeAsDeleted();
178 			typeDesc = "(any)";
179 		}
180 
181 		ResourceReindexJobEntity job = new ResourceReindexJobEntity();
182 		job.setResourceType(theType);
183 		job.setThresholdHigh(DateUtils.addMinutes(new Date(), 5));
184 		job = myReindexJobDao.saveAndFlush(job);
185 
186 		ourLog.info("Marking all resources of type {} for reindexing - Got job ID[{}]", typeDesc, job.getId());
187 		return job.getId();
188 	}
189 
190 	@PostConstruct
191 	public void registerScheduledJob() {
192 		ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
193 		jobDetail.setId(ResourceReindexingSvcImpl.class.getName());
194 		jobDetail.setJobClass(ResourceReindexingSvcImpl.SubmitJob.class);
195 		mySchedulerService.scheduleFixedDelay(10 * DateUtils.MILLIS_PER_SECOND, true, jobDetail);
196 	}
197 
198 	@VisibleForTesting
199 	ReentrantLock getIndexingLockForUnitTest() {
200 		return myIndexingLock;
201 	}
202 
203 	@Override
204 	@Transactional(Transactional.TxType.NEVER)
205 	public Integer runReindexingPass() {
206 		if (myDaoConfig.isSchedulingDisabled()) {
207 			return null;
208 		}
209 		if (myIndexingLock.tryLock()) {
210 			try {
211 				return doReindexingPassInsideLock();
212 			} finally {
213 				myIndexingLock.unlock();
214 			}
215 		}
216 		return null;
217 	}
218 
219 	private int doReindexingPassInsideLock() {
220 		expungeJobsMarkedAsDeleted();
221 		return runReindexJobs();
222 	}
223 
224 	@Override
225 	public int forceReindexingPass() {
226 		myIndexingLock.lock();
227 		try {
228 			return doReindexingPassInsideLock();
229 		} finally {
230 			myIndexingLock.unlock();
231 		}
232 	}
233 
234 	@Override
235 	public void cancelAndPurgeAllJobs() {
236 		ourLog.info("Cancelling and purging all resource reindexing jobs");
237 		myIndexingLock.lock();
238 		try {
239 		myTxTemplate.execute(t -> {
240 			myReindexJobDao.markAllOfTypeAsDeleted();
241 			return null;
242 		});
243 
244 		myTaskExecutor.shutdown();
245 		initExecutor();
246 
247 		expungeJobsMarkedAsDeleted();
248 		} finally {
249 			myIndexingLock.unlock();
250 		}
251 	}
252 
253 	private int runReindexJobs() {
254 		Collection<ResourceReindexJobEntity> jobs = getResourceReindexJobEntities();
255 
256 		if (jobs.size() > 0) {
257 			ourLog.info("Running {} reindex jobs: {}", jobs.size(), jobs);
258 		} else {
259 			ourLog.debug("Running {} reindex jobs: {}", jobs.size(), jobs);
260 			return 0;
261 		}
262 
263 		int count = 0;
264 		for (ResourceReindexJobEntity next : jobs) {
265 
266 			if (next.getThresholdLow() != null && next.getThresholdLow().getTime() >= next.getThresholdHigh().getTime()) {
267 				markJobAsDeleted(next);
268 				continue;
269 			}
270 
271 			count += runReindexJob(next);
272 		}
273 		return count;
274 	}
275 
276 	@Override
277 	public int countReindexJobs() {
278 		return getResourceReindexJobEntities().size();
279 	}
280 
281 	private Collection<ResourceReindexJobEntity> getResourceReindexJobEntities() {
282 		Collection<ResourceReindexJobEntity> jobs = myTxTemplate.execute(t -> myReindexJobDao.findAll(PageRequest.of(0, 10), false));
283 		assert jobs != null;
284 		return jobs;
285 	}
286 
287 	private void markJobAsDeleted(ResourceReindexJobEntity theJob) {
288 		ourLog.info("Marking reindexing job ID[{}] as deleted", theJob.getId());
289 		myTxTemplate.execute(t -> {
290 			myReindexJobDao.markAsDeletedById(theJob.getId());
291 			return null;
292 		});
293 	}
294 
295 	@VisibleForTesting
296 	void setSearchParamRegistryForUnitTest(ISearchParamRegistry theSearchParamRegistry) {
297 		mySearchParamRegistry = theSearchParamRegistry;
298 	}
299 
300 	private int runReindexJob(ResourceReindexJobEntity theJob) {
301 		if (theJob.getSuspendedUntil() != null) {
302 			if (theJob.getSuspendedUntil().getTime() > System.currentTimeMillis()) {
303 				return 0;
304 			}
305 		}
306 
307 		ourLog.info("Performing reindex pass for JOB[{}]", theJob.getId());
308 		StopWatch sw = new StopWatch();
309 		AtomicInteger counter = new AtomicInteger();
310 
311 		/*
312 		 * On the first time we run a particular reindex job, let's make sure we
313 		 * have the latest search parameters loaded. A common reason to
314 		 * be reindexing is that the search parameters have changed in some way, so
315 		 * this makes sure we're on the latest versions
316 		 */
317 		if (theJob.getThresholdLow() == null) {
318 			mySearchParamRegistry.forceRefresh();
319 		}
320 
321 		// Calculate range
322 		Date low = theJob.getThresholdLow() != null ? theJob.getThresholdLow() : BEGINNING_OF_TIME;
323 		Date high = theJob.getThresholdHigh();
324 
325 		// Query for resources within threshold
326 		StopWatch pageSw = new StopWatch();
327 		Slice<Long> range = myTxTemplate.execute(t -> {
328 			PageRequest page = PageRequest.of(0, PASS_SIZE);
329 			if (isNotBlank(theJob.getResourceType())) {
330 				return myResourceTableDao.findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest(page, theJob.getResourceType(), low, high);
331 			} else {
332 				return myResourceTableDao.findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest(page, low, high);
333 			}
334 		});
335 		Validate.notNull(range);
336 		int count = range.getNumberOfElements();
337 		ourLog.info("Loaded {} resources for reindexing in {}", count, pageSw.toString());
338 
339 		// If we didn't find any results at all, mark as deleted
340 		if (count == 0) {
341 			markJobAsDeleted(theJob);
342 			return 0;
343 		}
344 
345 		// Submit each resource requiring reindexing
346 		List<Future<Date>> futures = range
347 			.stream()
348 			.map(t -> myTaskExecutor.submit(new ResourceReindexingTask(t, counter)))
349 			.collect(Collectors.toList());
350 
351 		Date latestDate = null;
352 		for (Future<Date> next : futures) {
353 			Date nextDate;
354 			try {
355 				nextDate = next.get();
356 			} catch (Exception e) {
357 				ourLog.error("Failure reindexing", e);
358 				Date suspendedUntil = DateUtils.addMinutes(new Date(), 1);
359 				myTxTemplate.execute(t -> {
360 					myReindexJobDao.setSuspendedUntil(suspendedUntil);
361 					return null;
362 				});
363 				return counter.get();
364 			}
365 
366 			if (nextDate != null) {
367 				if (latestDate == null || latestDate.getTime() < nextDate.getTime()) {
368 					latestDate = new Date(nextDate.getTime());
369 				}
370 			}
371 		}
372 
373 		Validate.notNull(latestDate);
374 		Date newLow;
375 		if (latestDate.getTime() == low.getTime()) {
376 			if (count == PASS_SIZE) {
377 				// Just in case we end up in some sort of infinite loop. This shouldn't happen, and couldn't really
378 				// happen unless there were 10000 resources with the exact same update time down to the
379 				// millisecond.
380 				ourLog.error("Final pass time for reindex JOB[{}] has same ending low value: {}", theJob.getId(), latestDate);
381 			}
382 
383 			newLow = new Date(latestDate.getTime() + 1);
384 		} else {
385 			newLow = latestDate;
386 		}
387 
388 		myTxTemplate.execute(t -> {
389 			myReindexJobDao.setThresholdLow(theJob.getId(), newLow);
390 			Integer existingCount = myReindexJobDao.getReindexCount(theJob.getId()).orElse(0);
391 			int newCount = existingCount + counter.get();
392 			myReindexJobDao.setReindexCount(theJob.getId(), newCount);
393 			return null;
394 		});
395 
396 		ourLog.info("Completed pass of reindex JOB[{}] - Indexed {} resources in {} ({} / sec) - Have indexed until: {}", theJob.getId(), count, sw.toString(), sw.formatThroughput(count, TimeUnit.SECONDS), new InstantType(newLow));
397 		return counter.get();
398 	}
399 
400 	private void expungeJobsMarkedAsDeleted() {
401 		myTxTemplate.execute(t -> {
402 			Collection<ResourceReindexJobEntity> toDelete = myReindexJobDao.findAll(PageRequest.of(0, 10), true);
403 			toDelete.forEach(job -> {
404 				ourLog.info("Purging deleted job[{}]", job.getId());
405 				myReindexJobDao.deleteById(job.getId());
406 			});
407 			return null;
408 		});
409 	}
410 
411 	private void markResourceAsIndexingFailed(final long theId) {
412 		TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
413 		txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
414 		txTemplate.execute((TransactionCallback<Void>) theStatus -> {
415 			ourLog.info("Marking resource with PID {} as indexing_failed", new Object[]{theId});
416 
417 			myResourceTableDao.updateIndexStatus(theId, BaseHapiFhirDao.INDEX_STATUS_INDEXING_FAILED);
418 
419 			Query q = myEntityManager.createQuery("DELETE FROM ResourceTag t WHERE t.myResourceId = :id");
420 			q.setParameter("id", theId);
421 			q.executeUpdate();
422 
423 			q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamCoords t WHERE t.myResourcePid = :id");
424 			q.setParameter("id", theId);
425 			q.executeUpdate();
426 
427 			q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamDate t WHERE t.myResourcePid = :id");
428 			q.setParameter("id", theId);
429 			q.executeUpdate();
430 
431 			q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamNumber t WHERE t.myResourcePid = :id");
432 			q.setParameter("id", theId);
433 			q.executeUpdate();
434 
435 			q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamQuantity t WHERE t.myResourcePid = :id");
436 			q.setParameter("id", theId);
437 			q.executeUpdate();
438 
439 			q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamString t WHERE t.myResourcePid = :id");
440 			q.setParameter("id", theId);
441 			q.executeUpdate();
442 
443 			q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamToken t WHERE t.myResourcePid = :id");
444 			q.setParameter("id", theId);
445 			q.executeUpdate();
446 
447 			q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamUri t WHERE t.myResourcePid = :id");
448 			q.setParameter("id", theId);
449 			q.executeUpdate();
450 
451 			q = myEntityManager.createQuery("DELETE FROM ResourceLink t WHERE t.mySourceResourcePid = :id");
452 			q.setParameter("id", theId);
453 			q.executeUpdate();
454 
455 			q = myEntityManager.createQuery("DELETE FROM ResourceLink t WHERE t.myTargetResourcePid = :id");
456 			q.setParameter("id", theId);
457 			q.executeUpdate();
458 
459 			return null;
460 		});
461 	}
462 
463 	private class ResourceReindexingTask implements Callable<Date> {
464 		private final Long myNextId;
465 		private final AtomicInteger myCounter;
466 		private Date myUpdated;
467 
468 		ResourceReindexingTask(Long theNextId, AtomicInteger theCounter) {
469 			myNextId = theNextId;
470 			myCounter = theCounter;
471 		}
472 
473 
474 		@SuppressWarnings("unchecked")
475 		private <T extends IBaseResource> void doReindex(ResourceTable theResourceTable, T theResource) {
476 			RuntimeResourceDefinition resourceDefinition = myContext.getResourceDefinition(theResource.getClass());
477 			Class<T> resourceClass = (Class<T>) resourceDefinition.getImplementingClass();
478 			final IFhirResourceDao<T> dao = myDaoRegistry.getResourceDao(resourceClass);
479 			dao.reindex(theResource, theResourceTable);
480 
481 			myCounter.incrementAndGet();
482 		}
483 
484 		@Override
485 		public Date call() {
486 			Throwable reindexFailure;
487 			try {
488 				reindexFailure = myTxTemplate.execute(t -> {
489 					ResourceTable resourceTable = myResourceTableDao.findById(myNextId).orElseThrow(IllegalStateException::new);
490 					myUpdated = resourceTable.getUpdatedDate();
491 
492 					try {
493 						/*
494 						 * This part is because from HAPI 1.5 - 1.6 we changed the format of forced ID to be "type/id" instead of just "id"
495 						 */
496 						ForcedId forcedId = resourceTable.getForcedId();
497 						if (forcedId != null) {
498 							if (isBlank(forcedId.getResourceType())) {
499 								ourLog.info("Updating resource {} forcedId type to {}", forcedId.getForcedId(), resourceTable.getResourceType());
500 								forcedId.setResourceType(resourceTable.getResourceType());
501 								myForcedIdDao.save(forcedId);
502 							}
503 						}
504 
505 						IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(resourceTable.getResourceType());
506 						long expectedVersion = resourceTable.getVersion();
507 						IBaseResource resource = dao.read(resourceTable.getIdDt().toVersionless(), null, true);
508 						if (resource == null) {
509 							throw new InternalErrorException("Could not find resource version " + resourceTable.getIdDt().toUnqualified().getValue() + " in database");
510 						}
511 
512 						Long actualVersion = resource.getIdElement().getVersionIdPartAsLong();
513 						if (actualVersion < expectedVersion) {
514 							ourLog.warn("Resource {} version {} does not exist, renumbering version {}", resource.getIdElement().toUnqualifiedVersionless().getValue(), resource.getIdElement().getVersionIdPart(), expectedVersion);
515 							myResourceHistoryTableDao.updateVersion(resourceTable.getId(), actualVersion, expectedVersion);
516 						}
517 
518 						doReindex(resourceTable, resource);
519 						return null;
520 
521 					} catch (Exception e) {
522 						ourLog.error("Failed to index resource {}: {}", resourceTable.getIdDt(), e.toString(), e);
523 						t.setRollbackOnly();
524 						return e;
525 					}
526 				});
527 
528 			} catch (ResourceVersionConflictException e) {
529 				/*
530 				 * We reindex in multiple threads, so it's technically possible that two threads try
531 				 * to index resources that cause a constraint error now (i.e. because a unique index has been
532 				 * added that didn't previously exist). In this case, one of the threads would succeed and
533 				 * not get this error, so we'll let the other one fail and try
534 				 * again later.
535 				 */
536 				ourLog.info("Failed to reindex because of a version conflict. Leaving in unindexed state: {}", e.getMessage());
537 				reindexFailure = null;
538 			}
539 
540 			if (reindexFailure != null) {
541 				ourLog.info("Setting resource PID[{}] status to ERRORED", myNextId);
542 				markResourceAsIndexingFailed(myNextId);
543 			}
544 
545 			return myUpdated;
546 		}
547 	}
548 
549 	public static class SubmitJob implements Job {
550 		@Autowired
551 		private IResourceReindexingSvc myTarget;
552 
553 		@Override
554 		public void execute(JobExecutionContext theContext) {
555 			myTarget.runReindexingPass();
556 		}
557 	}
558 }