001/*-
002 * #%L
003 * HAPI FHIR Subscription Server
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.subscription.match.deliver.message;
021
022import ca.uhn.fhir.i18n.Msg;
023import ca.uhn.fhir.interceptor.api.HookParams;
024import ca.uhn.fhir.interceptor.api.Pointcut;
025import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings;
026import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
027import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
028import ca.uhn.fhir.jpa.subscription.match.deliver.BaseSubscriptionDeliverySubscriber;
029import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
030import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
031import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
032import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
033import ca.uhn.fhir.rest.api.EncodingEnum;
034import org.hl7.fhir.instance.model.api.IBaseResource;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037import org.springframework.context.annotation.Scope;
038import org.springframework.messaging.MessagingException;
039
040import java.net.URI;
041import java.net.URISyntaxException;
042import java.util.Optional;
043
044import static org.apache.commons.lang3.StringUtils.isNotBlank;
045
046@Scope("prototype")
047public class SubscriptionDeliveringMessageSubscriber extends BaseSubscriptionDeliverySubscriber {
048        private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionDeliveringMessageSubscriber.class);
049
050        private final IChannelFactory myChannelFactory;
051
052        /**
053         * Constructor
054         */
055        public SubscriptionDeliveringMessageSubscriber(IChannelFactory theChannelFactory) {
056                super();
057                myChannelFactory = theChannelFactory;
058        }
059
060        protected void doDelivery(
061                        ResourceDeliveryMessage theSourceMessage,
062                        CanonicalSubscription theSubscription,
063                        IChannelProducer theChannelProducer,
064                        ResourceModifiedJsonMessage theWrappedMessageToSend) {
065                String payloadId = theSourceMessage.getPayloadId();
066                if (isNotBlank(theSubscription.getPayloadSearchCriteria())) {
067                        IBaseResource payloadResource = createDeliveryBundleForPayloadSearchCriteria(
068                                        theSubscription, theWrappedMessageToSend.getPayload().getPayload(myFhirContext));
069                        ResourceModifiedJsonMessage newWrappedMessageToSend =
070                                        convertDeliveryMessageToResourceModifiedJsonMessage(theSourceMessage, payloadResource);
071                        theWrappedMessageToSend.setPayload(newWrappedMessageToSend.getPayload());
072                        payloadId =
073                                        payloadResource.getIdElement().toUnqualifiedVersionless().getValue();
074                }
075                theChannelProducer.send(theWrappedMessageToSend);
076                ourLog.debug(
077                                "Delivering {} message payload {} for {}",
078                                theSourceMessage.getOperationType(),
079                                payloadId,
080                                theSubscription
081                                                .getIdElement(myFhirContext)
082                                                .toUnqualifiedVersionless()
083                                                .getValue());
084        }
085
086        private ResourceModifiedJsonMessage convertDeliveryMessageToResourceModifiedJsonMessage(
087                        ResourceDeliveryMessage theMsg, IBaseResource thePayloadResource) {
088                ResourceModifiedMessage payload =
089                                new ResourceModifiedMessage(myFhirContext, thePayloadResource, theMsg.getOperationType());
090                payload.setMessageKey(theMsg.getMessageKeyOrDefault());
091                payload.setTransactionId(theMsg.getTransactionId());
092                payload.setPartitionId(theMsg.getRequestPartitionId());
093                return new ResourceModifiedJsonMessage(payload);
094        }
095
096        @Override
097        public void handleMessage(ResourceDeliveryMessage theMessage) throws MessagingException, URISyntaxException {
098                CanonicalSubscription subscription = theMessage.getSubscription();
099                IBaseResource payloadResource = theMessage.getPayload(myFhirContext);
100                if (payloadResource == null) {
101                        Optional<ResourceModifiedMessage> inflatedMsg =
102                                        inflateResourceModifiedMessageFromDeliveryMessage(theMessage);
103                        if (inflatedMsg.isEmpty()) {
104                                return;
105                        }
106                        payloadResource = inflatedMsg.get().getPayload(myFhirContext);
107                }
108
109                ResourceModifiedJsonMessage messageWrapperToSend =
110                                convertDeliveryMessageToResourceModifiedJsonMessage(theMessage, payloadResource);
111
112                // Interceptor call: SUBSCRIPTION_BEFORE_MESSAGE_DELIVERY
113                HookParams params = new HookParams()
114                                .add(CanonicalSubscription.class, subscription)
115                                .add(ResourceDeliveryMessage.class, theMessage)
116                                .add(ResourceModifiedJsonMessage.class, messageWrapperToSend);
117                if (!getInterceptorBroadcaster().callHooks(Pointcut.SUBSCRIPTION_BEFORE_MESSAGE_DELIVERY, params)) {
118                        return;
119                }
120                // Grab the endpoint from the subscription
121                String endpointUrl = subscription.getEndpointUrl();
122
123                String queueName = extractQueueNameFromEndpoint(endpointUrl);
124
125                ChannelProducerSettings channelSettings = new ChannelProducerSettings();
126                channelSettings.setQualifyChannelName(false);
127
128                IChannelProducer channelProducer =
129                                myChannelFactory.getOrCreateProducer(queueName, ResourceModifiedJsonMessage.class, channelSettings);
130
131                // Grab the payload type (encoding mimetype) from the subscription
132                String payloadString = subscription.getPayloadString();
133                EncodingEnum payloadType = null;
134                if (payloadString != null) {
135                        payloadType = EncodingEnum.forContentType(payloadString);
136                }
137
138                if (payloadType != EncodingEnum.JSON) {
139                        throw new UnsupportedOperationException(
140                                        Msg.code(4) + "Only JSON payload type is currently supported for Message Subscriptions");
141                }
142
143                doDelivery(theMessage, subscription, channelProducer, messageWrapperToSend);
144
145                // Interceptor call: SUBSCRIPTION_AFTER_MESSAGE_DELIVERY
146                params = new HookParams()
147                                .add(CanonicalSubscription.class, subscription)
148                                .add(ResourceDeliveryMessage.class, theMessage);
149                if (!getInterceptorBroadcaster().callHooks(Pointcut.SUBSCRIPTION_AFTER_MESSAGE_DELIVERY, params)) {
150                        //noinspection UnnecessaryReturnStatement
151                        return;
152                }
153        }
154
155        private String extractQueueNameFromEndpoint(String theEndpointUrl) throws URISyntaxException {
156                URI uri = new URI(theEndpointUrl);
157                return uri.getSchemeSpecificPart();
158        }
159}