View Javadoc
1   package ca.uhn.fhir.jpa.subscription;
2   
3   import java.util.List;
4   
5   import org.apache.commons.lang3.StringUtils;
6   import org.hl7.fhir.instance.model.api.IBaseResource;
7   import org.hl7.fhir.instance.model.api.IIdType;
8   import org.hl7.fhir.r4.model.Subscription;
9   import org.slf4j.Logger;
10  import org.slf4j.LoggerFactory;
11  import org.springframework.messaging.Message;
12  import org.springframework.messaging.MessageChannel;
13  import org.springframework.messaging.MessagingException;
14  
15  /*-
16   * #%L
17   * HAPI FHIR JPA Server
18   * %%
19   * Copyright (C) 2014 - 2018 University Health Network
20   * %%
21   * Licensed under the Apache License, Version 2.0 (the "License");
22   * you may not use this file except in compliance with the License.
23   * You may obtain a copy of the License at
24   * 
25   *      http://www.apache.org/licenses/LICENSE-2.0
26   * 
27   * Unless required by applicable law or agreed to in writing, software
28   * distributed under the License is distributed on an "AS IS" BASIS,
29   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
30   * See the License for the specific language governing permissions and
31   * limitations under the License.
32   * #L%
33   */
34  
35  import ca.uhn.fhir.context.RuntimeResourceDefinition;
36  import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
37  import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
38  import ca.uhn.fhir.jpa.dao.SearchParameterMap;
39  import ca.uhn.fhir.jpa.provider.ServletSubRequestDetails;
40  import ca.uhn.fhir.jpa.subscription.matcher.ISubscriptionMatcher;
41  import ca.uhn.fhir.rest.api.server.IBundleProvider;
42  import ca.uhn.fhir.rest.api.server.RequestDetails;
43  
44  import static org.apache.commons.lang3.StringUtils.isNotBlank;
45  
46  public class SubscriptionCheckingSubscriber extends BaseSubscriptionSubscriber {
47  	private Logger ourLog = LoggerFactory.getLogger(SubscriptionCheckingSubscriber.class);
48  
49  	private final ISubscriptionMatcher mySubscriptionMatcher;
50  	
51  	public SubscriptionCheckingSubscriber(IFhirResourceDao theSubscriptionDao, Subscription.SubscriptionChannelType theChannelType, BaseSubscriptionInterceptor theSubscriptionInterceptor, ISubscriptionMatcher theSubscriptionMatcher) {
52  		super(theSubscriptionDao, theChannelType, theSubscriptionInterceptor);
53  		this.mySubscriptionMatcher = theSubscriptionMatcher;
54  	}
55  
56  	@Override
57  	public void handleMessage(Message<?> theMessage) throws MessagingException {
58  		ourLog.trace("Handling resource modified message: {}", theMessage);
59  
60  		if (!(theMessage instanceof ResourceModifiedJsonMessage)) {
61  			ourLog.warn("Unexpected message payload type: {}", theMessage);
62  			return;
63  		}
64  
65  		ResourceModifiedMessage msg = ((ResourceModifiedJsonMessage) theMessage).getPayload();
66  		switch (msg.getOperationType()) {
67  			case CREATE:
68  			case UPDATE:
69  			case MANUALLY_TRIGGERED:
70  				break;
71  			case DELETE:
72  			default:
73  				ourLog.trace("Not processing modified message for {}", msg.getOperationType());
74  				// ignore anything else
75  				return;
76  		}
77  
78  		IIdType id = msg.getId(getContext());
79  		String resourceType = id.getResourceType();
80  		String resourceId = id.getIdPart();
81  
82  		List<CanonicalSubscription> subscriptions = getSubscriptionInterceptor().getRegisteredSubscriptions();
83  
84  		ourLog.trace("Testing {} subscriptions for applicability");
85  
86  		for (CanonicalSubscription nextSubscription : subscriptions) {
87  
88  			String nextSubscriptionId = nextSubscription.getIdElement(getContext()).toUnqualifiedVersionless().getValue();
89  			String nextCriteriaString = nextSubscription.getCriteriaString();
90  
91  			if (isNotBlank(msg.getSubscriptionId())) {
92  				if (!msg.getSubscriptionId().equals(nextSubscriptionId)) {
93  					ourLog.debug("Ignoring subscription {} because it is not {}", nextSubscriptionId, msg.getSubscriptionId());
94  					continue;
95  				}
96  			}
97  
98  			if (StringUtils.isBlank(nextCriteriaString)) {
99  				continue;
100 			}
101 
102 			// see if the criteria matches the created object
103 			ourLog.trace("Checking subscription {} for {} with criteria {}", nextSubscriptionId, resourceType, nextCriteriaString);
104 			String criteriaResource = nextCriteriaString;
105 			int index = criteriaResource.indexOf("?");
106 			if (index != -1) {
107 				criteriaResource = criteriaResource.substring(0, criteriaResource.indexOf("?"));
108 			}
109 
110 			if (resourceType != null && nextCriteriaString != null && !criteriaResource.equals(resourceType)) {
111 				ourLog.trace("Skipping subscription search for {} because it does not match the criteria {}", resourceType, nextCriteriaString);
112 				continue;
113 			}
114 
115 			if (!mySubscriptionMatcher.match(nextCriteriaString, msg)) {
116 				continue;
117 			}
118 
119 			ourLog.debug("Found match: queueing rest-hook notification for resource: {}", id.toUnqualifiedVersionless().getValue());
120 
121 			ResourceDeliveryMessage deliveryMsg = new ResourceDeliveryMessage();
122 			deliveryMsg.setPayload(getContext(), msg.getNewPayload(getContext()));
123 			deliveryMsg.setSubscription(nextSubscription);
124 			deliveryMsg.setOperationType(msg.getOperationType());
125 			deliveryMsg.setPayloadId(msg.getId(getContext()));
126 
127 			ResourceDeliveryJsonMessage wrappedMsg = new ResourceDeliveryJsonMessage(deliveryMsg);
128 			MessageChannel deliveryChannel = getSubscriptionInterceptor().getDeliveryChannel(nextSubscription);
129 			if (deliveryChannel != null) {
130 				deliveryChannel.send(wrappedMsg);
131 			} else {
132 				ourLog.warn("Do not have deliovery channel for subscription {}", nextSubscription.getIdElement(getContext()));
133 			}
134 		}
135 
136 
137 	}
138 
139 	/**
140 	 * Subclasses may override
141 	 */
142 	protected String massageCriteria(String theCriteria) {
143 		return theCriteria;
144 	}
145 
146 	/**
147 	 * Search based on a query criteria
148 	 */
149 	protected IBundleProvider performSearch(String theCriteria) {
150 		RuntimeResourceDefinition responseResourceDef = getSubscriptionDao().validateCriteriaAndReturnResourceDefinition(theCriteria);
151 		SearchParameterMap responseCriteriaUrl = BaseHapiFhirDao.translateMatchUrl(getSubscriptionDao(), getSubscriptionDao().getContext(), theCriteria, responseResourceDef);
152 
153 		RequestDetails req = new ServletSubRequestDetails();
154 		req.setSubRequest(true);
155 
156 		IFhirResourceDao<? extends IBaseResource> responseDao = getSubscriptionInterceptor().getDao(responseResourceDef.getImplementingClass());
157 		responseCriteriaUrl.setLoadSynchronousUpTo(1);
158 
159 		IBundleProvider responseResults = responseDao.search(responseCriteriaUrl, req);
160 		return responseResults;
161 	}
162 
163 }