View Javadoc
1   package ca.uhn.fhir.jpa.subscription;
2   
3   /*-
4    * #%L
5    * HAPI FHIR JPA Server
6    * %%
7    * Copyright (C) 2014 - 2018 University Health Network
8    * %%
9    * Licensed under the Apache License, Version 2.0 (the "License");
10   * you may not use this file except in compliance with the License.
11   * You may obtain a copy of the License at
12   * 
13   *      http://www.apache.org/licenses/LICENSE-2.0
14   * 
15   * Unless required by applicable law or agreed to in writing, software
16   * distributed under the License is distributed on an "AS IS" BASIS,
17   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18   * See the License for the specific language governing permissions and
19   * limitations under the License.
20   * #L%
21   */
22  
23  import ca.uhn.fhir.context.RuntimeResourceDefinition;
24  import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
25  import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
26  import ca.uhn.fhir.jpa.dao.SearchParameterMap;
27  import ca.uhn.fhir.jpa.provider.ServletSubRequestDetails;
28  import ca.uhn.fhir.rest.api.server.IBundleProvider;
29  import ca.uhn.fhir.rest.api.server.RequestDetails;
30  import org.apache.commons.lang3.StringUtils;
31  import org.hl7.fhir.instance.model.api.IBaseResource;
32  import org.hl7.fhir.instance.model.api.IIdType;
33  import org.hl7.fhir.r4.model.Subscription;
34  import org.slf4j.Logger;
35  import org.slf4j.LoggerFactory;
36  import org.springframework.messaging.Message;
37  import org.springframework.messaging.MessagingException;
38  
39  import java.util.List;
40  
41  public class SubscriptionCheckingSubscriber extends BaseSubscriptionSubscriber {
42  	private Logger ourLog = LoggerFactory.getLogger(SubscriptionCheckingSubscriber.class);
43  
44  	public SubscriptionCheckingSubscriber(IFhirResourceDao theSubscriptionDao, Subscription.SubscriptionChannelType theChannelType, BaseSubscriptionInterceptor theSubscriptionInterceptor) {
45  		super(theSubscriptionDao, theChannelType, theSubscriptionInterceptor);
46  	}
47  
48  	@Override
49  	public void handleMessage(Message<?> theMessage) throws MessagingException {
50  		ourLog.trace("Handling resource modified message: {}", theMessage);
51  
52  		if (!(theMessage instanceof ResourceModifiedJsonMessage)) {
53  			ourLog.warn("Unexpected message payload type: {}", theMessage);
54  			return;
55  		}
56  
57  		ResourceModifiedMessage msg = ((ResourceModifiedJsonMessage) theMessage).getPayload();
58  		switch (msg.getOperationType()) {
59  			case CREATE:
60  			case UPDATE:
61  				break;
62  			default:
63  				ourLog.trace("Not processing modified message for {}", msg.getOperationType());
64  				// ignore anything else
65  				return;
66  		}
67  
68  		IIdType id = msg.getId(getContext());
69  		String resourceType = id.getResourceType();
70  		String resourceId = id.getIdPart();
71  
72  		List<CanonicalSubscription> subscriptions = getSubscriptionInterceptor().getSubscriptions();
73  
74  		ourLog.trace("Testing {} subscriptions for applicability");
75  
76  		for (CanonicalSubscription nextSubscription : subscriptions) {
77  
78  			String nextSubscriptionId = nextSubscription.getIdElement(getContext()).toUnqualifiedVersionless().getValue();
79  			String nextCriteriaString = nextSubscription.getCriteriaString();
80  
81  			if (StringUtils.isBlank(nextCriteriaString)) {
82  				continue;
83  			}
84  
85  			// see if the criteria matches the created object
86  			ourLog.trace("Checking subscription {} for {} with criteria {}", nextSubscriptionId, resourceType, nextCriteriaString);
87  			String criteriaResource = nextCriteriaString;
88  			int index = criteriaResource.indexOf("?");
89  			if (index != -1) {
90  				criteriaResource = criteriaResource.substring(0, criteriaResource.indexOf("?"));
91  			}
92  
93  			if (resourceType != null && nextCriteriaString != null && !criteriaResource.equals(resourceType)) {
94  				ourLog.trace("Skipping subscription search for {} because it does not match the criteria {}", resourceType, nextCriteriaString);
95  				continue;
96  			}
97  
98  			// run the subscriptions query and look for matches, add the id as part of the criteria to avoid getting matches of previous resources rather than the recent resource
99  			String criteria = nextCriteriaString;
100 			criteria += "&_id=" + resourceType + "/" + resourceId;
101 			criteria = massageCriteria(criteria);
102 
103 			IBundleProvider results = performSearch(criteria);
104 
105 			ourLog.debug("Subscription check found {} results for query: {}", results.size(), criteria);
106 
107 			if (results.size() == 0) {
108 				continue;
109 			}
110 
111 			ourLog.debug("Found match: queueing rest-hook notification for resource: {}", id.toUnqualifiedVersionless().getValue());
112 
113 			ResourceDeliveryMessage deliveryMsg = new ResourceDeliveryMessage();
114 			deliveryMsg.setPayload(getContext(), msg.getNewPayload(getContext()));
115 			deliveryMsg.setSubscription(nextSubscription);
116 			deliveryMsg.setOperationType(msg.getOperationType());
117 			deliveryMsg.setPayloadId(msg.getId(getContext()));
118 
119 			ResourceDeliveryJsonMessage wrappedMsg = new ResourceDeliveryJsonMessage(deliveryMsg);
120 			getSubscriptionInterceptor().getDeliveryChannel().send(wrappedMsg);
121 		}
122 
123 
124 	}
125 
126 	/**
127 	 * Subclasses may override
128 	 */
129 	protected String massageCriteria(String theCriteria) {
130 		return theCriteria;
131 	}
132 
133 	/**
134 	 * Search based on a query criteria
135 	 */
136 	protected IBundleProvider performSearch(String theCriteria) {
137 		RuntimeResourceDefinition responseResourceDef = getSubscriptionDao().validateCriteriaAndReturnResourceDefinition(theCriteria);
138 		SearchParameterMap responseCriteriaUrl = BaseHapiFhirDao.translateMatchUrl(getSubscriptionDao(), getSubscriptionDao().getContext(), theCriteria, responseResourceDef);
139 
140 		RequestDetails req = new ServletSubRequestDetails();
141 		req.setSubRequest(true);
142 
143 		IFhirResourceDao<? extends IBaseResource> responseDao = getSubscriptionInterceptor().getDao(responseResourceDef.getImplementingClass());
144 		responseCriteriaUrl.setLoadSynchronousUpTo(1);
145 
146 		IBundleProvider responseResults = responseDao.search(responseCriteriaUrl, req);
147 		return responseResults;
148 	}
149 
150 }