001package ca.uhn.fhir.jpa.subscription.match.deliver;
002
003/*-
004 * #%L
005 * HAPI FHIR Subscription Server
006 * %%
007 * Copyright (C) 2014 - 2021 Smile CDR, Inc.
008 * %%
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *      http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 * #L%
021 */
022
023import ca.uhn.fhir.context.FhirContext;
024import ca.uhn.fhir.interceptor.api.HookParams;
025import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
026import ca.uhn.fhir.interceptor.api.Pointcut;
027import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
028import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
029import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
030import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
031import com.google.common.annotations.VisibleForTesting;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034import org.springframework.beans.factory.annotation.Autowired;
035import org.springframework.messaging.Message;
036import org.springframework.messaging.MessageHandler;
037import org.springframework.messaging.MessagingException;
038
039public abstract class BaseSubscriptionDeliverySubscriber implements MessageHandler {
040        private static final Logger ourLog = LoggerFactory.getLogger(BaseSubscriptionDeliverySubscriber.class);
041
042        @Autowired
043        protected FhirContext myFhirContext;
044        @Autowired
045        protected SubscriptionRegistry mySubscriptionRegistry;
046        @Autowired
047        private IInterceptorBroadcaster myInterceptorBroadcaster;
048
049        @Override
050        public void handleMessage(Message theMessage) throws MessagingException {
051                if (!(theMessage.getPayload() instanceof ResourceDeliveryMessage)) {
052                        ourLog.warn("Unexpected payload type: {}", theMessage.getPayload());
053                        return;
054                }
055
056                ResourceDeliveryMessage msg = (ResourceDeliveryMessage) theMessage.getPayload();
057                String subscriptionId = msg.getSubscriptionId(myFhirContext);
058                if (subscriptionId == null) {
059                        ourLog.warn("Subscription has no ID, ignoring");
060                        return;
061                }
062
063                ActiveSubscription updatedSubscription = mySubscriptionRegistry.get(msg.getSubscription().getIdElement(myFhirContext).getIdPart());
064                if (updatedSubscription != null) {
065                        msg.setSubscription(updatedSubscription.getSubscription());
066                }
067
068                try {
069
070                        // Interceptor call: SUBSCRIPTION_BEFORE_DELIVERY
071                        HookParams params = new HookParams()
072                                .add(ResourceDeliveryMessage.class, msg)
073                                .add(CanonicalSubscription.class, msg.getSubscription());
074                        if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_BEFORE_DELIVERY, params)) {
075                                return;
076                        }
077
078                        handleMessage(msg);
079
080                        // Interceptor call: SUBSCRIPTION_AFTER_DELIVERY
081                        myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_DELIVERY, params);
082
083                } catch (Exception e) {
084
085                        String errorMsg = "Failure handling subscription payload for subscription: " + subscriptionId;
086                        ourLog.error(errorMsg, e);
087
088                        // Interceptor call: SUBSCRIPTION_AFTER_DELIVERY
089                        HookParams hookParams = new HookParams()
090                                .add(ResourceDeliveryMessage.class, msg)
091                                .add(Exception.class, e);
092                        if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_DELIVERY_FAILED, hookParams)) {
093                                return;
094                        }
095
096                        throw new MessagingException(theMessage, errorMsg, e);
097                }
098        }
099
100        public abstract void handleMessage(ResourceDeliveryMessage theMessage) throws Exception;
101
102        @VisibleForTesting
103        public void setFhirContextForUnitTest(FhirContext theCtx) {
104                myFhirContext = theCtx;
105        }
106
107        @VisibleForTesting
108        public void setInterceptorBroadcasterForUnitTest(IInterceptorBroadcaster theInterceptorBroadcaster) {
109                myInterceptorBroadcaster = theInterceptorBroadcaster;
110        }
111
112        @VisibleForTesting
113        public void setSubscriptionRegistryForUnitTest(SubscriptionRegistry theSubscriptionRegistry) {
114                mySubscriptionRegistry = theSubscriptionRegistry;
115        }
116
117        public IInterceptorBroadcaster getInterceptorBroadcaster() {
118                return myInterceptorBroadcaster;
119        }
120}