001/*-
002 * #%L
003 * HAPI FHIR Subscription Server
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber;
021
022import ca.uhn.fhir.context.FhirContext;
023import ca.uhn.fhir.interceptor.api.HookParams;
024import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
025import ca.uhn.fhir.interceptor.api.Pointcut;
026import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
027import ca.uhn.fhir.jpa.subscription.match.matcher.matching.ISubscriptionMatcher;
028import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
029import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
030import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
031import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
032import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
033import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
034import jakarta.annotation.Nonnull;
035import org.hl7.fhir.instance.model.api.IBaseResource;
036import org.hl7.fhir.instance.model.api.IIdType;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039import org.springframework.beans.factory.annotation.Autowired;
040import org.springframework.messaging.Message;
041import org.springframework.messaging.MessageHandler;
042import org.springframework.messaging.MessagingException;
043
044import java.util.Collection;
045import java.util.Optional;
046
047import static ca.uhn.fhir.rest.server.messaging.BaseResourceMessage.OperationTypeEnum.DELETE;
048import static org.apache.commons.lang3.StringUtils.isNotBlank;
049
050public class SubscriptionMatchingSubscriber implements MessageHandler {
051        private final Logger ourLog = LoggerFactory.getLogger(SubscriptionMatchingSubscriber.class);
052        public static final String SUBSCRIPTION_MATCHING_CHANNEL_NAME = "subscription-matching";
053
054        @Autowired
055        private ISubscriptionMatcher mySubscriptionMatcher;
056
057        @Autowired
058        private FhirContext myFhirContext;
059
060        @Autowired
061        private SubscriptionRegistry mySubscriptionRegistry;
062
063        @Autowired
064        private IInterceptorBroadcaster myInterceptorBroadcaster;
065
066        @Autowired
067        private SubscriptionMatchDeliverer mySubscriptionMatchDeliverer;
068
069        @Autowired
070        private IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc;
071
072        /**
073         * Constructor
074         */
075        public SubscriptionMatchingSubscriber() {
076                super();
077        }
078
079        @Override
080        public void handleMessage(@Nonnull Message<?> theMessage) throws MessagingException {
081                ourLog.trace("Handling resource modified message: {}", theMessage);
082
083                if (!(theMessage instanceof ResourceModifiedJsonMessage)) {
084                        ourLog.warn("Unexpected message payload type: {}", theMessage);
085                        return;
086                }
087
088                ResourceModifiedMessage msg = ((ResourceModifiedJsonMessage) theMessage).getPayload();
089                matchActiveSubscriptionsAndDeliver(msg);
090        }
091
092        public void matchActiveSubscriptionsAndDeliver(ResourceModifiedMessage theMsg) {
093                switch (theMsg.getOperationType()) {
094                        case CREATE:
095                        case UPDATE:
096                        case MANUALLY_TRIGGERED:
097                        case DELETE:
098                                break;
099                        default:
100                                ourLog.trace("Not processing modified message for {}", theMsg.getOperationType());
101                                // ignore anything else
102                                return;
103                }
104
105                if (theMsg.getPayload(myFhirContext) == null) {
106                        // inflate the message and ignore any resource that cannot be found.
107                        Optional<ResourceModifiedMessage> inflatedMsg =
108                                        myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessageOrNull(theMsg);
109                        if (inflatedMsg.isEmpty()) {
110                                return;
111                        }
112                        theMsg = inflatedMsg.get();
113                }
114
115                // Interceptor call: SUBSCRIPTION_BEFORE_PERSISTED_RESOURCE_CHECKED
116                HookParams params = new HookParams().add(ResourceModifiedMessage.class, theMsg);
117                if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_BEFORE_PERSISTED_RESOURCE_CHECKED, params)) {
118                        return;
119                }
120
121                try {
122                        doMatchActiveSubscriptionsAndDeliver(theMsg);
123                } finally {
124                        // Interceptor call: SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED
125                        myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED, params);
126                }
127        }
128
129        private void doMatchActiveSubscriptionsAndDeliver(ResourceModifiedMessage theMsg) {
130                IIdType resourceId = theMsg.getPayloadId(myFhirContext);
131
132                Collection<ActiveSubscription> subscriptions = mySubscriptionRegistry.getAllNonTopicSubscriptions();
133
134                ourLog.trace("Testing {} subscriptions for applicability", subscriptions.size());
135                boolean anySubscriptionsMatchedResource = false;
136
137                for (ActiveSubscription nextActiveSubscription : subscriptions) {
138                        anySubscriptionsMatchedResource |= processSubscription(theMsg, resourceId, nextActiveSubscription);
139                }
140
141                if (!anySubscriptionsMatchedResource) {
142                        // Interceptor call: SUBSCRIPTION_RESOURCE_MATCHED
143                        HookParams params = new HookParams().add(ResourceModifiedMessage.class, theMsg);
144                        myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_RESOURCE_DID_NOT_MATCH_ANY_SUBSCRIPTIONS, params);
145                }
146        }
147
148        /**
149         * Returns true if subscription matched, and processing completed successfully, and the message was sent to the delivery channel. False otherwise.
150         *
151         */
152        private boolean processSubscription(
153                        ResourceModifiedMessage theMsg, IIdType theResourceId, ActiveSubscription theActiveSubscription) {
154                // skip if the partitions don't match
155                CanonicalSubscription subscription = theActiveSubscription.getSubscription();
156                if (subscription != null
157                                && theMsg.getPartitionId() != null
158                                && theMsg.getPartitionId().hasPartitionIds()
159                                && !subscription.getCrossPartitionEnabled()
160                                && !theMsg.getPartitionId().hasPartitionId(subscription.getRequestPartitionId())) {
161                        return false;
162                }
163                String nextSubscriptionId = theActiveSubscription.getId();
164
165                if (isNotBlank(theMsg.getSubscriptionId())) {
166                        if (!theMsg.getSubscriptionId().equals(nextSubscriptionId)) {
167                                // TODO KHS we should use a hash to look it up instead of this full table scan
168                                ourLog.debug(
169                                                "Ignoring subscription {} because it is not {}",
170                                                nextSubscriptionId,
171                                                theMsg.getSubscriptionId());
172                                return false;
173                        }
174                }
175
176                if (!resourceTypeIsAppropriateForSubscription(theActiveSubscription, theResourceId)) {
177                        return false;
178                }
179
180                if (theMsg.getOperationType().equals(DELETE)) {
181                        if (!theActiveSubscription.getSubscription().getSendDeleteMessages()) {
182                                ourLog.trace("Not processing modified message for {}", theMsg.getOperationType());
183                                return false;
184                        }
185                }
186
187                InMemoryMatchResult matchResult;
188                if (theActiveSubscription.getCriteria().getType() == SubscriptionCriteriaParser.TypeEnum.SEARCH_EXPRESSION) {
189                        matchResult = mySubscriptionMatcher.match(theActiveSubscription.getSubscription(), theMsg);
190                        if (!matchResult.matched()) {
191                                ourLog.trace(
192                                                "Subscription {} was not matched by resource {} {}",
193                                                theActiveSubscription.getId(),
194                                                theResourceId.toUnqualifiedVersionless().getValue(),
195                                                matchResult.isInMemory() ? "in-memory" : "by querying the repository");
196                                return false;
197                        }
198                        ourLog.debug(
199                                        "Subscription {} was matched by resource {} {}",
200                                        theActiveSubscription.getId(),
201                                        theResourceId.toUnqualifiedVersionless().getValue(),
202                                        matchResult.isInMemory() ? "in-memory" : "by querying the repository");
203                } else {
204                        ourLog.trace(
205                                        "Subscription {} was not matched by resource {} - No search expression",
206                                        theActiveSubscription.getId(),
207                                        theResourceId.toUnqualifiedVersionless().getValue());
208                        matchResult = InMemoryMatchResult.successfulMatch();
209                        matchResult.setInMemory(true);
210                }
211
212                IBaseResource payload = theMsg.getNewPayload(myFhirContext);
213                return mySubscriptionMatchDeliverer.deliverPayload(payload, theMsg, theActiveSubscription, matchResult);
214        }
215
216        private boolean resourceTypeIsAppropriateForSubscription(
217                        ActiveSubscription theActiveSubscription, IIdType theResourceId) {
218                SubscriptionCriteriaParser.SubscriptionCriteria criteria = theActiveSubscription.getCriteria();
219                String subscriptionId = theActiveSubscription.getId();
220                String resourceType = theResourceId.getResourceType();
221
222                // see if the criteria matches the created object
223                ourLog.trace("Checking subscription {} for {} with criteria {}", subscriptionId, resourceType, criteria);
224
225                if (criteria == null) {
226                        ourLog.trace("Subscription {} has no criteria - Not matching", subscriptionId);
227                        return false;
228                }
229
230                switch (criteria.getType()) {
231                        default:
232                        case SEARCH_EXPRESSION:
233                        case MULTITYPE_EXPRESSION:
234                                boolean contains = criteria.getApplicableResourceTypes().contains(resourceType);
235                                ourLog.trace("Subscription {} applicable resource type check: {}", subscriptionId, contains);
236                                return contains;
237                        case STARTYPE_EXPRESSION:
238                                boolean match = !resourceType.equals("Subscription");
239                                ourLog.trace("Subscription {} start resource type check: {}", subscriptionId, match);
240                                return match;
241                }
242        }
243}