
001package ca.uhn.fhir.jpa.subscription.match.deliver; 002 003/*- 004 * #%L 005 * HAPI FHIR Subscription Server 006 * %% 007 * Copyright (C) 2014 - 2021 Smile CDR, Inc. 008 * %% 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 * #L% 021 */ 022 023import ca.uhn.fhir.context.FhirContext; 024import ca.uhn.fhir.interceptor.api.HookParams; 025import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; 026import ca.uhn.fhir.interceptor.api.Pointcut; 027import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; 028import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage; 029import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription; 030import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry; 031import com.google.common.annotations.VisibleForTesting; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034import org.springframework.beans.factory.annotation.Autowired; 035import org.springframework.messaging.Message; 036import org.springframework.messaging.MessageHandler; 037import org.springframework.messaging.MessagingException; 038 039public abstract class BaseSubscriptionDeliverySubscriber implements MessageHandler { 040 private static final Logger ourLog = LoggerFactory.getLogger(BaseSubscriptionDeliverySubscriber.class); 041 042 @Autowired 043 protected FhirContext myFhirContext; 044 @Autowired 045 protected SubscriptionRegistry mySubscriptionRegistry; 046 @Autowired 047 private IInterceptorBroadcaster myInterceptorBroadcaster; 048 049 @Override 050 public void handleMessage(Message theMessage) throws MessagingException { 051 if (!(theMessage.getPayload() instanceof ResourceDeliveryMessage)) { 052 ourLog.warn("Unexpected payload type: {}", theMessage.getPayload()); 053 return; 054 } 055 056 ResourceDeliveryMessage msg = (ResourceDeliveryMessage) theMessage.getPayload(); 057 String subscriptionId = msg.getSubscriptionId(myFhirContext); 058 if (subscriptionId == null) { 059 ourLog.warn("Subscription has no ID, ignoring"); 060 return; 061 } 062 063 ActiveSubscription updatedSubscription = mySubscriptionRegistry.get(msg.getSubscription().getIdElement(myFhirContext).getIdPart()); 064 if (updatedSubscription != null) { 065 msg.setSubscription(updatedSubscription.getSubscription()); 066 } 067 068 try { 069 070 // Interceptor call: SUBSCRIPTION_BEFORE_DELIVERY 071 HookParams params = new HookParams() 072 .add(ResourceDeliveryMessage.class, msg) 073 .add(CanonicalSubscription.class, msg.getSubscription()); 074 if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_BEFORE_DELIVERY, params)) { 075 return; 076 } 077 078 handleMessage(msg); 079 080 // Interceptor call: SUBSCRIPTION_AFTER_DELIVERY 081 myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_DELIVERY, params); 082 083 } catch (Exception e) { 084 085 String errorMsg = "Failure handling subscription payload for subscription: " + subscriptionId; 086 ourLog.error(errorMsg, e); 087 088 // Interceptor call: SUBSCRIPTION_AFTER_DELIVERY 089 HookParams hookParams = new HookParams() 090 .add(ResourceDeliveryMessage.class, msg) 091 .add(Exception.class, e); 092 if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_DELIVERY_FAILED, hookParams)) { 093 return; 094 } 095 096 throw new MessagingException(theMessage, errorMsg, e); 097 } 098 } 099 100 public abstract void handleMessage(ResourceDeliveryMessage theMessage) throws Exception; 101 102 @VisibleForTesting 103 public void setFhirContextForUnitTest(FhirContext theCtx) { 104 myFhirContext = theCtx; 105 } 106 107 @VisibleForTesting 108 public void setInterceptorBroadcasterForUnitTest(IInterceptorBroadcaster theInterceptorBroadcaster) { 109 myInterceptorBroadcaster = theInterceptorBroadcaster; 110 } 111 112 @VisibleForTesting 113 public void setSubscriptionRegistryForUnitTest(SubscriptionRegistry theSubscriptionRegistry) { 114 mySubscriptionRegistry = theSubscriptionRegistry; 115 } 116 117 public IInterceptorBroadcaster getInterceptorBroadcaster() { 118 return myInterceptorBroadcaster; 119 } 120}