View Javadoc
1   package ca.uhn.fhir.jpa.subscription;
2   
3   /*-
4    * #%L
5    * HAPI FHIR JPA Server
6    * %%
7    * Copyright (C) 2014 - 2018 University Health Network
8    * %%
9    * Licensed under the Apache License, Version 2.0 (the "License");
10   * you may not use this file except in compliance with the License.
11   * You may obtain a copy of the License at
12   * 
13   *      http://www.apache.org/licenses/LICENSE-2.0
14   * 
15   * Unless required by applicable law or agreed to in writing, software
16   * distributed under the License is distributed on an "AS IS" BASIS,
17   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18   * See the License for the specific language governing permissions and
19   * limitations under the License.
20   * #L%
21   */
22  
23  import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
24  import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
25  import org.hl7.fhir.instance.model.api.IBaseResource;
26  import org.hl7.fhir.instance.model.api.IIdType;
27  import org.hl7.fhir.r4.model.Subscription;
28  import org.slf4j.Logger;
29  import org.slf4j.LoggerFactory;
30  import org.springframework.messaging.Message;
31  import org.springframework.messaging.MessagingException;
32  
33  public abstract class BaseSubscriptionDeliverySubscriber extends BaseSubscriptionSubscriber {
34  	private static final Logger ourLog = LoggerFactory.getLogger(BaseSubscriptionDeliverySubscriber.class);
35  	private boolean myReloadResourceBeforeDelivery = true;
36  
37  	public BaseSubscriptionDeliverySubscriber(IFhirResourceDao<?> theSubscriptionDao, Subscription.SubscriptionChannelType theChannelType, BaseSubscriptionInterceptor theSubscriptionInterceptor) {
38  		super(theSubscriptionDao, theChannelType, theSubscriptionInterceptor);
39  	}
40  
41  	@Override
42  	public void handleMessage(Message<?> theMessage) throws MessagingException {
43  		if (!(theMessage.getPayload() instanceof ResourceDeliveryMessage)) {
44  			ourLog.warn("Unexpected payload type: {}", theMessage.getPayload());
45  			return;
46  		}
47  
48  		String subscriptionId = "(unknown?)";
49  
50  		try {
51  			ResourceDeliveryMessage msg = (ResourceDeliveryMessage) theMessage.getPayload();
52  			subscriptionId = msg.getSubscription().getIdElement(getContext()).getValue();
53  
54  			if (!subscriptionTypeApplies(getContext(), msg.getSubscription().getBackingSubscription(getContext()))) {
55  				return;
56  			}
57  
58  			CanonicalSubscription updatedSubscription = (CanonicalSubscription) getSubscriptionInterceptor().getIdToSubscription().get(msg.getSubscription().getIdElement(getContext()).getIdPart());
59  			if (updatedSubscription != null) {
60  				msg.setSubscription(updatedSubscription);
61  			}
62  
63  			if (myReloadResourceBeforeDelivery) {
64  				// Reload the payload just in case any interceptors modified
65  				// it before it was saved to the database. This is also
66  				// useful for resources created in a transaction, since they
67  				// can have placeholder IDs in them.
68  				IIdType payloadId = msg.getPayloadId(getContext());
69  				Class type = getContext().getResourceDefinition(payloadId.getResourceType()).getImplementingClass();
70  				IFhirResourceDao dao = getSubscriptionInterceptor().getDao(type);
71  				IBaseResource loadedPayload;
72  				try {
73  					loadedPayload = dao.read(payloadId);
74  				} catch (ResourceNotFoundException e) {
75  					// This can happen if a last minute failure happens when saving a resource,
76  					// eg a constraint causes the transaction to roll back on commit
77  					ourLog.warn("Unable to find resource {} - Aborting delivery", payloadId.getValue());
78  					return;
79  				}
80  				msg.setPayload(getContext(), loadedPayload);
81  			}
82  
83  			handleMessage(msg);
84  		} catch (Exception e) {
85  			String msg = "Failure handling subscription payload for subscription: " + subscriptionId;
86  			ourLog.error(msg, e);
87  			throw new MessagingException(theMessage, msg, e);
88  		}
89  	}
90  
91  	public abstract void handleMessage(ResourceDeliveryMessage theMessage) throws Exception;
92  
93  	public void setReloadResourceBeforeDelivery(boolean theReloadResourceBeforeDelivery) {
94  		myReloadResourceBeforeDelivery = theReloadResourceBeforeDelivery;
95  	}
96  
97  }