View Javadoc
1   package ca.uhn.fhir.jpa.subscription;
2   
3   /*-
4    * #%L
5    * HAPI FHIR JPA Server
6    * %%
7    * Copyright (C) 2014 - 2018 University Health Network
8    * %%
9    * Licensed under the Apache License, Version 2.0 (the "License");
10   * you may not use this file except in compliance with the License.
11   * You may obtain a copy of the License at
12   * 
13   *      http://www.apache.org/licenses/LICENSE-2.0
14   * 
15   * Unless required by applicable law or agreed to in writing, software
16   * distributed under the License is distributed on an "AS IS" BASIS,
17   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18   * See the License for the specific language governing permissions and
19   * limitations under the License.
20   * #L%
21   */
22  
23  import ca.uhn.fhir.context.ConfigurationException;
24  import ca.uhn.fhir.context.FhirContext;
25  import ca.uhn.fhir.context.FhirVersionEnum;
26  import ca.uhn.fhir.jpa.config.BaseConfig;
27  import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
28  import ca.uhn.fhir.jpa.dao.SearchParameterMap;
29  import ca.uhn.fhir.jpa.provider.ServletSubRequestDetails;
30  import ca.uhn.fhir.jpa.util.JpaConstants;
31  import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
32  import ca.uhn.fhir.rest.api.server.IBundleProvider;
33  import ca.uhn.fhir.rest.api.server.RequestDetails;
34  import ca.uhn.fhir.rest.param.TokenOrListParam;
35  import ca.uhn.fhir.rest.param.TokenParam;
36  import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
37  import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
38  import ca.uhn.fhir.rest.server.interceptor.ServerOperationInterceptorAdapter;
39  import ca.uhn.fhir.util.StopWatch;
40  import com.google.common.annotations.VisibleForTesting;
41  import com.google.common.collect.ArrayListMultimap;
42  import com.google.common.collect.Multimap;
43  import com.google.common.collect.Multimaps;
44  import org.apache.commons.lang3.Validate;
45  import org.apache.commons.lang3.concurrent.BasicThreadFactory;
46  import org.hl7.fhir.exceptions.FHIRException;
47  import org.hl7.fhir.instance.model.api.IBaseReference;
48  import org.hl7.fhir.instance.model.api.IBaseResource;
49  import org.hl7.fhir.instance.model.api.IIdType;
50  import org.hl7.fhir.r4.model.Subscription;
51  import org.slf4j.Logger;
52  import org.slf4j.LoggerFactory;
53  import org.springframework.beans.factory.DisposableBean;
54  import org.springframework.beans.factory.annotation.Autowired;
55  import org.springframework.beans.factory.annotation.Qualifier;
56  import org.springframework.core.task.AsyncTaskExecutor;
57  import org.springframework.messaging.MessageChannel;
58  import org.springframework.messaging.MessageHandler;
59  import org.springframework.messaging.SubscribableChannel;
60  import org.springframework.messaging.support.ExecutorSubscribableChannel;
61  import org.springframework.scheduling.annotation.Scheduled;
62  import org.springframework.transaction.PlatformTransactionManager;
63  import org.springframework.transaction.TransactionStatus;
64  import org.springframework.transaction.support.TransactionCallbackWithoutResult;
65  import org.springframework.transaction.support.TransactionSynchronizationAdapter;
66  import org.springframework.transaction.support.TransactionSynchronizationManager;
67  import org.springframework.transaction.support.TransactionTemplate;
68  
69  import javax.annotation.PostConstruct;
70  import javax.annotation.PreDestroy;
71  import java.util.*;
72  import java.util.concurrent.*;
73  
74  public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> extends ServerOperationInterceptorAdapter {
75  
76  	static final String SUBSCRIPTION_STATUS = "Subscription.status";
77  	static final String SUBSCRIPTION_TYPE = "Subscription.channel.type";
78  	private static final Integer MAX_SUBSCRIPTION_RESULTS = 1000;
79  	private final Object myInitSubscriptionsLock = new Object();
80  	private SubscribableChannel myProcessingChannel;
81  	private Map<String, SubscribableChannel> myDeliveryChannel;
82  	private ExecutorService myProcessingExecutor;
83  	private int myExecutorThreadCount;
84  	private SubscriptionActivatingSubscriber mySubscriptionActivatingSubscriber;
85  	private MessageHandler mySubscriptionCheckingSubscriber;
86  	private ConcurrentHashMap<String, CanonicalSubscription> myIdToSubscription = new ConcurrentHashMap<>();
87  	private ConcurrentHashMap<String, SubscribableChannel> mySubscribableChannel = new ConcurrentHashMap<>();
88  	private Multimap<String, MessageHandler> myIdToDeliveryHandler = Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
89  	private Logger ourLog = LoggerFactory.getLogger(BaseSubscriptionInterceptor.class);
90  	private ThreadPoolExecutor myDeliveryExecutor;
91  	private LinkedBlockingQueue<Runnable> myProcessingExecutorQueue;
92  	private IFhirResourceDao<?> mySubscriptionDao;
93  	@Autowired
94  	private List<IFhirResourceDao<?>> myResourceDaos;
95  	@Autowired
96  	private FhirContext myCtx;
97  	@Autowired(required = false)
98  	@Qualifier("myEventDefinitionDaoR4")
99  	private IFhirResourceDao<org.hl7.fhir.r4.model.EventDefinition> myEventDefinitionDaoR4;
100 	@Autowired()
101 	private PlatformTransactionManager myTxManager;
102 	@Autowired
103 	@Qualifier(BaseConfig.TASK_EXECUTOR_NAME)
104 	private AsyncTaskExecutor myAsyncTaskExecutor;
105 	private Map<Class<? extends IBaseResource>, IFhirResourceDao<?>> myResourceTypeToDao;
106 	private Semaphore myInitSubscriptionsSemaphore = new Semaphore(1);
107 
108 	/**
109 	 * Constructor
110 	 */
111 	public BaseSubscriptionInterceptor() {
112 		super();
113 		setExecutorThreadCount(5);
114 	}
115 
116 	protected CanonicalSubscription canonicalize(S theSubscription) {
117 		switch (myCtx.getVersion().getVersion()) {
118 			case DSTU2:
119 				return canonicalizeDstu2(theSubscription);
120 			case DSTU3:
121 				return canonicalizeDstu3(theSubscription);
122 			case R4:
123 				return canonicalizeR4(theSubscription);
124 			default:
125 				throw new ConfigurationException("Subscription not supported for version: " + myCtx.getVersion().getVersion());
126 		}
127 	}
128 
129 	protected CanonicalSubscription canonicalizeDstu2(IBaseResource theSubscription) {
130 		ca.uhn.fhir.model.dstu2.resource.Subscription subscription = (ca.uhn.fhir.model.dstu2.resource.Subscription) theSubscription;
131 
132 		CanonicalSubscription retVal = new CanonicalSubscription();
133 		try {
134 			retVal.setStatus(org.hl7.fhir.r4.model.Subscription.SubscriptionStatus.fromCode(subscription.getStatus()));
135 			retVal.setChannelType(org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType.fromCode(subscription.getChannel().getType()));
136 			retVal.setCriteriaString(subscription.getCriteria());
137 			retVal.setEndpointUrl(subscription.getChannel().getEndpoint());
138 			retVal.setHeaders(subscription.getChannel().getHeader());
139 			retVal.setIdElement(subscription.getIdElement());
140 			retVal.setPayloadString(subscription.getChannel().getPayload());
141 		} catch (FHIRException theE) {
142 			throw new InternalErrorException(theE);
143 		}
144 		return retVal;
145 	}
146 
147 	protected CanonicalSubscription canonicalizeDstu3(IBaseResource theSubscription) {
148 		org.hl7.fhir.dstu3.model.Subscription subscription = (org.hl7.fhir.dstu3.model.Subscription) theSubscription;
149 
150 		CanonicalSubscription retVal = new CanonicalSubscription();
151 		try {
152 			retVal.setStatus(org.hl7.fhir.r4.model.Subscription.SubscriptionStatus.fromCode(subscription.getStatus().toCode()));
153 			retVal.setChannelType(org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType.fromCode(subscription.getChannel().getType().toCode()));
154 			retVal.setCriteriaString(subscription.getCriteria());
155 			retVal.setEndpointUrl(subscription.getChannel().getEndpoint());
156 			retVal.setHeaders(subscription.getChannel().getHeader());
157 			retVal.setIdElement(subscription.getIdElement());
158 			retVal.setPayloadString(subscription.getChannel().getPayload());
159 
160 			if (retVal.getChannelType() == Subscription.SubscriptionChannelType.EMAIL) {
161 				String from;
162 				String subjectTemplate;
163 				String bodyTemplate;
164 				try {
165 					from = subscription.getChannel().getExtensionString(JpaConstants.EXT_SUBSCRIPTION_EMAIL_FROM);
166 					subjectTemplate = subscription.getChannel().getExtensionString(JpaConstants.EXT_SUBSCRIPTION_SUBJECT_TEMPLATE);
167 				} catch (FHIRException theE) {
168 					throw new ConfigurationException("Failed to extract subscription extension(s): " + theE.getMessage(), theE);
169 				}
170 				retVal.getEmailDetails().setFrom(from);
171 				retVal.getEmailDetails().setSubjectTemplate(subjectTemplate);
172 			}
173 
174 			if (retVal.getChannelType() == Subscription.SubscriptionChannelType.RESTHOOK) {
175 				String stripVersionIds;
176 				String deliverLatestVersion;
177 				try {
178 					stripVersionIds = subscription.getChannel().getExtensionString(JpaConstants.EXT_SUBSCRIPTION_RESTHOOK_STRIP_VERSION_IDS);
179 					deliverLatestVersion = subscription.getChannel().getExtensionString(JpaConstants.EXT_SUBSCRIPTION_RESTHOOK_DELIVER_LATEST_VERSION);
180 				} catch (FHIRException theE) {
181 					throw new ConfigurationException("Failed to extract subscription extension(s): " + theE.getMessage(), theE);
182 				}
183 				retVal.getRestHookDetails().setStripVersionId(Boolean.parseBoolean(stripVersionIds));
184 				retVal.getRestHookDetails().setDeliverLatestVersion(Boolean.parseBoolean(deliverLatestVersion));
185 			}
186 
187 		} catch (FHIRException theE) {
188 			throw new InternalErrorException(theE);
189 		}
190 		return retVal;
191 	}
192 
193 	protected CanonicalSubscription canonicalizeR4(IBaseResource theSubscription) {
194 		org.hl7.fhir.r4.model.Subscription subscription = (org.hl7.fhir.r4.model.Subscription) theSubscription;
195 
196 		CanonicalSubscription retVal = new CanonicalSubscription();
197 		retVal.setStatus(subscription.getStatus());
198 		retVal.setChannelType(subscription.getChannel().getType());
199 		retVal.setCriteriaString(subscription.getCriteria());
200 		retVal.setEndpointUrl(subscription.getChannel().getEndpoint());
201 		retVal.setHeaders(subscription.getChannel().getHeader());
202 		retVal.setIdElement(subscription.getIdElement());
203 		retVal.setPayloadString(subscription.getChannel().getPayload());
204 
205 		if (retVal.getChannelType() == Subscription.SubscriptionChannelType.EMAIL) {
206 			String from;
207 			String subjectTemplate;
208 			try {
209 				from = subscription.getChannel().getExtensionString(JpaConstants.EXT_SUBSCRIPTION_EMAIL_FROM);
210 				subjectTemplate = subscription.getChannel().getExtensionString(JpaConstants.EXT_SUBSCRIPTION_SUBJECT_TEMPLATE);
211 			} catch (FHIRException theE) {
212 				throw new ConfigurationException("Failed to extract subscription extension(s): " + theE.getMessage(), theE);
213 			}
214 			retVal.getEmailDetails().setFrom(from);
215 			retVal.getEmailDetails().setSubjectTemplate(subjectTemplate);
216 		}
217 
218 		if (retVal.getChannelType() == Subscription.SubscriptionChannelType.RESTHOOK) {
219 			String stripVersionIds;
220 			String deliverLatestVersion;
221 			try {
222 				stripVersionIds = subscription.getChannel().getExtensionString(JpaConstants.EXT_SUBSCRIPTION_RESTHOOK_STRIP_VERSION_IDS);
223 				deliverLatestVersion = subscription.getChannel().getExtensionString(JpaConstants.EXT_SUBSCRIPTION_RESTHOOK_DELIVER_LATEST_VERSION);
224 			} catch (FHIRException theE) {
225 				throw new ConfigurationException("Failed to extract subscription extension(s): " + theE.getMessage(), theE);
226 			}
227 			retVal.getRestHookDetails().setStripVersionId(Boolean.parseBoolean(stripVersionIds));
228 			retVal.getRestHookDetails().setDeliverLatestVersion(Boolean.parseBoolean(deliverLatestVersion));
229 		}
230 
231 		List<org.hl7.fhir.r4.model.Extension> topicExts = subscription.getExtensionsByUrl("http://hl7.org/fhir/subscription/topics");
232 		if (topicExts.size() > 0) {
233 			IBaseReference ref = (IBaseReference) topicExts.get(0).getValueAsPrimitive();
234 			if (!"EventDefinition".equals(ref.getReferenceElement().getResourceType())) {
235 				throw new PreconditionFailedException("Topic reference must be an EventDefinition");
236 			}
237 
238 			org.hl7.fhir.r4.model.EventDefinition def = myEventDefinitionDaoR4.read(ref.getReferenceElement());
239 			retVal.addTrigger(new CanonicalSubscription.CanonicalEventDefinition(def));
240 		}
241 
242 		return retVal;
243 	}
244 
245 	protected SubscribableChannel createDeliveryChannel(CanonicalSubscription theSubscription) {
246 		String subscriptionId = theSubscription.getIdElement(myCtx).getIdPart();
247 
248 		LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>(1000);
249 		BasicThreadFactory threadFactory = new BasicThreadFactory.Builder()
250 			.namingPattern("subscription-delivery-" + subscriptionId + "-%d")
251 			.daemon(false)
252 			.priority(Thread.NORM_PRIORITY)
253 			.build();
254 		RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
255 			@Override
256 			public void rejectedExecution(Runnable theRunnable, ThreadPoolExecutor theExecutor) {
257 				ourLog.info("Note: Executor queue is full ({} elements), waiting for a slot to become available!", executorQueue.size());
258 				StopWatch sw = new StopWatch();
259 				try {
260 					executorQueue.put(theRunnable);
261 				} catch (InterruptedException theE) {
262 					throw new RejectedExecutionException("Task " + theRunnable.toString() +
263 						" rejected from " + theE.toString());
264 				}
265 				ourLog.info("Slot become available after {}ms", sw.getMillis());
266 			}
267 		};
268 		ThreadPoolExecutor deliveryExecutor = new ThreadPoolExecutor(
269 			1,
270 			getExecutorThreadCount(),
271 			0L,
272 			TimeUnit.MILLISECONDS,
273 			executorQueue,
274 			threadFactory,
275 			rejectedExecutionHandler);
276 
277 		return new ExecutorSubscribableChannel(deliveryExecutor);
278 	}
279 
280 	/**
281 	 * Returns an empty handler if the interceptor will manually handle registration and unregistration
282 	 */
283 	protected abstract Optional<MessageHandler> createDeliveryHandler(CanonicalSubscription theSubscription);
284 
285 	public abstract Subscription.SubscriptionChannelType getChannelType();
286 
287 	@SuppressWarnings("unchecked")
288 	public <R extends IBaseResource> IFhirResourceDao<R> getDao(Class<R> theType) {
289 		if (myResourceTypeToDao == null) {
290 			Map<Class<? extends IBaseResource>, IFhirResourceDao<?>> theResourceTypeToDao = new HashMap<>();
291 			for (IFhirResourceDao<?> next : myResourceDaos) {
292 				theResourceTypeToDao.put(next.getResourceType(), next);
293 			}
294 
295 			if (this instanceof IFhirResourceDao<?>) {
296 				IFhirResourceDao<?> thiz = (IFhirResourceDao<?>) this;
297 				theResourceTypeToDao.put(thiz.getResourceType(), thiz);
298 			}
299 
300 			myResourceTypeToDao = theResourceTypeToDao;
301 		}
302 
303 		return (IFhirResourceDao<R>) myResourceTypeToDao.get(theType);
304 	}
305 
306 	protected MessageChannel getDeliveryChannel(CanonicalSubscription theSubscription) {
307 		return mySubscribableChannel.get(theSubscription.getIdElement(myCtx).getIdPart());
308 	}
309 
310 	public int getExecutorQueueSizeForUnitTests() {
311 		return myProcessingExecutorQueue.size();
312 	}
313 
314 	public int getExecutorThreadCount() {
315 		return myExecutorThreadCount;
316 	}
317 
318 	public void setExecutorThreadCount(int theExecutorThreadCount) {
319 		Validate.inclusiveBetween(1, Integer.MAX_VALUE, theExecutorThreadCount);
320 		myExecutorThreadCount = theExecutorThreadCount;
321 	}
322 
323 	public Map<String, CanonicalSubscription> getIdToSubscription() {
324 		return Collections.unmodifiableMap(myIdToSubscription);
325 	}
326 
327 	public SubscribableChannel getProcessingChannel() {
328 		return myProcessingChannel;
329 	}
330 
331 	public void setProcessingChannel(SubscribableChannel theProcessingChannel) {
332 		myProcessingChannel = theProcessingChannel;
333 	}
334 
335 	protected IFhirResourceDao<?> getSubscriptionDao() {
336 		return mySubscriptionDao;
337 	}
338 
339 	public List<CanonicalSubscription> getRegisteredSubscriptions() {
340 		return new ArrayList<>(myIdToSubscription.values());
341 	}
342 
343 	public CanonicalSubscription hasSubscription(IIdType theId) {
344 		Validate.notNull(theId);
345 		Validate.notBlank(theId.getIdPart());
346 		return myIdToSubscription.get(theId.getIdPart());
347 	}
348 
349 	/**
350 	 * Read the existing subscriptions from the database
351 	 */
352 	@SuppressWarnings("unused")
353 	@Scheduled(fixedDelay = 60000)
354 	public void initSubscriptions() {
355 		if (!myInitSubscriptionsSemaphore.tryAcquire()) {
356 			return;
357 		}
358 		try {
359 			doInitSubscriptions();
360 		} finally {
361 			myInitSubscriptionsSemaphore.release();
362 		}
363 	}
364 
365 	public Integer doInitSubscriptions() {
366 		synchronized (myInitSubscriptionsLock) {
367 			ourLog.debug("Starting init subscriptions");
368 			SearchParameterMap map = new SearchParameterMap();
369 			map.add(Subscription.SP_TYPE, new TokenParam(null, getChannelType().toCode()));
370 			map.add(Subscription.SP_STATUS, new TokenOrListParam()
371 				.addOr(new TokenParam(null, Subscription.SubscriptionStatus.REQUESTED.toCode()))
372 				.addOr(new TokenParam(null, Subscription.SubscriptionStatus.ACTIVE.toCode())));
373 			map.setLoadSynchronousUpTo(MAX_SUBSCRIPTION_RESULTS);
374 
375 			RequestDetails req = new ServletSubRequestDetails();
376 			req.setSubRequest(true);
377 
378 			IBundleProvider subscriptionBundleList = getSubscriptionDao().search(map, req);
379 			if (subscriptionBundleList.size() >= MAX_SUBSCRIPTION_RESULTS) {
380 				ourLog.error("Currently over " + MAX_SUBSCRIPTION_RESULTS + " subscriptions.  Some subscriptions have not been loaded.");
381 			}
382 
383 			List<IBaseResource> resourceList = subscriptionBundleList.getResources(0, subscriptionBundleList.size());
384 
385 			Set<String> allIds = new HashSet<>();
386 			int changesCount = 0;
387 			for (IBaseResource resource : resourceList) {
388 				String nextId = resource.getIdElement().getIdPart();
389 				allIds.add(nextId);
390 				boolean changed = mySubscriptionActivatingSubscriber.activateOrRegisterSubscriptionIfRequired(resource);
391 				if (changed) {
392 					changesCount++;
393 				}
394 			}
395 
396 			unregisterAllSubscriptionsNotInCollection(allIds);
397 			ourLog.trace("Finished init subscriptions - found {}", resourceList.size());
398 
399 			return changesCount;
400 		}
401 	}
402 
403 	@SuppressWarnings("unused")
404 	@PreDestroy
405 	public void preDestroy() {
406 		getProcessingChannel().unsubscribe(mySubscriptionCheckingSubscriber);
407 		unregisterAllSubscriptionsNotInCollection(Collections.emptyList());
408 	}
409 
410 	public void registerHandler(String theSubscriptionId, MessageHandler theHandler) {
411 		mySubscribableChannel.get(theSubscriptionId).subscribe(theHandler);
412 		myIdToDeliveryHandler.put(theSubscriptionId, theHandler);
413 	}
414 
415 	@SuppressWarnings("UnusedReturnValue")
416 	public CanonicalSubscription registerSubscription(IIdType theId, S theSubscription) {
417 		Validate.notNull(theId);
418 		String subscriptionId = theId.getIdPart();
419 		Validate.notBlank(subscriptionId);
420 		Validate.notNull(theSubscription);
421 
422 		CanonicalSubscription canonicalized = canonicalize(theSubscription);
423 		SubscribableChannel deliveryChannel = createDeliveryChannel(canonicalized);
424 		Optional<MessageHandler> deliveryHandler = createDeliveryHandler(canonicalized);
425 
426 		mySubscribableChannel.put(subscriptionId, deliveryChannel);
427 		myIdToSubscription.put(subscriptionId, canonicalized);
428 
429 		deliveryHandler.ifPresent(handler -> registerHandler(subscriptionId, handler));
430 
431 		return canonicalized;
432 	}
433 
434 	protected void registerSubscriptionCheckingSubscriber() {
435 		if (mySubscriptionCheckingSubscriber == null) {
436 			mySubscriptionCheckingSubscriber = new SubscriptionCheckingSubscriber(getSubscriptionDao(), getChannelType(), this);
437 		}
438 		getProcessingChannel().subscribe(mySubscriptionCheckingSubscriber);
439 	}
440 
441 	@Override
442 	public void resourceCreated(RequestDetails theRequest, IBaseResource theResource) {
443 		ResourceModifiedMessage msg = new ResourceModifiedMessage();
444 		msg.setId(theResource.getIdElement());
445 		msg.setOperationType(RestOperationTypeEnum.CREATE);
446 		msg.setNewPayload(myCtx, theResource);
447 		submitResourceModified(msg);
448 	}
449 
450 	@Override
451 	public void resourceDeleted(RequestDetails theRequest, IBaseResource theResource) {
452 		ResourceModifiedMessage msg = new ResourceModifiedMessage();
453 		msg.setId(theResource.getIdElement());
454 		msg.setOperationType(RestOperationTypeEnum.DELETE);
455 		submitResourceModified(msg);
456 	}
457 
458 	@Override
459 	public void resourceUpdated(RequestDetails theRequest, IBaseResource theOldResource, IBaseResource theNewResource) {
460 		submitResourceModifiedForUpdate(theNewResource);
461 	}
462 
463 	void submitResourceModifiedForUpdate(IBaseResource theNewResource) {
464 		ResourceModifiedMessage msg = new ResourceModifiedMessage();
465 		msg.setId(theNewResource.getIdElement());
466 		msg.setOperationType(RestOperationTypeEnum.UPDATE);
467 		msg.setNewPayload(myCtx, theNewResource);
468 		submitResourceModified(msg);
469 	}
470 
471 	protected void sendToProcessingChannel(final ResourceModifiedMessage theMessage) {
472 		ourLog.trace("Registering synchronization to send resource modified message to processing channel");
473 
474 		/*
475 		 * We only actually submit this item work working after the
476 		 */
477 		if (TransactionSynchronizationManager.isSynchronizationActive()) {
478 			TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
479 				@Override
480 				public void afterCommit() {
481 					ourLog.trace("Sending resource modified message to processing channel");
482 					getProcessingChannel().send(new ResourceModifiedJsonMessage(theMessage));
483 				}
484 			});
485 		} else {
486 			ourLog.trace("Sending resource modified message to processing channel");
487 			getProcessingChannel().send(new ResourceModifiedJsonMessage(theMessage));
488 		}
489 	}
490 
491 	@VisibleForTesting
492 	public void setAsyncTaskExecutorForUnitTest(AsyncTaskExecutor theAsyncTaskExecutor) {
493 		myAsyncTaskExecutor = theAsyncTaskExecutor;
494 	}
495 
496 	public void setFhirContext(FhirContext theCtx) {
497 		myCtx = theCtx;
498 	}
499 
500 	public void setResourceDaos(List<IFhirResourceDao<?>> theResourceDaos) {
501 		myResourceDaos = theResourceDaos;
502 	}
503 
504 	@VisibleForTesting
505 	public void setTxManager(PlatformTransactionManager theTxManager) {
506 		myTxManager = theTxManager;
507 	}
508 
509 	@PostConstruct
510 	public void start() {
511 		for (IFhirResourceDao<?> next : myResourceDaos) {
512 			if (myCtx.getResourceDefinition(next.getResourceType()).getName().equals("Subscription")) {
513 				mySubscriptionDao = next;
514 			}
515 		}
516 		Validate.notNull(mySubscriptionDao);
517 
518 		if (myCtx.getVersion().getVersion() == FhirVersionEnum.R4) {
519 			Validate.notNull(myEventDefinitionDaoR4);
520 		}
521 
522 		if (getProcessingChannel() == null) {
523 			myProcessingExecutorQueue = new LinkedBlockingQueue<>(1000);
524 			RejectedExecutionHandler rejectedExecutionHandler = (theRunnable, theExecutor) -> {
525 				ourLog.info("Note: Executor queue is full ({} elements), waiting for a slot to become available!", myProcessingExecutorQueue.size());
526 				StopWatch sw = new StopWatch();
527 				try {
528 					myProcessingExecutorQueue.put(theRunnable);
529 				} catch (InterruptedException theE) {
530 					throw new RejectedExecutionException("Task " + theRunnable.toString() +
531 						" rejected from " + theE.toString());
532 				}
533 				ourLog.info("Slot become available after {}ms", sw.getMillis());
534 			};
535 			ThreadFactory threadFactory = new BasicThreadFactory.Builder()
536 				.namingPattern("subscription-proc-%d")
537 				.daemon(false)
538 				.priority(Thread.NORM_PRIORITY)
539 				.build();
540 			myProcessingExecutor = new ThreadPoolExecutor(
541 				1,
542 				getExecutorThreadCount(),
543 				0L,
544 				TimeUnit.MILLISECONDS,
545 				myProcessingExecutorQueue,
546 				threadFactory,
547 				rejectedExecutionHandler);
548 			setProcessingChannel(new ExecutorSubscribableChannel(myProcessingExecutor));
549 		}
550 
551 		if (mySubscriptionActivatingSubscriber == null) {
552 			mySubscriptionActivatingSubscriber = new SubscriptionActivatingSubscriber(getSubscriptionDao(), getChannelType(), this, myTxManager, myAsyncTaskExecutor);
553 		}
554 
555 		registerSubscriptionCheckingSubscriber();
556 
557 		TransactionTemplate transactionTemplate = new TransactionTemplate(myTxManager);
558 		transactionTemplate.execute(new TransactionCallbackWithoutResult() {
559 			@Override
560 			protected void doInTransactionWithoutResult(TransactionStatus status) {
561 				initSubscriptions();
562 			}
563 		});
564 	}
565 
566 	protected void submitResourceModified(final ResourceModifiedMessage theMsg) {
567 		mySubscriptionActivatingSubscriber.handleMessage(theMsg.getOperationType(), theMsg.getId(myCtx), theMsg.getNewPayload(myCtx));
568 		sendToProcessingChannel(theMsg);
569 	}
570 
571 	private void unregisterAllSubscriptionsNotInCollection(Collection<String> theAllIds) {
572 		for (String next : new ArrayList<>(myIdToSubscription.keySet())) {
573 			if (!theAllIds.contains(next)) {
574 				ourLog.info("Unregistering Subscription/{}", next);
575 				CanonicalSubscription subscription = myIdToSubscription.get(next);
576 				unregisterSubscription(subscription.getIdElement(myCtx));
577 			}
578 		}
579 	}
580 
581 	public void unregisterHandler(String theSubscriptionId, MessageHandler theMessageHandler) {
582 		SubscribableChannel channel = mySubscribableChannel.get(theSubscriptionId);
583 		if (channel != null) {
584 			channel.unsubscribe(theMessageHandler);
585 			if (channel instanceof DisposableBean) {
586 				try {
587 					((DisposableBean) channel).destroy();
588 				} catch (Exception e) {
589 					ourLog.error("Failed to destroy channel bean", e);
590 				}
591 			}
592 		}
593 
594 		mySubscribableChannel.remove(theSubscriptionId);
595 	}
596 
597 	@SuppressWarnings("UnusedReturnValue")
598 	public CanonicalSubscription unregisterSubscription(IIdType theId) {
599 		Validate.notNull(theId);
600 
601 		String subscriptionId = theId.getIdPart();
602 		Validate.notBlank(subscriptionId);
603 
604 		for (MessageHandler next : new ArrayList<>(myIdToDeliveryHandler.get(subscriptionId))) {
605 			unregisterHandler(subscriptionId, next);
606 		}
607 
608 		mySubscribableChannel.remove(subscriptionId);
609 
610 		return myIdToSubscription.remove(subscriptionId);
611 	}
612 
613 
614 }