001/*- 002 * #%L 003 * HAPI FHIR Subscription Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.i18n.Msg; 024import ca.uhn.fhir.interceptor.api.HookParams; 025import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; 026import ca.uhn.fhir.interceptor.api.Pointcut; 027import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult; 028import ca.uhn.fhir.jpa.subscription.channel.api.PayloadTooLargeException; 029import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry; 030import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription; 031import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; 032import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage; 033import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage; 034import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; 035import ca.uhn.fhir.rest.api.EncodingEnum; 036import jakarta.annotation.Nonnull; 037import jakarta.annotation.Nullable; 038import org.hl7.fhir.instance.model.api.IBaseResource; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041import org.springframework.messaging.MessageChannel; 042 043import static org.apache.commons.lang3.ObjectUtils.defaultIfNull; 044 045public class SubscriptionMatchDeliverer { 046 private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionMatchDeliverer.class); 047 private final FhirContext myFhirContext; 048 private final IInterceptorBroadcaster myInterceptorBroadcaster; 049 private final SubscriptionChannelRegistry mySubscriptionChannelRegistry; 050 051 public SubscriptionMatchDeliverer( 052 FhirContext theFhirContext, 053 IInterceptorBroadcaster theInterceptorBroadcaster, 054 SubscriptionChannelRegistry theSubscriptionChannelRegistry) { 055 myFhirContext = theFhirContext; 056 myInterceptorBroadcaster = theInterceptorBroadcaster; 057 mySubscriptionChannelRegistry = theSubscriptionChannelRegistry; 058 } 059 060 public boolean deliverPayload( 061 @Nullable IBaseResource thePayload, 062 @Nonnull ResourceModifiedMessage theMsg, 063 @Nonnull ActiveSubscription theActiveSubscription, 064 @Nullable InMemoryMatchResult theInMemoryMatchResult) { 065 SubscriptionDeliveryRequest subscriptionDeliveryRequest; 066 if (thePayload != null) { 067 subscriptionDeliveryRequest = new SubscriptionDeliveryRequest(thePayload, theMsg, theActiveSubscription); 068 } else { 069 subscriptionDeliveryRequest = 070 new SubscriptionDeliveryRequest(theMsg.getPayloadId(myFhirContext), theMsg, theActiveSubscription); 071 } 072 ResourceDeliveryMessage deliveryMsg = buildResourceDeliveryMessage(subscriptionDeliveryRequest); 073 deliveryMsg.copyAdditionalPropertiesFrom(theMsg); 074 075 return sendToDeliveryChannel(theActiveSubscription, theInMemoryMatchResult, deliveryMsg); 076 } 077 078 public boolean deliverPayload( 079 @Nonnull SubscriptionDeliveryRequest subscriptionDeliveryRequest, 080 @Nullable InMemoryMatchResult theInMemoryMatchResult) { 081 ResourceDeliveryMessage deliveryMsg = buildResourceDeliveryMessage(subscriptionDeliveryRequest); 082 083 return sendToDeliveryChannel( 084 subscriptionDeliveryRequest.getActiveSubscription(), theInMemoryMatchResult, deliveryMsg); 085 } 086 087 private boolean sendToDeliveryChannel( 088 @Nonnull ActiveSubscription theActiveSubscription, 089 @Nullable InMemoryMatchResult theInMemoryMatchResult, 090 @Nonnull ResourceDeliveryMessage deliveryMsg) { 091 if (!callHooks(theActiveSubscription, theInMemoryMatchResult, deliveryMsg)) { 092 return false; 093 } 094 095 boolean retVal = false; 096 ResourceDeliveryJsonMessage wrappedMsg = new ResourceDeliveryJsonMessage(deliveryMsg); 097 MessageChannel deliveryChannel = 098 mySubscriptionChannelRegistry.getDeliverySenderChannel(theActiveSubscription.getChannelName()); 099 if (deliveryChannel != null) { 100 retVal = true; 101 trySendToDeliveryChannel(wrappedMsg, deliveryChannel); 102 } else { 103 ourLog.warn("Do not have delivery channel for subscription {}", theActiveSubscription.getId()); 104 } 105 return retVal; 106 } 107 108 private ResourceDeliveryMessage buildResourceDeliveryMessage(@Nonnull SubscriptionDeliveryRequest theRequest) { 109 EncodingEnum encoding = null; 110 111 CanonicalSubscription subscription = theRequest.getSubscription(); 112 113 if (subscription != null 114 && subscription.getPayloadString() != null 115 && !subscription.getPayloadString().isEmpty()) { 116 encoding = EncodingEnum.forContentType(subscription.getPayloadString()); 117 } 118 encoding = defaultIfNull(encoding, EncodingEnum.JSON); 119 120 ResourceDeliveryMessage deliveryMsg = new ResourceDeliveryMessage(); 121 deliveryMsg.setPartitionId(theRequest.getRequestPartitionId()); 122 123 if (theRequest.hasPayload()) { 124 deliveryMsg.setPayload(myFhirContext, theRequest.getPayload(), encoding); 125 } else { 126 deliveryMsg.setPayloadId(theRequest.getPayloadId()); 127 } 128 deliveryMsg.setSubscription(subscription); 129 deliveryMsg.setOperationType(theRequest.getOperationType()); 130 deliveryMsg.setTransactionId(theRequest.getTransactionId()); 131 return deliveryMsg; 132 } 133 134 private boolean callHooks( 135 ActiveSubscription theActiveSubscription, 136 InMemoryMatchResult theInMemoryMatchResult, 137 ResourceDeliveryMessage deliveryMsg) { 138 // Interceptor call: SUBSCRIPTION_RESOURCE_MATCHED 139 HookParams params = new HookParams() 140 .add(CanonicalSubscription.class, theActiveSubscription.getSubscription()) 141 .add(ResourceDeliveryMessage.class, deliveryMsg) 142 .add(InMemoryMatchResult.class, theInMemoryMatchResult); 143 if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_RESOURCE_MATCHED, params)) { 144 ourLog.info( 145 "Interceptor has decided to abort processing of subscription {}", theActiveSubscription.getId()); 146 return false; 147 } 148 return true; 149 } 150 151 private void trySendToDeliveryChannel( 152 ResourceDeliveryJsonMessage theWrappedMsg, MessageChannel theDeliveryChannel) { 153 try { 154 boolean success = theDeliveryChannel.send(theWrappedMsg); 155 if (!success) { 156 ourLog.warn("Failed to send message to Delivery Channel."); 157 } 158 } catch (RuntimeException e) { 159 if (e.getCause() instanceof PayloadTooLargeException) { 160 ourLog.warn("Failed to send message to Delivery Channel because the payload size is larger than broker " 161 + "max message size. Retry is about to be performed without payload."); 162 ResourceDeliveryJsonMessage msgPayloadLess = nullOutPayload(theWrappedMsg); 163 trySendToDeliveryChannel(msgPayloadLess, theDeliveryChannel); 164 } else { 165 ourLog.error("Failed to send message to Delivery Channel", e); 166 throw new RuntimeException(Msg.code(7) + "Failed to send message to Delivery Channel", e); 167 } 168 } 169 } 170 171 private ResourceDeliveryJsonMessage nullOutPayload(ResourceDeliveryJsonMessage theWrappedMsg) { 172 ResourceDeliveryMessage resourceDeliveryMessage = theWrappedMsg.getPayload(); 173 resourceDeliveryMessage.setPayloadToNull(); 174 return new ResourceDeliveryJsonMessage(resourceDeliveryMessage); 175 } 176}