001/*-
002 * #%L
003 * HAPI FHIR Subscription Server
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.topic;
021
022import ca.uhn.fhir.context.FhirContext;
023import ca.uhn.fhir.interceptor.api.HookParams;
024import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
025import ca.uhn.fhir.interceptor.api.Pointcut;
026import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
027import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
028import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
029import ca.uhn.fhir.jpa.topic.filter.InMemoryTopicFilterMatcher;
030import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
031import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
032import ca.uhn.fhir.util.Logs;
033import jakarta.annotation.Nonnull;
034import org.hl7.fhir.instance.model.api.IBaseResource;
035import org.hl7.fhir.r5.model.SubscriptionTopic;
036import org.slf4j.Logger;
037import org.springframework.beans.factory.annotation.Autowired;
038import org.springframework.messaging.Message;
039import org.springframework.messaging.MessageHandler;
040import org.springframework.messaging.MessagingException;
041
042import java.util.Collection;
043import java.util.Collections;
044import java.util.List;
045import java.util.Optional;
046
047public class SubscriptionTopicMatchingSubscriber implements MessageHandler {
048        private static final Logger ourLog = Logs.getSubscriptionTopicLog();
049
050        private final FhirContext myFhirContext;
051
052        @Autowired
053        SubscriptionTopicSupport mySubscriptionTopicSupport;
054
055        @Autowired
056        SubscriptionTopicRegistry mySubscriptionTopicRegistry;
057
058        @Autowired
059        private IInterceptorBroadcaster myInterceptorBroadcaster;
060
061        @Autowired
062        private SubscriptionTopicDispatcher mySubscriptionTopicDispatcher;
063
064        @Autowired
065        private InMemoryTopicFilterMatcher myInMemoryTopicFilterMatcher;
066
067        @Autowired
068        private IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc;
069
070        public SubscriptionTopicMatchingSubscriber(FhirContext theFhirContext) {
071                myFhirContext = theFhirContext;
072        }
073
074        @Override
075        public void handleMessage(@Nonnull Message<?> theMessage) throws MessagingException {
076                ourLog.trace("Handling resource modified message: {}", theMessage);
077
078                if (!(theMessage instanceof ResourceModifiedJsonMessage)) {
079                        ourLog.warn("Unexpected message payload type: {}", theMessage);
080                        return;
081                }
082
083                ResourceModifiedMessage msg = ((ResourceModifiedJsonMessage) theMessage).getPayload();
084
085                if (msg.getPayload(myFhirContext) == null) {
086                        // inflate the message and ignore any resource that cannot be found.
087                        Optional<ResourceModifiedMessage> inflatedMsg =
088                                        myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessageOrNull(msg);
089                        if (inflatedMsg.isEmpty()) {
090                                return;
091                        }
092                        msg = inflatedMsg.get();
093                }
094
095                // Interceptor call: SUBSCRIPTION_TOPIC_BEFORE_PERSISTED_RESOURCE_CHECKED
096                HookParams params = new HookParams().add(ResourceModifiedMessage.class, msg);
097                if (!myInterceptorBroadcaster.callHooks(
098                                Pointcut.SUBSCRIPTION_TOPIC_BEFORE_PERSISTED_RESOURCE_CHECKED, params)) {
099                        return;
100                }
101                try {
102                        matchActiveSubscriptionTopicsAndDeliver(msg);
103                } finally {
104                        // Interceptor call: SUBSCRIPTION_TOPIC_AFTER_PERSISTED_RESOURCE_CHECKED
105                        myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_TOPIC_AFTER_PERSISTED_RESOURCE_CHECKED, params);
106                }
107        }
108
109        private void matchActiveSubscriptionTopicsAndDeliver(ResourceModifiedMessage theMsg) {
110
111                Collection<SubscriptionTopic> topics = mySubscriptionTopicRegistry.getAll();
112                for (SubscriptionTopic topic : topics) {
113                        SubscriptionTopicMatcher matcher = new SubscriptionTopicMatcher(mySubscriptionTopicSupport, topic);
114                        InMemoryMatchResult result = matcher.match(theMsg);
115                        if (result.matched()) {
116                                int deliveries = deliverToTopicSubscriptions(theMsg, topic, result);
117                                ourLog.info(
118                                                "Matched topic {} to message {}.  Notifications sent to {} subscriptions for delivery.",
119                                                topic.getUrl(),
120                                                theMsg,
121                                                deliveries);
122                        }
123                }
124        }
125
126        private int deliverToTopicSubscriptions(
127                        ResourceModifiedMessage theMsg,
128                        SubscriptionTopic theSubscriptionTopic,
129                        InMemoryMatchResult theInMemoryMatchResult) {
130                String topicUrl = theSubscriptionTopic.getUrl();
131                IBaseResource matchedResource = theMsg.getNewPayload(myFhirContext);
132                List<IBaseResource> matchedResourceList = Collections.singletonList(matchedResource);
133                RestOperationTypeEnum restOperationType = theMsg.getOperationType().asRestOperationType();
134
135                return mySubscriptionTopicDispatcher.dispatch(new SubscriptionTopicDispatchRequest(
136                                topicUrl,
137                                matchedResourceList,
138                                myInMemoryTopicFilterMatcher,
139                                restOperationType,
140                                theInMemoryMatchResult,
141                                theMsg.getPartitionId(),
142                                theMsg.getTransactionId()));
143        }
144}