View Javadoc
1   package ca.uhn.fhir.jpa.subscription;
2   
3   import ca.uhn.fhir.context.FhirContext;
4   import ca.uhn.fhir.interceptor.api.*;
5   import ca.uhn.fhir.jpa.subscription.module.LinkedBlockingQueueSubscribableChannel;
6   import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
7   import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionChannelFactory;
8   import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceModifiedJsonMessage;
9   import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriber;
10  import ca.uhn.fhir.jpa.util.JpaInterceptorBroadcaster;
11  import ca.uhn.fhir.rest.api.server.RequestDetails;
12  import com.google.common.annotations.VisibleForTesting;
13  import org.apache.commons.lang3.Validate;
14  import org.hl7.fhir.instance.model.api.IBaseResource;
15  import org.slf4j.Logger;
16  import org.slf4j.LoggerFactory;
17  import org.springframework.beans.factory.annotation.Autowired;
18  import org.springframework.context.annotation.Lazy;
19  import org.springframework.messaging.SubscribableChannel;
20  import org.springframework.stereotype.Component;
21  import org.springframework.transaction.support.TransactionSynchronizationAdapter;
22  import org.springframework.transaction.support.TransactionSynchronizationManager;
23  
24  import javax.annotation.PreDestroy;
25  
26  /*-
27   * #%L
28   * HAPI FHIR JPA Server
29   * %%
30   * Copyright (C) 2014 - 2019 University Health Network
31   * %%
32   * Licensed under the Apache License, Version 2.0 (the "License");
33   * you may not use this file except in compliance with the License.
34   * You may obtain a copy of the License at
35   *
36   *      http://www.apache.org/licenses/LICENSE-2.0
37   *
38   * Unless required by applicable law or agreed to in writing, software
39   * distributed under the License is distributed on an "AS IS" BASIS,
40   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
41   * See the License for the specific language governing permissions and
42   * limitations under the License.
43   * #L%
44   */
45  
46  @Component
47  @Lazy
48  @Interceptor()
49  public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer {
50  	public static final String SUBSCRIPTION_MATCHING_CHANNEL_NAME = "subscription-matching";
51  	protected SubscribableChannel myMatchingChannel;
52  	@Autowired
53  	protected SubscriptionChannelFactory mySubscriptionChannelFactory;
54  	private Logger ourLog = LoggerFactory.getLogger(SubscriptionMatcherInterceptor.class);
55  	@Autowired
56  	private FhirContext myFhirContext;
57  	@Autowired
58  	private SubscriptionMatchingSubscriber mySubscriptionMatchingSubscriber;
59  	@Autowired
60  	private IInterceptorBroadcaster myInterceptorBroadcaster;
61  
62  	/**
63  	 * Constructor
64  	 */
65  	public SubscriptionMatcherInterceptor() {
66  		super();
67  	}
68  
69  	public void start() {
70  		if (myMatchingChannel == null) {
71  			myMatchingChannel = mySubscriptionChannelFactory.newMatchingChannel(SUBSCRIPTION_MATCHING_CHANNEL_NAME);
72  		}
73  		myMatchingChannel.subscribe(mySubscriptionMatchingSubscriber);
74  		ourLog.info("Subscription Matching Subscriber subscribed to Matching Channel {} with name {}", myMatchingChannel.getClass().getName(), SUBSCRIPTION_MATCHING_CHANNEL_NAME);
75  
76  	}
77  
78  	@SuppressWarnings("unused")
79  	@PreDestroy
80  	public void preDestroy() {
81  
82  		if (myMatchingChannel != null) {
83  			myMatchingChannel.unsubscribe(mySubscriptionMatchingSubscriber);
84  		}
85  	}
86  
87  	@Hook(Pointcut.STORAGE_PRECOMMIT_RESOURCE_CREATED)
88  	public void resourceCreated(IBaseResource theResource, RequestDetails theRequest) {
89  		submitResourceModified(theResource, ResourceModifiedMessage.OperationTypeEnum.CREATE, theRequest);
90  	}
91  
92  	@Hook(Pointcut.STORAGE_PRECOMMIT_RESOURCE_DELETED)
93  	public void resourceDeleted(IBaseResource theResource, RequestDetails theRequest) {
94  		submitResourceModified(theResource, ResourceModifiedMessage.OperationTypeEnum.DELETE, theRequest);
95  	}
96  
97  	@Hook(Pointcut.STORAGE_PRECOMMIT_RESOURCE_UPDATED)
98  	public void resourceUpdated(IBaseResource theOldResource, IBaseResource theNewResource, RequestDetails theRequest) {
99  		submitResourceModified(theNewResource, ResourceModifiedMessage.OperationTypeEnum.UPDATE, theRequest);
100 	}
101 
102 	private void submitResourceModified(IBaseResource theNewResource, ResourceModifiedMessage.OperationTypeEnum theOperationType, RequestDetails theRequest) {
103 		ResourceModifiedMessage msg = new ResourceModifiedMessage(myFhirContext, theNewResource, theOperationType);
104 		// Interceptor call: SUBSCRIPTION_RESOURCE_MODIFIED
105 		HookParams params = new HookParams()
106 			.add(ResourceModifiedMessage.class, msg);
107 		boolean outcome = JpaInterceptorBroadcaster.doCallHooks(myInterceptorBroadcaster, theRequest, Pointcut.SUBSCRIPTION_RESOURCE_MODIFIED, params);
108 		if (!outcome) {
109 			return;
110 		}
111 
112 		submitResourceModified(msg);
113 	}
114 
115 	protected void sendToProcessingChannel(final ResourceModifiedMessage theMessage) {
116 		ourLog.trace("Sending resource modified message to processing channel");
117 		Validate.notNull(myMatchingChannel, "A SubscriptionMatcherInterceptor has been registered without calling start() on it.");
118 		myMatchingChannel.send(new ResourceModifiedJsonMessage(theMessage));
119 	}
120 
121 	public void setFhirContext(FhirContext theCtx) {
122 		myFhirContext = theCtx;
123 	}
124 
125 	/**
126 	 * This is an internal API - Use with caution!
127 	 */
128 	@Override
129 	public void submitResourceModified(final ResourceModifiedMessage theMsg) {
130 		/*
131 		 * We only want to submit the message to the processing queue once the
132 		 * transaction is committed. We do this in order to make sure that the
133 		 * data is actually in the DB, in case it's the database matcher.
134 		 */
135 		if (TransactionSynchronizationManager.isSynchronizationActive()) {
136 			TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
137 				@Override
138 				public int getOrder() {
139 					return 0;
140 				}
141 
142 				@Override
143 				public void afterCommit() {
144 					sendToProcessingChannel(theMsg);
145 				}
146 			});
147 		} else {
148 			sendToProcessingChannel(theMsg);
149 		}
150 	}
151 
152 	@VisibleForTesting
153 	LinkedBlockingQueueSubscribableChannel getProcessingChannelForUnitTest() {
154 		return (LinkedBlockingQueueSubscribableChannel) myMatchingChannel;
155 	}
156 }