001/*-
002 * #%L
003 * HAPI FHIR Subscription Server
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber;
021
022import ca.uhn.fhir.context.FhirContext;
023import ca.uhn.fhir.i18n.Msg;
024import ca.uhn.fhir.interceptor.api.HookParams;
025import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
026import ca.uhn.fhir.interceptor.api.Pointcut;
027import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
028import ca.uhn.fhir.jpa.subscription.channel.api.PayloadTooLargeException;
029import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry;
030import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
031import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
032import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage;
033import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
034import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
035import ca.uhn.fhir.rest.api.EncodingEnum;
036import jakarta.annotation.Nonnull;
037import jakarta.annotation.Nullable;
038import org.hl7.fhir.instance.model.api.IBaseResource;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041import org.springframework.messaging.MessageChannel;
042
043import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
044
045public class SubscriptionMatchDeliverer {
046        private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionMatchDeliverer.class);
047        private final FhirContext myFhirContext;
048        private final IInterceptorBroadcaster myInterceptorBroadcaster;
049        private final SubscriptionChannelRegistry mySubscriptionChannelRegistry;
050
051        public SubscriptionMatchDeliverer(
052                        FhirContext theFhirContext,
053                        IInterceptorBroadcaster theInterceptorBroadcaster,
054                        SubscriptionChannelRegistry theSubscriptionChannelRegistry) {
055                myFhirContext = theFhirContext;
056                myInterceptorBroadcaster = theInterceptorBroadcaster;
057                mySubscriptionChannelRegistry = theSubscriptionChannelRegistry;
058        }
059
060        public boolean deliverPayload(
061                        @Nullable IBaseResource thePayload,
062                        @Nonnull ResourceModifiedMessage theMsg,
063                        @Nonnull ActiveSubscription theActiveSubscription,
064                        @Nullable InMemoryMatchResult theInMemoryMatchResult) {
065                SubscriptionDeliveryRequest subscriptionDeliveryRequest;
066                if (thePayload != null) {
067                        subscriptionDeliveryRequest = new SubscriptionDeliveryRequest(thePayload, theMsg, theActiveSubscription);
068                } else {
069                        subscriptionDeliveryRequest =
070                                        new SubscriptionDeliveryRequest(theMsg.getPayloadId(myFhirContext), theMsg, theActiveSubscription);
071                }
072                ResourceDeliveryMessage deliveryMsg = buildResourceDeliveryMessage(subscriptionDeliveryRequest);
073                deliveryMsg.copyAdditionalPropertiesFrom(theMsg);
074
075                return sendToDeliveryChannel(theActiveSubscription, theInMemoryMatchResult, deliveryMsg);
076        }
077
078        public boolean deliverPayload(
079                        @Nonnull SubscriptionDeliveryRequest subscriptionDeliveryRequest,
080                        @Nullable InMemoryMatchResult theInMemoryMatchResult) {
081                ResourceDeliveryMessage deliveryMsg = buildResourceDeliveryMessage(subscriptionDeliveryRequest);
082
083                return sendToDeliveryChannel(
084                                subscriptionDeliveryRequest.getActiveSubscription(), theInMemoryMatchResult, deliveryMsg);
085        }
086
087        private boolean sendToDeliveryChannel(
088                        @Nonnull ActiveSubscription theActiveSubscription,
089                        @Nullable InMemoryMatchResult theInMemoryMatchResult,
090                        @Nonnull ResourceDeliveryMessage deliveryMsg) {
091                if (!callHooks(theActiveSubscription, theInMemoryMatchResult, deliveryMsg)) {
092                        return false;
093                }
094
095                boolean retVal = false;
096                ResourceDeliveryJsonMessage wrappedMsg = new ResourceDeliveryJsonMessage(deliveryMsg);
097                MessageChannel deliveryChannel =
098                                mySubscriptionChannelRegistry.getDeliverySenderChannel(theActiveSubscription.getChannelName());
099                if (deliveryChannel != null) {
100                        retVal = true;
101                        trySendToDeliveryChannel(wrappedMsg, deliveryChannel);
102                } else {
103                        ourLog.warn("Do not have delivery channel for subscription {}", theActiveSubscription.getId());
104                }
105                return retVal;
106        }
107
108        private ResourceDeliveryMessage buildResourceDeliveryMessage(@Nonnull SubscriptionDeliveryRequest theRequest) {
109                EncodingEnum encoding = null;
110
111                CanonicalSubscription subscription = theRequest.getSubscription();
112
113                if (subscription != null
114                                && subscription.getPayloadString() != null
115                                && !subscription.getPayloadString().isEmpty()) {
116                        encoding = EncodingEnum.forContentType(subscription.getPayloadString());
117                }
118                encoding = defaultIfNull(encoding, EncodingEnum.JSON);
119
120                ResourceDeliveryMessage deliveryMsg = new ResourceDeliveryMessage();
121                deliveryMsg.setPartitionId(theRequest.getRequestPartitionId());
122
123                if (theRequest.hasPayload()) {
124                        deliveryMsg.setPayload(myFhirContext, theRequest.getPayload(), encoding);
125                } else {
126                        deliveryMsg.setPayloadId(theRequest.getPayloadId());
127                }
128                deliveryMsg.setSubscription(subscription);
129                deliveryMsg.setOperationType(theRequest.getOperationType());
130                deliveryMsg.setTransactionId(theRequest.getTransactionId());
131                return deliveryMsg;
132        }
133
134        private boolean callHooks(
135                        ActiveSubscription theActiveSubscription,
136                        InMemoryMatchResult theInMemoryMatchResult,
137                        ResourceDeliveryMessage deliveryMsg) {
138                // Interceptor call: SUBSCRIPTION_RESOURCE_MATCHED
139                HookParams params = new HookParams()
140                                .add(CanonicalSubscription.class, theActiveSubscription.getSubscription())
141                                .add(ResourceDeliveryMessage.class, deliveryMsg)
142                                .add(InMemoryMatchResult.class, theInMemoryMatchResult);
143                if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_RESOURCE_MATCHED, params)) {
144                        ourLog.info(
145                                        "Interceptor has decided to abort processing of subscription {}", theActiveSubscription.getId());
146                        return false;
147                }
148                return true;
149        }
150
151        private void trySendToDeliveryChannel(
152                        ResourceDeliveryJsonMessage theWrappedMsg, MessageChannel theDeliveryChannel) {
153                try {
154                        boolean success = theDeliveryChannel.send(theWrappedMsg);
155                        if (!success) {
156                                ourLog.warn("Failed to send message to Delivery Channel.");
157                        }
158                } catch (RuntimeException e) {
159                        if (e.getCause() instanceof PayloadTooLargeException) {
160                                ourLog.warn("Failed to send message to Delivery Channel because the payload size is larger than broker "
161                                                + "max message size. Retry is about to be performed without payload.");
162                                ResourceDeliveryJsonMessage msgPayloadLess = nullOutPayload(theWrappedMsg);
163                                trySendToDeliveryChannel(msgPayloadLess, theDeliveryChannel);
164                        } else {
165                                ourLog.error("Failed to send message to Delivery Channel", e);
166                                throw new RuntimeException(Msg.code(7) + "Failed to send message to Delivery Channel", e);
167                        }
168                }
169        }
170
171        private ResourceDeliveryJsonMessage nullOutPayload(ResourceDeliveryJsonMessage theWrappedMsg) {
172                ResourceDeliveryMessage resourceDeliveryMessage = theWrappedMsg.getPayload();
173                resourceDeliveryMessage.setPayloadToNull();
174                return new ResourceDeliveryJsonMessage(resourceDeliveryMessage);
175        }
176}