View Javadoc
1   package ca.uhn.fhir.jpa.dao;
2   
3   import ca.uhn.fhir.jpa.dao.data.IForcedIdDao;
4   import ca.uhn.fhir.jpa.dao.data.IResourceTableDao;
5   import ca.uhn.fhir.jpa.dao.data.ITermConceptDao;
6   import ca.uhn.fhir.jpa.entity.ForcedId;
7   import ca.uhn.fhir.jpa.entity.ResourceTable;
8   import ca.uhn.fhir.jpa.util.ExpungeOptions;
9   import ca.uhn.fhir.jpa.util.ExpungeOutcome;
10  import ca.uhn.fhir.jpa.util.ReindexFailureException;
11  import ca.uhn.fhir.jpa.util.ResourceCountCache;
12  import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
13  import ca.uhn.fhir.rest.api.server.IBundleProvider;
14  import ca.uhn.fhir.rest.api.server.RequestDetails;
15  import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
16  import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
17  import ca.uhn.fhir.rest.server.interceptor.IServerInterceptor.ActionRequestDetails;
18  import ca.uhn.fhir.util.StopWatch;
19  import org.apache.commons.lang3.concurrent.BasicThreadFactory;
20  import org.hibernate.search.util.impl.Executors;
21  import org.hl7.fhir.instance.model.api.IBaseResource;
22  import org.springframework.beans.factory.annotation.Autowired;
23  import org.springframework.beans.factory.annotation.Qualifier;
24  import org.springframework.data.domain.PageRequest;
25  import org.springframework.transaction.PlatformTransactionManager;
26  import org.springframework.transaction.TransactionStatus;
27  import org.springframework.transaction.annotation.Propagation;
28  import org.springframework.transaction.annotation.Transactional;
29  import org.springframework.transaction.support.TransactionCallback;
30  import org.springframework.transaction.support.TransactionCallbackWithoutResult;
31  import org.springframework.transaction.support.TransactionTemplate;
32  
33  import javax.annotation.Nonnull;
34  import javax.annotation.Nullable;
35  import javax.persistence.Query;
36  import java.util.*;
37  import java.util.concurrent.*;
38  import java.util.concurrent.locks.ReentrantLock;
39  
40  import static org.apache.commons.lang3.StringUtils.isBlank;
41  
42  /*
43   * #%L
44   * HAPI FHIR JPA Server
45   * %%
46   * Copyright (C) 2014 - 2018 University Health Network
47   * %%
48   * Licensed under the Apache License, Version 2.0 (the "License");
49   * you may not use this file except in compliance with the License.
50   * You may obtain a copy of the License at
51   * 
52   *      http://www.apache.org/licenses/LICENSE-2.0
53   * 
54   * Unless required by applicable law or agreed to in writing, software
55   * distributed under the License is distributed on an "AS IS" BASIS,
56   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
57   * See the License for the specific language governing permissions and
58   * limitations under the License.
59   * #L%
60   */
61  
62  public abstract class BaseHapiFhirSystemDao<T, MT> extends BaseHapiFhirDao<IBaseResource> implements IFhirSystemDao<T, MT> {
63  
64  	private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(BaseHapiFhirSystemDao.class);
65  	@Autowired
66  	@Qualifier("myResourceCountsCache")
67  	public ResourceCountCache myResourceCountsCache;
68  	@Autowired
69  	private IForcedIdDao myForcedIdDao;
70  	private ReentrantLock myReindexLock = new ReentrantLock(false);
71  	@Autowired
72  	private ITermConceptDao myTermConceptDao;
73  	@Autowired
74  	private ISearchParamRegistry mySearchParamRegistry;
75  	@Autowired
76  	private PlatformTransactionManager myTxManager;
77  	@Autowired
78  	private IResourceTableDao myResourceTableDao;
79  	private ThreadFactory myReindexingThreadFactory = new BasicThreadFactory.Builder().namingPattern("ResourceReindex-%d").build();
80  
81  	private int doPerformReindexingPass(final Integer theCount) {
82  		/*
83  		 * If any search parameters have been recently added or changed,
84  		 * this makes sure that the cache has been reloaded to reflect
85  		 * them.
86  		 */
87  		mySearchParamRegistry.refreshCacheIfNecessary();
88  
89  		TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
90  		txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRED);
91  		return doPerformReindexingPassForResources(theCount, txTemplate);
92  	}
93  
94  	@SuppressWarnings("ConstantConditions")
95  	private int doPerformReindexingPassForResources(final Integer theCount, TransactionTemplate txTemplate) {
96  
97  		// Determine the IDs needing reindexing
98  		List<Long> idsToReindex = txTemplate.execute(theStatus -> {
99  			int maxResult = 500;
100 			if (theCount != null) {
101 				maxResult = Math.min(theCount, 2000);
102 			}
103 			maxResult = Math.max(maxResult, 10);
104 
105 			ourLog.debug("Beginning indexing query with maximum {}", maxResult);
106 			return myResourceTableDao
107 				.findIdsOfResourcesRequiringReindexing(new PageRequest(0, maxResult))
108 				.getContent();
109 		});
110 
111 		// If no IDs need reindexing, we're good here
112 		if (idsToReindex.isEmpty()) {
113 			return 0;
114 		}
115 
116 		// Reindex
117 		StopWatch sw = new StopWatch();
118 
119 		// Execute each reindex in a task within a threadpool
120 		int threadCount = getConfig().getReindexThreadCount();
121 		RejectedExecutionHandler rejectHandler = new Executors.BlockPolicy();
122 		ThreadPoolExecutor executor = new ThreadPoolExecutor(threadCount, threadCount,
123 			0L, TimeUnit.MILLISECONDS,
124 			new LinkedBlockingQueue<>(),
125 			myReindexingThreadFactory,
126 			rejectHandler
127 			);
128 		List<Future<?>> futures = new ArrayList<>();
129 		for (Long nextId : idsToReindex) {
130 			futures.add(executor.submit(new ResourceReindexingTask(nextId)));
131 		}
132 		for (Future<?> next : futures) {
133 			try {
134 				next.get();
135 			} catch (Exception e) {
136 				throw new InternalErrorException("Failed to reindex: ", e);
137 			}
138 		}
139 		executor.shutdown();
140 
141 		ourLog.info("Reindexed {} resources in {} threads - {}ms/resource", idsToReindex.size(), threadCount, sw.getMillisPerOperation(idsToReindex.size()));
142 		return idsToReindex.size();
143 	}
144 
145 	@Override
146 	@Transactional(propagation = Propagation.REQUIRED)
147 	public ExpungeOutcome expunge(ExpungeOptions theExpungeOptions) {
148 		return doExpunge(null, null, null, theExpungeOptions);
149 	}
150 
151 	@Transactional(propagation = Propagation.REQUIRED)
152 	@Override
153 	public Map<String, Long> getResourceCounts() {
154 		Map<String, Long> retVal = new HashMap<>();
155 
156 		List<Map<?, ?>> counts = myResourceTableDao.getResourceCounts();
157 		for (Map<?, ?> next : counts) {
158 			retVal.put(next.get("type").toString(), Long.parseLong(next.get("count").toString()));
159 		}
160 
161 		return retVal;
162 	}
163 
164 	@Transactional(propagation = Propagation.SUPPORTS)
165 	@Nullable
166 	@Override
167 	public Map<String, Long> getResourceCountsFromCache() {
168 		return myResourceCountsCache.get();
169 	}
170 
171 	@Override
172 	public IBundleProvider history(Date theSince, Date theUntil, RequestDetails theRequestDetails) {
173 		if (theRequestDetails != null) {
174 			// Notify interceptors
175 			ActionRequestDetails requestDetails = new ActionRequestDetails(theRequestDetails);
176 			notifyInterceptors(RestOperationTypeEnum.HISTORY_SYSTEM, requestDetails);
177 		}
178 
179 		StopWatch w = new StopWatch();
180 		IBundleProvider retVal = super.history(null, null, theSince, theUntil);
181 		ourLog.info("Processed global history in {}ms", w.getMillisAndRestart());
182 		return retVal;
183 	}
184 
185 	@Transactional()
186 	@Override
187 	public int markAllResourcesForReindexing() {
188 
189 		ourLog.info("Marking all resources as needing reindexing");
190 		int retVal = myEntityManager.createQuery("UPDATE " + ResourceTable.class.getSimpleName() + " t SET t.myIndexStatus = null").executeUpdate();
191 
192 		ourLog.info("Marking all concepts as needing reindexing");
193 		retVal += myTermConceptDao.markAllForReindexing();
194 
195 		ourLog.info("Done marking reindexing");
196 		return retVal;
197 	}
198 
199 	private void markResourceAsIndexingFailed(final long theId) {
200 		TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
201 		txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
202 		txTemplate.execute(new TransactionCallback<Void>() {
203 			@Override
204 			public Void doInTransaction(@Nonnull TransactionStatus theStatus) {
205 				ourLog.info("Marking resource with PID {} as indexing_failed", new Object[] {theId});
206 				Query q = myEntityManager.createQuery("UPDATE ResourceTable t SET t.myIndexStatus = :status WHERE t.myId = :id");
207 				q.setParameter("status", INDEX_STATUS_INDEXING_FAILED);
208 				q.setParameter("id", theId);
209 				q.executeUpdate();
210 
211 				q = myEntityManager.createQuery("DELETE FROM ResourceTag t WHERE t.myResourceId = :id");
212 				q.setParameter("id", theId);
213 				q.executeUpdate();
214 
215 				q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamCoords t WHERE t.myResourcePid = :id");
216 				q.setParameter("id", theId);
217 				q.executeUpdate();
218 
219 				q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamDate t WHERE t.myResourcePid = :id");
220 				q.setParameter("id", theId);
221 				q.executeUpdate();
222 
223 				q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamNumber t WHERE t.myResourcePid = :id");
224 				q.setParameter("id", theId);
225 				q.executeUpdate();
226 
227 				q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamQuantity t WHERE t.myResourcePid = :id");
228 				q.setParameter("id", theId);
229 				q.executeUpdate();
230 
231 				q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamString t WHERE t.myResourcePid = :id");
232 				q.setParameter("id", theId);
233 				q.executeUpdate();
234 
235 				q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamToken t WHERE t.myResourcePid = :id");
236 				q.setParameter("id", theId);
237 				q.executeUpdate();
238 
239 				q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamUri t WHERE t.myResourcePid = :id");
240 				q.setParameter("id", theId);
241 				q.executeUpdate();
242 
243 				q = myEntityManager.createQuery("DELETE FROM ResourceLink t WHERE t.mySourceResourcePid = :id");
244 				q.setParameter("id", theId);
245 				q.executeUpdate();
246 
247 				q = myEntityManager.createQuery("DELETE FROM ResourceLink t WHERE t.myTargetResourcePid = :id");
248 				q.setParameter("id", theId);
249 				q.executeUpdate();
250 
251 				return null;
252 			}
253 		});
254 	}
255 
256 	@Override
257 	@Transactional(propagation = Propagation.NEVER)
258 	public Integer performReindexingPass(final Integer theCount) {
259 		if (getConfig().isStatusBasedReindexingDisabled()) {
260 			return -1;
261 		}
262 		if (!myReindexLock.tryLock()) {
263 			return -1;
264 		}
265 		try {
266 			return doPerformReindexingPass(theCount);
267 		} catch (ReindexFailureException e) {
268 			ourLog.warn("Reindexing failed for resource {}", e.getResourceId());
269 			markResourceAsIndexingFailed(e.getResourceId());
270 			return -1;
271 		} finally {
272 			myReindexLock.unlock();
273 		}
274 	}
275 
276 	private class ResourceReindexingTask implements Runnable {
277 		private final Long myNextId;
278 
279 		public ResourceReindexingTask(Long theNextId) {
280 			myNextId = theNextId;
281 		}
282 
283 		@SuppressWarnings("unchecked")
284 		@Override
285 		public void run() {
286 			TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
287 			txTemplate.afterPropertiesSet();
288 
289 			Throwable reindexFailure;
290 			try {
291 				reindexFailure = txTemplate.execute(new TransactionCallback<Throwable>() {
292 					@Override
293 					public Throwable doInTransaction(TransactionStatus theStatus) {
294 						ResourceTable resourceTable = myResourceTableDao.findById(myNextId).orElseThrow(IllegalStateException::new);
295 
296 						try {
297 							/*
298 							 * This part is because from HAPI 1.5 - 1.6 we changed the format of forced ID to be "type/id" instead of just "id"
299 							 */
300 							ForcedId forcedId = resourceTable.getForcedId();
301 							if (forcedId != null) {
302 								if (isBlank(forcedId.getResourceType())) {
303 									ourLog.info("Updating resource {} forcedId type to {}", forcedId.getForcedId(), resourceTable.getResourceType());
304 									forcedId.setResourceType(resourceTable.getResourceType());
305 									myForcedIdDao.save(forcedId);
306 								}
307 							}
308 
309 							final IBaseResource resource = toResource(resourceTable, false);
310 
311 							Class<? extends IBaseResource> resourceClass = getContext().getResourceDefinition(resourceTable.getResourceType()).getImplementingClass();
312 							@SuppressWarnings("rawtypes") final IFhirResourceDao dao = getDaoOrThrowException(resourceClass);
313 							dao.reindex(resource, resourceTable);
314 							return null;
315 
316 						} catch (Exception e) {
317 							ourLog.error("Failed to index resource {}: {}", resourceTable.getIdDt(), e.toString(), e);
318 							theStatus.setRollbackOnly();
319 							return e;
320 						}
321 					}
322 				});
323 			} catch (ResourceVersionConflictException e) {
324 				/*
325 				 * We reindex in multiple threads, so it's technically possible that two threads try
326 				 * to index resources that cause a constraint error now (i.e. because a unique index has been
327 				 * added that didn't previously exist). In this case, one of the threads would succeed and
328 				 * not get this error, so we'll let the other one fail and try
329 				 * again later.
330 				 */
331 				ourLog.info("Failed to reindex {} because of a version conflict. Leaving in unindexed state: {}", e.getMessage());
332 				reindexFailure = null;
333 			}
334 
335 			if (reindexFailure != null) {
336 				txTemplate.execute(new TransactionCallbackWithoutResult() {
337 					@Override
338 					protected void doInTransactionWithoutResult(TransactionStatus theStatus) {
339 						ourLog.info("Setting resource PID[{}] status to ERRORED", myNextId);
340 						myResourceTableDao.updateStatusToErrored(myNextId);
341 					}
342 				});
343 			}
344 		}
345 	}
346 }