View Javadoc
1   package ca.uhn.fhir.jpa.subscription;
2   
3   /*-
4    * #%L
5    * HAPI FHIR JPA Server
6    * %%
7    * Copyright (C) 2014 - 2018 University Health Network
8    * %%
9    * Licensed under the Apache License, Version 2.0 (the "License");
10   * you may not use this file except in compliance with the License.
11   * You may obtain a copy of the License at
12   * 
13   *      http://www.apache.org/licenses/LICENSE-2.0
14   * 
15   * Unless required by applicable law or agreed to in writing, software
16   * distributed under the License is distributed on an "AS IS" BASIS,
17   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18   * See the License for the specific language governing permissions and
19   * limitations under the License.
20   * #L%
21   */
22  
23  import ca.uhn.fhir.context.RuntimeResourceDefinition;
24  import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
25  import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
26  import ca.uhn.fhir.jpa.dao.SearchParameterMap;
27  import ca.uhn.fhir.jpa.provider.ServletSubRequestDetails;
28  import ca.uhn.fhir.rest.api.server.IBundleProvider;
29  import ca.uhn.fhir.rest.api.server.RequestDetails;
30  import org.apache.commons.lang3.StringUtils;
31  import org.hl7.fhir.instance.model.api.IBaseResource;
32  import org.hl7.fhir.instance.model.api.IIdType;
33  import org.hl7.fhir.r4.model.Subscription;
34  import org.slf4j.Logger;
35  import org.slf4j.LoggerFactory;
36  import org.springframework.messaging.Message;
37  import org.springframework.messaging.MessageChannel;
38  import org.springframework.messaging.MessagingException;
39  
40  import java.util.List;
41  
42  public class SubscriptionCheckingSubscriber extends BaseSubscriptionSubscriber {
43  	private Logger ourLog = LoggerFactory.getLogger(SubscriptionCheckingSubscriber.class);
44  
45  	public SubscriptionCheckingSubscriber(IFhirResourceDao theSubscriptionDao, Subscription.SubscriptionChannelType theChannelType, BaseSubscriptionInterceptor theSubscriptionInterceptor) {
46  		super(theSubscriptionDao, theChannelType, theSubscriptionInterceptor);
47  	}
48  
49  	@Override
50  	public void handleMessage(Message<?> theMessage) throws MessagingException {
51  		ourLog.trace("Handling resource modified message: {}", theMessage);
52  
53  		if (!(theMessage instanceof ResourceModifiedJsonMessage)) {
54  			ourLog.warn("Unexpected message payload type: {}", theMessage);
55  			return;
56  		}
57  
58  		ResourceModifiedMessage msg = ((ResourceModifiedJsonMessage) theMessage).getPayload();
59  		switch (msg.getOperationType()) {
60  			case CREATE:
61  			case UPDATE:
62  				break;
63  			default:
64  				ourLog.trace("Not processing modified message for {}", msg.getOperationType());
65  				// ignore anything else
66  				return;
67  		}
68  
69  		IIdType id = msg.getId(getContext());
70  		String resourceType = id.getResourceType();
71  		String resourceId = id.getIdPart();
72  
73  		List<CanonicalSubscription> subscriptions = getSubscriptionInterceptor().getRegisteredSubscriptions();
74  
75  		ourLog.trace("Testing {} subscriptions for applicability");
76  
77  		for (CanonicalSubscription nextSubscription : subscriptions) {
78  
79  			String nextSubscriptionId = nextSubscription.getIdElement(getContext()).toUnqualifiedVersionless().getValue();
80  			String nextCriteriaString = nextSubscription.getCriteriaString();
81  
82  			if (StringUtils.isBlank(nextCriteriaString)) {
83  				continue;
84  			}
85  
86  			// see if the criteria matches the created object
87  			ourLog.trace("Checking subscription {} for {} with criteria {}", nextSubscriptionId, resourceType, nextCriteriaString);
88  			String criteriaResource = nextCriteriaString;
89  			int index = criteriaResource.indexOf("?");
90  			if (index != -1) {
91  				criteriaResource = criteriaResource.substring(0, criteriaResource.indexOf("?"));
92  			}
93  
94  			if (resourceType != null && nextCriteriaString != null && !criteriaResource.equals(resourceType)) {
95  				ourLog.trace("Skipping subscription search for {} because it does not match the criteria {}", resourceType, nextCriteriaString);
96  				continue;
97  			}
98  
99  			// run the subscriptions query and look for matches, add the id as part of the criteria to avoid getting matches of previous resources rather than the recent resource
100 			String criteria = nextCriteriaString;
101 			criteria += "&_id=" + resourceType + "/" + resourceId;
102 			criteria = massageCriteria(criteria);
103 
104 			IBundleProvider results = performSearch(criteria);
105 
106 			ourLog.debug("Subscription check found {} results for query: {}", results.size(), criteria);
107 
108 			if (results.size() == 0) {
109 				continue;
110 			}
111 
112 			ourLog.debug("Found match: queueing rest-hook notification for resource: {}", id.toUnqualifiedVersionless().getValue());
113 
114 			ResourceDeliveryMessage deliveryMsg = new ResourceDeliveryMessage();
115 			deliveryMsg.setPayload(getContext(), msg.getNewPayload(getContext()));
116 			deliveryMsg.setSubscription(nextSubscription);
117 			deliveryMsg.setOperationType(msg.getOperationType());
118 			deliveryMsg.setPayloadId(msg.getId(getContext()));
119 
120 			ResourceDeliveryJsonMessage wrappedMsg = new ResourceDeliveryJsonMessage(deliveryMsg);
121 			MessageChannel deliveryChannel = getSubscriptionInterceptor().getDeliveryChannel(nextSubscription);
122 			if (deliveryChannel != null) {
123 				deliveryChannel.send(wrappedMsg);
124 			} else {
125 				ourLog.warn("Do not have deliovery channel for subscription {}", nextSubscription.getIdElement(getContext()));
126 			}
127 		}
128 
129 
130 	}
131 
132 	/**
133 	 * Subclasses may override
134 	 */
135 	protected String massageCriteria(String theCriteria) {
136 		return theCriteria;
137 	}
138 
139 	/**
140 	 * Search based on a query criteria
141 	 */
142 	protected IBundleProvider performSearch(String theCriteria) {
143 		RuntimeResourceDefinition responseResourceDef = getSubscriptionDao().validateCriteriaAndReturnResourceDefinition(theCriteria);
144 		SearchParameterMap responseCriteriaUrl = BaseHapiFhirDao.translateMatchUrl(getSubscriptionDao(), getSubscriptionDao().getContext(), theCriteria, responseResourceDef);
145 
146 		RequestDetails req = new ServletSubRequestDetails();
147 		req.setSubRequest(true);
148 
149 		IFhirResourceDao<? extends IBaseResource> responseDao = getSubscriptionInterceptor().getDao(responseResourceDef.getImplementingClass());
150 		responseCriteriaUrl.setLoadSynchronousUpTo(1);
151 
152 		IBundleProvider responseResults = responseDao.search(responseCriteriaUrl, req);
153 		return responseResults;
154 	}
155 
156 }