View Javadoc
1   package ca.uhn.fhir.jpa.search.reindex;
2   
3   /*-
4    * #%L
5    * HAPI FHIR JPA Server
6    * %%
7    * Copyright (C) 2014 - 2019 University Health Network
8    * %%
9    * Licensed under the Apache License, Version 2.0 (the "License");
10   * you may not use this file except in compliance with the License.
11   * You may obtain a copy of the License at
12   *
13   *      http://www.apache.org/licenses/LICENSE-2.0
14   *
15   * Unless required by applicable law or agreed to in writing, software
16   * distributed under the License is distributed on an "AS IS" BASIS,
17   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18   * See the License for the specific language governing permissions and
19   * limitations under the License.
20   * #L%
21   */
22  
23  import ca.uhn.fhir.context.FhirContext;
24  import ca.uhn.fhir.context.RuntimeResourceDefinition;
25  import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
26  import ca.uhn.fhir.jpa.dao.DaoConfig;
27  import ca.uhn.fhir.jpa.dao.DaoRegistry;
28  import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
29  import ca.uhn.fhir.jpa.dao.data.IForcedIdDao;
30  import ca.uhn.fhir.jpa.dao.data.IResourceHistoryTableDao;
31  import ca.uhn.fhir.jpa.dao.data.IResourceReindexJobDao;
32  import ca.uhn.fhir.jpa.dao.data.IResourceTableDao;
33  import ca.uhn.fhir.jpa.entity.ResourceReindexJobEntity;
34  import ca.uhn.fhir.jpa.model.entity.ForcedId;
35  import ca.uhn.fhir.jpa.model.entity.ResourceTable;
36  import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamRegistry;
37  import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
38  import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
39  import ca.uhn.fhir.util.StopWatch;
40  import com.google.common.annotations.VisibleForTesting;
41  import org.apache.commons.lang3.Validate;
42  import org.apache.commons.lang3.concurrent.BasicThreadFactory;
43  import org.apache.commons.lang3.time.DateUtils;
44  import org.hibernate.search.util.impl.Executors;
45  import org.hl7.fhir.instance.model.api.IBaseResource;
46  import org.hl7.fhir.r4.model.InstantType;
47  import org.slf4j.Logger;
48  import org.slf4j.LoggerFactory;
49  import org.springframework.beans.factory.annotation.Autowired;
50  import org.springframework.data.domain.PageRequest;
51  import org.springframework.data.domain.Slice;
52  import org.springframework.scheduling.annotation.Scheduled;
53  import org.springframework.transaction.PlatformTransactionManager;
54  import org.springframework.transaction.TransactionDefinition;
55  import org.springframework.transaction.support.TransactionCallback;
56  import org.springframework.transaction.support.TransactionTemplate;
57  
58  import javax.annotation.PostConstruct;
59  import javax.persistence.EntityManager;
60  import javax.persistence.PersistenceContext;
61  import javax.persistence.PersistenceContextType;
62  import javax.persistence.Query;
63  import javax.transaction.Transactional;
64  import java.util.Collection;
65  import java.util.Date;
66  import java.util.List;
67  import java.util.concurrent.*;
68  import java.util.concurrent.atomic.AtomicInteger;
69  import java.util.concurrent.locks.ReentrantLock;
70  import java.util.stream.Collectors;
71  
72  import static org.apache.commons.lang3.StringUtils.isBlank;
73  import static org.apache.commons.lang3.StringUtils.isNotBlank;
74  
75  public class ResourceReindexingSvcImpl implements IResourceReindexingSvc {
76  
77  	private static final Date BEGINNING_OF_TIME = new Date(0);
78  	private static final Logger ourLog = LoggerFactory.getLogger(ResourceReindexingSvcImpl.class);
79  	private static final int PASS_SIZE = 25000;
80  	private final ReentrantLock myIndexingLock = new ReentrantLock();
81  	@Autowired
82  	private IResourceReindexJobDao myReindexJobDao;
83  	@Autowired
84  	private DaoConfig myDaoConfig;
85  	@Autowired
86  	private PlatformTransactionManager myTxManager;
87  	private TransactionTemplate myTxTemplate;
88  	private ThreadFactory myReindexingThreadFactory = new BasicThreadFactory.Builder().namingPattern("ResourceReindex-%d").build();
89  	private ThreadPoolExecutor myTaskExecutor;
90  	@Autowired
91  	private IResourceTableDao myResourceTableDao;
92  	@Autowired
93  	private IResourceHistoryTableDao myResourceHistoryTableDao;
94  	@Autowired
95  	private DaoRegistry myDaoRegistry;
96  	@Autowired
97  	private IForcedIdDao myForcedIdDao;
98  	@Autowired
99  	private FhirContext myContext;
100 	@PersistenceContext(type = PersistenceContextType.TRANSACTION)
101 	private EntityManager myEntityManager;
102 	@Autowired
103 	private ISearchParamRegistry mySearchParamRegistry;
104 
105 	@VisibleForTesting
106 	void setReindexJobDaoForUnitTest(IResourceReindexJobDao theReindexJobDao) {
107 		myReindexJobDao = theReindexJobDao;
108 	}
109 
110 	@VisibleForTesting
111 	void setDaoConfigForUnitTest(DaoConfig theDaoConfig) {
112 		myDaoConfig = theDaoConfig;
113 	}
114 
115 	@VisibleForTesting
116 	void setTxManagerForUnitTest(PlatformTransactionManager theTxManager) {
117 		myTxManager = theTxManager;
118 	}
119 
120 	@VisibleForTesting
121 	void setResourceTableDaoForUnitTest(IResourceTableDao theResourceTableDao) {
122 		myResourceTableDao = theResourceTableDao;
123 	}
124 
125 	@VisibleForTesting
126 	void setDaoRegistryForUnitTest(DaoRegistry theDaoRegistry) {
127 		myDaoRegistry = theDaoRegistry;
128 	}
129 
130 	@VisibleForTesting
131 	void setForcedIdDaoForUnitTest(IForcedIdDao theForcedIdDao) {
132 		myForcedIdDao = theForcedIdDao;
133 	}
134 
135 	@VisibleForTesting
136 	void setContextForUnitTest(FhirContext theContext) {
137 		myContext = theContext;
138 	}
139 
140 	@PostConstruct
141 	public void start() {
142 		myTxTemplate = new TransactionTemplate(myTxManager);
143 		initExecutor();
144 	}
145 
146 	private void initExecutor() {
147 		// Create the threadpool executor used for reindex jobs
148 		int reindexThreadCount = myDaoConfig.getReindexThreadCount();
149 		RejectedExecutionHandler rejectHandler = new Executors.BlockPolicy();
150 		myTaskExecutor = new ThreadPoolExecutor(0, reindexThreadCount,
151 			0L, TimeUnit.MILLISECONDS,
152 			new LinkedBlockingQueue<>(100),
153 			myReindexingThreadFactory,
154 			rejectHandler
155 		);
156 	}
157 
158 	@Override
159 	@Transactional(Transactional.TxType.REQUIRED)
160 	public Long markAllResourcesForReindexing() {
161 		return markAllResourcesForReindexing(null);
162 	}
163 
164 	@Override
165 	@Transactional(Transactional.TxType.REQUIRED)
166 	public Long markAllResourcesForReindexing(String theType) {
167 		String typeDesc;
168 		if (isNotBlank(theType)) {
169 			myReindexJobDao.markAllOfTypeAsDeleted(theType);
170 			typeDesc = theType;
171 		} else {
172 			myReindexJobDao.markAllOfTypeAsDeleted();
173 			typeDesc = "(any)";
174 		}
175 
176 		ResourceReindexJobEntity job = new ResourceReindexJobEntity();
177 		job.setResourceType(theType);
178 		job.setThresholdHigh(DateUtils.addMinutes(new Date(), 5));
179 		job = myReindexJobDao.saveAndFlush(job);
180 
181 		ourLog.info("Marking all resources of type {} for reindexing - Got job ID[{}]", typeDesc, job.getId());
182 		return job.getId();
183 	}
184 
185 	@Override
186 	@Transactional(Transactional.TxType.NEVER)
187 	@Scheduled(fixedDelay = 10 * DateUtils.MILLIS_PER_SECOND)
188 	public void scheduleReindexingPass() {
189 		runReindexingPass();
190 	}
191 
192 	@Override
193 	@Transactional(Transactional.TxType.NEVER)
194 	public Integer runReindexingPass() {
195 		if (myDaoConfig.isSchedulingDisabled()) {
196 			return null;
197 		}
198 		if (myIndexingLock.tryLock()) {
199 			try {
200 				return doReindexingPassInsideLock();
201 			} finally {
202 				myIndexingLock.unlock();
203 			}
204 		}
205 		return null;
206 	}
207 
208 	private int doReindexingPassInsideLock() {
209 		expungeJobsMarkedAsDeleted();
210 		return runReindexJobs();
211 	}
212 
213 	@Override
214 	public int forceReindexingPass() {
215 		myIndexingLock.lock();
216 		try {
217 			return doReindexingPassInsideLock();
218 		} finally {
219 			myIndexingLock.unlock();
220 		}
221 	}
222 
223 	@Override
224 	public void cancelAndPurgeAllJobs() {
225 		ourLog.info("Cancelling and purging all resource reindexing jobs");
226 		myTxTemplate.execute(t -> {
227 			myReindexJobDao.markAllOfTypeAsDeleted();
228 			return null;
229 		});
230 
231 		myTaskExecutor.shutdown();
232 		initExecutor();
233 
234 		expungeJobsMarkedAsDeleted();
235 	}
236 
237 	private int runReindexJobs() {
238 		Collection<ResourceReindexJobEntity> jobs = getResourceReindexJobEntities();
239 
240 		if (jobs.size() > 0) {
241 			ourLog.info("Running {} reindex jobs: {}", jobs.size(), jobs);
242 		} else {
243 			ourLog.debug("Running {} reindex jobs: {}", jobs.size(), jobs);
244 			return 0;
245 		}
246 
247 		int count = 0;
248 		for (ResourceReindexJobEntity next : jobs) {
249 
250 			if (next.getThresholdLow() != null && next.getThresholdLow().getTime() >= next.getThresholdHigh().getTime()) {
251 				markJobAsDeleted(next);
252 				continue;
253 			}
254 
255 			count += runReindexJob(next);
256 		}
257 		return count;
258 	}
259 
260 	@Override
261 	public int countReindexJobs() {
262 		return getResourceReindexJobEntities().size();
263 	}
264 
265 	private Collection<ResourceReindexJobEntity> getResourceReindexJobEntities() {
266 		Collection<ResourceReindexJobEntity> jobs = myTxTemplate.execute(t -> myReindexJobDao.findAll(PageRequest.of(0, 10), false));
267 		assert jobs != null;
268 		return jobs;
269 	}
270 
271 	private void markJobAsDeleted(ResourceReindexJobEntity theJob) {
272 		ourLog.info("Marking reindexing job ID[{}] as deleted", theJob.getId());
273 		myTxTemplate.execute(t -> {
274 			myReindexJobDao.markAsDeletedById(theJob.getId());
275 			return null;
276 		});
277 	}
278 
279 	@VisibleForTesting
280 	public void setSearchParamRegistryForUnitTest(ISearchParamRegistry theSearchParamRegistry) {
281 		mySearchParamRegistry = theSearchParamRegistry;
282 	}
283 
284 	private int runReindexJob(ResourceReindexJobEntity theJob) {
285 		if (theJob.getSuspendedUntil() != null) {
286 			if (theJob.getSuspendedUntil().getTime() > System.currentTimeMillis()) {
287 				return 0;
288 			}
289 		}
290 
291 		ourLog.info("Performing reindex pass for JOB[{}]", theJob.getId());
292 		StopWatch sw = new StopWatch();
293 		AtomicInteger counter = new AtomicInteger();
294 
295 		/*
296 		 * On the first time we run a particular reindex job, let's make sure we
297 		 * have the latest search parameters loaded. A common reason to
298 		 * be reindexing is that the search parameters have changed in some way, so
299 		 * this makes sure we're on the latest versions
300 		 */
301 		if (theJob.getThresholdLow() == null) {
302 			mySearchParamRegistry.forceRefresh();
303 		}
304 
305 		// Calculate range
306 		Date low = theJob.getThresholdLow() != null ? theJob.getThresholdLow() : BEGINNING_OF_TIME;
307 		Date high = theJob.getThresholdHigh();
308 
309 		// SqlQuery for resources within threshold
310 		StopWatch pageSw = new StopWatch();
311 		Slice<Long> range = myTxTemplate.execute(t -> {
312 			PageRequest page = PageRequest.of(0, PASS_SIZE);
313 			if (isNotBlank(theJob.getResourceType())) {
314 				return myResourceTableDao.findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest(page, theJob.getResourceType(), low, high);
315 			} else {
316 				return myResourceTableDao.findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest(page, low, high);
317 			}
318 		});
319 		Validate.notNull(range);
320 		int count = range.getNumberOfElements();
321 		ourLog.info("Loaded {} resources for reindexing in {}", count, pageSw.toString());
322 
323 		// If we didn't find any results at all, mark as deleted
324 		if (count == 0) {
325 			markJobAsDeleted(theJob);
326 			return 0;
327 		}
328 
329 		// Submit each resource requiring reindexing
330 		List<Future<Date>> futures = range
331 			.stream()
332 			.map(t -> myTaskExecutor.submit(new ResourceReindexingTask(t, counter)))
333 			.collect(Collectors.toList());
334 
335 		Date latestDate = null;
336 		for (Future<Date> next : futures) {
337 			Date nextDate;
338 			try {
339 				nextDate = next.get();
340 			} catch (Exception e) {
341 				ourLog.error("Failure reindexing", e);
342 				Date suspendedUntil = DateUtils.addMinutes(new Date(), 1);
343 				myTxTemplate.execute(t -> {
344 					myReindexJobDao.setSuspendedUntil(suspendedUntil);
345 					return null;
346 				});
347 				return counter.get();
348 			}
349 
350 			if (nextDate != null) {
351 				if (latestDate == null || latestDate.getTime() < nextDate.getTime()) {
352 					latestDate = new Date(nextDate.getTime());
353 				}
354 			}
355 		}
356 
357 		Validate.notNull(latestDate);
358 		Date newLow;
359 		if (latestDate.getTime() == low.getTime()) {
360 			if (count == PASS_SIZE) {
361 				// Just in case we end up in some sort of infinite loop. This shouldn't happen, and couldn't really
362 				// happen unless there were 10000 resources with the exact same update time down to the
363 				// millisecond.
364 				ourLog.error("Final pass time for reindex JOB[{}] has same ending low value: {}", theJob.getId(), latestDate);
365 			}
366 
367 			newLow = new Date(latestDate.getTime() + 1);
368 		} else {
369 			newLow = latestDate;
370 		}
371 
372 		myTxTemplate.execute(t -> {
373 			myReindexJobDao.setThresholdLow(theJob.getId(), newLow);
374 			Integer existingCount = myReindexJobDao.getReindexCount(theJob.getId()).orElse(0);
375 			int newCount = existingCount + counter.get();
376 			myReindexJobDao.setReindexCount(theJob.getId(), newCount);
377 			return null;
378 		});
379 
380 		ourLog.info("Completed pass of reindex JOB[{}] - Indexed {} resources in {} ({} / sec) - Have indexed until: {}", theJob.getId(), count, sw.toString(), sw.formatThroughput(count, TimeUnit.SECONDS), new InstantType(newLow));
381 		return counter.get();
382 	}
383 
384 	private void expungeJobsMarkedAsDeleted() {
385 		myTxTemplate.execute(t -> {
386 			Collection<ResourceReindexJobEntity> toDelete = myReindexJobDao.findAll(PageRequest.of(0, 10), true);
387 			toDelete.forEach(job -> {
388 				ourLog.info("Purging deleted job[{}]", job.getId());
389 				myReindexJobDao.deleteById(job.getId());
390 			});
391 			return null;
392 		});
393 	}
394 
395 	private void markResourceAsIndexingFailed(final long theId) {
396 		TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
397 		txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
398 		txTemplate.execute((TransactionCallback<Void>) theStatus -> {
399 			ourLog.info("Marking resource with PID {} as indexing_failed", new Object[]{theId});
400 
401 			myResourceTableDao.updateIndexStatus(theId, BaseHapiFhirDao.INDEX_STATUS_INDEXING_FAILED);
402 
403 			Query q = myEntityManager.createQuery("DELETE FROM ResourceTag t WHERE t.myResourceId = :id");
404 			q.setParameter("id", theId);
405 			q.executeUpdate();
406 
407 			q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamCoords t WHERE t.myResourcePid = :id");
408 			q.setParameter("id", theId);
409 			q.executeUpdate();
410 
411 			q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamDate t WHERE t.myResourcePid = :id");
412 			q.setParameter("id", theId);
413 			q.executeUpdate();
414 
415 			q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamNumber t WHERE t.myResourcePid = :id");
416 			q.setParameter("id", theId);
417 			q.executeUpdate();
418 
419 			q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamQuantity t WHERE t.myResourcePid = :id");
420 			q.setParameter("id", theId);
421 			q.executeUpdate();
422 
423 			q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamString t WHERE t.myResourcePid = :id");
424 			q.setParameter("id", theId);
425 			q.executeUpdate();
426 
427 			q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamToken t WHERE t.myResourcePid = :id");
428 			q.setParameter("id", theId);
429 			q.executeUpdate();
430 
431 			q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamUri t WHERE t.myResourcePid = :id");
432 			q.setParameter("id", theId);
433 			q.executeUpdate();
434 
435 			q = myEntityManager.createQuery("DELETE FROM ResourceLink t WHERE t.mySourceResourcePid = :id");
436 			q.setParameter("id", theId);
437 			q.executeUpdate();
438 
439 			q = myEntityManager.createQuery("DELETE FROM ResourceLink t WHERE t.myTargetResourcePid = :id");
440 			q.setParameter("id", theId);
441 			q.executeUpdate();
442 
443 			return null;
444 		});
445 	}
446 
447 	private class ResourceReindexingTask implements Callable<Date> {
448 		private final Long myNextId;
449 		private final AtomicInteger myCounter;
450 		private Date myUpdated;
451 
452 		ResourceReindexingTask(Long theNextId, AtomicInteger theCounter) {
453 			myNextId = theNextId;
454 			myCounter = theCounter;
455 		}
456 
457 
458 		@SuppressWarnings("unchecked")
459 		private <T extends IBaseResource> void doReindex(ResourceTable theResourceTable, T theResource) {
460 			RuntimeResourceDefinition resourceDefinition = myContext.getResourceDefinition(theResource.getClass());
461 			Class<T> resourceClass = (Class<T>) resourceDefinition.getImplementingClass();
462 			final IFhirResourceDao<T> dao = myDaoRegistry.getResourceDao(resourceClass);
463 			dao.reindex(theResource, theResourceTable);
464 
465 			myCounter.incrementAndGet();
466 		}
467 
468 		@Override
469 		public Date call() {
470 			Throwable reindexFailure;
471 			try {
472 				reindexFailure = myTxTemplate.execute(t -> {
473 					ResourceTable resourceTable = myResourceTableDao.findById(myNextId).orElseThrow(IllegalStateException::new);
474 					myUpdated = resourceTable.getUpdatedDate();
475 
476 					try {
477 						/*
478 						 * This part is because from HAPI 1.5 - 1.6 we changed the format of forced ID to be "type/id" instead of just "id"
479 						 */
480 						ForcedId forcedId = resourceTable.getForcedId();
481 						if (forcedId != null) {
482 							if (isBlank(forcedId.getResourceType())) {
483 								ourLog.info("Updating resource {} forcedId type to {}", forcedId.getForcedId(), resourceTable.getResourceType());
484 								forcedId.setResourceType(resourceTable.getResourceType());
485 								myForcedIdDao.save(forcedId);
486 							}
487 						}
488 
489 						IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(resourceTable.getResourceType());
490 						long expectedVersion = resourceTable.getVersion();
491 						IBaseResource resource = dao.read(resourceTable.getIdDt().toVersionless(), null, true);
492 						if (resource == null) {
493 							throw new InternalErrorException("Could not find resource version " + resourceTable.getIdDt().toUnqualified().getValue() + " in database");
494 						}
495 
496 						Long actualVersion = resource.getIdElement().getVersionIdPartAsLong();
497 						if (actualVersion < expectedVersion) {
498 							ourLog.warn("Resource {} version {} does not exist, renumbering version {}", resource.getIdElement().toUnqualifiedVersionless().getValue(), resource.getIdElement().getVersionIdPart(), expectedVersion);
499 							myResourceHistoryTableDao.updateVersion(resourceTable.getId(), actualVersion, expectedVersion);
500 						}
501 
502 						doReindex(resourceTable, resource);
503 						return null;
504 
505 					} catch (Exception e) {
506 						ourLog.error("Failed to index resource {}: {}", resourceTable.getIdDt(), e.toString(), e);
507 						t.setRollbackOnly();
508 						return e;
509 					}
510 				});
511 
512 			} catch (ResourceVersionConflictException e) {
513 				/*
514 				 * We reindex in multiple threads, so it's technically possible that two threads try
515 				 * to index resources that cause a constraint error now (i.e. because a unique index has been
516 				 * added that didn't previously exist). In this case, one of the threads would succeed and
517 				 * not get this error, so we'll let the other one fail and try
518 				 * again later.
519 				 */
520 				ourLog.info("Failed to reindex because of a version conflict. Leaving in unindexed state: {}", e.getMessage());
521 				reindexFailure = null;
522 			}
523 
524 			if (reindexFailure != null) {
525 				ourLog.info("Setting resource PID[{}] status to ERRORED", myNextId);
526 				markResourceAsIndexingFailed(myNextId);
527 			}
528 
529 			return myUpdated;
530 		}
531 	}
532 }