View Javadoc
1   package ca.uhn.fhir.jpa.subscription;
2   
3   /*-
4    * #%L
5    * HAPI FHIR JPA Server
6    * %%
7    * Copyright (C) 2014 - 2018 University Health Network
8    * %%
9    * Licensed under the Apache License, Version 2.0 (the "License");
10   * you may not use this file except in compliance with the License.
11   * You may obtain a copy of the License at
12   * 
13   *      http://www.apache.org/licenses/LICENSE-2.0
14   * 
15   * Unless required by applicable law or agreed to in writing, software
16   * distributed under the License is distributed on an "AS IS" BASIS,
17   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18   * See the License for the specific language governing permissions and
19   * limitations under the License.
20   * #L%
21   */
22  
23  import ca.uhn.fhir.context.ConfigurationException;
24  import ca.uhn.fhir.context.FhirContext;
25  import ca.uhn.fhir.context.FhirVersionEnum;
26  import ca.uhn.fhir.jpa.config.BaseConfig;
27  import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
28  import ca.uhn.fhir.jpa.dao.SearchParameterMap;
29  import ca.uhn.fhir.jpa.provider.ServletSubRequestDetails;
30  import ca.uhn.fhir.jpa.subscription.matcher.ISubscriptionMatcher;
31  import ca.uhn.fhir.jpa.subscription.matcher.SubscriptionMatcherDatabase;
32  import ca.uhn.fhir.jpa.util.JpaConstants;
33  import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
34  import ca.uhn.fhir.rest.api.server.IBundleProvider;
35  import ca.uhn.fhir.rest.api.server.RequestDetails;
36  import ca.uhn.fhir.rest.param.TokenOrListParam;
37  import ca.uhn.fhir.rest.param.TokenParam;
38  import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
39  import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
40  import ca.uhn.fhir.rest.server.interceptor.ServerOperationInterceptorAdapter;
41  import ca.uhn.fhir.util.StopWatch;
42  import com.google.common.annotations.VisibleForTesting;
43  import com.google.common.collect.ArrayListMultimap;
44  import com.google.common.collect.Multimap;
45  import com.google.common.collect.Multimaps;
46  import org.apache.commons.lang3.Validate;
47  import org.apache.commons.lang3.concurrent.BasicThreadFactory;
48  import org.hl7.fhir.exceptions.FHIRException;
49  import org.hl7.fhir.instance.model.api.IBaseReference;
50  import org.hl7.fhir.instance.model.api.IBaseResource;
51  import org.hl7.fhir.instance.model.api.IIdType;
52  import org.hl7.fhir.r4.model.Subscription;
53  import org.slf4j.Logger;
54  import org.slf4j.LoggerFactory;
55  import org.springframework.beans.factory.DisposableBean;
56  import org.springframework.beans.factory.annotation.Autowired;
57  import org.springframework.beans.factory.annotation.Qualifier;
58  import org.springframework.core.task.AsyncTaskExecutor;
59  import org.springframework.messaging.MessageChannel;
60  import org.springframework.messaging.MessageHandler;
61  import org.springframework.messaging.SubscribableChannel;
62  import org.springframework.messaging.support.ExecutorSubscribableChannel;
63  import org.springframework.scheduling.annotation.Scheduled;
64  import org.springframework.transaction.PlatformTransactionManager;
65  import org.springframework.transaction.TransactionStatus;
66  import org.springframework.transaction.support.TransactionCallbackWithoutResult;
67  import org.springframework.transaction.support.TransactionSynchronizationAdapter;
68  import org.springframework.transaction.support.TransactionSynchronizationManager;
69  import org.springframework.transaction.support.TransactionTemplate;
70  
71  import javax.annotation.PostConstruct;
72  import javax.annotation.PreDestroy;
73  import java.util.*;
74  import java.util.concurrent.*;
75  
76  public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> extends ServerOperationInterceptorAdapter {
77  
78  	static final String SUBSCRIPTION_STATUS = "Subscription.status";
79  	static final String SUBSCRIPTION_TYPE = "Subscription.channel.type";
80  	private static final Integer MAX_SUBSCRIPTION_RESULTS = 1000;
81  	private final Object myInitSubscriptionsLock = new Object();
82  	private SubscribableChannel myProcessingChannel;
83  	private Map<String, SubscribableChannel> myDeliveryChannel;
84  	private ExecutorService myProcessingExecutor;
85  	private int myExecutorThreadCount;
86  	private SubscriptionActivatingSubscriber mySubscriptionActivatingSubscriber;
87  	private MessageHandler mySubscriptionCheckingSubscriber;
88  	private ConcurrentHashMap<String, CanonicalSubscription> myIdToSubscription = new ConcurrentHashMap<>();
89  	private ConcurrentHashMap<String, SubscribableChannel> mySubscribableChannel = new ConcurrentHashMap<>();
90  	private Multimap<String, MessageHandler> myIdToDeliveryHandler = Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
91  	private Logger ourLog = LoggerFactory.getLogger(BaseSubscriptionInterceptor.class);
92  	private ThreadPoolExecutor myDeliveryExecutor;
93  	private LinkedBlockingQueue<Runnable> myProcessingExecutorQueue;
94  	private IFhirResourceDao<?> mySubscriptionDao;
95  	@Autowired
96  	private List<IFhirResourceDao<?>> myResourceDaos;
97  	@Autowired
98  	private FhirContext myCtx;
99  	@Autowired(required = false)
100 	@Qualifier("myEventDefinitionDaoR4")
101 	private IFhirResourceDao<org.hl7.fhir.r4.model.EventDefinition> myEventDefinitionDaoR4;
102 	@Autowired()
103 	private PlatformTransactionManager myTxManager;
104 	@Autowired
105 	@Qualifier(BaseConfig.TASK_EXECUTOR_NAME)
106 	private AsyncTaskExecutor myAsyncTaskExecutor;
107 	private Map<Class<? extends IBaseResource>, IFhirResourceDao<?>> myResourceTypeToDao;
108 	private Semaphore myInitSubscriptionsSemaphore = new Semaphore(1);
109 
110 	/**
111 	 * Constructor
112 	 */
113 	public BaseSubscriptionInterceptor() {
114 		super();
115 		setExecutorThreadCount(5);
116 	}
117 
118 	protected CanonicalSubscription canonicalize(S theSubscription) {
119 		switch (myCtx.getVersion().getVersion()) {
120 			case DSTU2:
121 				return canonicalizeDstu2(theSubscription);
122 			case DSTU3:
123 				return canonicalizeDstu3(theSubscription);
124 			case R4:
125 				return canonicalizeR4(theSubscription);
126 			default:
127 				throw new ConfigurationException("Subscription not supported for version: " + myCtx.getVersion().getVersion());
128 		}
129 	}
130 
131 	protected CanonicalSubscription canonicalizeDstu2(IBaseResource theSubscription) {
132 		ca.uhn.fhir.model.dstu2.resource.Subscription subscription = (ca.uhn.fhir.model.dstu2.resource.Subscription) theSubscription;
133 
134 		CanonicalSubscription retVal = new CanonicalSubscription();
135 		try {
136 			retVal.setStatus(org.hl7.fhir.r4.model.Subscription.SubscriptionStatus.fromCode(subscription.getStatus()));
137 			retVal.setChannelType(org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType.fromCode(subscription.getChannel().getType()));
138 			retVal.setCriteriaString(subscription.getCriteria());
139 			retVal.setEndpointUrl(subscription.getChannel().getEndpoint());
140 			retVal.setHeaders(subscription.getChannel().getHeader());
141 			retVal.setIdElement(subscription.getIdElement());
142 			retVal.setPayloadString(subscription.getChannel().getPayload());
143 		} catch (FHIRException theE) {
144 			throw new InternalErrorException(theE);
145 		}
146 		return retVal;
147 	}
148 
149 	protected CanonicalSubscription canonicalizeDstu3(IBaseResource theSubscription) {
150 		org.hl7.fhir.dstu3.model.Subscription subscription = (org.hl7.fhir.dstu3.model.Subscription) theSubscription;
151 
152 		CanonicalSubscription retVal = new CanonicalSubscription();
153 		try {
154 			retVal.setStatus(org.hl7.fhir.r4.model.Subscription.SubscriptionStatus.fromCode(subscription.getStatus().toCode()));
155 			retVal.setChannelType(org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType.fromCode(subscription.getChannel().getType().toCode()));
156 			retVal.setCriteriaString(subscription.getCriteria());
157 			retVal.setEndpointUrl(subscription.getChannel().getEndpoint());
158 			retVal.setHeaders(subscription.getChannel().getHeader());
159 			retVal.setIdElement(subscription.getIdElement());
160 			retVal.setPayloadString(subscription.getChannel().getPayload());
161 
162 			if (retVal.getChannelType() == Subscription.SubscriptionChannelType.EMAIL) {
163 				String from;
164 				String subjectTemplate;
165 				String bodyTemplate;
166 				try {
167 					from = subscription.getChannel().getExtensionString(JpaConstants.EXT_SUBSCRIPTION_EMAIL_FROM);
168 					subjectTemplate = subscription.getChannel().getExtensionString(JpaConstants.EXT_SUBSCRIPTION_SUBJECT_TEMPLATE);
169 				} catch (FHIRException theE) {
170 					throw new ConfigurationException("Failed to extract subscription extension(s): " + theE.getMessage(), theE);
171 				}
172 				retVal.getEmailDetails().setFrom(from);
173 				retVal.getEmailDetails().setSubjectTemplate(subjectTemplate);
174 			}
175 
176 			if (retVal.getChannelType() == Subscription.SubscriptionChannelType.RESTHOOK) {
177 				String stripVersionIds;
178 				String deliverLatestVersion;
179 				try {
180 					stripVersionIds = subscription.getChannel().getExtensionString(JpaConstants.EXT_SUBSCRIPTION_RESTHOOK_STRIP_VERSION_IDS);
181 					deliverLatestVersion = subscription.getChannel().getExtensionString(JpaConstants.EXT_SUBSCRIPTION_RESTHOOK_DELIVER_LATEST_VERSION);
182 				} catch (FHIRException theE) {
183 					throw new ConfigurationException("Failed to extract subscription extension(s): " + theE.getMessage(), theE);
184 				}
185 				retVal.getRestHookDetails().setStripVersionId(Boolean.parseBoolean(stripVersionIds));
186 				retVal.getRestHookDetails().setDeliverLatestVersion(Boolean.parseBoolean(deliverLatestVersion));
187 			}
188 
189 		} catch (FHIRException theE) {
190 			throw new InternalErrorException(theE);
191 		}
192 		return retVal;
193 	}
194 
195 	protected CanonicalSubscription canonicalizeR4(IBaseResource theSubscription) {
196 		org.hl7.fhir.r4.model.Subscription subscription = (org.hl7.fhir.r4.model.Subscription) theSubscription;
197 
198 		CanonicalSubscription retVal = new CanonicalSubscription();
199 		retVal.setStatus(subscription.getStatus());
200 		retVal.setChannelType(subscription.getChannel().getType());
201 		retVal.setCriteriaString(subscription.getCriteria());
202 		retVal.setEndpointUrl(subscription.getChannel().getEndpoint());
203 		retVal.setHeaders(subscription.getChannel().getHeader());
204 		retVal.setIdElement(subscription.getIdElement());
205 		retVal.setPayloadString(subscription.getChannel().getPayload());
206 
207 		if (retVal.getChannelType() == Subscription.SubscriptionChannelType.EMAIL) {
208 			String from;
209 			String subjectTemplate;
210 			try {
211 				from = subscription.getChannel().getExtensionString(JpaConstants.EXT_SUBSCRIPTION_EMAIL_FROM);
212 				subjectTemplate = subscription.getChannel().getExtensionString(JpaConstants.EXT_SUBSCRIPTION_SUBJECT_TEMPLATE);
213 			} catch (FHIRException theE) {
214 				throw new ConfigurationException("Failed to extract subscription extension(s): " + theE.getMessage(), theE);
215 			}
216 			retVal.getEmailDetails().setFrom(from);
217 			retVal.getEmailDetails().setSubjectTemplate(subjectTemplate);
218 		}
219 
220 		if (retVal.getChannelType() == Subscription.SubscriptionChannelType.RESTHOOK) {
221 			String stripVersionIds;
222 			String deliverLatestVersion;
223 			try {
224 				stripVersionIds = subscription.getChannel().getExtensionString(JpaConstants.EXT_SUBSCRIPTION_RESTHOOK_STRIP_VERSION_IDS);
225 				deliverLatestVersion = subscription.getChannel().getExtensionString(JpaConstants.EXT_SUBSCRIPTION_RESTHOOK_DELIVER_LATEST_VERSION);
226 			} catch (FHIRException theE) {
227 				throw new ConfigurationException("Failed to extract subscription extension(s): " + theE.getMessage(), theE);
228 			}
229 			retVal.getRestHookDetails().setStripVersionId(Boolean.parseBoolean(stripVersionIds));
230 			retVal.getRestHookDetails().setDeliverLatestVersion(Boolean.parseBoolean(deliverLatestVersion));
231 		}
232 
233 		List<org.hl7.fhir.r4.model.Extension> topicExts = subscription.getExtensionsByUrl("http://hl7.org/fhir/subscription/topics");
234 		if (topicExts.size() > 0) {
235 			IBaseReference ref = (IBaseReference) topicExts.get(0).getValueAsPrimitive();
236 			if (!"EventDefinition".equals(ref.getReferenceElement().getResourceType())) {
237 				throw new PreconditionFailedException("Topic reference must be an EventDefinition");
238 			}
239 
240 			org.hl7.fhir.r4.model.EventDefinition def = myEventDefinitionDaoR4.read(ref.getReferenceElement());
241 			retVal.addTrigger(new CanonicalSubscription.CanonicalEventDefinition(def));
242 		}
243 
244 		return retVal;
245 	}
246 
247 	protected SubscribableChannel createDeliveryChannel(CanonicalSubscription theSubscription) {
248 		String subscriptionId = theSubscription.getIdElement(myCtx).getIdPart();
249 
250 		LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>(1000);
251 		BasicThreadFactory threadFactory = new BasicThreadFactory.Builder()
252 			.namingPattern("subscription-delivery-" + subscriptionId + "-%d")
253 			.daemon(false)
254 			.priority(Thread.NORM_PRIORITY)
255 			.build();
256 		RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
257 			@Override
258 			public void rejectedExecution(Runnable theRunnable, ThreadPoolExecutor theExecutor) {
259 				ourLog.info("Note: Executor queue is full ({} elements), waiting for a slot to become available!", executorQueue.size());
260 				StopWatch sw = new StopWatch();
261 				try {
262 					executorQueue.put(theRunnable);
263 				} catch (InterruptedException theE) {
264 					throw new RejectedExecutionException("Task " + theRunnable.toString() +
265 						" rejected from " + theE.toString());
266 				}
267 				ourLog.info("Slot become available after {}ms", sw.getMillis());
268 			}
269 		};
270 		ThreadPoolExecutor deliveryExecutor = new ThreadPoolExecutor(
271 			1,
272 			getExecutorThreadCount(),
273 			0L,
274 			TimeUnit.MILLISECONDS,
275 			executorQueue,
276 			threadFactory,
277 			rejectedExecutionHandler);
278 
279 		return new ExecutorSubscribableChannel(deliveryExecutor);
280 	}
281 
282 	/**
283 	 * Returns an empty handler if the interceptor will manually handle registration and unregistration
284 	 */
285 	protected abstract Optional<MessageHandler> createDeliveryHandler(CanonicalSubscription theSubscription);
286 
287 	public abstract Subscription.SubscriptionChannelType getChannelType();
288 
289 	// TODO KHS move out
290 	@SuppressWarnings("unchecked")
291 	public <R extends IBaseResource> IFhirResourceDao<R> getDao(Class<R> theType) {
292 		if (myResourceTypeToDao == null) {
293 			Map<Class<? extends IBaseResource>, IFhirResourceDao<?>> theResourceTypeToDao = new HashMap<>();
294 			for (IFhirResourceDao<?> next : myResourceDaos) {
295 				theResourceTypeToDao.put(next.getResourceType(), next);
296 			}
297 
298 			if (this instanceof IFhirResourceDao<?>) {
299 				IFhirResourceDao<?> thiz = (IFhirResourceDao<?>) this;
300 				theResourceTypeToDao.put(thiz.getResourceType(), thiz);
301 			}
302 
303 			myResourceTypeToDao = theResourceTypeToDao;
304 		}
305 
306 		return (IFhirResourceDao<R>) myResourceTypeToDao.get(theType);
307 	}
308 
309 	protected MessageChannel getDeliveryChannel(CanonicalSubscription theSubscription) {
310 		return mySubscribableChannel.get(theSubscription.getIdElement(myCtx).getIdPart());
311 	}
312 
313 	public int getExecutorQueueSizeForUnitTests() {
314 		return myProcessingExecutorQueue.size();
315 	}
316 
317 	public int getExecutorThreadCount() {
318 		return myExecutorThreadCount;
319 	}
320 
321 	public void setExecutorThreadCount(int theExecutorThreadCount) {
322 		Validate.inclusiveBetween(1, Integer.MAX_VALUE, theExecutorThreadCount);
323 		myExecutorThreadCount = theExecutorThreadCount;
324 	}
325 
326 	public Map<String, CanonicalSubscription> getIdToSubscription() {
327 		return Collections.unmodifiableMap(myIdToSubscription);
328 	}
329 
330 	public SubscribableChannel getProcessingChannel() {
331 		return myProcessingChannel;
332 	}
333 
334 	public void setProcessingChannel(SubscribableChannel theProcessingChannel) {
335 		myProcessingChannel = theProcessingChannel;
336 	}
337 
338 	protected IFhirResourceDao<?> getSubscriptionDao() {
339 		return mySubscriptionDao;
340 	}
341 
342 	public List<CanonicalSubscription> getRegisteredSubscriptions() {
343 		return new ArrayList<>(myIdToSubscription.values());
344 	}
345 
346 	public CanonicalSubscription hasSubscription(IIdType theId) {
347 		Validate.notNull(theId);
348 		Validate.notBlank(theId.getIdPart());
349 		return myIdToSubscription.get(theId.getIdPart());
350 	}
351 
352 	/**
353 	 * Read the existing subscriptions from the database
354 	 */
355 	@SuppressWarnings("unused")
356 	@Scheduled(fixedDelay = 60000)
357 	public void initSubscriptions() {
358 		if (!myInitSubscriptionsSemaphore.tryAcquire()) {
359 			return;
360 		}
361 		try {
362 			doInitSubscriptions();
363 		} finally {
364 			myInitSubscriptionsSemaphore.release();
365 		}
366 	}
367 
368 	public Integer doInitSubscriptions() {
369 		synchronized (myInitSubscriptionsLock) {
370 			ourLog.debug("Starting init subscriptions");
371 			SearchParameterMap map = new SearchParameterMap();
372 			map.add(Subscription.SP_TYPE, new TokenParam(null, getChannelType().toCode()));
373 			map.add(Subscription.SP_STATUS, new TokenOrListParam()
374 				.addOr(new TokenParam(null, Subscription.SubscriptionStatus.REQUESTED.toCode()))
375 				.addOr(new TokenParam(null, Subscription.SubscriptionStatus.ACTIVE.toCode())));
376 			map.setLoadSynchronousUpTo(MAX_SUBSCRIPTION_RESULTS);
377 
378 			RequestDetails req = new ServletSubRequestDetails();
379 			req.setSubRequest(true);
380 
381 			IBundleProvider subscriptionBundleList = getSubscriptionDao().search(map, req);
382 			if (subscriptionBundleList.size() >= MAX_SUBSCRIPTION_RESULTS) {
383 				ourLog.error("Currently over " + MAX_SUBSCRIPTION_RESULTS + " subscriptions.  Some subscriptions have not been loaded.");
384 			}
385 
386 			List<IBaseResource> resourceList = subscriptionBundleList.getResources(0, subscriptionBundleList.size());
387 
388 			Set<String> allIds = new HashSet<>();
389 			int changesCount = 0;
390 			for (IBaseResource resource : resourceList) {
391 				String nextId = resource.getIdElement().getIdPart();
392 				allIds.add(nextId);
393 				boolean changed = mySubscriptionActivatingSubscriber.activateOrRegisterSubscriptionIfRequired(resource);
394 				if (changed) {
395 					changesCount++;
396 				}
397 			}
398 
399 			unregisterAllSubscriptionsNotInCollection(allIds);
400 			ourLog.trace("Finished init subscriptions - found {}", resourceList.size());
401 
402 			return changesCount;
403 		}
404 	}
405 
406 	@SuppressWarnings("unused")
407 	@PreDestroy
408 	public void preDestroy() {
409 		getProcessingChannel().unsubscribe(mySubscriptionCheckingSubscriber);
410 		unregisterAllSubscriptionsNotInCollection(Collections.emptyList());
411 	}
412 
413 	public void registerHandler(String theSubscriptionId, MessageHandler theHandler) {
414 		mySubscribableChannel.get(theSubscriptionId).subscribe(theHandler);
415 		myIdToDeliveryHandler.put(theSubscriptionId, theHandler);
416 	}
417 
418 	@SuppressWarnings("UnusedReturnValue")
419 	public CanonicalSubscription registerSubscription(IIdType theId, S theSubscription) {
420 		Validate.notNull(theId);
421 		String subscriptionId = theId.getIdPart();
422 		Validate.notBlank(subscriptionId);
423 		Validate.notNull(theSubscription);
424 
425 		CanonicalSubscription canonicalized = canonicalize(theSubscription);
426 		SubscribableChannel deliveryChannel = createDeliveryChannel(canonicalized);
427 		Optional<MessageHandler> deliveryHandler = createDeliveryHandler(canonicalized);
428 
429 		mySubscribableChannel.put(subscriptionId, deliveryChannel);
430 		myIdToSubscription.put(subscriptionId, canonicalized);
431 
432 		deliveryHandler.ifPresent(handler -> registerHandler(subscriptionId, handler));
433 
434 		return canonicalized;
435 	}
436 
437 	protected void registerSubscriptionCheckingSubscriber() {
438 		if (mySubscriptionCheckingSubscriber == null) {
439 			ISubscriptionMatcher subscriptionMatcher = new SubscriptionMatcherDatabase(getSubscriptionDao(), this);
440 			mySubscriptionCheckingSubscriber = new SubscriptionCheckingSubscriber(getSubscriptionDao(), getChannelType(), this, subscriptionMatcher );
441 		}
442 		getProcessingChannel().subscribe(mySubscriptionCheckingSubscriber);
443 	}
444 
445 	@Override
446 	public void resourceCreated(RequestDetails theRequest, IBaseResource theResource) {
447 		ResourceModifiedMessage msg = new ResourceModifiedMessage();
448 		msg.setId(theResource.getIdElement());
449 		msg.setOperationType(ResourceModifiedMessage.OperationTypeEnum.CREATE);
450 		msg.setNewPayload(myCtx, theResource);
451 		submitResourceModified(msg);
452 	}
453 
454 	@Override
455 	public void resourceDeleted(RequestDetails theRequest, IBaseResource theResource) {
456 		ResourceModifiedMessage msg = new ResourceModifiedMessage();
457 		msg.setId(theResource.getIdElement());
458 		msg.setOperationType(ResourceModifiedMessage.OperationTypeEnum.DELETE);
459 		submitResourceModified(msg);
460 	}
461 
462 	@Override
463 	public void resourceUpdated(RequestDetails theRequest, IBaseResource theOldResource, IBaseResource theNewResource) {
464 		submitResourceModifiedForUpdate(theNewResource);
465 	}
466 
467 	void submitResourceModifiedForUpdate(IBaseResource theNewResource) {
468 		ResourceModifiedMessage msg = new ResourceModifiedMessage();
469 		msg.setId(theNewResource.getIdElement());
470 		msg.setOperationType(ResourceModifiedMessage.OperationTypeEnum.UPDATE);
471 		msg.setNewPayload(myCtx, theNewResource);
472 		submitResourceModified(msg);
473 	}
474 
475 	protected void sendToProcessingChannel(final ResourceModifiedMessage theMessage) {
476 		ourLog.trace("Registering synchronization to send resource modified message to processing channel");
477 
478 		/*
479 		 * We only actually submit this item work working after the
480 		 */
481 		if (TransactionSynchronizationManager.isSynchronizationActive()) {
482 			TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
483 				@Override
484 				public void afterCommit() {
485 					ourLog.trace("Sending resource modified message to processing channel");
486 					getProcessingChannel().send(new ResourceModifiedJsonMessage(theMessage));
487 				}
488 			});
489 		} else {
490 			ourLog.trace("Sending resource modified message to processing channel");
491 			getProcessingChannel().send(new ResourceModifiedJsonMessage(theMessage));
492 		}
493 	}
494 
495 	@VisibleForTesting
496 	public void setAsyncTaskExecutorForUnitTest(AsyncTaskExecutor theAsyncTaskExecutor) {
497 		myAsyncTaskExecutor = theAsyncTaskExecutor;
498 	}
499 
500 	public void setFhirContext(FhirContext theCtx) {
501 		myCtx = theCtx;
502 	}
503 
504 	public void setResourceDaos(List<IFhirResourceDao<?>> theResourceDaos) {
505 		myResourceDaos = theResourceDaos;
506 	}
507 
508 	@VisibleForTesting
509 	public void setTxManager(PlatformTransactionManager theTxManager) {
510 		myTxManager = theTxManager;
511 	}
512 
513 	@PostConstruct
514 	public void start() {
515 		for (IFhirResourceDao<?> next : myResourceDaos) {
516 			if (next.getResourceType() != null) {
517 				if (myCtx.getResourceDefinition(next.getResourceType()).getName().equals("Subscription")) {
518 					mySubscriptionDao = next;
519 				}
520 			}
521 		}
522 		Validate.notNull(mySubscriptionDao);
523 
524 		if (myCtx.getVersion().getVersion() == FhirVersionEnum.R4) {
525 			Validate.notNull(myEventDefinitionDaoR4);
526 		}
527 
528 		if (getProcessingChannel() == null) {
529 			myProcessingExecutorQueue = new LinkedBlockingQueue<>(1000);
530 			RejectedExecutionHandler rejectedExecutionHandler = (theRunnable, theExecutor) -> {
531 				ourLog.info("Note: Executor queue is full ({} elements), waiting for a slot to become available!", myProcessingExecutorQueue.size());
532 				StopWatch sw = new StopWatch();
533 				try {
534 					myProcessingExecutorQueue.put(theRunnable);
535 				} catch (InterruptedException theE) {
536 					throw new RejectedExecutionException("Task " + theRunnable.toString() +
537 						" rejected from " + theE.toString());
538 				}
539 				ourLog.info("Slot become available after {}ms", sw.getMillis());
540 			};
541 			ThreadFactory threadFactory = new BasicThreadFactory.Builder()
542 				.namingPattern("subscription-proc-%d")
543 				.daemon(false)
544 				.priority(Thread.NORM_PRIORITY)
545 				.build();
546 			myProcessingExecutor = new ThreadPoolExecutor(
547 				1,
548 				getExecutorThreadCount(),
549 				0L,
550 				TimeUnit.MILLISECONDS,
551 				myProcessingExecutorQueue,
552 				threadFactory,
553 				rejectedExecutionHandler);
554 			setProcessingChannel(new ExecutorSubscribableChannel(myProcessingExecutor));
555 		}
556 
557 		if (mySubscriptionActivatingSubscriber == null) {
558 			mySubscriptionActivatingSubscriber = new SubscriptionActivatingSubscriber(getSubscriptionDao(), getChannelType(), this, myTxManager, myAsyncTaskExecutor);
559 		}
560 
561 		registerSubscriptionCheckingSubscriber();
562 
563 		TransactionTemplate transactionTemplate = new TransactionTemplate(myTxManager);
564 		transactionTemplate.execute(new TransactionCallbackWithoutResult() {
565 			@Override
566 			protected void doInTransactionWithoutResult(TransactionStatus status) {
567 				initSubscriptions();
568 			}
569 		});
570 	}
571 
572 	/**
573 	 * This is an internal API - Use with caution!
574 	 */
575 	public void submitResourceModified(final ResourceModifiedMessage theMsg) {
576 		mySubscriptionActivatingSubscriber.handleMessage(theMsg.getOperationType(), theMsg.getId(myCtx), theMsg.getNewPayload(myCtx));
577 		sendToProcessingChannel(theMsg);
578 	}
579 
580 	private void unregisterAllSubscriptionsNotInCollection(Collection<String> theAllIds) {
581 		for (String next : new ArrayList<>(myIdToSubscription.keySet())) {
582 			if (!theAllIds.contains(next)) {
583 				ourLog.info("Unregistering Subscription/{}", next);
584 				CanonicalSubscription subscription = myIdToSubscription.get(next);
585 				unregisterSubscription(subscription.getIdElement(myCtx));
586 			}
587 		}
588 	}
589 
590 	public void unregisterHandler(String theSubscriptionId, MessageHandler theMessageHandler) {
591 		SubscribableChannel channel = mySubscribableChannel.get(theSubscriptionId);
592 		if (channel != null) {
593 			channel.unsubscribe(theMessageHandler);
594 			if (channel instanceof DisposableBean) {
595 				try {
596 					((DisposableBean) channel).destroy();
597 				} catch (Exception e) {
598 					ourLog.error("Failed to destroy channel bean", e);
599 				}
600 			}
601 		}
602 
603 		mySubscribableChannel.remove(theSubscriptionId);
604 	}
605 
606 	@SuppressWarnings("UnusedReturnValue")
607 	public CanonicalSubscription unregisterSubscription(IIdType theId) {
608 		Validate.notNull(theId);
609 
610 		String subscriptionId = theId.getIdPart();
611 		Validate.notBlank(subscriptionId);
612 
613 		for (MessageHandler next : new ArrayList<>(myIdToDeliveryHandler.get(subscriptionId))) {
614 			unregisterHandler(subscriptionId, next);
615 		}
616 
617 		mySubscribableChannel.remove(subscriptionId);
618 
619 		return myIdToSubscription.remove(subscriptionId);
620 	}
621 
622 
623 }