View Javadoc
1   package ca.uhn.fhir.jpa.dao;
2   
3   import ca.uhn.fhir.jpa.dao.data.IForcedIdDao;
4   import ca.uhn.fhir.jpa.dao.data.IResourceTableDao;
5   import ca.uhn.fhir.jpa.dao.data.ITermConceptDao;
6   import ca.uhn.fhir.jpa.entity.ForcedId;
7   import ca.uhn.fhir.jpa.entity.ResourceTable;
8   import ca.uhn.fhir.jpa.util.ExpungeOptions;
9   import ca.uhn.fhir.jpa.util.ExpungeOutcome;
10  import ca.uhn.fhir.jpa.util.ReindexFailureException;
11  import ca.uhn.fhir.jpa.util.ResourceCountCache;
12  import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
13  import ca.uhn.fhir.rest.api.server.IBundleProvider;
14  import ca.uhn.fhir.rest.api.server.RequestDetails;
15  import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
16  import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
17  import ca.uhn.fhir.rest.server.interceptor.IServerInterceptor.ActionRequestDetails;
18  import ca.uhn.fhir.util.StopWatch;
19  import org.apache.commons.lang3.concurrent.BasicThreadFactory;
20  import org.hibernate.search.util.impl.Executors;
21  import org.hl7.fhir.instance.model.api.IBaseResource;
22  import org.springframework.beans.factory.annotation.Autowired;
23  import org.springframework.beans.factory.annotation.Qualifier;
24  import org.springframework.data.domain.PageRequest;
25  import org.springframework.transaction.PlatformTransactionManager;
26  import org.springframework.transaction.TransactionStatus;
27  import org.springframework.transaction.annotation.Propagation;
28  import org.springframework.transaction.annotation.Transactional;
29  import org.springframework.transaction.support.TransactionCallback;
30  import org.springframework.transaction.support.TransactionCallbackWithoutResult;
31  import org.springframework.transaction.support.TransactionTemplate;
32  
33  import javax.annotation.Nonnull;
34  import javax.annotation.Nullable;
35  import javax.persistence.Query;
36  import java.util.*;
37  import java.util.concurrent.*;
38  import java.util.concurrent.locks.ReentrantLock;
39  
40  import static org.apache.commons.lang3.StringUtils.isBlank;
41  
42  /*
43   * #%L
44   * HAPI FHIR JPA Server
45   * %%
46   * Copyright (C) 2014 - 2018 University Health Network
47   * %%
48   * Licensed under the Apache License, Version 2.0 (the "License");
49   * you may not use this file except in compliance with the License.
50   * You may obtain a copy of the License at
51   * 
52   *      http://www.apache.org/licenses/LICENSE-2.0
53   * 
54   * Unless required by applicable law or agreed to in writing, software
55   * distributed under the License is distributed on an "AS IS" BASIS,
56   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
57   * See the License for the specific language governing permissions and
58   * limitations under the License.
59   * #L%
60   */
61  
62  public abstract class BaseHapiFhirSystemDao<T, MT> extends BaseHapiFhirDao<IBaseResource> implements IFhirSystemDao<T, MT> {
63  
64  	private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(BaseHapiFhirSystemDao.class);
65  	@Autowired
66  	@Qualifier("myResourceCountsCache")
67  	public ResourceCountCache myResourceCountsCache;
68  	@Autowired
69  	private IForcedIdDao myForcedIdDao;
70  	private ReentrantLock myReindexLock = new ReentrantLock(false);
71  	@Autowired
72  	private ITermConceptDao myTermConceptDao;
73  	@Autowired
74  	private ISearchParamRegistry mySearchParamRegistry;
75  	@Autowired
76  	private PlatformTransactionManager myTxManager;
77  	@Autowired
78  	private IResourceTableDao myResourceTableDao;
79  	private ThreadFactory myReindexingThreadFactory = new BasicThreadFactory.Builder().namingPattern("ResourceReindex-%d").build();
80  
81  	private int doPerformReindexingPass(final Integer theCount) {
82  		/*
83  		 * If any search parameters have been recently added or changed,
84  		 * this makes sure that the cache has been reloaded to reflect
85  		 * them.
86  		 */
87  		mySearchParamRegistry.refreshCacheIfNecessary();
88  
89  		TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
90  		txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRED);
91  		return doPerformReindexingPassForResources(theCount, txTemplate);
92  	}
93  
94  	@SuppressWarnings("ConstantConditions")
95  	private int doPerformReindexingPassForResources(final Integer theCount, TransactionTemplate txTemplate) {
96  
97  		// Determine the IDs needing reindexing
98  		List<Long> idsToReindex = txTemplate.execute(theStatus -> {
99  			int maxResult = 500;
100 			if (theCount != null) {
101 				maxResult = Math.min(theCount, 2000);
102 			}
103 			maxResult = Math.max(maxResult, 10);
104 
105 			ourLog.debug("Beginning indexing query with maximum {}", maxResult);
106 			return myResourceTableDao
107 				.findIdsOfResourcesRequiringReindexing(new PageRequest(0, maxResult))
108 				.getContent();
109 		});
110 
111 		// If no IDs need reindexing, we're good here
112 		if (idsToReindex.isEmpty()) {
113 			return 0;
114 		}
115 
116 		// Reindex
117 		StopWatch sw = new StopWatch();
118 
119 		// Execute each reindex in a task within a threadpool
120 		int threadCount = getConfig().getReindexThreadCount();
121 		RejectedExecutionHandler rejectHandler = new Executors.BlockPolicy();
122 		ThreadPoolExecutor executor = new ThreadPoolExecutor(threadCount, threadCount,
123 			0L, TimeUnit.MILLISECONDS,
124 			new LinkedBlockingQueue<>(),
125 			myReindexingThreadFactory,
126 			rejectHandler
127 			);
128 		List<Future<?>> futures = new ArrayList<>();
129 		for (Long nextId : idsToReindex) {
130 			futures.add(executor.submit(new ResourceReindexingTask(nextId)));
131 		}
132 		for (Future<?> next : futures) {
133 			try {
134 				next.get();
135 			} catch (Exception e) {
136 				throw new InternalErrorException("Failed to reindex: ", e);
137 			}
138 		}
139 		executor.shutdown();
140 
141 		ourLog.info("Reindexed {} resources in {} threads - {}ms/resource", idsToReindex.size(), threadCount, sw.getMillisPerOperation(idsToReindex.size()));
142 		return idsToReindex.size();
143 	}
144 
145 	@Override
146 	@Transactional(propagation = Propagation.REQUIRED)
147 	public ExpungeOutcome expunge(ExpungeOptions theExpungeOptions) {
148 		return doExpunge(null, null, null, theExpungeOptions);
149 	}
150 
151 	@Transactional(propagation = Propagation.REQUIRED)
152 	@Override
153 	public Map<String, Long> getResourceCounts() {
154 		Map<String, Long> retVal = new HashMap<>();
155 
156 		List<Map<?, ?>> counts = myResourceTableDao.getResourceCounts();
157 		for (Map<?, ?> next : counts) {
158 			retVal.put(next.get("type").toString(), Long.parseLong(next.get("count").toString()));
159 		}
160 
161 		return retVal;
162 	}
163 
164 	@Transactional(propagation = Propagation.SUPPORTS)
165 	@Nullable
166 	@Override
167 	public Map<String, Long> getResourceCountsFromCache() {
168 		return myResourceCountsCache.get();
169 	}
170 
171 	@Override
172 	public IBundleProvider history(Date theSince, Date theUntil, RequestDetails theRequestDetails) {
173 		if (theRequestDetails != null) {
174 			// Notify interceptors
175 			ActionRequestDetails requestDetails = new ActionRequestDetails(theRequestDetails);
176 			notifyInterceptors(RestOperationTypeEnum.HISTORY_SYSTEM, requestDetails);
177 		}
178 
179 		StopWatch w = new StopWatch();
180 		IBundleProvider retVal = super.history(null, null, theSince, theUntil);
181 		ourLog.info("Processed global history in {}ms", w.getMillisAndRestart());
182 		return retVal;
183 	}
184 
185 	@Transactional()
186 	@Override
187 	public int markAllResourcesForReindexing() {
188 
189 		ourLog.info("Marking all resources as needing reindexing");
190 		int retVal = myEntityManager.createQuery("UPDATE " + ResourceTable.class.getSimpleName() + " t SET t.myIndexStatus = null").executeUpdate();
191 
192 		ourLog.info("Marking all concepts as needing reindexing");
193 		retVal += myTermConceptDao.markAllForReindexing();
194 
195 		ourLog.info("Done marking reindexing");
196 		return retVal;
197 	}
198 
199 	private void markResourceAsIndexingFailed(final long theId) {
200 		TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
201 		txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
202 		txTemplate.execute(new TransactionCallback<Void>() {
203 			@Override
204 			public Void doInTransaction(@Nonnull TransactionStatus theStatus) {
205 				ourLog.info("Marking resource with PID {} as indexing_failed", new Object[] {theId});
206 				Query q = myEntityManager.createQuery("UPDATE ResourceTable t SET t.myIndexStatus = :status WHERE t.myId = :id");
207 				q.setParameter("status", INDEX_STATUS_INDEXING_FAILED);
208 				q.setParameter("id", theId);
209 				q.executeUpdate();
210 
211 				q = myEntityManager.createQuery("DELETE FROM ResourceTag t WHERE t.myResourceId = :id");
212 				q.setParameter("id", theId);
213 				q.executeUpdate();
214 
215 				q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamCoords t WHERE t.myResourcePid = :id");
216 				q.setParameter("id", theId);
217 				q.executeUpdate();
218 
219 				q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamDate t WHERE t.myResourcePid = :id");
220 				q.setParameter("id", theId);
221 				q.executeUpdate();
222 
223 				q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamNumber t WHERE t.myResourcePid = :id");
224 				q.setParameter("id", theId);
225 				q.executeUpdate();
226 
227 				q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamQuantity t WHERE t.myResourcePid = :id");
228 				q.setParameter("id", theId);
229 				q.executeUpdate();
230 
231 				q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamString t WHERE t.myResourcePid = :id");
232 				q.setParameter("id", theId);
233 				q.executeUpdate();
234 
235 				q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamToken t WHERE t.myResourcePid = :id");
236 				q.setParameter("id", theId);
237 				q.executeUpdate();
238 
239 				q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamUri t WHERE t.myResourcePid = :id");
240 				q.setParameter("id", theId);
241 				q.executeUpdate();
242 
243 				q = myEntityManager.createQuery("DELETE FROM ResourceLink t WHERE t.mySourceResourcePid = :id");
244 				q.setParameter("id", theId);
245 				q.executeUpdate();
246 
247 				q = myEntityManager.createQuery("DELETE FROM ResourceLink t WHERE t.myTargetResourcePid = :id");
248 				q.setParameter("id", theId);
249 				q.executeUpdate();
250 
251 				return null;
252 			}
253 		});
254 	}
255 
256 	@Override
257 	@Transactional(propagation = Propagation.NEVER)
258 	public Integer performReindexingPass(final Integer theCount) {
259 		if (!myReindexLock.tryLock()) {
260 			return null;
261 		}
262 		try {
263 			return doPerformReindexingPass(theCount);
264 		} catch (ReindexFailureException e) {
265 			ourLog.warn("Reindexing failed for resource {}", e.getResourceId());
266 			markResourceAsIndexingFailed(e.getResourceId());
267 			return -1;
268 		} finally {
269 			myReindexLock.unlock();
270 		}
271 	}
272 
273 	private class ResourceReindexingTask implements Runnable {
274 		private final Long myNextId;
275 
276 		public ResourceReindexingTask(Long theNextId) {
277 			myNextId = theNextId;
278 		}
279 
280 		@SuppressWarnings("unchecked")
281 		@Override
282 		public void run() {
283 			TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
284 			txTemplate.afterPropertiesSet();
285 
286 			Throwable reindexFailure;
287 			try {
288 				reindexFailure = txTemplate.execute(new TransactionCallback<Throwable>() {
289 					@Override
290 					public Throwable doInTransaction(TransactionStatus theStatus) {
291 						ResourceTable resourceTable = myResourceTableDao.findOne(myNextId);
292 
293 						try {
294 							/*
295 							 * This part is because from HAPI 1.5 - 1.6 we changed the format of forced ID to be "type/id" instead of just "id"
296 							 */
297 							ForcedId forcedId = resourceTable.getForcedId();
298 							if (forcedId != null) {
299 								if (isBlank(forcedId.getResourceType())) {
300 									ourLog.info("Updating resource {} forcedId type to {}", forcedId.getForcedId(), resourceTable.getResourceType());
301 									forcedId.setResourceType(resourceTable.getResourceType());
302 									myForcedIdDao.save(forcedId);
303 								}
304 							}
305 
306 							final IBaseResource resource = toResource(resourceTable, false);
307 
308 							@SuppressWarnings("rawtypes") final IFhirResourceDao dao = getDao(resource.getClass());
309 							dao.reindex(resource, resourceTable);
310 							return null;
311 
312 						} catch (Exception e) {
313 							ourLog.error("Failed to index resource {}: {}", resourceTable.getIdDt(), e.toString(), e);
314 							theStatus.setRollbackOnly();
315 							return e;
316 						}
317 					}
318 				});
319 			} catch (ResourceVersionConflictException e) {
320 				/*
321 				 * We reindex in multiple threads, so it's technically possible that two threads try
322 				 * to index resources that cause a constraint error now (i.e. because a unique index has been
323 				 * added that didn't previously exist). In this case, one of the threads would succeed and
324 				 * not get this error, so we'll let the other one fail and try
325 				 * again later.
326 				 */
327 				ourLog.info("Failed to reindex {} because of a version conflict. Leaving in unindexed state: {}", e.getMessage());
328 				reindexFailure = null;
329 			}
330 
331 			if (reindexFailure != null) {
332 				txTemplate.execute(new TransactionCallbackWithoutResult() {
333 					@Override
334 					protected void doInTransactionWithoutResult(TransactionStatus theStatus) {
335 						ourLog.info("Setting resource PID[{}] status to ERRORED", myNextId);
336 						myResourceTableDao.updateStatusToErrored(myNextId);
337 					}
338 				});
339 			}
340 		}
341 	}
342 }