View Javadoc
1   package ca.uhn.fhir.jpa.subscription;
2   
3   /*-
4    * #%L
5    * HAPI FHIR JPA Server
6    * %%
7    * Copyright (C) 2014 - 2019 University Health Network
8    * %%
9    * Licensed under the Apache License, Version 2.0 (the "License");
10   * you may not use this file except in compliance with the License.
11   * You may obtain a copy of the License at
12   *
13   *      http://www.apache.org/licenses/LICENSE-2.0
14   *
15   * Unless required by applicable law or agreed to in writing, software
16   * distributed under the License is distributed on an "AS IS" BASIS,
17   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18   * See the License for the specific language governing permissions and
19   * limitations under the License.
20   * #L%
21   */
22  
23  import ca.uhn.fhir.context.FhirContext;
24  import ca.uhn.fhir.context.RuntimeResourceDefinition;
25  import ca.uhn.fhir.jpa.dao.DaoConfig;
26  import ca.uhn.fhir.jpa.dao.DaoRegistry;
27  import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
28  import ca.uhn.fhir.jpa.provider.SubscriptionTriggeringProvider;
29  import ca.uhn.fhir.jpa.search.ISearchCoordinatorSvc;
30  import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
31  import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
32  import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
33  import ca.uhn.fhir.model.dstu2.valueset.ResourceTypeEnum;
34  import ca.uhn.fhir.rest.annotation.IdParam;
35  import ca.uhn.fhir.rest.api.CacheControlDirective;
36  import ca.uhn.fhir.rest.api.server.IBundleProvider;
37  import ca.uhn.fhir.rest.param.StringParam;
38  import ca.uhn.fhir.rest.param.UriParam;
39  import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
40  import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
41  import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
42  import ca.uhn.fhir.util.ParametersUtil;
43  import ca.uhn.fhir.util.StopWatch;
44  import ca.uhn.fhir.util.UrlUtil;
45  import ca.uhn.fhir.util.ValidateUtil;
46  import org.apache.commons.lang3.ObjectUtils;
47  import org.apache.commons.lang3.Validate;
48  import org.apache.commons.lang3.concurrent.BasicThreadFactory;
49  import org.apache.commons.lang3.time.DateUtils;
50  import org.apache.commons.lang3.tuple.Pair;
51  import org.hl7.fhir.dstu2.model.IdType;
52  import org.hl7.fhir.instance.model.api.IBaseParameters;
53  import org.hl7.fhir.instance.model.api.IBaseResource;
54  import org.hl7.fhir.instance.model.api.IIdType;
55  import org.hl7.fhir.instance.model.api.IPrimitiveType;
56  import org.slf4j.Logger;
57  import org.slf4j.LoggerFactory;
58  import org.springframework.beans.factory.annotation.Autowired;
59  import org.springframework.scheduling.annotation.Scheduled;
60  import org.springframework.stereotype.Service;
61  
62  import javax.annotation.PostConstruct;
63  import java.util.ArrayList;
64  import java.util.Collections;
65  import java.util.List;
66  import java.util.UUID;
67  import java.util.concurrent.*;
68  import java.util.stream.Collectors;
69  
70  import static ca.uhn.fhir.jpa.provider.SubscriptionTriggeringProvider.RESOURCE_ID;
71  import static org.apache.commons.lang3.StringUtils.isBlank;
72  import static org.apache.commons.lang3.StringUtils.isNotBlank;
73  
74  @Service
75  public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc {
76  	private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTriggeringProvider.class);
77  
78  	private static final int DEFAULT_MAX_SUBMIT = 10000;
79  
80  	@Autowired
81  	private FhirContext myFhirContext;
82  	@Autowired
83  	private DaoRegistry myDaoRegistry;
84  	@Autowired
85  	private DaoConfig myDaoConfig;
86  	@Autowired
87  	private ISearchCoordinatorSvc mySearchCoordinatorSvc;
88  	@Autowired
89  	private MatchUrlService myMatchUrlService;
90  	@Autowired
91  	private IResourceModifiedConsumer myResourceModifiedConsumer;
92  
93  	private final List<SubscriptionTriggeringJobDetails> myActiveJobs = new ArrayList<>();
94  	private int myMaxSubmitPerPass = DEFAULT_MAX_SUBMIT;
95  	private ExecutorService myExecutorService;
96  
97  	@Override
98  	public IBaseParameters triggerSubscription(List<UriParam> theResourceIds, List<StringParam> theSearchUrls, @IdParam IIdType theSubscriptionId) {
99  		if (myDaoConfig.getSupportedSubscriptionTypes().isEmpty()) {
100 			throw new PreconditionFailedException("Subscription processing not active on this server");
101 		}
102 
103 		// Throw a 404 if the subscription doesn't exist
104 		if (theSubscriptionId != null) {
105 			IFhirResourceDao<?> subscriptionDao = myDaoRegistry.getSubscriptionDao();
106 			IIdType subscriptionId = theSubscriptionId;
107 			if (!subscriptionId.hasResourceType()) {
108 				subscriptionId = subscriptionId.withResourceType(ResourceTypeEnum.SUBSCRIPTION.getCode());
109 			}
110 			subscriptionDao.read(subscriptionId);
111 		}
112 
113 		List<UriParam> resourceIds = ObjectUtils.defaultIfNull(theResourceIds, Collections.emptyList());
114 		List<StringParam> searchUrls = ObjectUtils.defaultIfNull(theSearchUrls, Collections.emptyList());
115 
116 		// Make sure we have at least one resource ID or search URL
117 		if (resourceIds.size() == 0 && searchUrls.size() == 0) {
118 			throw new InvalidRequestException("No resource IDs or search URLs specified for triggering");
119 		}
120 
121 		// Resource URLs must be compete
122 		for (UriParam next : resourceIds) {
123 			IdType resourceId = new IdType(next.getValue());
124 			ValidateUtil.isTrueOrThrowInvalidRequest(resourceId.hasResourceType(), RESOURCE_ID + " parameter must have resource type");
125 			ValidateUtil.isTrueOrThrowInvalidRequest(resourceId.hasIdPart(), RESOURCE_ID + " parameter must have resource ID part");
126 		}
127 
128 		// Search URLs must be valid
129 		for (StringParam next : searchUrls) {
130 			if (!next.getValue().contains("?")) {
131 				throw new InvalidRequestException("Search URL is not valid (must be in the form \"[resource type]?[optional params]\")");
132 			}
133 		}
134 
135 		SubscriptionTriggeringJobDetails jobDetails = new SubscriptionTriggeringJobDetails();
136 		jobDetails.setJobId(UUID.randomUUID().toString());
137 		jobDetails.setRemainingResourceIds(resourceIds.stream().map(UriParam::getValue).collect(Collectors.toList()));
138 		jobDetails.setRemainingSearchUrls(searchUrls.stream().map(StringParam::getValue).collect(Collectors.toList()));
139 		if (theSubscriptionId != null) {
140 			jobDetails.setSubscriptionId(theSubscriptionId.toUnqualifiedVersionless().getValue());
141 		}
142 
143 		// Submit job for processing
144 		synchronized (myActiveJobs) {
145 			myActiveJobs.add(jobDetails);
146 		}
147 		ourLog.info("Subscription triggering requested for {} resource and {} search - Gave job ID: {}", resourceIds.size(), searchUrls.size(), jobDetails.getJobId());
148 
149 		// Create a parameters response
150 		IBaseParameters retVal = ParametersUtil.newInstance(myFhirContext);
151 		IPrimitiveType<?> value = (IPrimitiveType<?>) myFhirContext.getElementDefinition("string").newInstance();
152 		value.setValueAsString("Subscription triggering job submitted as JOB ID: " + jobDetails.myJobId);
153 		ParametersUtil.addParameterToParameters(myFhirContext, retVal, "information", value);
154 		return retVal;
155 	}
156 
157 	@Scheduled(fixedDelay = DateUtils.MILLIS_PER_SECOND)
158 	public void runDeliveryPass() {
159 
160 		synchronized (myActiveJobs) {
161 			if (myActiveJobs.isEmpty()) {
162 				return;
163 			}
164 
165 			String activeJobIds = myActiveJobs.stream().map(SubscriptionTriggeringJobDetails::getJobId).collect(Collectors.joining(", "));
166 			ourLog.info("Starting pass: currently have {} active job IDs: {}", myActiveJobs.size(), activeJobIds);
167 
168 			SubscriptionTriggeringJobDetails activeJob = myActiveJobs.get(0);
169 
170 			runJob(activeJob);
171 
172 			// If the job is complete, remove it from the queue
173 			if (activeJob.getRemainingResourceIds().isEmpty()) {
174 				if (activeJob.getRemainingSearchUrls().isEmpty()) {
175 					if (isBlank(activeJob.myCurrentSearchUuid)) {
176 						myActiveJobs.remove(0);
177 						String remainingJobsMsg = "";
178 						if (myActiveJobs.size() > 0) {
179 							remainingJobsMsg = "(" + myActiveJobs.size() + " jobs remaining)";
180 						}
181 						ourLog.info("Subscription triggering job {} is complete{}", activeJob.getJobId(), remainingJobsMsg);
182 					}
183 				}
184 			}
185 
186 		}
187 
188 	}
189 
190 	private void runJob(SubscriptionTriggeringJobDetails theJobDetails) {
191 		StopWatch sw = new StopWatch();
192 		ourLog.info("Starting pass of subscription triggering job {}", theJobDetails.getJobId());
193 
194 		// Submit individual resources
195 		int totalSubmitted = 0;
196 		List<Pair<String, Future<Void>>> futures = new ArrayList<>();
197 		while (theJobDetails.getRemainingResourceIds().size() > 0 && totalSubmitted < myMaxSubmitPerPass) {
198 			totalSubmitted++;
199 			String nextResourceId = theJobDetails.getRemainingResourceIds().remove(0);
200 			Future<Void> future = submitResource(theJobDetails.getSubscriptionId(), nextResourceId);
201 			futures.add(Pair.of(nextResourceId, future));
202 		}
203 
204 		// Make sure these all succeeded in submitting
205 		if (validateFuturesAndReturnTrueIfWeShouldAbort(futures)) {
206 			return;
207 		}
208 
209 		// If we don't have an active search started, and one needs to be.. start it
210 		if (isBlank(theJobDetails.getCurrentSearchUuid()) && theJobDetails.getRemainingSearchUrls().size() > 0 && totalSubmitted < myMaxSubmitPerPass) {
211 			String nextSearchUrl = theJobDetails.getRemainingSearchUrls().remove(0);
212 			RuntimeResourceDefinition resourceDef = UrlUtil.parseUrlResourceType(myFhirContext, nextSearchUrl);
213 			String queryPart = nextSearchUrl.substring(nextSearchUrl.indexOf('?'));
214 			String resourceType = resourceDef.getName();
215 
216 			IFhirResourceDao<?> callingDao = myDaoRegistry.getResourceDao(resourceType);
217 			SearchParameterMap params = myMatchUrlService.translateMatchUrl(queryPart, resourceDef);
218 
219 			ourLog.info("Triggering job[{}] is starting a search for {}", theJobDetails.getJobId(), nextSearchUrl);
220 
221 			IBundleProvider search = mySearchCoordinatorSvc.registerSearch(callingDao, params, resourceType, new CacheControlDirective(), null);
222 			theJobDetails.setCurrentSearchUuid(search.getUuid());
223 			theJobDetails.setCurrentSearchResourceType(resourceType);
224 			theJobDetails.setCurrentSearchCount(params.getCount());
225 			theJobDetails.setCurrentSearchLastUploadedIndex(-1);
226 		}
227 
228 		// If we have an active search going, submit resources from it
229 		if (isNotBlank(theJobDetails.getCurrentSearchUuid()) && totalSubmitted < myMaxSubmitPerPass) {
230 			int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1;
231 
232 			IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(theJobDetails.getCurrentSearchResourceType());
233 
234 			int maxQuerySize = myMaxSubmitPerPass - totalSubmitted;
235 			int toIndex = fromIndex + maxQuerySize;
236 			if (theJobDetails.getCurrentSearchCount() != null) {
237 				toIndex = Math.min(toIndex, theJobDetails.getCurrentSearchCount());
238 			}
239 			ourLog.info("Triggering job[{}] search {} requesting resources {} - {}", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex);
240 			List<Long> resourceIds = mySearchCoordinatorSvc.getResources(theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex, null);
241 
242 			ourLog.info("Triggering job[{}] delivering {} resources", theJobDetails.getJobId(), resourceIds.size());
243 			int highestIndexSubmitted = theJobDetails.getCurrentSearchLastUploadedIndex();
244 
245 			for (Long next : resourceIds) {
246 				IBaseResource nextResource = resourceDao.readByPid(next);
247 				Future<Void> future = submitResource(theJobDetails.getSubscriptionId(), nextResource);
248 				futures.add(Pair.of(nextResource.getIdElement().getIdPart(), future));
249 				totalSubmitted++;
250 				highestIndexSubmitted++;
251 			}
252 
253 			if (validateFuturesAndReturnTrueIfWeShouldAbort(futures)) {
254 				return;
255 			}
256 
257 			theJobDetails.setCurrentSearchLastUploadedIndex(highestIndexSubmitted);
258 
259 			if (resourceIds.size() == 0 || (theJobDetails.getCurrentSearchCount() != null && toIndex >= theJobDetails.getCurrentSearchCount())) {
260 				ourLog.info("Triggering job[{}] search {} has completed ", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid());
261 				theJobDetails.setCurrentSearchResourceType(null);
262 				theJobDetails.setCurrentSearchUuid(null);
263 				theJobDetails.setCurrentSearchLastUploadedIndex(-1);
264 				theJobDetails.setCurrentSearchCount(null);
265 			}
266 		}
267 
268 		ourLog.info("Subscription trigger job[{}] triggered {} resources in {}ms ({} res / second)", theJobDetails.getJobId(), totalSubmitted, sw.getMillis(), sw.getThroughput(totalSubmitted, TimeUnit.SECONDS));
269 	}
270 
271 	private boolean validateFuturesAndReturnTrueIfWeShouldAbort(List<Pair<String, Future<Void>>> theIdToFutures) {
272 
273 		for (Pair<String, Future<Void>> next : theIdToFutures) {
274 			String nextDeliveredId = next.getKey();
275 			try {
276 				Future<Void> nextFuture = next.getValue();
277 				nextFuture.get();
278 				ourLog.info("Finished redelivering {}", nextDeliveredId);
279 			} catch (Exception e) {
280 				ourLog.error("Failure triggering resource " + nextDeliveredId, e);
281 				return true;
282 			}
283 		}
284 
285 		// Clear the list since it will potentially get reused
286 		theIdToFutures.clear();
287 		return false;
288 	}
289 
290 	private Future<Void> submitResource(String theSubscriptionId, String theResourceIdToTrigger) {
291 		org.hl7.fhir.r4.model.IdType resourceId = new org.hl7.fhir.r4.model.IdType(theResourceIdToTrigger);
292 		IFhirResourceDao dao = myDaoRegistry.getResourceDao(resourceId.getResourceType());
293 		IBaseResource resourceToTrigger = dao.read(resourceId);
294 
295 		return submitResource(theSubscriptionId, resourceToTrigger);
296 	}
297 
298 	private Future<Void> submitResource(String theSubscriptionId, IBaseResource theResourceToTrigger) {
299 
300 		ourLog.info("Submitting resource {} to subscription {}", theResourceToTrigger.getIdElement().toUnqualifiedVersionless().getValue(), theSubscriptionId);
301 
302 		ResourceModifiedMessage msg = new ResourceModifiedMessage(myFhirContext, theResourceToTrigger, ResourceModifiedMessage.OperationTypeEnum.UPDATE);
303 		msg.setSubscriptionId(new IdType(theSubscriptionId).toUnqualifiedVersionless().getValue());
304 
305 		return myExecutorService.submit(() -> {
306 			for (int i = 0; ; i++) {
307 				try {
308 						myResourceModifiedConsumer.submitResourceModified(msg);
309 					break;
310 				} catch (Exception e) {
311 					if (i >= 3) {
312 						throw new InternalErrorException(e);
313 					}
314 
315 					ourLog.warn("Exception while retriggering subscriptions (going to sleep and retry): {}", e.toString());
316 					Thread.sleep(1000);
317 				}
318 			}
319 
320 			return null;
321 		});
322 
323 	}
324 
325 	public void cancelAll() {
326 		synchronized (myActiveJobs) {
327 			myActiveJobs.clear();
328 		}
329 	}
330 
331 	/**
332 	 * Sets the maximum number of resources that will be submitted in a single pass
333 	 */
334 	public void setMaxSubmitPerPass(Integer theMaxSubmitPerPass) {
335 		Integer maxSubmitPerPass = theMaxSubmitPerPass;
336 		if (maxSubmitPerPass == null) {
337 			maxSubmitPerPass = DEFAULT_MAX_SUBMIT;
338 		}
339 		Validate.isTrue(maxSubmitPerPass > 0, "theMaxSubmitPerPass must be > 0");
340 		myMaxSubmitPerPass = maxSubmitPerPass;
341 	}
342 
343 	@PostConstruct
344 	public void start() {
345 		LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>(1000);
346 		BasicThreadFactory threadFactory = new BasicThreadFactory.Builder()
347 			.namingPattern("SubscriptionTriggering-%d")
348 			.daemon(false)
349 			.priority(Thread.NORM_PRIORITY)
350 			.build();
351 		RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
352 			@Override
353 			public void rejectedExecution(Runnable theRunnable, ThreadPoolExecutor theExecutor) {
354 				ourLog.info("Note: Subscription triggering queue is full ({} elements), waiting for a slot to become available!", executorQueue.size());
355 				StopWatch sw = new StopWatch();
356 				try {
357 					executorQueue.put(theRunnable);
358 				} catch (InterruptedException theE) {
359 					// Restore interrupted state...
360 					Thread.currentThread().interrupt();
361 					throw new RejectedExecutionException("Task " + theRunnable.toString() +
362 						" rejected from " + theE.toString());
363 				}
364 				ourLog.info("Slot become available after {}ms", sw.getMillis());
365 			}
366 		};
367 		myExecutorService = new ThreadPoolExecutor(
368 			0,
369 			10,
370 			0L,
371 			TimeUnit.MILLISECONDS,
372 			executorQueue,
373 			threadFactory,
374 			rejectedExecutionHandler);
375 
376 	}
377 
378 	private static class SubscriptionTriggeringJobDetails {
379 
380 		private String myJobId;
381 		private String mySubscriptionId;
382 		private List<String> myRemainingResourceIds;
383 		private List<String> myRemainingSearchUrls;
384 		private String myCurrentSearchUuid;
385 		private Integer myCurrentSearchCount;
386 		private String myCurrentSearchResourceType;
387 		private int myCurrentSearchLastUploadedIndex;
388 
389 		Integer getCurrentSearchCount() {
390 			return myCurrentSearchCount;
391 		}
392 
393 		void setCurrentSearchCount(Integer theCurrentSearchCount) {
394 			myCurrentSearchCount = theCurrentSearchCount;
395 		}
396 
397 		String getCurrentSearchResourceType() {
398 			return myCurrentSearchResourceType;
399 		}
400 
401 		void setCurrentSearchResourceType(String theCurrentSearchResourceType) {
402 			myCurrentSearchResourceType = theCurrentSearchResourceType;
403 		}
404 
405 		String getJobId() {
406 			return myJobId;
407 		}
408 
409 		void setJobId(String theJobId) {
410 			myJobId = theJobId;
411 		}
412 
413 		String getSubscriptionId() {
414 			return mySubscriptionId;
415 		}
416 
417 		void setSubscriptionId(String theSubscriptionId) {
418 			mySubscriptionId = theSubscriptionId;
419 		}
420 
421 		List<String> getRemainingResourceIds() {
422 			return myRemainingResourceIds;
423 		}
424 
425 		void setRemainingResourceIds(List<String> theRemainingResourceIds) {
426 			myRemainingResourceIds = theRemainingResourceIds;
427 		}
428 
429 		List<String> getRemainingSearchUrls() {
430 			return myRemainingSearchUrls;
431 		}
432 
433 		void setRemainingSearchUrls(List<String> theRemainingSearchUrls) {
434 			myRemainingSearchUrls = theRemainingSearchUrls;
435 		}
436 
437 		String getCurrentSearchUuid() {
438 			return myCurrentSearchUuid;
439 		}
440 
441 		void setCurrentSearchUuid(String theCurrentSearchUuid) {
442 			myCurrentSearchUuid = theCurrentSearchUuid;
443 		}
444 
445 		int getCurrentSearchLastUploadedIndex() {
446 			return myCurrentSearchLastUploadedIndex;
447 		}
448 
449 		void setCurrentSearchLastUploadedIndex(int theCurrentSearchLastUploadedIndex) {
450 			myCurrentSearchLastUploadedIndex = theCurrentSearchLastUploadedIndex;
451 		}
452 	}
453 
454 }