View Javadoc
1   package ca.uhn.fhir.jpa.subscription;
2   
3   /*-
4    * #%L
5    * HAPI FHIR JPA Server
6    * %%
7    * Copyright (C) 2014 - 2018 University Health Network
8    * %%
9    * Licensed under the Apache License, Version 2.0 (the "License");
10   * you may not use this file except in compliance with the License.
11   * You may obtain a copy of the License at
12   * 
13   *      http://www.apache.org/licenses/LICENSE-2.0
14   * 
15   * Unless required by applicable law or agreed to in writing, software
16   * distributed under the License is distributed on an "AS IS" BASIS,
17   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18   * See the License for the specific language governing permissions and
19   * limitations under the License.
20   * #L%
21   */
22  
23  import ca.uhn.fhir.context.FhirContext;
24  import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
25  import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
26  import ca.uhn.fhir.util.SubscriptionUtil;
27  import com.google.common.annotations.VisibleForTesting;
28  import org.apache.commons.lang3.Validate;
29  import org.hl7.fhir.instance.model.api.IBaseResource;
30  import org.hl7.fhir.instance.model.api.IIdType;
31  import org.hl7.fhir.instance.model.api.IPrimitiveType;
32  import org.hl7.fhir.r4.model.Subscription;
33  import org.slf4j.Logger;
34  import org.slf4j.LoggerFactory;
35  import org.springframework.core.task.AsyncTaskExecutor;
36  import org.springframework.messaging.MessagingException;
37  import org.springframework.transaction.PlatformTransactionManager;
38  import org.springframework.transaction.TransactionStatus;
39  import org.springframework.transaction.support.TransactionCallbackWithoutResult;
40  import org.springframework.transaction.support.TransactionSynchronizationAdapter;
41  import org.springframework.transaction.support.TransactionSynchronizationManager;
42  import org.springframework.transaction.support.TransactionTemplate;
43  
44  import java.util.concurrent.Future;
45  import java.util.concurrent.TimeUnit;
46  
47  @SuppressWarnings("unchecked")
48  public class SubscriptionActivatingSubscriber {
49  	private static boolean ourWaitForSubscriptionActivationSynchronouslyForUnitTest;
50  	private final IFhirResourceDao mySubscriptionDao;
51  	private final BaseSubscriptionInterceptor mySubscriptionInterceptor;
52  	private final PlatformTransactionManager myTransactionManager;
53  	private final AsyncTaskExecutor myTaskExecutor;
54  	private Logger ourLog = LoggerFactory.getLogger(SubscriptionActivatingSubscriber.class);
55  	private FhirContext myCtx;
56  	private Subscription.SubscriptionChannelType myChannelType;
57  
58  
59  	/**
60  	 * Constructor
61  	 */
62  	public SubscriptionActivatingSubscriber(IFhirResourceDao<? extends IBaseResource> theSubscriptionDao, Subscription.SubscriptionChannelType theChannelType, BaseSubscriptionInterceptor theSubscriptionInterceptor, PlatformTransactionManager theTransactionManager, AsyncTaskExecutor theTaskExecutor) {
63  		mySubscriptionDao = theSubscriptionDao;
64  		mySubscriptionInterceptor = theSubscriptionInterceptor;
65  		myChannelType = theChannelType;
66  		myCtx = theSubscriptionDao.getContext();
67  		myTransactionManager = theTransactionManager;
68  		myTaskExecutor = theTaskExecutor;
69  		Validate.notNull(theTaskExecutor);
70  	}
71  
72  	public boolean activateOrRegisterSubscriptionIfRequired(final IBaseResource theSubscription) {
73  		// Grab the value for "Subscription.channel.type" so we can see if this
74  		// subscriber applies..
75  		String subscriptionChannelType = myCtx
76  			.newTerser()
77  			.getSingleValueOrNull(theSubscription, BaseSubscriptionInterceptor.SUBSCRIPTION_TYPE, IPrimitiveType.class)
78  			.getValueAsString();
79  		boolean subscriptionTypeApplies = BaseSubscriptionSubscriber.subscriptionTypeApplies(subscriptionChannelType, myChannelType);
80  		if (subscriptionTypeApplies == false) {
81  			return false;
82  		}
83  
84  		final IPrimitiveType<?> status = myCtx.newTerser().getSingleValueOrNull(theSubscription, BaseSubscriptionInterceptor.SUBSCRIPTION_STATUS, IPrimitiveType.class);
85  		String statusString = status.getValueAsString();
86  
87  		final String requestedStatus = Subscription.SubscriptionStatus.REQUESTED.toCode();
88  		final String activeStatus = Subscription.SubscriptionStatus.ACTIVE.toCode();
89  		if (requestedStatus.equals(statusString)) {
90  			if (TransactionSynchronizationManager.isSynchronizationActive()) {
91  				/*
92  				 * If we're in a transaction, we don't want to try and change the status from
93  				 * requested to active within the same transaction because it's too late by
94  				 * the time we get here to make modifications to the payload.
95  				 *
96  				 * So, we register a synchronization, meaning that when the transaction is
97  				 * finished, we'll schedule a task to do this in a separate worker thread
98  				 * to avoid any possibility of conflict.
99  				 */
100 				TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
101 					@Override
102 					public void afterCommit() {
103 						Future<?> activationFuture = myTaskExecutor.submit(new Runnable() {
104 							@Override
105 							public void run() {
106 								activateSubscription(activeStatus, theSubscription, requestedStatus);
107 							}
108 						});
109 
110 						/*
111 						 * If we're running in a unit test, it's nice to be predictable in
112 						 * terms of order... In the real world it's a recipe for deadlocks
113 						 */
114 						if (ourWaitForSubscriptionActivationSynchronouslyForUnitTest) {
115 							try {
116 								activationFuture.get(5, TimeUnit.SECONDS);
117 							} catch (Exception e) {
118 								ourLog.error("Failed to activate subscription", e);
119 							}
120 						}
121 					}
122 				});
123 				return true;
124 			} else {
125 				return activateSubscription(activeStatus, theSubscription, requestedStatus);
126 			}
127 		} else if (activeStatus.equals(statusString)) {
128 			return registerSubscriptionUnlessAlreadyRegistered(theSubscription);
129 		} else {
130 			// Status isn't "active" or "requested"
131 			return unregisterSubscriptionIfRegistered(theSubscription, statusString);
132 		}
133 	}
134 
135 	protected boolean unregisterSubscriptionIfRegistered(IBaseResource theSubscription, String theStatusString) {
136 		if (mySubscriptionInterceptor.hasSubscription(theSubscription.getIdElement()) != null) {
137 			ourLog.info("Removing {} subscription {}", theStatusString, theSubscription.getIdElement().toUnqualified().getValue());
138 			mySubscriptionInterceptor.unregisterSubscription(theSubscription.getIdElement());
139 			return true;
140 		}
141 		return false;
142 	}
143 
144 	private boolean activateSubscription(String theActiveStatus, final IBaseResource theSubscription, String theRequestedStatus) {
145 		IBaseResource subscription = mySubscriptionDao.read(theSubscription.getIdElement());
146 
147 		ourLog.info("Activating subscription {} from status {} to {} for channel {}", subscription.getIdElement().toUnqualified().getValue(), theRequestedStatus, theActiveStatus, myChannelType);
148 		try {
149 			SubscriptionUtil.setStatus(myCtx, subscription, theActiveStatus);
150 			subscription = mySubscriptionDao.update(subscription).getResource();
151 			mySubscriptionInterceptor.submitResourceModifiedForUpdate(subscription);
152 			return true;
153 		} catch (final UnprocessableEntityException e) {
154 			ourLog.info("Changing status of {} to ERROR", subscription.getIdElement());
155 			SubscriptionUtil.setStatus(myCtx, subscription, "error");
156 			SubscriptionUtil.setReason(myCtx, subscription, e.getMessage());
157 			mySubscriptionDao.update(subscription);
158 			return false;
159 		}
160 
161 	}
162 
163 	@SuppressWarnings("EnumSwitchStatementWhichMissesCases")
164 	public void handleMessage(ResourceModifiedMessage.OperationTypeEnum theOperationType, IIdType theId, final IBaseResource theSubscription) throws MessagingException {
165 
166 		switch (theOperationType) {
167 			case DELETE:
168 				mySubscriptionInterceptor.unregisterSubscription(theId);
169 				return;
170 			case CREATE:
171 			case UPDATE:
172 				if (!theId.getResourceType().equals("Subscription")) {
173 					return;
174 				}
175 				activateAndRegisterSubscriptionIfRequiredInTransaction(theSubscription);
176 				break;
177 			default:
178 				break;
179 		}
180 
181 	}
182 
183 	private void activateAndRegisterSubscriptionIfRequiredInTransaction(IBaseResource theSubscription) {
184 		TransactionTemplate txTemplate = new TransactionTemplate(myTransactionManager);
185 		txTemplate.execute(new TransactionCallbackWithoutResult() {
186 			@Override
187 			protected void doInTransactionWithoutResult(TransactionStatus status) {
188 				activateOrRegisterSubscriptionIfRequired(theSubscription);
189 			}
190 		});
191 	}
192 
193 	protected synchronized boolean registerSubscriptionUnlessAlreadyRegistered(IBaseResource theSubscription) {
194 		CanonicalSubscription existingSubscription = mySubscriptionInterceptor.hasSubscription(theSubscription.getIdElement());
195 		CanonicalSubscription newSubscription = mySubscriptionInterceptor.canonicalize(theSubscription);
196 
197 		if (existingSubscription != null) {
198 			if (newSubscription.equals(existingSubscription)) {
199 				// No changes
200 				return false;
201 			}
202 		}
203 
204 		if (existingSubscription != null) {
205 			ourLog.info("Updating already-registered active subscription {}", theSubscription.getIdElement().toUnqualified().getValue());
206 			mySubscriptionInterceptor.unregisterSubscription(theSubscription.getIdElement());
207 		} else {
208 			ourLog.info("Registering active subscription {}", theSubscription.getIdElement().toUnqualified().getValue());
209 		}
210 		mySubscriptionInterceptor.registerSubscription(theSubscription.getIdElement(), theSubscription);
211 		return true;
212 	}
213 
214 	@VisibleForTesting
215 	public static void setWaitForSubscriptionActivationSynchronouslyForUnitTest(boolean theWaitForSubscriptionActivationSynchronouslyForUnitTest) {
216 		ourWaitForSubscriptionActivationSynchronouslyForUnitTest = theWaitForSubscriptionActivationSynchronouslyForUnitTest;
217 	}
218 
219 }