001/*-
002 * #%L
003 * HAPI FHIR Subscription Server
004 * %%
005 * Copyright (C) 2014 - 2025 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.subscription.match.deliver.message;
021
022import ca.uhn.fhir.broker.api.ChannelProducerSettings;
023import ca.uhn.fhir.broker.api.IBrokerClient;
024import ca.uhn.fhir.broker.api.IChannelProducer;
025import ca.uhn.fhir.i18n.Msg;
026import ca.uhn.fhir.interceptor.api.HookParams;
027import ca.uhn.fhir.interceptor.api.Pointcut;
028import ca.uhn.fhir.interceptor.model.IDefaultPartitionSettings;
029import ca.uhn.fhir.jpa.subscription.match.deliver.BaseSubscriptionDeliveryListener;
030import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
031import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
032import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
033import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
034import ca.uhn.fhir.rest.api.EncodingEnum;
035import ca.uhn.fhir.util.IoUtils;
036import org.hl7.fhir.instance.model.api.IBaseResource;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039import org.springframework.context.annotation.Scope;
040import org.springframework.messaging.MessagingException;
041
042import java.net.URI;
043import java.net.URISyntaxException;
044import java.util.Optional;
045
046import static org.apache.commons.lang3.StringUtils.isNotBlank;
047
048@Scope("prototype")
049public class SubscriptionDeliveringMessageListener extends BaseSubscriptionDeliveryListener implements AutoCloseable {
050        private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionDeliveringMessageListener.class);
051
052        private final IBrokerClient myBrokerClient;
053        private IChannelProducer<ResourceModifiedMessage> myChannelProducer;
054        private final IDefaultPartitionSettings myDefaultPartitionSettings;
055
056        /**
057         * Constructor
058         */
059        public SubscriptionDeliveringMessageListener(
060                        IBrokerClient theBrokerClient, IDefaultPartitionSettings theDefaultPartitionSettings) {
061                super();
062                myBrokerClient = theBrokerClient;
063                myDefaultPartitionSettings = theDefaultPartitionSettings;
064        }
065
066        public Class<ResourceDeliveryMessage> getPayloadType() {
067                return ResourceDeliveryMessage.class;
068        }
069
070        protected void doDelivery(
071                        ResourceDeliveryMessage theSourceMessage,
072                        CanonicalSubscription theSubscription,
073                        IChannelProducer<ResourceModifiedMessage> theChannelProducer,
074                        ResourceModifiedJsonMessage theWrappedMessageToSend) {
075                String payloadId = theSourceMessage.getPayloadId();
076                if (isNotBlank(theSubscription.getPayloadSearchCriteria())) {
077                        IBaseResource payloadResource = createDeliveryBundleForPayloadSearchCriteria(
078                                        theSubscription, theWrappedMessageToSend.getPayload().getResource(myFhirContext));
079                        ResourceModifiedJsonMessage newWrappedMessageToSend =
080                                        convertDeliveryMessageToResourceModifiedJsonMessage(theSourceMessage, payloadResource);
081                        theWrappedMessageToSend.setPayload(newWrappedMessageToSend.getPayload());
082                        payloadId =
083                                        payloadResource.getIdElement().toUnqualifiedVersionless().getValue();
084                }
085                theChannelProducer.send(theWrappedMessageToSend);
086                ourLog.debug(
087                                "Delivering {} message payload {} for {}",
088                                theSourceMessage.getOperationType(),
089                                payloadId,
090                                theSubscription
091                                                .getIdElement(myFhirContext)
092                                                .toUnqualifiedVersionless()
093                                                .getValue());
094        }
095
096        private ResourceModifiedJsonMessage convertDeliveryMessageToResourceModifiedJsonMessage(
097                        ResourceDeliveryMessage theMsg, IBaseResource thePayloadResource) {
098                ResourceModifiedMessage payload = new ResourceModifiedMessage(
099                                myFhirContext,
100                                thePayloadResource,
101                                theMsg.getOperationType(),
102                                myDefaultPartitionSettings.getDefaultRequestPartitionId());
103                payload.setPayloadMessageKey(theMsg.getPayloadMessageKey());
104                payload.setTransactionId(theMsg.getTransactionId());
105                payload.setPartitionId(theMsg.getPartitionId());
106                return new ResourceModifiedJsonMessage(payload);
107        }
108
109        @Override
110        public void handleMessage(ResourceDeliveryMessage theMessage) throws MessagingException, URISyntaxException {
111                CanonicalSubscription subscription = theMessage.getSubscription();
112                IBaseResource payloadResource = theMessage.getPayload(myFhirContext);
113                if (payloadResource == null) {
114                        Optional<ResourceModifiedMessage> inflatedMsg =
115                                        inflateResourceModifiedMessageFromDeliveryMessage(theMessage);
116                        if (inflatedMsg.isEmpty()) {
117                                return;
118                        }
119                        payloadResource = inflatedMsg.get().getResource(myFhirContext);
120                }
121
122                ResourceModifiedJsonMessage messageWrapperToSend =
123                                convertDeliveryMessageToResourceModifiedJsonMessage(theMessage, payloadResource);
124
125                // Interceptor call: SUBSCRIPTION_BEFORE_MESSAGE_DELIVERY
126                HookParams params = new HookParams()
127                                .add(CanonicalSubscription.class, subscription)
128                                .add(ResourceDeliveryMessage.class, theMessage)
129                                .add(ResourceModifiedJsonMessage.class, messageWrapperToSend);
130                if (!getInterceptorBroadcaster().callHooks(Pointcut.SUBSCRIPTION_BEFORE_MESSAGE_DELIVERY, params)) {
131                        return;
132                }
133                // Grab the endpoint from the subscription
134                String endpointUrl = subscription.getEndpointUrl();
135
136                String queueName = extractQueueNameFromEndpoint(endpointUrl);
137
138                ChannelProducerSettings channelSettings = new ChannelProducerSettings();
139                channelSettings.setQualifyChannelName(false);
140
141                if (myChannelProducer == null) {
142                        myChannelProducer =
143                                        myBrokerClient.getOrCreateProducer(queueName, ResourceModifiedJsonMessage.class, channelSettings);
144                }
145
146                // Grab the payload type (encoding mimetype) from the subscription
147                String payloadString = subscription.getPayloadString();
148                EncodingEnum payloadType = null;
149                if (payloadString != null) {
150                        payloadType = EncodingEnum.forContentType(payloadString);
151                }
152
153                if (payloadType != EncodingEnum.JSON) {
154                        throw new UnsupportedOperationException(
155                                        Msg.code(4) + "Only JSON payload type is currently supported for Message Subscriptions");
156                }
157
158                doDelivery(theMessage, subscription, myChannelProducer, messageWrapperToSend);
159
160                // Interceptor call: SUBSCRIPTION_AFTER_MESSAGE_DELIVERY
161                params = new HookParams()
162                                .add(CanonicalSubscription.class, subscription)
163                                .add(ResourceDeliveryMessage.class, theMessage);
164                if (!getInterceptorBroadcaster().callHooks(Pointcut.SUBSCRIPTION_AFTER_MESSAGE_DELIVERY, params)) {
165                        //noinspection UnnecessaryReturnStatement
166                        return;
167                }
168        }
169
170        private String extractQueueNameFromEndpoint(String theEndpointUrl) throws URISyntaxException {
171                URI uri = new URI(theEndpointUrl);
172                return uri.getSchemeSpecificPart();
173        }
174
175        @Override
176        public void close() throws Exception {
177                if (myChannelProducer instanceof AutoCloseable) {
178                        IoUtils.closeQuietly((AutoCloseable) myChannelProducer, ourLog);
179                }
180        }
181}