001package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber;
002
003import ca.uhn.fhir.context.FhirContext;
004import ca.uhn.fhir.interceptor.api.HookParams;
005import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
006import ca.uhn.fhir.interceptor.api.Pointcut;
007import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
008import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry;
009import ca.uhn.fhir.jpa.subscription.match.matcher.matching.ISubscriptionMatcher;
010import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
011import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
012import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
013import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage;
014import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
015import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
016import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
017import ca.uhn.fhir.rest.api.EncodingEnum;
018import org.apache.commons.lang3.StringUtils;
019import org.hl7.fhir.instance.model.api.IBaseResource;
020import org.hl7.fhir.instance.model.api.IIdType;
021import org.slf4j.Logger;
022import org.slf4j.LoggerFactory;
023import org.springframework.beans.factory.annotation.Autowired;
024import org.springframework.messaging.Message;
025import org.springframework.messaging.MessageChannel;
026import org.springframework.messaging.MessageHandler;
027import org.springframework.messaging.MessagingException;
028
029import javax.annotation.Nonnull;
030import java.util.Collection;
031
032import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
033import static org.apache.commons.lang3.StringUtils.isNotBlank;
034
035/*-
036 * #%L
037 * HAPI FHIR Subscription Server
038 * %%
039 * Copyright (C) 2014 - 2021 Smile CDR, Inc.
040 * %%
041 * Licensed under the Apache License, Version 2.0 (the "License");
042 * you may not use this file except in compliance with the License.
043 * You may obtain a copy of the License at
044 *
045 *      http://www.apache.org/licenses/LICENSE-2.0
046 *
047 * Unless required by applicable law or agreed to in writing, software
048 * distributed under the License is distributed on an "AS IS" BASIS,
049 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
050 * See the License for the specific language governing permissions and
051 * limitations under the License.
052 * #L%
053 */
054
055public class SubscriptionMatchingSubscriber implements MessageHandler {
056        private Logger ourLog = LoggerFactory.getLogger(SubscriptionMatchingSubscriber.class);
057        public static final String SUBSCRIPTION_MATCHING_CHANNEL_NAME = "subscription-matching";
058
059        @Autowired
060        private ISubscriptionMatcher mySubscriptionMatcher;
061        @Autowired
062        private FhirContext myFhirContext;
063        @Autowired
064        private SubscriptionRegistry mySubscriptionRegistry;
065        @Autowired
066        private IInterceptorBroadcaster myInterceptorBroadcaster;
067        @Autowired
068        private SubscriptionChannelRegistry mySubscriptionChannelRegistry;
069
070        /**
071         * Constructor
072         */
073        public SubscriptionMatchingSubscriber() {
074                super();
075        }
076
077
078        @Override
079        public void handleMessage(@Nonnull Message<?> theMessage) throws MessagingException {
080                ourLog.trace("Handling resource modified message: {}", theMessage);
081
082                if (!(theMessage instanceof ResourceModifiedJsonMessage)) {
083                        ourLog.warn("Unexpected message payload type: {}", theMessage);
084                        return;
085                }
086
087                ResourceModifiedMessage msg = ((ResourceModifiedJsonMessage) theMessage).getPayload();
088                matchActiveSubscriptionsAndDeliver(msg);
089        }
090
091        public void matchActiveSubscriptionsAndDeliver(ResourceModifiedMessage theMsg) {
092                switch (theMsg.getOperationType()) {
093                        case CREATE:
094                        case UPDATE:
095                        case MANUALLY_TRIGGERED:
096                                break;
097                        case DELETE:
098                        default:
099                                ourLog.trace("Not processing modified message for {}", theMsg.getOperationType());
100                                // ignore anything else
101                                return;
102                }
103
104                // Interceptor call: SUBSCRIPTION_BEFORE_PERSISTED_RESOURCE_CHECKED
105                HookParams params = new HookParams()
106                        .add(ResourceModifiedMessage.class, theMsg);
107                if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_BEFORE_PERSISTED_RESOURCE_CHECKED, params)) {
108                        return;
109                }
110
111                try {
112                        doMatchActiveSubscriptionsAndDeliver(theMsg);
113                } finally {
114                        // Interceptor call: SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED
115                        myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED, params);
116                }
117        }
118
119        private void doMatchActiveSubscriptionsAndDeliver(ResourceModifiedMessage theMsg) {
120                IIdType resourceId = theMsg.getId(myFhirContext);
121
122                Collection<ActiveSubscription> subscriptions = mySubscriptionRegistry.getAll();
123
124                ourLog.trace("Testing {} subscriptions for applicability", subscriptions.size());
125                boolean resourceMatched = false;
126
127                for (ActiveSubscription nextActiveSubscription : subscriptions) {
128
129                        String nextSubscriptionId = getId(nextActiveSubscription);
130
131                        if (isNotBlank(theMsg.getSubscriptionId())) {
132                                if (!theMsg.getSubscriptionId().equals(nextSubscriptionId)) {
133                                        // TODO KHS we should use a hash to look it up instead of this full table scan
134                                        ourLog.debug("Ignoring subscription {} because it is not {}", nextSubscriptionId, theMsg.getSubscriptionId());
135                                        continue;
136                                }
137                        }
138
139                        if (!validCriteria(nextActiveSubscription, resourceId)) {
140                                continue;
141                        }
142
143                        InMemoryMatchResult matchResult = mySubscriptionMatcher.match(nextActiveSubscription.getSubscription(), theMsg);
144                        if (!matchResult.matched()) {
145                                continue;
146                        }
147                        ourLog.debug("Subscription {} was matched by resource {} {}",
148                                nextActiveSubscription.getId(),
149                                resourceId.toUnqualifiedVersionless().getValue(),
150                                matchResult.isInMemory() ? "in-memory" : "by querying the repository");
151
152                        IBaseResource payload = theMsg.getNewPayload(myFhirContext);
153                        CanonicalSubscription subscription = nextActiveSubscription.getSubscription();
154
155                        EncodingEnum encoding = null;
156                        if (subscription.getPayloadString() != null && !subscription.getPayloadString().isEmpty()) {
157                                encoding = EncodingEnum.forContentType(subscription.getPayloadString());
158                        }
159                        encoding = defaultIfNull(encoding, EncodingEnum.JSON);
160
161                        ResourceDeliveryMessage deliveryMsg = new ResourceDeliveryMessage();
162
163                        deliveryMsg.setPayload(myFhirContext, payload, encoding);
164                        deliveryMsg.setSubscription(subscription);
165                        deliveryMsg.setOperationType(theMsg.getOperationType());
166                        deliveryMsg.setTransactionId(theMsg.getTransactionId());
167                        deliveryMsg.copyAdditionalPropertiesFrom(theMsg);
168
169                        // Interceptor call: SUBSCRIPTION_RESOURCE_MATCHED
170                        HookParams params = new HookParams()
171                                .add(CanonicalSubscription.class, nextActiveSubscription.getSubscription())
172                                .add(ResourceDeliveryMessage.class, deliveryMsg)
173                                .add(InMemoryMatchResult.class, matchResult);
174                        if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_RESOURCE_MATCHED, params)) {
175                                return;
176                        }
177
178                        resourceMatched |= sendToDeliveryChannel(nextActiveSubscription, deliveryMsg);
179                }
180
181                if (!resourceMatched) {
182                        // Interceptor call: SUBSCRIPTION_RESOURCE_MATCHED
183                        HookParams params = new HookParams()
184                                .add(ResourceModifiedMessage.class, theMsg);
185                        myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_RESOURCE_DID_NOT_MATCH_ANY_SUBSCRIPTIONS, params);
186                }
187        }
188
189        private boolean sendToDeliveryChannel(ActiveSubscription nextActiveSubscription, ResourceDeliveryMessage theDeliveryMsg) {
190                boolean retVal = false;
191                ResourceDeliveryJsonMessage wrappedMsg = new ResourceDeliveryJsonMessage(theDeliveryMsg);
192                MessageChannel deliveryChannel = mySubscriptionChannelRegistry.getDeliverySenderChannel(nextActiveSubscription.getChannelName());
193                if (deliveryChannel != null) {
194                        retVal = true;
195                        trySendToDeliveryChannel(wrappedMsg, deliveryChannel);
196                } else {
197                        ourLog.warn("Do not have delivery channel for subscription {}", nextActiveSubscription.getId());
198                }
199                return retVal;
200        }
201
202        private void trySendToDeliveryChannel(ResourceDeliveryJsonMessage theWrappedMsg, MessageChannel theDeliveryChannel) {
203                try {
204                        boolean success = theDeliveryChannel.send(theWrappedMsg);
205                        if (!success) {
206                                ourLog.warn("Failed to send message to Delivery Channel.");
207                        }
208                } catch (RuntimeException e) {
209                        ourLog.error("Failed to send message to Delivery Channel", e);
210                        throw new RuntimeException("Failed to send message to Delivery Channel", e);
211                }
212        }
213
214        private String getId(ActiveSubscription theActiveSubscription) {
215                return theActiveSubscription.getId();
216        }
217
218        private boolean validCriteria(ActiveSubscription theActiveSubscription, IIdType theResourceId) {
219                String criteriaString = theActiveSubscription.getCriteriaString();
220                String subscriptionId = getId(theActiveSubscription);
221                String resourceType = theResourceId.getResourceType();
222
223                if (StringUtils.isBlank(criteriaString)) {
224                        return false;
225                }
226
227                // see if the criteria matches the created object
228                ourLog.trace("Checking subscription {} for {} with criteria {}", subscriptionId, resourceType, criteriaString);
229                String criteriaResource = criteriaString;
230                int index = criteriaResource.indexOf("?");
231                if (index != -1) {
232                        criteriaResource = criteriaResource.substring(0, criteriaResource.indexOf("?"));
233                }
234
235                if (resourceType != null && !criteriaResource.equals(resourceType)) {
236                        ourLog.trace("Skipping subscription search for {} because it does not match the criteria {}", resourceType, criteriaString);
237                        return false;
238                }
239
240                return true;
241        }
242}