View Javadoc
1   package ca.uhn.fhir.jpa.subscription;
2   
3   /*-
4    * #%L
5    * HAPI FHIR JPA Server
6    * %%
7    * Copyright (C) 2014 - 2018 University Health Network
8    * %%
9    * Licensed under the Apache License, Version 2.0 (the "License");
10   * you may not use this file except in compliance with the License.
11   * You may obtain a copy of the License at
12   * 
13   *      http://www.apache.org/licenses/LICENSE-2.0
14   * 
15   * Unless required by applicable law or agreed to in writing, software
16   * distributed under the License is distributed on an "AS IS" BASIS,
17   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18   * See the License for the specific language governing permissions and
19   * limitations under the License.
20   * #L%
21   */
22  
23  import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
24  import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
25  import org.hl7.fhir.instance.model.api.IBaseResource;
26  import org.hl7.fhir.instance.model.api.IIdType;
27  import org.hl7.fhir.r4.model.Subscription;
28  import org.slf4j.Logger;
29  import org.slf4j.LoggerFactory;
30  import org.springframework.messaging.Message;
31  import org.springframework.messaging.MessagingException;
32  
33  public abstract class BaseSubscriptionDeliverySubscriber extends BaseSubscriptionSubscriber {
34  	private static final Logger ourLog = LoggerFactory.getLogger(BaseSubscriptionDeliverySubscriber.class);
35  
36  	public BaseSubscriptionDeliverySubscriber(IFhirResourceDao<?> theSubscriptionDao, Subscription.SubscriptionChannelType theChannelType, BaseSubscriptionInterceptor theSubscriptionInterceptor) {
37  		super(theSubscriptionDao, theChannelType, theSubscriptionInterceptor);
38  	}
39  
40  	@Override
41  	public void handleMessage(Message<?> theMessage) throws MessagingException {
42  		if (!(theMessage.getPayload() instanceof ResourceDeliveryMessage)) {
43  			ourLog.warn("Unexpected payload type: {}", theMessage.getPayload());
44  			return;
45  		}
46  
47  		String subscriptionId = "(unknown?)";
48  
49  		try {
50  			ResourceDeliveryMessage msg = (ResourceDeliveryMessage) theMessage.getPayload();
51  			subscriptionId = msg.getSubscription().getIdElement(getContext()).getValue();
52  
53  			CanonicalSubscription updatedSubscription = (CanonicalSubscription) getSubscriptionInterceptor().getIdToSubscription().get(msg.getSubscription().getIdElement(getContext()).getIdPart());
54  			if (updatedSubscription != null) {
55  				msg.setSubscription(updatedSubscription);
56  			}
57  
58  			if (!subscriptionTypeApplies(msg.getSubscription())) {
59  				return;
60  			}
61  
62  			// Load the resource
63  			IIdType payloadId = msg.getPayloadId(getContext());
64  			Class type = getContext().getResourceDefinition(payloadId.getResourceType()).getImplementingClass();
65  			IFhirResourceDao dao = getSubscriptionInterceptor().getDao(type);
66  			IBaseResource loadedPayload;
67  			try {
68  				loadedPayload = dao.read(payloadId);
69  			} catch (ResourceNotFoundException e) {
70  				// This can happen if a last minute failure happens when saving a resource,
71  				// eg a constraint causes the transaction to roll back on commit
72  				ourLog.warn("Unable to find resource {} - Aborting delivery", payloadId.getValue());
73  				return;
74  			}
75  			msg.setPayload(getContext(), loadedPayload);
76  
77  			handleMessage(msg);
78  		} catch (Exception e) {
79  			String msg = "Failure handling subscription payload for subscription: " + subscriptionId;
80  			ourLog.error(msg, e);
81  			throw new MessagingException(theMessage, msg, e);
82  		}
83  	}
84  
85  	public abstract void handleMessage(ResourceDeliveryMessage theMessage) throws Exception;
86  
87  }