001package ca.uhn.fhir.jpa.subscription.match.deliver.message;
002
003/*-
004 * #%L
005 * HAPI FHIR Subscription Server
006 * %%
007 * Copyright (C) 2014 - 2021 Smile CDR, Inc.
008 * %%
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *      http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 * #L%
021 */
022
023import ca.uhn.fhir.interceptor.api.HookParams;
024import ca.uhn.fhir.interceptor.api.Pointcut;
025import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings;
026import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
027import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
028import ca.uhn.fhir.jpa.subscription.match.deliver.BaseSubscriptionDeliverySubscriber;
029import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
030import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
031import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
032import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
033import ca.uhn.fhir.rest.api.EncodingEnum;
034import org.hl7.fhir.instance.model.api.IBaseResource;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037import org.springframework.beans.factory.annotation.Autowired;
038import org.springframework.context.annotation.Scope;
039import org.springframework.messaging.MessagingException;
040
041import java.net.URI;
042import java.net.URISyntaxException;
043
044@Scope("prototype")
045public class SubscriptionDeliveringMessageSubscriber extends BaseSubscriptionDeliverySubscriber {
046        private static Logger ourLog = LoggerFactory.getLogger(SubscriptionDeliveringMessageSubscriber.class);
047
048        @Autowired
049        private IChannelFactory myChannelFactory;
050
051        /**
052         * Constructor
053         */
054        public SubscriptionDeliveringMessageSubscriber() {
055                super();
056        }
057
058        protected void deliverPayload(ResourceDeliveryMessage theMsg, CanonicalSubscription theSubscription, IChannelProducer theChannelProducer) {
059                IBaseResource payloadResource = theMsg.getPayload(myFhirContext);
060
061                // Regardless of whether we have a payload, the message should be sent.
062                doDelivery(theMsg, theSubscription, theChannelProducer, payloadResource);
063        }
064
065        protected void doDelivery(ResourceDeliveryMessage theMsg, CanonicalSubscription theSubscription, IChannelProducer theChannelProducer, IBaseResource thePayloadResource) {
066                ResourceModifiedMessage payload = new ResourceModifiedMessage(myFhirContext, thePayloadResource, theMsg.getOperationType());
067                payload.setTransactionId(theMsg.getTransactionId());
068                ResourceModifiedJsonMessage message = new ResourceModifiedJsonMessage(payload);
069                theChannelProducer.send(message);
070                ourLog.debug("Delivering {} message payload {} for {}", theMsg.getOperationType(), theMsg.getPayloadId(), theSubscription.getIdElement(myFhirContext).toUnqualifiedVersionless().getValue());
071        }
072
073        @Override
074        public void handleMessage(ResourceDeliveryMessage theMessage) throws MessagingException, URISyntaxException {
075                CanonicalSubscription subscription = theMessage.getSubscription();
076
077                // Interceptor call: SUBSCRIPTION_BEFORE_MESSAGE_DELIVERY
078                HookParams params = new HookParams()
079                        .add(CanonicalSubscription.class, subscription)
080                        .add(ResourceDeliveryMessage.class, theMessage);
081                if (!getInterceptorBroadcaster().callHooks(Pointcut.SUBSCRIPTION_BEFORE_MESSAGE_DELIVERY, params)) {
082                        return;
083                }
084                // Grab the endpoint from the subscription
085                String endpointUrl = subscription.getEndpointUrl();
086
087                String queueName = extractQueueNameFromEndpoint(endpointUrl);
088
089                ChannelProducerSettings channelSettings = new ChannelProducerSettings();
090                channelSettings.setQualifyChannelName(false);
091
092                IChannelProducer channelProducer = myChannelFactory.getOrCreateProducer(queueName, ResourceModifiedJsonMessage.class, channelSettings);
093
094                // Grab the payload type (encoding mimetype) from the subscription
095                String payloadString = subscription.getPayloadString();
096                EncodingEnum payloadType = null;
097                if (payloadString != null) {
098                        payloadType = EncodingEnum.forContentType(payloadString);
099                }
100
101                if (payloadType != EncodingEnum.JSON) {
102                        throw new UnsupportedOperationException("Only JSON payload type is currently supported for Message Subscriptions");
103                }
104
105                deliverPayload(theMessage, subscription, channelProducer);
106
107                // Interceptor call: SUBSCRIPTION_AFTER_MESSAGE_DELIVERY
108                params = new HookParams()
109                        .add(CanonicalSubscription.class, subscription)
110                        .add(ResourceDeliveryMessage.class, theMessage);
111                if (!getInterceptorBroadcaster().callHooks(Pointcut.SUBSCRIPTION_AFTER_MESSAGE_DELIVERY, params)) {
112                        //noinspection UnnecessaryReturnStatement
113                        return;
114                }
115        }
116
117        private String extractQueueNameFromEndpoint(String theEndpointUrl) throws URISyntaxException {
118                URI uri = new URI(theEndpointUrl);
119                return uri.getSchemeSpecificPart();
120        }
121}