View Javadoc
1   package ca.uhn.fhir.jpa.subscription;
2   
3   /*-
4    * #%L
5    * HAPI FHIR JPA Server
6    * %%
7    * Copyright (C) 2014 - 2018 University Health Network
8    * %%
9    * Licensed under the Apache License, Version 2.0 (the "License");
10   * you may not use this file except in compliance with the License.
11   * You may obtain a copy of the License at
12   * 
13   *      http://www.apache.org/licenses/LICENSE-2.0
14   * 
15   * Unless required by applicable law or agreed to in writing, software
16   * distributed under the License is distributed on an "AS IS" BASIS,
17   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18   * See the License for the specific language governing permissions and
19   * limitations under the License.
20   * #L%
21   */
22  
23  import ca.uhn.fhir.context.FhirContext;
24  import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
25  import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
26  import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
27  import ca.uhn.fhir.util.SubscriptionUtil;
28  import com.google.common.annotations.VisibleForTesting;
29  import org.apache.commons.lang3.Validate;
30  import org.hl7.fhir.instance.model.api.IBaseResource;
31  import org.hl7.fhir.instance.model.api.IIdType;
32  import org.hl7.fhir.instance.model.api.IPrimitiveType;
33  import org.hl7.fhir.r4.model.Subscription;
34  import org.slf4j.Logger;
35  import org.slf4j.LoggerFactory;
36  import org.springframework.core.task.AsyncTaskExecutor;
37  import org.springframework.messaging.MessagingException;
38  import org.springframework.scheduling.TaskScheduler;
39  import org.springframework.transaction.PlatformTransactionManager;
40  import org.springframework.transaction.TransactionStatus;
41  import org.springframework.transaction.support.TransactionCallbackWithoutResult;
42  import org.springframework.transaction.support.TransactionSynchronizationAdapter;
43  import org.springframework.transaction.support.TransactionSynchronizationManager;
44  import org.springframework.transaction.support.TransactionTemplate;
45  
46  import java.util.Date;
47  import java.util.concurrent.Future;
48  import java.util.concurrent.TimeUnit;
49  
50  @SuppressWarnings("unchecked")
51  public class SubscriptionActivatingSubscriber {
52  	private static boolean ourWaitForSubscriptionActivationSynchronouslyForUnitTest;
53  	private final IFhirResourceDao mySubscriptionDao;
54  	private final BaseSubscriptionInterceptor mySubscriptionInterceptor;
55  	private final PlatformTransactionManager myTransactionManager;
56  	private final AsyncTaskExecutor myTaskExecutor;
57  	private Logger ourLog = LoggerFactory.getLogger(SubscriptionActivatingSubscriber.class);
58  	private FhirContext myCtx;
59  	private Subscription.SubscriptionChannelType myChannelType;
60  
61  
62  	/**
63  	 * Constructor
64  	 */
65  	public SubscriptionActivatingSubscriber(IFhirResourceDao<? extends IBaseResource> theSubscriptionDao, Subscription.SubscriptionChannelType theChannelType, BaseSubscriptionInterceptor theSubscriptionInterceptor, PlatformTransactionManager theTransactionManager, AsyncTaskExecutor theTaskExecutor) {
66  		mySubscriptionDao = theSubscriptionDao;
67  		mySubscriptionInterceptor = theSubscriptionInterceptor;
68  		myChannelType = theChannelType;
69  		myCtx = theSubscriptionDao.getContext();
70  		myTransactionManager = theTransactionManager;
71  		myTaskExecutor = theTaskExecutor;
72  		Validate.notNull(theTaskExecutor);
73  	}
74  
75  	public void activateAndRegisterSubscriptionIfRequired(final IBaseResource theSubscription) {
76  		boolean subscriptionTypeApplies = BaseSubscriptionSubscriber.subscriptionTypeApplies(myCtx, theSubscription, myChannelType);
77  		if (subscriptionTypeApplies == false) {
78  			return;
79  		}
80  
81  		final IPrimitiveType<?> status = myCtx.newTerser().getSingleValueOrNull(theSubscription, BaseSubscriptionInterceptor.SUBSCRIPTION_STATUS, IPrimitiveType.class);
82  		String statusString = status.getValueAsString();
83  
84  		final String requestedStatus = Subscription.SubscriptionStatus.REQUESTED.toCode();
85  		final String activeStatus = Subscription.SubscriptionStatus.ACTIVE.toCode();
86  		if (requestedStatus.equals(statusString)) {
87  			if (TransactionSynchronizationManager.isSynchronizationActive()) {
88  				/*
89  				 * If we're in a transaction, we don't want to try and change the status from
90  				 * requested to active within the same transaction because it's too late by
91  				 * the time we get here to make modifications to the payload.
92  				 *
93  				 * So, we register a synchronization, meaning that when the transaction is
94  				 * finished, we'll schedule a task to do this in a separate worker thread
95  				 * to avoid any possibility of conflict.
96  				 */
97  				TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
98  					@Override
99  					public void afterCommit() {
100 						Future<?> activationFuture = myTaskExecutor.submit(new Runnable() {
101 							@Override
102 							public void run() {
103 								activateSubscription(activeStatus, theSubscription, requestedStatus);
104 							}
105 						});
106 
107 						/*
108 						 * If we're running in a unit test, it's nice to be predictable in
109 						 * terms of order... In the real world it's a recipe for deadlocks
110 						 */
111 						if (ourWaitForSubscriptionActivationSynchronouslyForUnitTest) {
112 							try {
113 								activationFuture.get(5, TimeUnit.SECONDS);
114 							} catch (Exception e) {
115 								ourLog.error("Failed to activate subscription", e);
116 							}
117 						}
118 					}
119 				});
120 			} else {
121 				activateSubscription(activeStatus, theSubscription, requestedStatus);
122 			}
123 		} else if (activeStatus.equals(statusString)) {
124 			if (!mySubscriptionInterceptor.hasSubscription(theSubscription.getIdElement())) {
125 				ourLog.info("Registering active subscription {}", theSubscription.getIdElement().toUnqualified().getValue());
126 			}
127 			mySubscriptionInterceptor.registerSubscription(theSubscription.getIdElement(), theSubscription);
128 		} else {
129 			if (mySubscriptionInterceptor.hasSubscription(theSubscription.getIdElement())) {
130 				ourLog.info("Removing {} subscription {}", statusString, theSubscription.getIdElement().toUnqualified().getValue());
131 			}
132 			mySubscriptionInterceptor.unregisterSubscription(theSubscription.getIdElement());
133 		}
134 	}
135 
136 	private void activateSubscription(String theActiveStatus, final IBaseResource theSubscription, String theRequestedStatus) {
137 		IBaseResource subscription = mySubscriptionDao.read(theSubscription.getIdElement());
138 
139 		ourLog.info("Activating and registering subscription {} from status {} to {}", subscription.getIdElement().toUnqualified().getValue(), theRequestedStatus, theActiveStatus);
140 		try {
141 			SubscriptionUtil.setStatus(myCtx, subscription, theActiveStatus);
142 			mySubscriptionDao.update(subscription);
143 			mySubscriptionInterceptor.registerSubscription(subscription.getIdElement(), subscription);
144 		} catch (final UnprocessableEntityException e) {
145 			ourLog.info("Changing status of {} to ERROR", subscription.getIdElement());
146 			SubscriptionUtil.setStatus(myCtx, subscription, "error");
147 			SubscriptionUtil.setReason(myCtx, subscription, e.getMessage());
148 			mySubscriptionDao.update(subscription);
149 		}
150 	}
151 
152 	public void handleMessage(RestOperationTypeEnum theOperationType, IIdType theId, final IBaseResource theSubscription) throws MessagingException {
153 
154 		switch (theOperationType) {
155 			case DELETE:
156 				mySubscriptionInterceptor.unregisterSubscription(theId);
157 				return;
158 			case CREATE:
159 			case UPDATE:
160 				if (!theId.getResourceType().equals("Subscription")) {
161 					return;
162 				}
163 				TransactionTemplate txTemplate = new TransactionTemplate(myTransactionManager);
164 				txTemplate.execute(new TransactionCallbackWithoutResult() {
165 					@Override
166 					protected void doInTransactionWithoutResult(TransactionStatus status) {
167 						activateAndRegisterSubscriptionIfRequired(theSubscription);
168 					}
169 				});
170 				break;
171 			default:
172 				break;
173 		}
174 
175 	}
176 
177 	@VisibleForTesting
178 	public static void setWaitForSubscriptionActivationSynchronouslyForUnitTest(boolean theWaitForSubscriptionActivationSynchronouslyForUnitTest) {
179 		ourWaitForSubscriptionActivationSynchronouslyForUnitTest = theWaitForSubscriptionActivationSynchronouslyForUnitTest;
180 	}
181 
182 }