View Javadoc
1   package ca.uhn.fhir.jpa.subscription;
2   
3   /*-
4    * #%L
5    * HAPI FHIR JPA Server
6    * %%
7    * Copyright (C) 2014 - 2018 University Health Network
8    * %%
9    * Licensed under the Apache License, Version 2.0 (the "License");
10   * you may not use this file except in compliance with the License.
11   * You may obtain a copy of the License at
12   * 
13   *      http://www.apache.org/licenses/LICENSE-2.0
14   * 
15   * Unless required by applicable law or agreed to in writing, software
16   * distributed under the License is distributed on an "AS IS" BASIS,
17   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18   * See the License for the specific language governing permissions and
19   * limitations under the License.
20   * #L%
21   */
22  
23  import ca.uhn.fhir.context.FhirContext;
24  import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
25  import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
26  import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
27  import ca.uhn.fhir.util.SubscriptionUtil;
28  import com.google.common.annotations.VisibleForTesting;
29  import org.apache.commons.lang3.Validate;
30  import org.hl7.fhir.instance.model.api.IBaseResource;
31  import org.hl7.fhir.instance.model.api.IIdType;
32  import org.hl7.fhir.instance.model.api.IPrimitiveType;
33  import org.hl7.fhir.r4.model.Subscription;
34  import org.slf4j.Logger;
35  import org.slf4j.LoggerFactory;
36  import org.springframework.core.task.AsyncTaskExecutor;
37  import org.springframework.messaging.MessagingException;
38  import org.springframework.transaction.PlatformTransactionManager;
39  import org.springframework.transaction.TransactionStatus;
40  import org.springframework.transaction.support.TransactionCallbackWithoutResult;
41  import org.springframework.transaction.support.TransactionSynchronizationAdapter;
42  import org.springframework.transaction.support.TransactionSynchronizationManager;
43  import org.springframework.transaction.support.TransactionTemplate;
44  
45  import java.util.concurrent.Future;
46  import java.util.concurrent.TimeUnit;
47  
48  @SuppressWarnings("unchecked")
49  public class SubscriptionActivatingSubscriber {
50  	private static boolean ourWaitForSubscriptionActivationSynchronouslyForUnitTest;
51  	private final IFhirResourceDao mySubscriptionDao;
52  	private final BaseSubscriptionInterceptor mySubscriptionInterceptor;
53  	private final PlatformTransactionManager myTransactionManager;
54  	private final AsyncTaskExecutor myTaskExecutor;
55  	private Logger ourLog = LoggerFactory.getLogger(SubscriptionActivatingSubscriber.class);
56  	private FhirContext myCtx;
57  	private Subscription.SubscriptionChannelType myChannelType;
58  
59  
60  	/**
61  	 * Constructor
62  	 */
63  	public SubscriptionActivatingSubscriber(IFhirResourceDao<? extends IBaseResource> theSubscriptionDao, Subscription.SubscriptionChannelType theChannelType, BaseSubscriptionInterceptor theSubscriptionInterceptor, PlatformTransactionManager theTransactionManager, AsyncTaskExecutor theTaskExecutor) {
64  		mySubscriptionDao = theSubscriptionDao;
65  		mySubscriptionInterceptor = theSubscriptionInterceptor;
66  		myChannelType = theChannelType;
67  		myCtx = theSubscriptionDao.getContext();
68  		myTransactionManager = theTransactionManager;
69  		myTaskExecutor = theTaskExecutor;
70  		Validate.notNull(theTaskExecutor);
71  	}
72  
73  	public synchronized boolean activateOrRegisterSubscriptionIfRequired(final IBaseResource theSubscription) {
74  		// Grab the value for "Subscription.channel.type" so we can see if this
75  		// subscriber applies..
76  		String subscriptionChannelType = myCtx
77  			.newTerser()
78  			.getSingleValueOrNull(theSubscription, BaseSubscriptionInterceptor.SUBSCRIPTION_TYPE, IPrimitiveType.class)
79  			.getValueAsString();
80  		boolean subscriptionTypeApplies = BaseSubscriptionSubscriber.subscriptionTypeApplies(subscriptionChannelType, myChannelType);
81  		if (subscriptionTypeApplies == false) {
82  			return false;
83  		}
84  
85  		final IPrimitiveType<?> status = myCtx.newTerser().getSingleValueOrNull(theSubscription, BaseSubscriptionInterceptor.SUBSCRIPTION_STATUS, IPrimitiveType.class);
86  		String statusString = status.getValueAsString();
87  
88  		final String requestedStatus = Subscription.SubscriptionStatus.REQUESTED.toCode();
89  		final String activeStatus = Subscription.SubscriptionStatus.ACTIVE.toCode();
90  		if (requestedStatus.equals(statusString)) {
91  			if (TransactionSynchronizationManager.isSynchronizationActive()) {
92  				/*
93  				 * If we're in a transaction, we don't want to try and change the status from
94  				 * requested to active within the same transaction because it's too late by
95  				 * the time we get here to make modifications to the payload.
96  				 *
97  				 * So, we register a synchronization, meaning that when the transaction is
98  				 * finished, we'll schedule a task to do this in a separate worker thread
99  				 * to avoid any possibility of conflict.
100 				 */
101 				TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
102 					@Override
103 					public void afterCommit() {
104 						Future<?> activationFuture = myTaskExecutor.submit(new Runnable() {
105 							@Override
106 							public void run() {
107 								activateSubscription(activeStatus, theSubscription, requestedStatus);
108 							}
109 						});
110 
111 						/*
112 						 * If we're running in a unit test, it's nice to be predictable in
113 						 * terms of order... In the real world it's a recipe for deadlocks
114 						 */
115 						if (ourWaitForSubscriptionActivationSynchronouslyForUnitTest) {
116 							try {
117 								activationFuture.get(5, TimeUnit.SECONDS);
118 							} catch (Exception e) {
119 								ourLog.error("Failed to activate subscription", e);
120 							}
121 						}
122 					}
123 				});
124 				return true;
125 			} else {
126 				return activateSubscription(activeStatus, theSubscription, requestedStatus);
127 			}
128 		} else if (activeStatus.equals(statusString)) {
129 			return registerSubscriptionUnlessAlreadyRegistered(theSubscription);
130 		} else {
131 			// Status isn't "active" or "requested"
132 			return unregisterSubscriptionIfRegistered(theSubscription, statusString);
133 		}
134 	}
135 
136 	protected boolean unregisterSubscriptionIfRegistered(IBaseResource theSubscription, String theStatusString) {
137 		if (mySubscriptionInterceptor.hasSubscription(theSubscription.getIdElement()) != null) {
138 			ourLog.info("Removing {} subscription {}", theStatusString, theSubscription.getIdElement().toUnqualified().getValue());
139 			mySubscriptionInterceptor.unregisterSubscription(theSubscription.getIdElement());
140 			return true;
141 		}
142 		return false;
143 	}
144 
145 	private boolean activateSubscription(String theActiveStatus, final IBaseResource theSubscription, String theRequestedStatus) {
146 		IBaseResource subscription = mySubscriptionDao.read(theSubscription.getIdElement());
147 
148 		ourLog.info("Activating subscription {} from status {} to {} for channel {}", subscription.getIdElement().toUnqualified().getValue(), theRequestedStatus, theActiveStatus, myChannelType);
149 		try {
150 			SubscriptionUtil.setStatus(myCtx, subscription, theActiveStatus);
151 			subscription = mySubscriptionDao.update(subscription).getResource();
152 			mySubscriptionInterceptor.submitResourceModifiedForUpdate(subscription);
153 			return true;
154 		} catch (final UnprocessableEntityException e) {
155 			ourLog.info("Changing status of {} to ERROR", subscription.getIdElement());
156 			SubscriptionUtil.setStatus(myCtx, subscription, "error");
157 			SubscriptionUtil.setReason(myCtx, subscription, e.getMessage());
158 			mySubscriptionDao.update(subscription);
159 			return false;
160 		}
161 
162 	}
163 
164 	@SuppressWarnings("EnumSwitchStatementWhichMissesCases")
165 	public void handleMessage(RestOperationTypeEnum theOperationType, IIdType theId, final IBaseResource theSubscription) throws MessagingException {
166 
167 		switch (theOperationType) {
168 			case DELETE:
169 				mySubscriptionInterceptor.unregisterSubscription(theId);
170 				return;
171 			case CREATE:
172 			case UPDATE:
173 				if (!theId.getResourceType().equals("Subscription")) {
174 					return;
175 				}
176 				activateAndRegisterSubscriptionIfRequiredInTransaction(theSubscription);
177 				break;
178 			default:
179 				break;
180 		}
181 
182 	}
183 
184 	private synchronized void activateAndRegisterSubscriptionIfRequiredInTransaction(IBaseResource theSubscription) {
185 		TransactionTemplate txTemplate = new TransactionTemplate(myTransactionManager);
186 		txTemplate.execute(new TransactionCallbackWithoutResult() {
187 			@Override
188 			protected void doInTransactionWithoutResult(TransactionStatus status) {
189 				activateOrRegisterSubscriptionIfRequired(theSubscription);
190 			}
191 		});
192 	}
193 
194 	protected boolean registerSubscriptionUnlessAlreadyRegistered(IBaseResource theSubscription) {
195 		CanonicalSubscription existingSubscription = mySubscriptionInterceptor.hasSubscription(theSubscription.getIdElement());
196 		CanonicalSubscription newSubscription = mySubscriptionInterceptor.canonicalize(theSubscription);
197 
198 		if (existingSubscription != null) {
199 			if (newSubscription.equals(existingSubscription)) {
200 				// No changes
201 				return false;
202 			}
203 		}
204 
205 		if (existingSubscription != null) {
206 			ourLog.info("Updating already-registered active subscription {}", theSubscription.getIdElement().toUnqualified().getValue());
207 			mySubscriptionInterceptor.unregisterSubscription(theSubscription.getIdElement());
208 		} else {
209 			ourLog.info("Registering active subscription {}", theSubscription.getIdElement().toUnqualified().getValue());
210 		}
211 		mySubscriptionInterceptor.registerSubscription(theSubscription.getIdElement(), theSubscription);
212 		return true;
213 	}
214 
215 	@VisibleForTesting
216 	public static void setWaitForSubscriptionActivationSynchronouslyForUnitTest(boolean theWaitForSubscriptionActivationSynchronouslyForUnitTest) {
217 		ourWaitForSubscriptionActivationSynchronouslyForUnitTest = theWaitForSubscriptionActivationSynchronouslyForUnitTest;
218 	}
219 
220 }