001/*-
002 * #%L
003 * HAPI FHIR Subscription Server
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.topic;
021
022import ca.uhn.fhir.context.FhirContext;
023import ca.uhn.fhir.interceptor.api.HookParams;
024import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
025import ca.uhn.fhir.interceptor.api.Pointcut;
026import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
027import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
028import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
029import ca.uhn.fhir.jpa.topic.filter.InMemoryTopicFilterMatcher;
030import ca.uhn.fhir.jpa.util.MemoryCacheService;
031import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
032import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
033import ca.uhn.fhir.util.Logs;
034import jakarta.annotation.Nonnull;
035import org.hl7.fhir.instance.model.api.IBaseResource;
036import org.hl7.fhir.r5.model.SubscriptionTopic;
037import org.slf4j.Logger;
038import org.springframework.beans.factory.annotation.Autowired;
039import org.springframework.messaging.Message;
040import org.springframework.messaging.MessageHandler;
041import org.springframework.messaging.MessagingException;
042
043import java.util.Collection;
044import java.util.Collections;
045import java.util.List;
046import java.util.Optional;
047
048public class SubscriptionTopicMatchingSubscriber implements MessageHandler {
049        private static final Logger ourLog = Logs.getSubscriptionTopicLog();
050
051        private final FhirContext myFhirContext;
052
053        @Autowired
054        SubscriptionTopicSupport mySubscriptionTopicSupport;
055
056        @Autowired
057        SubscriptionTopicRegistry mySubscriptionTopicRegistry;
058
059        @Autowired
060        private IInterceptorBroadcaster myInterceptorBroadcaster;
061
062        @Autowired
063        private SubscriptionTopicDispatcher mySubscriptionTopicDispatcher;
064
065        @Autowired
066        private InMemoryTopicFilterMatcher myInMemoryTopicFilterMatcher;
067
068        @Autowired
069        private IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc;
070
071        private MemoryCacheService myMemoryCacheService;
072
073        public SubscriptionTopicMatchingSubscriber(FhirContext theFhirContext, MemoryCacheService memoryCacheService) {
074                myFhirContext = theFhirContext;
075                this.myMemoryCacheService = memoryCacheService;
076        }
077
078        @Override
079        public void handleMessage(@Nonnull Message<?> theMessage) throws MessagingException {
080                ourLog.trace("Handling resource modified message: {}", theMessage);
081
082                if (!(theMessage instanceof ResourceModifiedJsonMessage)) {
083                        ourLog.warn("Unexpected message payload type: {}", theMessage);
084                        return;
085                }
086
087                ResourceModifiedMessage msg = ((ResourceModifiedJsonMessage) theMessage).getPayload();
088
089                if (msg.getPayload(myFhirContext) == null) {
090                        // inflate the message and ignore any resource that cannot be found.
091                        Optional<ResourceModifiedMessage> inflatedMsg =
092                                        myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessageOrNull(msg);
093                        if (inflatedMsg.isEmpty()) {
094                                return;
095                        }
096                        msg = inflatedMsg.get();
097                }
098
099                // Interceptor call: SUBSCRIPTION_TOPIC_BEFORE_PERSISTED_RESOURCE_CHECKED
100                HookParams params = new HookParams().add(ResourceModifiedMessage.class, msg);
101                if (!myInterceptorBroadcaster.callHooks(
102                                Pointcut.SUBSCRIPTION_TOPIC_BEFORE_PERSISTED_RESOURCE_CHECKED, params)) {
103                        return;
104                }
105                try {
106                        matchActiveSubscriptionTopicsAndDeliver(msg);
107                } finally {
108                        // Interceptor call: SUBSCRIPTION_TOPIC_AFTER_PERSISTED_RESOURCE_CHECKED
109                        myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_TOPIC_AFTER_PERSISTED_RESOURCE_CHECKED, params);
110                }
111        }
112
113        private void matchActiveSubscriptionTopicsAndDeliver(ResourceModifiedMessage theMsg) {
114
115                Collection<SubscriptionTopic> topics = mySubscriptionTopicRegistry.getAll();
116                for (SubscriptionTopic topic : topics) {
117                        SubscriptionTopicMatcher matcher =
118                                        new SubscriptionTopicMatcher(mySubscriptionTopicSupport, topic, myMemoryCacheService);
119                        InMemoryMatchResult result = matcher.match(theMsg);
120                        if (result.matched()) {
121                                int deliveries = deliverToTopicSubscriptions(theMsg, topic, result);
122                                ourLog.info(
123                                                "Matched topic {} to message {}.  Notifications sent to {} subscriptions for delivery.",
124                                                topic.getUrl(),
125                                                theMsg,
126                                                deliveries);
127                        }
128                }
129        }
130
131        private int deliverToTopicSubscriptions(
132                        ResourceModifiedMessage theMsg,
133                        SubscriptionTopic theSubscriptionTopic,
134                        InMemoryMatchResult theInMemoryMatchResult) {
135                String topicUrl = theSubscriptionTopic.getUrl();
136                IBaseResource matchedResource = theMsg.getNewPayload(myFhirContext);
137                List<IBaseResource> matchedResourceList = Collections.singletonList(matchedResource);
138                RestOperationTypeEnum restOperationType = theMsg.getOperationType().asRestOperationType();
139
140                return mySubscriptionTopicDispatcher.dispatch(new SubscriptionTopicDispatchRequest(
141                                topicUrl,
142                                matchedResourceList,
143                                myInMemoryTopicFilterMatcher,
144                                restOperationType,
145                                theInMemoryMatchResult,
146                                theMsg.getPartitionId(),
147                                theMsg.getTransactionId()));
148        }
149}