
001/*- 002 * #%L 003 * HAPI FHIR Subscription Server 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber; 021 022import ca.uhn.fhir.broker.api.IChannelProducer; 023import ca.uhn.fhir.broker.api.ISendResult; 024import ca.uhn.fhir.broker.api.PayloadTooLargeException; 025import ca.uhn.fhir.context.FhirContext; 026import ca.uhn.fhir.i18n.Msg; 027import ca.uhn.fhir.interceptor.api.HookParams; 028import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; 029import ca.uhn.fhir.interceptor.api.Pointcut; 030import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult; 031import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry; 032import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription; 033import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; 034import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage; 035import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage; 036import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; 037import ca.uhn.fhir.rest.api.EncodingEnum; 038import jakarta.annotation.Nonnull; 039import jakarta.annotation.Nullable; 040import org.hl7.fhir.instance.model.api.IBaseResource; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import static org.apache.commons.lang3.ObjectUtils.defaultIfNull; 045 046public class SubscriptionMatchDeliverer { 047 private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionMatchDeliverer.class); 048 private final FhirContext myFhirContext; 049 private final IInterceptorBroadcaster myInterceptorBroadcaster; 050 private final SubscriptionChannelRegistry mySubscriptionChannelRegistry; 051 052 public SubscriptionMatchDeliverer( 053 FhirContext theFhirContext, 054 IInterceptorBroadcaster theInterceptorBroadcaster, 055 SubscriptionChannelRegistry theSubscriptionChannelRegistry) { 056 myFhirContext = theFhirContext; 057 myInterceptorBroadcaster = theInterceptorBroadcaster; 058 mySubscriptionChannelRegistry = theSubscriptionChannelRegistry; 059 } 060 061 public ISendResult deliverPayload( 062 @Nullable IBaseResource thePayload, 063 @Nonnull ResourceModifiedMessage theMsg, 064 @Nonnull ActiveSubscription theActiveSubscription, 065 @Nullable InMemoryMatchResult theInMemoryMatchResult) { 066 SubscriptionDeliveryRequest subscriptionDeliveryRequest; 067 if (thePayload != null) { 068 subscriptionDeliveryRequest = new SubscriptionDeliveryRequest(thePayload, theMsg, theActiveSubscription); 069 } else { 070 subscriptionDeliveryRequest = 071 new SubscriptionDeliveryRequest(theMsg.getPayloadId(myFhirContext), theMsg, theActiveSubscription); 072 } 073 ResourceDeliveryMessage deliveryMsg = buildResourceDeliveryMessage(subscriptionDeliveryRequest); 074 deliveryMsg.copyAdditionalPropertiesFrom(theMsg); 075 076 return sendToDeliveryChannel(theActiveSubscription, theInMemoryMatchResult, deliveryMsg); 077 } 078 079 public ISendResult deliverPayload( 080 @Nonnull SubscriptionDeliveryRequest subscriptionDeliveryRequest, 081 @Nullable InMemoryMatchResult theInMemoryMatchResult) { 082 ResourceDeliveryMessage deliveryMsg = buildResourceDeliveryMessage(subscriptionDeliveryRequest); 083 084 return sendToDeliveryChannel( 085 subscriptionDeliveryRequest.getActiveSubscription(), theInMemoryMatchResult, deliveryMsg); 086 } 087 088 private ISendResult sendToDeliveryChannel( 089 @Nonnull ActiveSubscription theActiveSubscription, 090 @Nullable InMemoryMatchResult theInMemoryMatchResult, 091 @Nonnull ResourceDeliveryMessage deliveryMsg) { 092 if (!callHooks(theActiveSubscription, theInMemoryMatchResult, deliveryMsg)) { 093 return ISendResult.FAILURE; 094 } 095 096 ResourceDeliveryJsonMessage wrappedMsg = new ResourceDeliveryJsonMessage(deliveryMsg); 097 IChannelProducer<ResourceDeliveryMessage> deliveryProducer = 098 mySubscriptionChannelRegistry.getDeliveryChannelProducer(theActiveSubscription.getChannelName()); 099 if (deliveryProducer != null) { 100 return trySendToDeliveryChannel(wrappedMsg, deliveryProducer); 101 } else { 102 ourLog.warn("Do not have delivery channel for subscription {}", theActiveSubscription.getId()); 103 } 104 return ISendResult.FAILURE; 105 } 106 107 private ResourceDeliveryMessage buildResourceDeliveryMessage(@Nonnull SubscriptionDeliveryRequest theRequest) { 108 EncodingEnum encoding = null; 109 110 CanonicalSubscription subscription = theRequest.getSubscription(); 111 112 if (subscription != null 113 && subscription.getPayloadString() != null 114 && !subscription.getPayloadString().isEmpty()) { 115 encoding = EncodingEnum.forContentType(subscription.getPayloadString()); 116 } 117 encoding = defaultIfNull(encoding, EncodingEnum.JSON); 118 119 ResourceDeliveryMessage deliveryMsg = new ResourceDeliveryMessage(); 120 deliveryMsg.setPartitionId(theRequest.getRequestPartitionId()); 121 122 if (theRequest.hasPayload()) { 123 deliveryMsg.setPayload(myFhirContext, theRequest.getPayload(), encoding); 124 } else { 125 deliveryMsg.setPayloadId(theRequest.getPayloadId()); 126 } 127 deliveryMsg.setSubscription(subscription); 128 deliveryMsg.setOperationType(theRequest.getOperationType()); 129 deliveryMsg.setTransactionId(theRequest.getTransactionId()); 130 return deliveryMsg; 131 } 132 133 private boolean callHooks( 134 ActiveSubscription theActiveSubscription, 135 InMemoryMatchResult theInMemoryMatchResult, 136 ResourceDeliveryMessage deliveryMsg) { 137 // Interceptor call: SUBSCRIPTION_RESOURCE_MATCHED 138 HookParams params = new HookParams() 139 .add(CanonicalSubscription.class, theActiveSubscription.getSubscription()) 140 .add(ResourceDeliveryMessage.class, deliveryMsg) 141 .add(InMemoryMatchResult.class, theInMemoryMatchResult); 142 if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_RESOURCE_MATCHED, params)) { 143 ourLog.info( 144 "Interceptor has decided to abort processing of subscription {}", theActiveSubscription.getId()); 145 return false; 146 } 147 return true; 148 } 149 150 private ISendResult trySendToDeliveryChannel( 151 ResourceDeliveryJsonMessage theWrappedMsg, IChannelProducer<ResourceDeliveryMessage> theDeliveryProducer) { 152 try { 153 ISendResult retval = theDeliveryProducer.send(theWrappedMsg); 154 if (!retval.isSuccessful()) { 155 ourLog.warn("Failed to send message to Delivery Channel."); 156 } 157 return retval; 158 } catch (RuntimeException e) { 159 if (e instanceof PayloadTooLargeException || e.getCause() instanceof PayloadTooLargeException) { 160 ourLog.warn("Failed to send message to Delivery Channel because the payload size is larger than broker " 161 + "max message size. Retry is about to be performed without payload."); 162 ResourceDeliveryJsonMessage msgPayloadLess = nullOutPayload(theWrappedMsg); 163 return trySendToDeliveryChannel(msgPayloadLess, theDeliveryProducer); 164 } else { 165 ourLog.error("Failed to send message to Delivery Channel", e); 166 throw new RuntimeException(Msg.code(7) + "Failed to send message to Delivery Channel", e); 167 } 168 } 169 } 170 171 private ResourceDeliveryJsonMessage nullOutPayload(ResourceDeliveryJsonMessage theWrappedMsg) { 172 ResourceDeliveryMessage resourceDeliveryMessage = theWrappedMsg.getPayload(); 173 resourceDeliveryMessage.setPayloadToNull(); 174 return new ResourceDeliveryJsonMessage(resourceDeliveryMessage); 175 } 176}