View Javadoc
1   package ca.uhn.fhir.jpa.subscription;
2   
3   /*-
4    * #%L
5    * HAPI FHIR JPA Server
6    * %%
7    * Copyright (C) 2014 - 2018 University Health Network
8    * %%
9    * Licensed under the Apache License, Version 2.0 (the "License");
10   * you may not use this file except in compliance with the License.
11   * You may obtain a copy of the License at
12   * 
13   *      http://www.apache.org/licenses/LICENSE-2.0
14   * 
15   * Unless required by applicable law or agreed to in writing, software
16   * distributed under the License is distributed on an "AS IS" BASIS,
17   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18   * See the License for the specific language governing permissions and
19   * limitations under the License.
20   * #L%
21   */
22  
23  import ca.uhn.fhir.context.ConfigurationException;
24  import ca.uhn.fhir.context.FhirContext;
25  import ca.uhn.fhir.context.FhirVersionEnum;
26  import ca.uhn.fhir.jpa.config.BaseConfig;
27  import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
28  import ca.uhn.fhir.jpa.dao.SearchParameterMap;
29  import ca.uhn.fhir.jpa.provider.ServletSubRequestDetails;
30  import ca.uhn.fhir.jpa.util.JpaConstants;
31  import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
32  import ca.uhn.fhir.rest.api.server.IBundleProvider;
33  import ca.uhn.fhir.rest.api.server.RequestDetails;
34  import ca.uhn.fhir.rest.param.TokenOrListParam;
35  import ca.uhn.fhir.rest.param.TokenParam;
36  import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
37  import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
38  import ca.uhn.fhir.rest.server.interceptor.ServerOperationInterceptorAdapter;
39  import ca.uhn.fhir.util.StopWatch;
40  import com.google.common.annotations.VisibleForTesting;
41  import org.apache.commons.lang3.Validate;
42  import org.apache.commons.lang3.concurrent.BasicThreadFactory;
43  import org.hl7.fhir.exceptions.FHIRException;
44  import org.hl7.fhir.instance.model.api.IBaseReference;
45  import org.hl7.fhir.instance.model.api.IBaseResource;
46  import org.hl7.fhir.instance.model.api.IIdType;
47  import org.hl7.fhir.r4.model.Subscription;
48  import org.slf4j.Logger;
49  import org.slf4j.LoggerFactory;
50  import org.springframework.beans.factory.annotation.Autowired;
51  import org.springframework.beans.factory.annotation.Qualifier;
52  import org.springframework.core.task.AsyncTaskExecutor;
53  import org.springframework.messaging.MessageHandler;
54  import org.springframework.messaging.SubscribableChannel;
55  import org.springframework.messaging.support.ExecutorSubscribableChannel;
56  import org.springframework.scheduling.annotation.Scheduled;
57  import org.springframework.transaction.PlatformTransactionManager;
58  import org.springframework.transaction.TransactionStatus;
59  import org.springframework.transaction.support.TransactionCallbackWithoutResult;
60  import org.springframework.transaction.support.TransactionSynchronizationAdapter;
61  import org.springframework.transaction.support.TransactionSynchronizationManager;
62  import org.springframework.transaction.support.TransactionTemplate;
63  
64  import javax.annotation.PostConstruct;
65  import javax.annotation.PreDestroy;
66  import java.util.*;
67  import java.util.concurrent.*;
68  
69  public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> extends ServerOperationInterceptorAdapter {
70  
71  	static final String SUBSCRIPTION_STATUS = "Subscription.status";
72  	static final String SUBSCRIPTION_TYPE = "Subscription.channel.type";
73  	static final String SUBSCRIPTION_CRITERIA = "Subscription.criteria";
74  	static final String SUBSCRIPTION_ENDPOINT = "Subscription.channel.endpoint";
75  	static final String SUBSCRIPTION_PAYLOAD = "Subscription.channel.payload";
76  	static final String SUBSCRIPTION_HEADER = "Subscription.channel.header";
77  	private static final Integer MAX_SUBSCRIPTION_RESULTS = 1000;
78  	private SubscribableChannel myProcessingChannel;
79  	private SubscribableChannel myDeliveryChannel;
80  	private ExecutorService myProcessingExecutor;
81  	private int myExecutorThreadCount;
82  	private SubscriptionActivatingSubscriber mySubscriptionActivatingSubscriber;
83  	private MessageHandler mySubscriptionCheckingSubscriber;
84  	private ConcurrentHashMap<String, CanonicalSubscription> myIdToSubscription = new ConcurrentHashMap<>();
85  	private Logger ourLog = LoggerFactory.getLogger(BaseSubscriptionInterceptor.class);
86  	private ThreadPoolExecutor myDeliveryExecutor;
87  	private LinkedBlockingQueue<Runnable> myProcessingExecutorQueue;
88  	private LinkedBlockingQueue<Runnable> myDeliveryExecutorQueue;
89  	private IFhirResourceDao<?> mySubscriptionDao;
90  	@Autowired
91  	private List<IFhirResourceDao<?>> myResourceDaos;
92  	@Autowired
93  	private FhirContext myCtx;
94  	@Autowired(required = false)
95  	@Qualifier("myEventDefinitionDaoR4")
96  	private IFhirResourceDao<org.hl7.fhir.r4.model.EventDefinition> myEventDefinitionDaoR4;
97  	@Autowired()
98  	private PlatformTransactionManager myTxManager;
99  	@Autowired
100 	@Qualifier(BaseConfig.TASK_EXECUTOR_NAME)
101 	private AsyncTaskExecutor myAsyncTaskExecutor;
102 	private Map<Class<? extends IBaseResource>, IFhirResourceDao<?>> myResourceTypeToDao;
103 
104 	/**
105 	 * Constructor
106 	 */
107 	public BaseSubscriptionInterceptor() {
108 		super();
109 		setExecutorThreadCount(5);
110 	}
111 
112 	protected CanonicalSubscription canonicalize(S theSubscription) {
113 		switch (myCtx.getVersion().getVersion()) {
114 			case DSTU2:
115 				return canonicalizeDstu2(theSubscription);
116 			case DSTU3:
117 				return canonicalizeDstu3(theSubscription);
118 			case R4:
119 				return canonicalizeR4(theSubscription);
120 			default:
121 				throw new ConfigurationException("Subscription not supported for version: " + myCtx.getVersion().getVersion());
122 		}
123 	}
124 
125 	protected CanonicalSubscription canonicalizeDstu2(IBaseResource theSubscription) {
126 		ca.uhn.fhir.model.dstu2.resource.Subscription subscription = (ca.uhn.fhir.model.dstu2.resource.Subscription) theSubscription;
127 
128 		CanonicalSubscription retVal = new CanonicalSubscription();
129 		try {
130 			retVal.setStatus(org.hl7.fhir.r4.model.Subscription.SubscriptionStatus.fromCode(subscription.getStatus()));
131 			retVal.setBackingSubscription(myCtx, theSubscription);
132 			retVal.setChannelType(org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType.fromCode(subscription.getChannel().getType()));
133 			retVal.setCriteriaString(subscription.getCriteria());
134 			retVal.setEndpointUrl(subscription.getChannel().getEndpoint());
135 			retVal.setHeaders(subscription.getChannel().getHeader());
136 			retVal.setIdElement(subscription.getIdElement());
137 			retVal.setPayloadString(subscription.getChannel().getPayload());
138 		} catch (FHIRException theE) {
139 			throw new InternalErrorException(theE);
140 		}
141 		return retVal;
142 	}
143 
144 	protected CanonicalSubscription canonicalizeDstu3(IBaseResource theSubscription) {
145 		org.hl7.fhir.dstu3.model.Subscription subscription = (org.hl7.fhir.dstu3.model.Subscription) theSubscription;
146 
147 		CanonicalSubscription retVal = new CanonicalSubscription();
148 		try {
149 			retVal.setStatus(org.hl7.fhir.r4.model.Subscription.SubscriptionStatus.fromCode(subscription.getStatus().toCode()));
150 			retVal.setBackingSubscription(myCtx, theSubscription);
151 			retVal.setChannelType(org.hl7.fhir.r4.model.Subscription.SubscriptionChannelType.fromCode(subscription.getChannel().getType().toCode()));
152 			retVal.setCriteriaString(subscription.getCriteria());
153 			retVal.setEndpointUrl(subscription.getChannel().getEndpoint());
154 			retVal.setHeaders(subscription.getChannel().getHeader());
155 			retVal.setIdElement(subscription.getIdElement());
156 			retVal.setPayloadString(subscription.getChannel().getPayload());
157 
158 			if (retVal.getChannelType() == Subscription.SubscriptionChannelType.EMAIL) {
159 				String from;
160 				String subjectTemplate;
161 				String bodyTemplate;
162 				try {
163 					from = subscription.getChannel().getExtensionString(JpaConstants.EXT_SUBSCRIPTION_EMAIL_FROM);
164 					subjectTemplate = subscription.getChannel().getExtensionString(JpaConstants.EXT_SUBSCRIPTION_SUBJECT_TEMPLATE);
165 				} catch (FHIRException theE) {
166 					throw new ConfigurationException("Failed to extract subscription extension(s): " + theE.getMessage(), theE);
167 				}
168 				retVal.getEmailDetails().setFrom(from);
169 				retVal.getEmailDetails().setSubjectTemplate(subjectTemplate);
170 			}
171 
172 			if (retVal.getChannelType() == Subscription.SubscriptionChannelType.RESTHOOK) {
173 				String stripVersionIds;
174 				String deliverLatestVersion;
175 				try {
176 					stripVersionIds = subscription.getChannel().getExtensionString(JpaConstants.EXT_SUBSCRIPTION_RESTHOOK_STRIP_VERSION_IDS);
177 					deliverLatestVersion = subscription.getChannel().getExtensionString(JpaConstants.EXT_SUBSCRIPTION_RESTHOOK_DELIVER_LATEST_VERSION);
178 				} catch (FHIRException theE) {
179 					throw new ConfigurationException("Failed to extract subscription extension(s): " + theE.getMessage(), theE);
180 				}
181 				retVal.getRestHookDetails().setStripVersionId(Boolean.parseBoolean(stripVersionIds));
182 				retVal.getRestHookDetails().setDeliverLatestVersion(Boolean.parseBoolean(deliverLatestVersion));
183 			}
184 
185 		} catch (FHIRException theE) {
186 			throw new InternalErrorException(theE);
187 		}
188 		return retVal;
189 	}
190 
191 	protected CanonicalSubscription canonicalizeR4(IBaseResource theSubscription) {
192 		org.hl7.fhir.r4.model.Subscription subscription = (org.hl7.fhir.r4.model.Subscription) theSubscription;
193 
194 		CanonicalSubscription retVal = new CanonicalSubscription();
195 		retVal.setStatus(subscription.getStatus());
196 		retVal.setBackingSubscription(myCtx, theSubscription);
197 		retVal.setChannelType(subscription.getChannel().getType());
198 		retVal.setCriteriaString(subscription.getCriteria());
199 		retVal.setEndpointUrl(subscription.getChannel().getEndpoint());
200 		retVal.setHeaders(subscription.getChannel().getHeader());
201 		retVal.setIdElement(subscription.getIdElement());
202 		retVal.setPayloadString(subscription.getChannel().getPayload());
203 
204 		if (retVal.getChannelType() == Subscription.SubscriptionChannelType.EMAIL) {
205 			String from;
206 			String subjectTemplate;
207 			String bodyTemplate;
208 			try {
209 				from = subscription.getChannel().getExtensionString(JpaConstants.EXT_SUBSCRIPTION_EMAIL_FROM);
210 				subjectTemplate = subscription.getChannel().getExtensionString(JpaConstants.EXT_SUBSCRIPTION_SUBJECT_TEMPLATE);
211 			} catch (FHIRException theE) {
212 				throw new ConfigurationException("Failed to extract subscription extension(s): " + theE.getMessage(), theE);
213 			}
214 			retVal.getEmailDetails().setFrom(from);
215 			retVal.getEmailDetails().setSubjectTemplate(subjectTemplate);
216 		}
217 
218 		if (retVal.getChannelType() == Subscription.SubscriptionChannelType.RESTHOOK) {
219 			String stripVersionIds;
220 			String deliverLatestVersion;
221 			try {
222 				stripVersionIds = subscription.getChannel().getExtensionString(JpaConstants.EXT_SUBSCRIPTION_RESTHOOK_STRIP_VERSION_IDS);
223 				deliverLatestVersion = subscription.getChannel().getExtensionString(JpaConstants.EXT_SUBSCRIPTION_RESTHOOK_DELIVER_LATEST_VERSION);
224 			} catch (FHIRException theE) {
225 				throw new ConfigurationException("Failed to extract subscription extension(s): " + theE.getMessage(), theE);
226 			}
227 			retVal.getRestHookDetails().setStripVersionId(Boolean.parseBoolean(stripVersionIds));
228 			retVal.getRestHookDetails().setDeliverLatestVersion(Boolean.parseBoolean(deliverLatestVersion));
229 		}
230 
231 		List<org.hl7.fhir.r4.model.Extension> topicExts = subscription.getExtensionsByUrl("http://hl7.org/fhir/subscription/topics");
232 		if (topicExts.size() > 0) {
233 			IBaseReference ref = (IBaseReference) topicExts.get(0).getValueAsPrimitive();
234 			if (!"EventDefinition".equals(ref.getReferenceElement().getResourceType())) {
235 				throw new PreconditionFailedException("Topic reference must be an EventDefinition");
236 			}
237 
238 			org.hl7.fhir.r4.model.EventDefinition def = myEventDefinitionDaoR4.read(ref.getReferenceElement());
239 			retVal.addTrigger(new CanonicalSubscription.CanonicalEventDefinition(def));
240 		}
241 
242 		return retVal;
243 	}
244 
245 	public abstract Subscription.SubscriptionChannelType getChannelType();
246 
247 	@SuppressWarnings("unchecked")
248 	public <R extends IBaseResource> IFhirResourceDao<R> getDao(Class<R> theType) {
249 		if (myResourceTypeToDao == null) {
250 			Map<Class<? extends IBaseResource>, IFhirResourceDao<?>> theResourceTypeToDao = new HashMap<>();
251 			for (IFhirResourceDao<?> next : myResourceDaos) {
252 				theResourceTypeToDao.put(next.getResourceType(), next);
253 			}
254 
255 			if (this instanceof IFhirResourceDao<?>) {
256 				IFhirResourceDao<?> thiz = (IFhirResourceDao<?>) this;
257 				theResourceTypeToDao.put(thiz.getResourceType(), thiz);
258 			}
259 
260 			myResourceTypeToDao = theResourceTypeToDao;
261 		}
262 
263 		IFhirResourceDao<R> dao = (IFhirResourceDao<R>) myResourceTypeToDao.get(theType);
264 		return dao;
265 	}
266 
267 	public SubscribableChannel getDeliveryChannel() {
268 		return myDeliveryChannel;
269 	}
270 
271 	public void setDeliveryChannel(SubscribableChannel theDeliveryChannel) {
272 		myDeliveryChannel = theDeliveryChannel;
273 	}
274 
275 	public int getExecutorQueueSizeForUnitTests() {
276 		return myProcessingExecutorQueue.size() + myDeliveryExecutorQueue.size();
277 	}
278 
279 	public int getExecutorThreadCount() {
280 		return myExecutorThreadCount;
281 	}
282 
283 	public void setExecutorThreadCount(int theExecutorThreadCount) {
284 		Validate.inclusiveBetween(1, Integer.MAX_VALUE, theExecutorThreadCount);
285 		myExecutorThreadCount = theExecutorThreadCount;
286 	}
287 
288 	public Map<String, CanonicalSubscription> getIdToSubscription() {
289 		return Collections.unmodifiableMap(myIdToSubscription);
290 	}
291 
292 	public SubscribableChannel getProcessingChannel() {
293 		return myProcessingChannel;
294 	}
295 
296 	public void setProcessingChannel(SubscribableChannel theProcessingChannel) {
297 		myProcessingChannel = theProcessingChannel;
298 	}
299 
300 	protected IFhirResourceDao<?> getSubscriptionDao() {
301 		return mySubscriptionDao;
302 	}
303 
304 	public List<CanonicalSubscription> getSubscriptions() {
305 		return new ArrayList<>(myIdToSubscription.values());
306 	}
307 
308 	public boolean hasSubscription(IIdType theId) {
309 		Validate.notNull(theId);
310 		Validate.notBlank(theId.getIdPart());
311 		return myIdToSubscription.containsKey(theId.getIdPart());
312 	}
313 
314 	/**
315 	 * Read the existing subscriptions from the database
316 	 */
317 	@SuppressWarnings("unused")
318 	@Scheduled(fixedDelay = 10000)
319 	public void initSubscriptions() {
320 		SearchParameterMap map = new SearchParameterMap();
321 		map.add(Subscription.SP_TYPE, new TokenParam(null, getChannelType().toCode()));
322 		map.add(Subscription.SP_STATUS, new TokenOrListParam()
323 			.addOr(new TokenParam(null, Subscription.SubscriptionStatus.REQUESTED.toCode()))
324 			.addOr(new TokenParam(null, Subscription.SubscriptionStatus.ACTIVE.toCode())));
325 		map.setLoadSynchronousUpTo(MAX_SUBSCRIPTION_RESULTS);
326 
327 		RequestDetails req = new ServletSubRequestDetails();
328 		req.setSubRequest(true);
329 
330 		IBundleProvider subscriptionBundleList = getSubscriptionDao().search(map, req);
331 		if (subscriptionBundleList.size() >= MAX_SUBSCRIPTION_RESULTS) {
332 			ourLog.error("Currently over " + MAX_SUBSCRIPTION_RESULTS + " subscriptions.  Some subscriptions have not been loaded.");
333 		}
334 
335 		List<IBaseResource> resourceList = subscriptionBundleList.getResources(0, subscriptionBundleList.size());
336 
337 		Set<String> allIds = new HashSet<>();
338 		for (IBaseResource resource : resourceList) {
339 			String nextId = resource.getIdElement().getIdPart();
340 			allIds.add(nextId);
341 			mySubscriptionActivatingSubscriber.activateAndRegisterSubscriptionIfRequired(resource);
342 		}
343 
344 		for (Enumeration<String> keyEnum = myIdToSubscription.keys(); keyEnum.hasMoreElements(); ) {
345 			String next = keyEnum.nextElement();
346 			if (!allIds.contains(next)) {
347 				ourLog.info("Unregistering Subscription/{} as it no longer exists", next);
348 				myIdToSubscription.remove(next);
349 			}
350 		}
351 	}
352 
353 	@SuppressWarnings("unused")
354 	@PreDestroy
355 	public void preDestroy() {
356 		getProcessingChannel().unsubscribe(mySubscriptionCheckingSubscriber);
357 
358 		unregisterDeliverySubscriber();
359 	}
360 
361 	protected abstract void registerDeliverySubscriber();
362 
363 	public void registerSubscription(IIdType theId, S theSubscription) {
364 		Validate.notNull(theId);
365 		Validate.notBlank(theId.getIdPart());
366 		Validate.notNull(theSubscription);
367 
368 		myIdToSubscription.put(theId.getIdPart(), canonicalize(theSubscription));
369 	}
370 
371 	protected void registerSubscriptionCheckingSubscriber() {
372 		if (mySubscriptionCheckingSubscriber == null) {
373 			mySubscriptionCheckingSubscriber = new SubscriptionCheckingSubscriber(getSubscriptionDao(), getChannelType(), this);
374 		}
375 		getProcessingChannel().subscribe(mySubscriptionCheckingSubscriber);
376 	}
377 
378 	@Override
379 	public void resourceCreated(RequestDetails theRequest, IBaseResource theResource) {
380 		ResourceModifiedMessage msg = new ResourceModifiedMessage();
381 		msg.setId(theResource.getIdElement());
382 		msg.setOperationType(RestOperationTypeEnum.CREATE);
383 		msg.setNewPayload(myCtx, theResource);
384 		submitResourceModified(msg);
385 	}
386 
387 	@Override
388 	public void resourceDeleted(RequestDetails theRequest, IBaseResource theResource) {
389 		ResourceModifiedMessage msg = new ResourceModifiedMessage();
390 		msg.setId(theResource.getIdElement());
391 		msg.setOperationType(RestOperationTypeEnum.DELETE);
392 		submitResourceModified(msg);
393 	}
394 
395 	@Override
396 	public void resourceUpdated(RequestDetails theRequest, IBaseResource theOldResource, IBaseResource theNewResource) {
397 		ResourceModifiedMessage msg = new ResourceModifiedMessage();
398 		msg.setId(theNewResource.getIdElement());
399 		msg.setOperationType(RestOperationTypeEnum.UPDATE);
400 		msg.setNewPayload(myCtx, theNewResource);
401 		submitResourceModified(msg);
402 	}
403 
404 	protected void sendToProcessingChannel(final ResourceModifiedMessage theMessage) {
405 		ourLog.trace("Registering synchronization to send resource modified message to processing channel");
406 
407 		/*
408 		 * We only actually submit this item work working after the
409 		 */
410 		TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
411 			@Override
412 			public void afterCommit() {
413 				ourLog.trace("Sending resource modified message to processing channel");
414 				getProcessingChannel().send(new ResourceModifiedJsonMessage(theMessage));
415 			}
416 		});
417 	}
418 
419 	@VisibleForTesting
420 	public void setAsyncTaskExecutorForUnitTest(AsyncTaskExecutor theAsyncTaskExecutor) {
421 		myAsyncTaskExecutor = theAsyncTaskExecutor;
422 	}
423 
424 	public void setFhirContext(FhirContext theCtx) {
425 		myCtx = theCtx;
426 	}
427 
428 	public void setResourceDaos(List<IFhirResourceDao<?>> theResourceDaos) {
429 		myResourceDaos = theResourceDaos;
430 	}
431 
432 	@VisibleForTesting
433 	public void setTxManager(PlatformTransactionManager theTxManager) {
434 		myTxManager = theTxManager;
435 	}
436 
437 	@PostConstruct
438 	public void start() {
439 		for (IFhirResourceDao<?> next : myResourceDaos) {
440 			if (myCtx.getResourceDefinition(next.getResourceType()).getName().equals("Subscription")) {
441 				mySubscriptionDao = next;
442 			}
443 		}
444 		Validate.notNull(mySubscriptionDao);
445 
446 		if (myCtx.getVersion().getVersion() == FhirVersionEnum.R4) {
447 			Validate.notNull(myEventDefinitionDaoR4);
448 		}
449 
450 		if (getProcessingChannel() == null) {
451 			myProcessingExecutorQueue = new LinkedBlockingQueue<>(1000);
452 			RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
453 				@Override
454 				public void rejectedExecution(Runnable theRunnable, ThreadPoolExecutor theExecutor) {
455 					ourLog.info("Note: Executor queue is full ({} elements), waiting for a slot to become available!", myProcessingExecutorQueue.size());
456 					StopWatch sw = new StopWatch();
457 					try {
458 						myProcessingExecutorQueue.put(theRunnable);
459 					} catch (InterruptedException theE) {
460 						throw new RejectedExecutionException("Task " + theRunnable.toString() +
461 							" rejected from " + theE.toString());
462 					}
463 					ourLog.info("Slot become available after {}ms", sw.getMillis());
464 				}
465 			};
466 			ThreadFactory threadFactory = new BasicThreadFactory.Builder()
467 				.namingPattern("subscription-proc-%d")
468 				.daemon(false)
469 				.priority(Thread.NORM_PRIORITY)
470 				.build();
471 			myProcessingExecutor = new ThreadPoolExecutor(
472 				1,
473 				getExecutorThreadCount(),
474 				0L,
475 				TimeUnit.MILLISECONDS,
476 				myProcessingExecutorQueue,
477 				threadFactory,
478 				rejectedExecutionHandler);
479 			setProcessingChannel(new ExecutorSubscribableChannel(myProcessingExecutor));
480 		}
481 
482 		if (getDeliveryChannel() == null) {
483 			myDeliveryExecutorQueue = new LinkedBlockingQueue<>(1000);
484 			BasicThreadFactory threadFactory = new BasicThreadFactory.Builder()
485 				.namingPattern("subscription-delivery-%d")
486 				.daemon(false)
487 				.priority(Thread.NORM_PRIORITY)
488 				.build();
489 			RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
490 				@Override
491 				public void rejectedExecution(Runnable theRunnable, ThreadPoolExecutor theExecutor) {
492 					ourLog.info("Note: Executor queue is full ({} elements), waiting for a slot to become available!", myDeliveryExecutorQueue.size());
493 					StopWatch sw = new StopWatch();
494 					try {
495 						myDeliveryExecutorQueue.put(theRunnable);
496 					} catch (InterruptedException theE) {
497 						throw new RejectedExecutionException("Task " + theRunnable.toString() +
498 							" rejected from " + theE.toString());
499 					}
500 					ourLog.info("Slot become available after {}ms", sw.getMillis());
501 				}
502 			};
503 			myDeliveryExecutor = new ThreadPoolExecutor(
504 				1,
505 				getExecutorThreadCount(),
506 				0L,
507 				TimeUnit.MILLISECONDS,
508 				myDeliveryExecutorQueue,
509 				threadFactory,
510 				rejectedExecutionHandler);
511 			setDeliveryChannel(new ExecutorSubscribableChannel(myDeliveryExecutor));
512 		}
513 
514 		if (mySubscriptionActivatingSubscriber == null) {
515 			mySubscriptionActivatingSubscriber = new SubscriptionActivatingSubscriber(getSubscriptionDao(), getChannelType(), this, myTxManager, myAsyncTaskExecutor);
516 		}
517 
518 		registerSubscriptionCheckingSubscriber();
519 		registerDeliverySubscriber();
520 
521 		TransactionTemplate transactionTemplate = new TransactionTemplate(myTxManager);
522 		transactionTemplate.execute(new TransactionCallbackWithoutResult() {
523 			@Override
524 			protected void doInTransactionWithoutResult(TransactionStatus status) {
525 				initSubscriptions();
526 			}
527 		});
528 	}
529 
530 	protected void submitResourceModified(final ResourceModifiedMessage theMsg) {
531 		mySubscriptionActivatingSubscriber.handleMessage(theMsg.getOperationType(), theMsg.getId(myCtx), theMsg.getNewPayload(myCtx));
532 		sendToProcessingChannel(theMsg);
533 	}
534 
535 	protected abstract void unregisterDeliverySubscriber();
536 
537 	public void unregisterSubscription(IIdType theId) {
538 		Validate.notNull(theId);
539 		Validate.notBlank(theId.getIdPart());
540 
541 		myIdToSubscription.remove(theId.getIdPart());
542 	}
543 
544 
545 }