001/*-
002 * #%L
003 * HAPI FHIR Subscription Server
004 * %%
005 * Copyright (C) 2014 - 2025 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber;
021
022import ca.uhn.fhir.broker.api.IChannelProducer;
023import ca.uhn.fhir.broker.api.ISendResult;
024import ca.uhn.fhir.broker.api.PayloadTooLargeException;
025import ca.uhn.fhir.context.FhirContext;
026import ca.uhn.fhir.i18n.Msg;
027import ca.uhn.fhir.interceptor.api.HookParams;
028import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
029import ca.uhn.fhir.interceptor.api.Pointcut;
030import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
031import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry;
032import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
033import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
034import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage;
035import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
036import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
037import ca.uhn.fhir.rest.api.EncodingEnum;
038import jakarta.annotation.Nonnull;
039import jakarta.annotation.Nullable;
040import org.hl7.fhir.instance.model.api.IBaseResource;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
045
046public class SubscriptionMatchDeliverer {
047        private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionMatchDeliverer.class);
048        private final FhirContext myFhirContext;
049        private final IInterceptorBroadcaster myInterceptorBroadcaster;
050        private final SubscriptionChannelRegistry mySubscriptionChannelRegistry;
051
052        public SubscriptionMatchDeliverer(
053                        FhirContext theFhirContext,
054                        IInterceptorBroadcaster theInterceptorBroadcaster,
055                        SubscriptionChannelRegistry theSubscriptionChannelRegistry) {
056                myFhirContext = theFhirContext;
057                myInterceptorBroadcaster = theInterceptorBroadcaster;
058                mySubscriptionChannelRegistry = theSubscriptionChannelRegistry;
059        }
060
061        public ISendResult deliverPayload(
062                        @Nullable IBaseResource thePayload,
063                        @Nonnull ResourceModifiedMessage theMsg,
064                        @Nonnull ActiveSubscription theActiveSubscription,
065                        @Nullable InMemoryMatchResult theInMemoryMatchResult) {
066                SubscriptionDeliveryRequest subscriptionDeliveryRequest;
067                if (thePayload != null) {
068                        subscriptionDeliveryRequest = new SubscriptionDeliveryRequest(thePayload, theMsg, theActiveSubscription);
069                } else {
070                        subscriptionDeliveryRequest =
071                                        new SubscriptionDeliveryRequest(theMsg.getPayloadId(myFhirContext), theMsg, theActiveSubscription);
072                }
073                ResourceDeliveryMessage deliveryMsg = buildResourceDeliveryMessage(subscriptionDeliveryRequest);
074                deliveryMsg.copyAdditionalPropertiesFrom(theMsg);
075
076                return sendToDeliveryChannel(theActiveSubscription, theInMemoryMatchResult, deliveryMsg);
077        }
078
079        public ISendResult deliverPayload(
080                        @Nonnull SubscriptionDeliveryRequest subscriptionDeliveryRequest,
081                        @Nullable InMemoryMatchResult theInMemoryMatchResult) {
082                ResourceDeliveryMessage deliveryMsg = buildResourceDeliveryMessage(subscriptionDeliveryRequest);
083
084                return sendToDeliveryChannel(
085                                subscriptionDeliveryRequest.getActiveSubscription(), theInMemoryMatchResult, deliveryMsg);
086        }
087
088        private ISendResult sendToDeliveryChannel(
089                        @Nonnull ActiveSubscription theActiveSubscription,
090                        @Nullable InMemoryMatchResult theInMemoryMatchResult,
091                        @Nonnull ResourceDeliveryMessage deliveryMsg) {
092                if (!callHooks(theActiveSubscription, theInMemoryMatchResult, deliveryMsg)) {
093                        return ISendResult.FAILURE;
094                }
095
096                ResourceDeliveryJsonMessage wrappedMsg = new ResourceDeliveryJsonMessage(deliveryMsg);
097                IChannelProducer<ResourceDeliveryMessage> deliveryProducer =
098                                mySubscriptionChannelRegistry.getDeliveryChannelProducer(theActiveSubscription.getChannelName());
099                if (deliveryProducer != null) {
100                        return trySendToDeliveryChannel(wrappedMsg, deliveryProducer);
101                } else {
102                        ourLog.warn("Do not have delivery channel for subscription {}", theActiveSubscription.getId());
103                }
104                return ISendResult.FAILURE;
105        }
106
107        private ResourceDeliveryMessage buildResourceDeliveryMessage(@Nonnull SubscriptionDeliveryRequest theRequest) {
108                EncodingEnum encoding = null;
109
110                CanonicalSubscription subscription = theRequest.getSubscription();
111
112                if (subscription != null
113                                && subscription.getPayloadString() != null
114                                && !subscription.getPayloadString().isEmpty()) {
115                        encoding = EncodingEnum.forContentType(subscription.getPayloadString());
116                }
117                encoding = defaultIfNull(encoding, EncodingEnum.JSON);
118
119                ResourceDeliveryMessage deliveryMsg = new ResourceDeliveryMessage();
120                deliveryMsg.setPartitionId(theRequest.getRequestPartitionId());
121
122                if (theRequest.hasPayload()) {
123                        deliveryMsg.setPayload(myFhirContext, theRequest.getPayload(), encoding);
124                } else {
125                        deliveryMsg.setPayloadId(theRequest.getPayloadId());
126                }
127                deliveryMsg.setSubscription(subscription);
128                deliveryMsg.setOperationType(theRequest.getOperationType());
129                deliveryMsg.setTransactionId(theRequest.getTransactionId());
130                return deliveryMsg;
131        }
132
133        private boolean callHooks(
134                        ActiveSubscription theActiveSubscription,
135                        InMemoryMatchResult theInMemoryMatchResult,
136                        ResourceDeliveryMessage deliveryMsg) {
137                // Interceptor call: SUBSCRIPTION_RESOURCE_MATCHED
138                HookParams params = new HookParams()
139                                .add(CanonicalSubscription.class, theActiveSubscription.getSubscription())
140                                .add(ResourceDeliveryMessage.class, deliveryMsg)
141                                .add(InMemoryMatchResult.class, theInMemoryMatchResult);
142                if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_RESOURCE_MATCHED, params)) {
143                        ourLog.info(
144                                        "Interceptor has decided to abort processing of subscription {}", theActiveSubscription.getId());
145                        return false;
146                }
147                return true;
148        }
149
150        private ISendResult trySendToDeliveryChannel(
151                        ResourceDeliveryJsonMessage theWrappedMsg, IChannelProducer<ResourceDeliveryMessage> theDeliveryProducer) {
152                try {
153                        ISendResult retval = theDeliveryProducer.send(theWrappedMsg);
154                        if (!retval.isSuccessful()) {
155                                ourLog.warn("Failed to send message to Delivery Channel.");
156                        }
157                        return retval;
158                } catch (RuntimeException e) {
159                        if (e instanceof PayloadTooLargeException || e.getCause() instanceof PayloadTooLargeException) {
160                                ourLog.warn("Failed to send message to Delivery Channel because the payload size is larger than broker "
161                                                + "max message size. Retry is about to be performed without payload.");
162                                ResourceDeliveryJsonMessage msgPayloadLess = nullOutPayload(theWrappedMsg);
163                                return trySendToDeliveryChannel(msgPayloadLess, theDeliveryProducer);
164                        } else {
165                                ourLog.error("Failed to send message to Delivery Channel", e);
166                                throw new RuntimeException(Msg.code(7) + "Failed to send message to Delivery Channel", e);
167                        }
168                }
169        }
170
171        private ResourceDeliveryJsonMessage nullOutPayload(ResourceDeliveryJsonMessage theWrappedMsg) {
172                ResourceDeliveryMessage resourceDeliveryMessage = theWrappedMsg.getPayload();
173                resourceDeliveryMessage.setPayloadToNull();
174                return new ResourceDeliveryJsonMessage(resourceDeliveryMessage);
175        }
176}