View Javadoc
1   package ca.uhn.fhir.jpa.provider;
2   
3   /*-
4    * #%L
5    * HAPI FHIR JPA Server
6    * %%
7    * Copyright (C) 2014 - 2018 University Health Network
8    * %%
9    * Licensed under the Apache License, Version 2.0 (the "License");
10   * you may not use this file except in compliance with the License.
11   * You may obtain a copy of the License at
12   * 
13   *      http://www.apache.org/licenses/LICENSE-2.0
14   * 
15   * Unless required by applicable law or agreed to in writing, software
16   * distributed under the License is distributed on an "AS IS" BASIS,
17   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18   * See the License for the specific language governing permissions and
19   * limitations under the License.
20   * #L%
21   */
22  
23  import ca.uhn.fhir.context.FhirContext;
24  import ca.uhn.fhir.context.RuntimeResourceDefinition;
25  import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
26  import ca.uhn.fhir.jpa.dao.DaoRegistry;
27  import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
28  import ca.uhn.fhir.jpa.dao.SearchParameterMap;
29  import ca.uhn.fhir.jpa.search.ISearchCoordinatorSvc;
30  import ca.uhn.fhir.jpa.search.warm.CacheWarmingSvcImpl;
31  import ca.uhn.fhir.jpa.subscription.BaseSubscriptionInterceptor;
32  import ca.uhn.fhir.jpa.subscription.ResourceModifiedMessage;
33  import ca.uhn.fhir.jpa.util.JpaConstants;
34  import ca.uhn.fhir.rest.annotation.IdParam;
35  import ca.uhn.fhir.rest.annotation.Operation;
36  import ca.uhn.fhir.rest.annotation.OperationParam;
37  import ca.uhn.fhir.rest.api.CacheControlDirective;
38  import ca.uhn.fhir.rest.api.server.IBundleProvider;
39  import ca.uhn.fhir.rest.param.StringParam;
40  import ca.uhn.fhir.rest.param.UriParam;
41  import ca.uhn.fhir.rest.server.IResourceProvider;
42  import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
43  import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
44  import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
45  import ca.uhn.fhir.util.ParametersUtil;
46  import ca.uhn.fhir.util.StopWatch;
47  import ca.uhn.fhir.util.ValidateUtil;
48  import org.apache.commons.lang3.ObjectUtils;
49  import org.apache.commons.lang3.Validate;
50  import org.apache.commons.lang3.concurrent.BasicThreadFactory;
51  import org.apache.commons.lang3.time.DateUtils;
52  import org.apache.commons.lang3.tuple.Pair;
53  import org.hl7.fhir.instance.model.IdType;
54  import org.hl7.fhir.instance.model.api.IBaseParameters;
55  import org.hl7.fhir.instance.model.api.IBaseResource;
56  import org.hl7.fhir.instance.model.api.IIdType;
57  import org.hl7.fhir.instance.model.api.IPrimitiveType;
58  import org.slf4j.Logger;
59  import org.slf4j.LoggerFactory;
60  import org.springframework.beans.BeansException;
61  import org.springframework.beans.factory.annotation.Autowired;
62  import org.springframework.context.ApplicationContext;
63  import org.springframework.context.ApplicationContextAware;
64  import org.springframework.scheduling.annotation.Scheduled;
65  
66  import javax.annotation.PostConstruct;
67  import java.util.*;
68  import java.util.concurrent.*;
69  import java.util.stream.Collectors;
70  
71  import static org.apache.commons.lang3.StringUtils.isBlank;
72  import static org.apache.commons.lang3.StringUtils.isNotBlank;
73  
74  public class SubscriptionTriggeringProvider implements IResourceProvider, ApplicationContextAware {
75  
76  	public static final String RESOURCE_ID = "resourceId";
77  	public static final int DEFAULT_MAX_SUBMIT = 10000;
78  	public static final String SEARCH_URL = "searchUrl";
79  	private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTriggeringProvider.class);
80  	private final List<SubscriptionTriggeringJobDetails> myActiveJobs = new ArrayList<>();
81  	@Autowired
82  	private FhirContext myFhirContext;
83  	@Autowired
84  	private DaoRegistry myDaoRegistry;
85  	private List<BaseSubscriptionInterceptor<?>> mySubscriptionInterceptorList;
86  	private int myMaxSubmitPerPass = DEFAULT_MAX_SUBMIT;
87  	@Autowired
88  	private ISearchCoordinatorSvc mySearchCoordinatorSvc;
89  	private ApplicationContext myAppCtx;
90  	private ExecutorService myExecutorService;
91  
92  	/**
93  	 * Sets the maximum number of resources that will be submitted in a single pass
94  	 */
95  	public void setMaxSubmitPerPass(Integer theMaxSubmitPerPass) {
96  		Integer maxSubmitPerPass = theMaxSubmitPerPass;
97  		if (maxSubmitPerPass == null) {
98  			maxSubmitPerPass = DEFAULT_MAX_SUBMIT;
99  		}
100 		Validate.isTrue(maxSubmitPerPass > 0, "theMaxSubmitPerPass must be > 0");
101 		myMaxSubmitPerPass = maxSubmitPerPass;
102 	}
103 
104 	@SuppressWarnings("unchecked")
105 	@PostConstruct
106 	public void start() {
107 		mySubscriptionInterceptorList = ObjectUtils.defaultIfNull(mySubscriptionInterceptorList, Collections.emptyList());
108 		mySubscriptionInterceptorList = new ArrayList<>();
109 		Collection values1 = myAppCtx.getBeansOfType(BaseSubscriptionInterceptor.class).values();
110 		Collection<BaseSubscriptionInterceptor<?>> values = (Collection<BaseSubscriptionInterceptor<?>>) values1;
111 		mySubscriptionInterceptorList.addAll(values);
112 
113 
114 		LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>(1000);
115 		BasicThreadFactory threadFactory = new BasicThreadFactory.Builder()
116 			.namingPattern("SubscriptionTriggering-%d")
117 			.daemon(false)
118 			.priority(Thread.NORM_PRIORITY)
119 			.build();
120 		RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
121 			@Override
122 			public void rejectedExecution(Runnable theRunnable, ThreadPoolExecutor theExecutor) {
123 				ourLog.info("Note: Subscription triggering queue is full ({} elements), waiting for a slot to become available!", executorQueue.size());
124 				StopWatch sw = new StopWatch();
125 				try {
126 					executorQueue.put(theRunnable);
127 				} catch (InterruptedException theE) {
128 					throw new RejectedExecutionException("Task " + theRunnable.toString() +
129 						" rejected from " + theE.toString());
130 				}
131 				ourLog.info("Slot become available after {}ms", sw.getMillis());
132 			}
133 		};
134 		myExecutorService = new ThreadPoolExecutor(
135 			0,
136 			10,
137 			0L,
138 			TimeUnit.MILLISECONDS,
139 			executorQueue,
140 			threadFactory,
141 			rejectedExecutionHandler);
142 
143 	}
144 
145 	@Operation(name = JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION)
146 	public IBaseParameters triggerSubscription(
147 		@OperationParam(name = RESOURCE_ID, min = 0, max = OperationParam.MAX_UNLIMITED) List<UriParam> theResourceIds,
148 		@OperationParam(name = SEARCH_URL, min = 0, max = OperationParam.MAX_UNLIMITED) List<StringParam> theSearchUrls
149 	) {
150 		return doTriggerSubscription(theResourceIds, theSearchUrls, null);
151 	}
152 
153 	@Operation(name = JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION)
154 	public IBaseParameters triggerSubscription(
155 		@IdParam IIdType theSubscriptionId,
156 		@OperationParam(name = RESOURCE_ID, min = 0, max = OperationParam.MAX_UNLIMITED) List<UriParam> theResourceIds,
157 		@OperationParam(name = SEARCH_URL, min = 0, max = OperationParam.MAX_UNLIMITED) List<StringParam> theSearchUrls
158 	) {
159 
160 		// Throw a 404 if the subscription doesn't exist
161 		IFhirResourceDao<?> subscriptionDao = myDaoRegistry.getResourceDao("Subscription");
162 		IIdType subscriptionId = theSubscriptionId;
163 		if (subscriptionId.hasResourceType() == false) {
164 			subscriptionId = subscriptionId.withResourceType("Subscription");
165 		}
166 		subscriptionDao.read(subscriptionId);
167 
168 		return doTriggerSubscription(theResourceIds, theSearchUrls, subscriptionId);
169 
170 	}
171 
172 	private IBaseParameters doTriggerSubscription(@OperationParam(name = RESOURCE_ID, min = 0, max = OperationParam.MAX_UNLIMITED) List<UriParam> theResourceIds, @OperationParam(name = SEARCH_URL, min = 0, max = OperationParam.MAX_UNLIMITED) List<StringParam> theSearchUrls, @IdParam IIdType theSubscriptionId) {
173 		if (mySubscriptionInterceptorList.isEmpty()) {
174 			throw new PreconditionFailedException("Subscription processing not active on this server");
175 		}
176 
177 		List<UriParam> resourceIds = ObjectUtils.defaultIfNull(theResourceIds, Collections.emptyList());
178 		List<StringParam> searchUrls = ObjectUtils.defaultIfNull(theSearchUrls, Collections.emptyList());
179 
180 		// Make sure we have at least one resource ID or search URL
181 		if (resourceIds.size() == 0 && searchUrls.size() == 0) {
182 			throw new InvalidRequestException("No resource IDs or search URLs specified for triggering");
183 		}
184 
185 		// Resource URLs must be compete
186 		for (UriParam next : resourceIds) {
187 			IdType resourceId = new IdType(next.getValue());
188 			ValidateUtil.isTrueOrThrowInvalidRequest(resourceId.hasResourceType(), RESOURCE_ID + " parameter must have resource type");
189 			ValidateUtil.isTrueOrThrowInvalidRequest(resourceId.hasIdPart(), RESOURCE_ID + " parameter must have resource ID part");
190 		}
191 
192 		// Search URLs must be valid
193 		for (StringParam next : searchUrls) {
194 			if (next.getValue().contains("?") == false) {
195 				throw new InvalidRequestException("Search URL is not valid (must be in the form \"[resource type]?[optional params]\")");
196 			}
197 		}
198 
199 		SubscriptionTriggeringJobDetails jobDetails = new SubscriptionTriggeringJobDetails();
200 		jobDetails.setJobId(UUID.randomUUID().toString());
201 		jobDetails.setRemainingResourceIds(resourceIds.stream().map(UriParam::getValue).collect(Collectors.toList()));
202 		jobDetails.setRemainingSearchUrls(searchUrls.stream().map(StringParam::getValue).collect(Collectors.toList()));
203 		if (theSubscriptionId != null) {
204 			jobDetails.setSubscriptionId(theSubscriptionId.toUnqualifiedVersionless().getValue());
205 		}
206 
207 		// Submit job for processing
208 		synchronized (myActiveJobs) {
209 			myActiveJobs.add(jobDetails);
210 		}
211 		ourLog.info("Subscription triggering requested for {} resource and {} search - Gave job ID: {}", resourceIds.size(), searchUrls.size(), jobDetails.getJobId());
212 
213 		// Create a parameters response
214 		IBaseParameters retVal = ParametersUtil.newInstance(myFhirContext);
215 		IPrimitiveType<?> value = (IPrimitiveType<?>) myFhirContext.getElementDefinition("string").newInstance();
216 		value.setValueAsString("Subscription triggering job submitted as JOB ID: " + jobDetails.myJobId);
217 		ParametersUtil.addParameterToParameters(myFhirContext, retVal, "information", value);
218 		return retVal;
219 	}
220 
221 	@Override
222 	public Class<? extends IBaseResource> getResourceType() {
223 		return myFhirContext.getResourceDefinition("Subscription").getImplementingClass();
224 	}
225 
226 	@Scheduled(fixedDelay = DateUtils.MILLIS_PER_SECOND)
227 	public void runDeliveryPass() {
228 
229 		synchronized (myActiveJobs) {
230 			if (myActiveJobs.isEmpty()) {
231 				return;
232 			}
233 
234 			String activeJobIds = myActiveJobs.stream().map(t->t.getJobId()).collect(Collectors.joining(", "));
235 			ourLog.info("Starting pass: currently have {} active job IDs: {}", myActiveJobs.size(), activeJobIds);
236 
237 			SubscriptionTriggeringJobDetails activeJob = myActiveJobs.get(0);
238 
239 			runJob(activeJob);
240 
241 			// If the job is complete, remove it from the queue
242 			if (activeJob.getRemainingResourceIds().isEmpty()) {
243 				if (activeJob.getRemainingSearchUrls().isEmpty()) {
244 					if (isBlank(activeJob.myCurrentSearchUuid)) {
245 						myActiveJobs.remove(0);
246 						String remainingJobsMsg = "";
247 						if (myActiveJobs.size() > 0) {
248 							remainingJobsMsg = "(" + myActiveJobs.size() + " jobs remaining)";
249 						}
250 						ourLog.info("Subscription triggering job {} is complete{}", activeJob.getJobId(), remainingJobsMsg);
251 					}
252 				}
253 			}
254 
255 		}
256 
257 	}
258 
259 	private void runJob(SubscriptionTriggeringJobDetails theJobDetails) {
260 		StopWatch sw = new StopWatch();
261 		ourLog.info("Starting pass of subscription triggering job {}", theJobDetails.getJobId());
262 
263 		// Submit individual resources
264 		int totalSubmitted = 0;
265 		List<Pair<String, Future<Void>>> futures = new ArrayList<>();
266 		while (theJobDetails.getRemainingResourceIds().size() > 0 && totalSubmitted < myMaxSubmitPerPass) {
267 			totalSubmitted++;
268 			String nextResourceId = theJobDetails.getRemainingResourceIds().remove(0);
269 			Future<Void> future = submitResource(theJobDetails.getSubscriptionId(), nextResourceId);
270 			futures.add(Pair.of(nextResourceId, future));
271 		}
272 
273 		// Make sure these all succeeded in submitting
274 		if (validateFuturesAndReturnTrueIfWeShouldAbort(futures)) {
275 			return;
276 		}
277 
278 		// If we don't have an active search started, and one needs to be.. start it
279 		if (isBlank(theJobDetails.getCurrentSearchUuid()) && theJobDetails.getRemainingSearchUrls().size() > 0 && totalSubmitted < myMaxSubmitPerPass) {
280 			String nextSearchUrl = theJobDetails.getRemainingSearchUrls().remove(0);
281 			RuntimeResourceDefinition resourceDef = CacheWarmingSvcImpl.parseUrlResourceType(myFhirContext, nextSearchUrl);
282 			String queryPart = nextSearchUrl.substring(nextSearchUrl.indexOf('?'));
283 			String resourceType = resourceDef.getName();
284 
285 			IFhirResourceDao<?> callingDao = myDaoRegistry.getResourceDao(resourceType);
286 			SearchParameterMap params = BaseHapiFhirDao.translateMatchUrl(callingDao, myFhirContext, queryPart, resourceDef);
287 
288 			ourLog.info("Triggering job[{}] is starting a search for {}", theJobDetails.getJobId(), nextSearchUrl);
289 
290 			IBundleProvider search = mySearchCoordinatorSvc.registerSearch(callingDao, params, resourceType, new CacheControlDirective());
291 			theJobDetails.setCurrentSearchUuid(search.getUuid());
292 			theJobDetails.setCurrentSearchResourceType(resourceType);
293 			theJobDetails.setCurrentSearchCount(params.getCount());
294 			theJobDetails.setCurrentSearchLastUploadedIndex(-1);
295 		}
296 
297 		// If we have an active search going, submit resources from it
298 		if (isNotBlank(theJobDetails.getCurrentSearchUuid()) && totalSubmitted < myMaxSubmitPerPass) {
299 			int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1;
300 
301 			IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(theJobDetails.getCurrentSearchResourceType());
302 
303 			int maxQuerySize = myMaxSubmitPerPass - totalSubmitted;
304 			int toIndex = fromIndex + maxQuerySize;
305 			if (theJobDetails.getCurrentSearchCount() != null) {
306 				toIndex = Math.min(toIndex, theJobDetails.getCurrentSearchCount());
307 			}
308 			ourLog.info("Triggering job[{}] search {} requesting resources {} - {}", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex);
309 			List<Long> resourceIds = mySearchCoordinatorSvc.getResources(theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex);
310 
311 			ourLog.info("Triggering job[{}] delivering {} resources", theJobDetails.getJobId(), resourceIds.size());
312 			int highestIndexSubmitted = theJobDetails.getCurrentSearchLastUploadedIndex();
313 
314 			for (Long next : resourceIds) {
315 				IBaseResource nextResource = resourceDao.readByPid(next);
316 				Future<Void> future = submitResource(theJobDetails.getSubscriptionId(), nextResource);
317 				futures.add(Pair.of(nextResource.getIdElement().getIdPart(), future));
318 				totalSubmitted++;
319 				highestIndexSubmitted++;
320 			}
321 
322 			if (validateFuturesAndReturnTrueIfWeShouldAbort(futures)) {
323 				return;
324 			}
325 
326 			theJobDetails.setCurrentSearchLastUploadedIndex(highestIndexSubmitted);
327 
328 			if (resourceIds.size() == 0 || (theJobDetails.getCurrentSearchCount() != null && toIndex >= theJobDetails.getCurrentSearchCount())) {
329 				ourLog.info("Triggering job[{}] search {} has completed ", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid());
330 				theJobDetails.setCurrentSearchResourceType(null);
331 				theJobDetails.setCurrentSearchUuid(null);
332 				theJobDetails.setCurrentSearchLastUploadedIndex(-1);
333 				theJobDetails.setCurrentSearchCount(null);
334 			}
335 		}
336 
337 		ourLog.info("Subscription trigger job[{}] triggered {} resources in {}ms ({} res / second)", theJobDetails.getJobId(), totalSubmitted, sw.getMillis(), sw.getThroughput(totalSubmitted, TimeUnit.SECONDS));
338 	}
339 
340 	private boolean validateFuturesAndReturnTrueIfWeShouldAbort(List<Pair<String, Future<Void>>> theIdToFutures) {
341 
342 		for (Pair<String, Future<Void>> next : theIdToFutures) {
343 			String nextDeliveredId = next.getKey();
344 			try {
345 				Future<Void> nextFuture = next.getValue();
346 				nextFuture.get();
347 				ourLog.info("Finished redelivering {}", nextDeliveredId);
348 			} catch (Exception e) {
349 				ourLog.error("Failure triggering resource " + nextDeliveredId, e);
350 				return true;
351 			}
352 		}
353 
354 		// Clear the list since it will potentially get reused
355 		theIdToFutures.clear();
356 		return false;
357 	}
358 
359 	private Future<Void> submitResource(String theSubscriptionId, String theResourceIdToTrigger) {
360 		org.hl7.fhir.r4.model.IdType resourceId = new org.hl7.fhir.r4.model.IdType(theResourceIdToTrigger);
361 		IFhirResourceDao<? extends IBaseResource> dao = myDaoRegistry.getResourceDao(resourceId.getResourceType());
362 		IBaseResource resourceToTrigger = dao.read(resourceId);
363 
364 		return submitResource(theSubscriptionId, resourceToTrigger);
365 	}
366 
367 	private Future<Void> submitResource(String theSubscriptionId, IBaseResource theResourceToTrigger) {
368 
369 		ourLog.info("Submitting resource {} to subscription {}", theResourceToTrigger.getIdElement().toUnqualifiedVersionless().getValue(), theSubscriptionId);
370 
371 		ResourceModifiedMessage msg = new ResourceModifiedMessage();
372 		msg.setId(theResourceToTrigger.getIdElement());
373 		msg.setOperationType(ResourceModifiedMessage.OperationTypeEnum.UPDATE);
374 		msg.setSubscriptionId(new IdType(theSubscriptionId).toUnqualifiedVersionless().getValue());
375 		msg.setNewPayload(myFhirContext, theResourceToTrigger);
376 
377 		return myExecutorService.submit(()->{
378 			for (int i = 0; ; i++) {
379 				try {
380 					for (BaseSubscriptionInterceptor<?> next : mySubscriptionInterceptorList) {
381 						next.submitResourceModified(msg);
382 					}
383 					break;
384 				} catch (Exception e) {
385 					if (i >= 3) {
386 						throw new InternalErrorException(e);
387 					}
388 
389 					ourLog.warn("Exception while retriggering subscriptions (going to sleep and retry): {}", e.toString());
390 					Thread.sleep(1000);
391 				}
392 			}
393 
394 			return null;
395 		});
396 
397 	}
398 
399 	public void cancelAll() {
400 		synchronized (myActiveJobs) {
401 			myActiveJobs.clear();
402 		}
403 	}
404 
405 	@Override
406 	public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
407 		myAppCtx = applicationContext;
408 	}
409 
410 	private static class SubscriptionTriggeringJobDetails {
411 
412 		private String myJobId;
413 		private String mySubscriptionId;
414 		private List<String> myRemainingResourceIds;
415 		private List<String> myRemainingSearchUrls;
416 		private String myCurrentSearchUuid;
417 		private Integer myCurrentSearchCount;
418 		private String myCurrentSearchResourceType;
419 		private int myCurrentSearchLastUploadedIndex;
420 
421 		public Integer getCurrentSearchCount() {
422 			return myCurrentSearchCount;
423 		}
424 
425 		public void setCurrentSearchCount(Integer theCurrentSearchCount) {
426 			myCurrentSearchCount = theCurrentSearchCount;
427 		}
428 
429 		public String getCurrentSearchResourceType() {
430 			return myCurrentSearchResourceType;
431 		}
432 
433 		public void setCurrentSearchResourceType(String theCurrentSearchResourceType) {
434 			myCurrentSearchResourceType = theCurrentSearchResourceType;
435 		}
436 
437 		public String getJobId() {
438 			return myJobId;
439 		}
440 
441 		public void setJobId(String theJobId) {
442 			myJobId = theJobId;
443 		}
444 
445 		public String getSubscriptionId() {
446 			return mySubscriptionId;
447 		}
448 
449 		public void setSubscriptionId(String theSubscriptionId) {
450 			mySubscriptionId = theSubscriptionId;
451 		}
452 
453 		public List<String> getRemainingResourceIds() {
454 			return myRemainingResourceIds;
455 		}
456 
457 		public void setRemainingResourceIds(List<String> theRemainingResourceIds) {
458 			myRemainingResourceIds = theRemainingResourceIds;
459 		}
460 
461 		public List<String> getRemainingSearchUrls() {
462 			return myRemainingSearchUrls;
463 		}
464 
465 		public void setRemainingSearchUrls(List<String> theRemainingSearchUrls) {
466 			myRemainingSearchUrls = theRemainingSearchUrls;
467 		}
468 
469 		public String getCurrentSearchUuid() {
470 			return myCurrentSearchUuid;
471 		}
472 
473 		public void setCurrentSearchUuid(String theCurrentSearchUuid) {
474 			myCurrentSearchUuid = theCurrentSearchUuid;
475 		}
476 
477 		public int getCurrentSearchLastUploadedIndex() {
478 			return myCurrentSearchLastUploadedIndex;
479 		}
480 
481 		public void setCurrentSearchLastUploadedIndex(int theCurrentSearchLastUploadedIndex) {
482 			myCurrentSearchLastUploadedIndex = theCurrentSearchLastUploadedIndex;
483 		}
484 	}
485 
486 }