001/*- 002 * #%L 003 * HAPI FHIR Subscription Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.topic; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.interceptor.api.HookParams; 024import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; 025import ca.uhn.fhir.interceptor.api.Pointcut; 026import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult; 027import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; 028import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; 029import ca.uhn.fhir.jpa.topic.filter.InMemoryTopicFilterMatcher; 030import ca.uhn.fhir.rest.api.RestOperationTypeEnum; 031import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc; 032import ca.uhn.fhir.util.Logs; 033import jakarta.annotation.Nonnull; 034import org.hl7.fhir.instance.model.api.IBaseResource; 035import org.hl7.fhir.r5.model.SubscriptionTopic; 036import org.slf4j.Logger; 037import org.springframework.beans.factory.annotation.Autowired; 038import org.springframework.messaging.Message; 039import org.springframework.messaging.MessageHandler; 040import org.springframework.messaging.MessagingException; 041 042import java.util.Collection; 043import java.util.Collections; 044import java.util.List; 045import java.util.Optional; 046 047public class SubscriptionTopicMatchingSubscriber implements MessageHandler { 048 private static final Logger ourLog = Logs.getSubscriptionTopicLog(); 049 050 private final FhirContext myFhirContext; 051 052 @Autowired 053 SubscriptionTopicSupport mySubscriptionTopicSupport; 054 055 @Autowired 056 SubscriptionTopicRegistry mySubscriptionTopicRegistry; 057 058 @Autowired 059 private IInterceptorBroadcaster myInterceptorBroadcaster; 060 061 @Autowired 062 private SubscriptionTopicDispatcher mySubscriptionTopicDispatcher; 063 064 @Autowired 065 private InMemoryTopicFilterMatcher myInMemoryTopicFilterMatcher; 066 067 @Autowired 068 private IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc; 069 070 public SubscriptionTopicMatchingSubscriber(FhirContext theFhirContext) { 071 myFhirContext = theFhirContext; 072 } 073 074 @Override 075 public void handleMessage(@Nonnull Message<?> theMessage) throws MessagingException { 076 ourLog.trace("Handling resource modified message: {}", theMessage); 077 078 if (!(theMessage instanceof ResourceModifiedJsonMessage)) { 079 ourLog.warn("Unexpected message payload type: {}", theMessage); 080 return; 081 } 082 083 ResourceModifiedMessage msg = ((ResourceModifiedJsonMessage) theMessage).getPayload(); 084 085 if (msg.getPayload(myFhirContext) == null) { 086 // inflate the message and ignore any resource that cannot be found. 087 Optional<ResourceModifiedMessage> inflatedMsg = 088 myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessageOrNull(msg); 089 if (inflatedMsg.isEmpty()) { 090 return; 091 } 092 msg = inflatedMsg.get(); 093 } 094 095 // Interceptor call: SUBSCRIPTION_TOPIC_BEFORE_PERSISTED_RESOURCE_CHECKED 096 HookParams params = new HookParams().add(ResourceModifiedMessage.class, msg); 097 if (!myInterceptorBroadcaster.callHooks( 098 Pointcut.SUBSCRIPTION_TOPIC_BEFORE_PERSISTED_RESOURCE_CHECKED, params)) { 099 return; 100 } 101 try { 102 matchActiveSubscriptionTopicsAndDeliver(msg); 103 } finally { 104 // Interceptor call: SUBSCRIPTION_TOPIC_AFTER_PERSISTED_RESOURCE_CHECKED 105 myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_TOPIC_AFTER_PERSISTED_RESOURCE_CHECKED, params); 106 } 107 } 108 109 private void matchActiveSubscriptionTopicsAndDeliver(ResourceModifiedMessage theMsg) { 110 111 Collection<SubscriptionTopic> topics = mySubscriptionTopicRegistry.getAll(); 112 for (SubscriptionTopic topic : topics) { 113 SubscriptionTopicMatcher matcher = new SubscriptionTopicMatcher(mySubscriptionTopicSupport, topic); 114 InMemoryMatchResult result = matcher.match(theMsg); 115 if (result.matched()) { 116 int deliveries = deliverToTopicSubscriptions(theMsg, topic, result); 117 ourLog.info( 118 "Matched topic {} to message {}. Notifications sent to {} subscriptions for delivery.", 119 topic.getUrl(), 120 theMsg, 121 deliveries); 122 } 123 } 124 } 125 126 private int deliverToTopicSubscriptions( 127 ResourceModifiedMessage theMsg, 128 SubscriptionTopic theSubscriptionTopic, 129 InMemoryMatchResult theInMemoryMatchResult) { 130 String topicUrl = theSubscriptionTopic.getUrl(); 131 IBaseResource matchedResource = theMsg.getNewPayload(myFhirContext); 132 List<IBaseResource> matchedResourceList = Collections.singletonList(matchedResource); 133 RestOperationTypeEnum restOperationType = theMsg.getOperationType().asRestOperationType(); 134 135 return mySubscriptionTopicDispatcher.dispatch(new SubscriptionTopicDispatchRequest( 136 topicUrl, 137 matchedResourceList, 138 myInMemoryTopicFilterMatcher, 139 restOperationType, 140 theInMemoryMatchResult, 141 theMsg.getPartitionId(), 142 theMsg.getTransactionId())); 143 } 144}