001/*- 002 * #%L 003 * HAPI FHIR Subscription Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.topic; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.interceptor.api.HookParams; 024import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; 025import ca.uhn.fhir.interceptor.api.Pointcut; 026import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult; 027import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; 028import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; 029import ca.uhn.fhir.jpa.topic.filter.InMemoryTopicFilterMatcher; 030import ca.uhn.fhir.jpa.util.MemoryCacheService; 031import ca.uhn.fhir.rest.api.RestOperationTypeEnum; 032import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc; 033import ca.uhn.fhir.util.Logs; 034import jakarta.annotation.Nonnull; 035import org.hl7.fhir.instance.model.api.IBaseResource; 036import org.hl7.fhir.r5.model.SubscriptionTopic; 037import org.slf4j.Logger; 038import org.springframework.beans.factory.annotation.Autowired; 039import org.springframework.messaging.Message; 040import org.springframework.messaging.MessageHandler; 041import org.springframework.messaging.MessagingException; 042 043import java.util.Collection; 044import java.util.Collections; 045import java.util.List; 046import java.util.Optional; 047 048public class SubscriptionTopicMatchingSubscriber implements MessageHandler { 049 private static final Logger ourLog = Logs.getSubscriptionTopicLog(); 050 051 private final FhirContext myFhirContext; 052 053 @Autowired 054 SubscriptionTopicSupport mySubscriptionTopicSupport; 055 056 @Autowired 057 SubscriptionTopicRegistry mySubscriptionTopicRegistry; 058 059 @Autowired 060 private IInterceptorBroadcaster myInterceptorBroadcaster; 061 062 @Autowired 063 private SubscriptionTopicDispatcher mySubscriptionTopicDispatcher; 064 065 @Autowired 066 private InMemoryTopicFilterMatcher myInMemoryTopicFilterMatcher; 067 068 @Autowired 069 private IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc; 070 071 private MemoryCacheService myMemoryCacheService; 072 073 public SubscriptionTopicMatchingSubscriber(FhirContext theFhirContext, MemoryCacheService memoryCacheService) { 074 myFhirContext = theFhirContext; 075 this.myMemoryCacheService = memoryCacheService; 076 } 077 078 @Override 079 public void handleMessage(@Nonnull Message<?> theMessage) throws MessagingException { 080 ourLog.trace("Handling resource modified message: {}", theMessage); 081 082 if (!(theMessage instanceof ResourceModifiedJsonMessage)) { 083 ourLog.warn("Unexpected message payload type: {}", theMessage); 084 return; 085 } 086 087 ResourceModifiedMessage msg = ((ResourceModifiedJsonMessage) theMessage).getPayload(); 088 089 if (msg.getPayload(myFhirContext) == null) { 090 // inflate the message and ignore any resource that cannot be found. 091 Optional<ResourceModifiedMessage> inflatedMsg = 092 myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessageOrNull(msg); 093 if (inflatedMsg.isEmpty()) { 094 return; 095 } 096 msg = inflatedMsg.get(); 097 } 098 099 // Interceptor call: SUBSCRIPTION_TOPIC_BEFORE_PERSISTED_RESOURCE_CHECKED 100 HookParams params = new HookParams().add(ResourceModifiedMessage.class, msg); 101 if (!myInterceptorBroadcaster.callHooks( 102 Pointcut.SUBSCRIPTION_TOPIC_BEFORE_PERSISTED_RESOURCE_CHECKED, params)) { 103 return; 104 } 105 try { 106 matchActiveSubscriptionTopicsAndDeliver(msg); 107 } finally { 108 // Interceptor call: SUBSCRIPTION_TOPIC_AFTER_PERSISTED_RESOURCE_CHECKED 109 myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_TOPIC_AFTER_PERSISTED_RESOURCE_CHECKED, params); 110 } 111 } 112 113 private void matchActiveSubscriptionTopicsAndDeliver(ResourceModifiedMessage theMsg) { 114 115 Collection<SubscriptionTopic> topics = mySubscriptionTopicRegistry.getAll(); 116 for (SubscriptionTopic topic : topics) { 117 SubscriptionTopicMatcher matcher = 118 new SubscriptionTopicMatcher(mySubscriptionTopicSupport, topic, myMemoryCacheService); 119 InMemoryMatchResult result = matcher.match(theMsg); 120 if (result.matched()) { 121 int deliveries = deliverToTopicSubscriptions(theMsg, topic, result); 122 ourLog.info( 123 "Matched topic {} to message {}. Notifications sent to {} subscriptions for delivery.", 124 topic.getUrl(), 125 theMsg, 126 deliveries); 127 } 128 } 129 } 130 131 private int deliverToTopicSubscriptions( 132 ResourceModifiedMessage theMsg, 133 SubscriptionTopic theSubscriptionTopic, 134 InMemoryMatchResult theInMemoryMatchResult) { 135 String topicUrl = theSubscriptionTopic.getUrl(); 136 IBaseResource matchedResource = theMsg.getNewPayload(myFhirContext); 137 List<IBaseResource> matchedResourceList = Collections.singletonList(matchedResource); 138 RestOperationTypeEnum restOperationType = theMsg.getOperationType().asRestOperationType(); 139 140 return mySubscriptionTopicDispatcher.dispatch(new SubscriptionTopicDispatchRequest( 141 topicUrl, 142 matchedResourceList, 143 myInMemoryTopicFilterMatcher, 144 restOperationType, 145 theInMemoryMatchResult, 146 theMsg.getPartitionId(), 147 theMsg.getTransactionId())); 148 } 149}