View Javadoc
1   package ca.uhn.fhir.jpa.subscription;
2   
3   import ca.uhn.fhir.context.FhirContext;
4   import ca.uhn.fhir.interceptor.api.*;
5   import ca.uhn.fhir.jpa.subscription.module.LinkedBlockingQueueSubscribableChannel;
6   import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
7   import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionChannelFactory;
8   import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceModifiedJsonMessage;
9   import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriber;
10  import com.google.common.annotations.VisibleForTesting;
11  import org.apache.commons.lang3.Validate;
12  import org.hl7.fhir.instance.model.api.IBaseResource;
13  import org.slf4j.Logger;
14  import org.slf4j.LoggerFactory;
15  import org.springframework.beans.factory.annotation.Autowired;
16  import org.springframework.context.annotation.Lazy;
17  import org.springframework.messaging.SubscribableChannel;
18  import org.springframework.stereotype.Component;
19  import org.springframework.transaction.support.TransactionSynchronizationAdapter;
20  import org.springframework.transaction.support.TransactionSynchronizationManager;
21  
22  import javax.annotation.PreDestroy;
23  
24  /*-
25   * #%L
26   * HAPI FHIR JPA Server
27   * %%
28   * Copyright (C) 2014 - 2019 University Health Network
29   * %%
30   * Licensed under the Apache License, Version 2.0 (the "License");
31   * you may not use this file except in compliance with the License.
32   * You may obtain a copy of the License at
33   * 
34   *      http://www.apache.org/licenses/LICENSE-2.0
35   * 
36   * Unless required by applicable law or agreed to in writing, software
37   * distributed under the License is distributed on an "AS IS" BASIS,
38   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
39   * See the License for the specific language governing permissions and
40   * limitations under the License.
41   * #L%
42   */
43  
44  @Component
45  @Lazy
46  @Interceptor()
47  public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer {
48  	public static final String SUBSCRIPTION_MATCHING_CHANNEL_NAME = "subscription-matching";
49  	protected SubscribableChannel myMatchingChannel;
50  	@Autowired
51  	protected SubscriptionChannelFactory mySubscriptionChannelFactory;
52  	private Logger ourLog = LoggerFactory.getLogger(SubscriptionMatcherInterceptor.class);
53  	@Autowired
54  	private FhirContext myFhirContext;
55  	@Autowired
56  	private SubscriptionMatchingSubscriber mySubscriptionMatchingSubscriber;
57  	@Autowired
58  	private IInterceptorBroadcaster myInterceptorBroadcaster;
59  
60  	/**
61  	 * Constructor
62  	 */
63  	public SubscriptionMatcherInterceptor() {
64  		super();
65  	}
66  
67  	public void start() {
68  		if (myMatchingChannel == null) {
69  			myMatchingChannel = mySubscriptionChannelFactory.newMatchingChannel(SUBSCRIPTION_MATCHING_CHANNEL_NAME);
70  		}
71  		myMatchingChannel.subscribe(mySubscriptionMatchingSubscriber);
72  		ourLog.info("Subscription Matching Subscriber subscribed to Matching Channel {} with name {}", myMatchingChannel.getClass().getName(), SUBSCRIPTION_MATCHING_CHANNEL_NAME);
73  
74  	}
75  
76  	@SuppressWarnings("unused")
77  	@PreDestroy
78  	public void preDestroy() {
79  
80  		if (myMatchingChannel != null) {
81  			myMatchingChannel.unsubscribe(mySubscriptionMatchingSubscriber);
82  		}
83  	}
84  
85  	@Hook(Pointcut.STORAGE_PRECOMMIT_RESOURCE_CREATED)
86  	public void resourceCreated(IBaseResource theResource) {
87  		submitResourceModified(theResource, ResourceModifiedMessage.OperationTypeEnum.CREATE);
88  	}
89  
90  	@Hook(Pointcut.STORAGE_PRECOMMIT_RESOURCE_DELETED)
91  	public void resourceDeleted(IBaseResource theResource) {
92  		submitResourceModified(theResource, ResourceModifiedMessage.OperationTypeEnum.DELETE);
93  	}
94  
95  	@Hook(Pointcut.STORAGE_PRECOMMIT_RESOURCE_UPDATED)
96  	public void resourceUpdated(IBaseResource theOldResource, IBaseResource theNewResource) {
97  		submitResourceModified(theNewResource, ResourceModifiedMessage.OperationTypeEnum.UPDATE);
98  	}
99  
100 	private void submitResourceModified(IBaseResource theNewResource, ResourceModifiedMessage.OperationTypeEnum theOperationType) {
101 		ResourceModifiedMessage msg = new ResourceModifiedMessage(myFhirContext, theNewResource, theOperationType);
102 		// Interceptor call: SUBSCRIPTION_RESOURCE_MODIFIED
103 		HookParams params = new HookParams()
104 			.add(ResourceModifiedMessage.class, msg);
105 		if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_RESOURCE_MODIFIED, params)) {
106 			return;
107 		}
108 
109 		submitResourceModified(msg);
110 	}
111 
112 	protected void sendToProcessingChannel(final ResourceModifiedMessage theMessage) {
113 		ourLog.trace("Sending resource modified message to processing channel");
114 		Validate.notNull(myMatchingChannel, "A SubscriptionMatcherInterceptor has been registered without calling start() on it.");
115 		myMatchingChannel.send(new ResourceModifiedJsonMessage(theMessage));
116 	}
117 
118 	public void setFhirContext(FhirContext theCtx) {
119 		myFhirContext = theCtx;
120 	}
121 
122 	/**
123 	 * This is an internal API - Use with caution!
124 	 */
125 	@Override
126 	public void submitResourceModified(final ResourceModifiedMessage theMsg) {
127 		/*
128 		 * We only want to submit the message to the processing queue once the
129 		 * transaction is committed. We do this in order to make sure that the
130 		 * data is actually in the DB, in case it's the database matcher.
131 		 */
132 		if (TransactionSynchronizationManager.isSynchronizationActive()) {
133 			TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
134 				@Override
135 				public int getOrder() {
136 					return 0;
137 				}
138 
139 				@Override
140 				public void afterCommit() {
141 					sendToProcessingChannel(theMsg);
142 				}
143 			});
144 		} else {
145 			sendToProcessingChannel(theMsg);
146 		}
147 	}
148 
149 	@VisibleForTesting
150 	LinkedBlockingQueueSubscribableChannel getProcessingChannelForUnitTest() {
151 		return (LinkedBlockingQueueSubscribableChannel) myMatchingChannel;
152 	}
153 }