001/*- 002 * #%L 003 * HAPI FHIR Subscription Server 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.interceptor.api.HookParams; 024import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; 025import ca.uhn.fhir.interceptor.api.Pointcut; 026import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult; 027import ca.uhn.fhir.jpa.subscription.match.matcher.matching.ISubscriptionMatcher; 028import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription; 029import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry; 030import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; 031import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; 032import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; 033import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc; 034import jakarta.annotation.Nonnull; 035import org.hl7.fhir.instance.model.api.IBaseResource; 036import org.hl7.fhir.instance.model.api.IIdType; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039import org.springframework.beans.factory.annotation.Autowired; 040import org.springframework.messaging.Message; 041import org.springframework.messaging.MessageHandler; 042import org.springframework.messaging.MessagingException; 043 044import java.util.Collection; 045import java.util.Optional; 046 047import static ca.uhn.fhir.rest.server.messaging.BaseResourceMessage.OperationTypeEnum.DELETE; 048import static org.apache.commons.lang3.StringUtils.isNotBlank; 049 050public class SubscriptionMatchingSubscriber implements MessageHandler { 051 private final Logger ourLog = LoggerFactory.getLogger(SubscriptionMatchingSubscriber.class); 052 public static final String SUBSCRIPTION_MATCHING_CHANNEL_NAME = "subscription-matching"; 053 054 @Autowired 055 private ISubscriptionMatcher mySubscriptionMatcher; 056 057 @Autowired 058 private FhirContext myFhirContext; 059 060 @Autowired 061 private SubscriptionRegistry mySubscriptionRegistry; 062 063 @Autowired 064 private IInterceptorBroadcaster myInterceptorBroadcaster; 065 066 @Autowired 067 private SubscriptionMatchDeliverer mySubscriptionMatchDeliverer; 068 069 @Autowired 070 private IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc; 071 072 /** 073 * Constructor 074 */ 075 public SubscriptionMatchingSubscriber() { 076 super(); 077 } 078 079 @Override 080 public void handleMessage(@Nonnull Message<?> theMessage) throws MessagingException { 081 ourLog.trace("Handling resource modified message: {}", theMessage); 082 083 if (!(theMessage instanceof ResourceModifiedJsonMessage)) { 084 ourLog.warn("Unexpected message payload type: {}", theMessage); 085 return; 086 } 087 088 ResourceModifiedMessage msg = ((ResourceModifiedJsonMessage) theMessage).getPayload(); 089 matchActiveSubscriptionsAndDeliver(msg); 090 } 091 092 public void matchActiveSubscriptionsAndDeliver(ResourceModifiedMessage theMsg) { 093 switch (theMsg.getOperationType()) { 094 case CREATE: 095 case UPDATE: 096 case MANUALLY_TRIGGERED: 097 case DELETE: 098 break; 099 default: 100 ourLog.trace("Not processing modified message for {}", theMsg.getOperationType()); 101 // ignore anything else 102 return; 103 } 104 105 if (theMsg.getPayload(myFhirContext) == null) { 106 // inflate the message and ignore any resource that cannot be found. 107 Optional<ResourceModifiedMessage> inflatedMsg = 108 myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessageOrNull(theMsg); 109 if (inflatedMsg.isEmpty()) { 110 return; 111 } 112 theMsg = inflatedMsg.get(); 113 } 114 115 // Interceptor call: SUBSCRIPTION_BEFORE_PERSISTED_RESOURCE_CHECKED 116 HookParams params = new HookParams().add(ResourceModifiedMessage.class, theMsg); 117 if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_BEFORE_PERSISTED_RESOURCE_CHECKED, params)) { 118 return; 119 } 120 121 try { 122 doMatchActiveSubscriptionsAndDeliver(theMsg); 123 } finally { 124 // Interceptor call: SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED 125 myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED, params); 126 } 127 } 128 129 private void doMatchActiveSubscriptionsAndDeliver(ResourceModifiedMessage theMsg) { 130 IIdType resourceId = theMsg.getPayloadId(myFhirContext); 131 132 Collection<ActiveSubscription> subscriptions = mySubscriptionRegistry.getAllNonTopicSubscriptions(); 133 134 ourLog.trace("Testing {} subscriptions for applicability", subscriptions.size()); 135 boolean anySubscriptionsMatchedResource = false; 136 137 for (ActiveSubscription nextActiveSubscription : subscriptions) { 138 anySubscriptionsMatchedResource |= processSubscription(theMsg, resourceId, nextActiveSubscription); 139 } 140 141 if (!anySubscriptionsMatchedResource) { 142 // Interceptor call: SUBSCRIPTION_RESOURCE_MATCHED 143 HookParams params = new HookParams().add(ResourceModifiedMessage.class, theMsg); 144 myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_RESOURCE_DID_NOT_MATCH_ANY_SUBSCRIPTIONS, params); 145 } 146 } 147 148 /** 149 * Returns true if subscription matched, and processing completed successfully, and the message was sent to the delivery channel. False otherwise. 150 * 151 */ 152 private boolean processSubscription( 153 ResourceModifiedMessage theMsg, IIdType theResourceId, ActiveSubscription theActiveSubscription) { 154 155 CanonicalSubscription subscription = theActiveSubscription.getSubscription(); 156 157 if (subscription != null 158 && theMsg.getPartitionId() != null 159 && theMsg.getPartitionId().hasPartitionIds() 160 && !subscription.isCrossPartitionEnabled() 161 && !theMsg.getPartitionId().hasPartitionId(subscription.getRequestPartitionId())) { 162 return false; 163 } 164 165 String nextSubscriptionId = theActiveSubscription.getId(); 166 167 if (isNotBlank(theMsg.getSubscriptionId())) { 168 if (!theMsg.getSubscriptionId().equals(nextSubscriptionId)) { 169 // TODO KHS we should use a hash to look it up instead of this full table scan 170 ourLog.debug( 171 "Ignoring subscription {} because it is not {}", 172 nextSubscriptionId, 173 theMsg.getSubscriptionId()); 174 return false; 175 } 176 } 177 178 if (!resourceTypeIsAppropriateForSubscription(theActiveSubscription, theResourceId)) { 179 return false; 180 } 181 182 if (theMsg.getOperationType().equals(DELETE)) { 183 if (!theActiveSubscription.getSubscription().getSendDeleteMessages()) { 184 ourLog.trace("Not processing modified message for {}", theMsg.getOperationType()); 185 return false; 186 } 187 } 188 189 InMemoryMatchResult matchResult; 190 if (theActiveSubscription.getCriteria().getType() == SubscriptionCriteriaParser.TypeEnum.SEARCH_EXPRESSION) { 191 matchResult = mySubscriptionMatcher.match(theActiveSubscription.getSubscription(), theMsg); 192 if (!matchResult.matched()) { 193 ourLog.trace( 194 "Subscription {} was not matched by resource {} {}", 195 theActiveSubscription.getId(), 196 theResourceId.toUnqualifiedVersionless().getValue(), 197 matchResult.isInMemory() ? "in-memory" : "by querying the repository"); 198 return false; 199 } 200 ourLog.debug( 201 "Subscription {} was matched by resource {} {}", 202 theActiveSubscription.getId(), 203 theResourceId.toUnqualifiedVersionless().getValue(), 204 matchResult.isInMemory() ? "in-memory" : "by querying the repository"); 205 } else { 206 ourLog.trace( 207 "Subscription {} was not matched by resource {} - No search expression", 208 theActiveSubscription.getId(), 209 theResourceId.toUnqualifiedVersionless().getValue()); 210 matchResult = InMemoryMatchResult.successfulMatch(); 211 matchResult.setInMemory(true); 212 } 213 214 IBaseResource payload = theMsg.getNewPayload(myFhirContext); 215 return mySubscriptionMatchDeliverer.deliverPayload(payload, theMsg, theActiveSubscription, matchResult); 216 } 217 218 private boolean resourceTypeIsAppropriateForSubscription( 219 ActiveSubscription theActiveSubscription, IIdType theResourceId) { 220 SubscriptionCriteriaParser.SubscriptionCriteria criteria = theActiveSubscription.getCriteria(); 221 String subscriptionId = theActiveSubscription.getId(); 222 String resourceType = theResourceId.getResourceType(); 223 224 // see if the criteria matches the created object 225 ourLog.trace("Checking subscription {} for {} with criteria {}", subscriptionId, resourceType, criteria); 226 227 if (criteria == null) { 228 ourLog.trace("Subscription {} has no criteria - Not matching", subscriptionId); 229 return false; 230 } 231 232 switch (criteria.getType()) { 233 default: 234 case SEARCH_EXPRESSION: 235 case MULTITYPE_EXPRESSION: 236 boolean contains = criteria.getApplicableResourceTypes().contains(resourceType); 237 ourLog.trace("Subscription {} applicable resource type check: {}", subscriptionId, contains); 238 return contains; 239 case STARTYPE_EXPRESSION: 240 boolean match = !resourceType.equals("Subscription"); 241 ourLog.trace("Subscription {} start resource type check: {}", subscriptionId, match); 242 return match; 243 } 244 } 245}