
001/*- 002 * #%L 003 * HAPI FHIR Subscription Server 004 * %% 005 * Copyright (C) 2014 - 2023 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.subscription.match.deliver; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.context.RuntimeResourceDefinition; 024import ca.uhn.fhir.i18n.Msg; 025import ca.uhn.fhir.interceptor.api.HookParams; 026import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; 027import ca.uhn.fhir.interceptor.api.Pointcut; 028import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 029import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 030import ca.uhn.fhir.jpa.searchparam.MatchUrlService; 031import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 032import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription; 033import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry; 034import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; 035import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage; 036import ca.uhn.fhir.rest.api.server.IBundleProvider; 037import ca.uhn.fhir.util.BundleBuilder; 038import com.google.common.annotations.VisibleForTesting; 039import org.apache.commons.text.StringSubstitutor; 040import org.hl7.fhir.instance.model.api.IBaseBundle; 041import org.hl7.fhir.instance.model.api.IBaseResource; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044import org.springframework.beans.factory.annotation.Autowired; 045import org.springframework.messaging.Message; 046import org.springframework.messaging.MessageHandler; 047import org.springframework.messaging.MessagingException; 048 049import java.util.HashMap; 050import java.util.Map; 051 052import static ca.uhn.fhir.jpa.subscription.util.SubscriptionUtil.createRequestDetailForPartitionedRequest; 053 054public abstract class BaseSubscriptionDeliverySubscriber implements MessageHandler { 055 private static final Logger ourLog = LoggerFactory.getLogger(BaseSubscriptionDeliverySubscriber.class); 056 057 @Autowired 058 protected FhirContext myFhirContext; 059 060 @Autowired 061 protected SubscriptionRegistry mySubscriptionRegistry; 062 063 @Autowired 064 private IInterceptorBroadcaster myInterceptorBroadcaster; 065 066 @Autowired 067 private DaoRegistry myDaoRegistry; 068 069 @Autowired 070 private MatchUrlService myMatchUrlService; 071 072 @Override 073 public void handleMessage(Message theMessage) throws MessagingException { 074 if (!(theMessage.getPayload() instanceof ResourceDeliveryMessage)) { 075 ourLog.warn("Unexpected payload type: {}", theMessage.getPayload()); 076 return; 077 } 078 079 ResourceDeliveryMessage msg = (ResourceDeliveryMessage) theMessage.getPayload(); 080 String subscriptionId = msg.getSubscriptionId(myFhirContext); 081 if (subscriptionId == null) { 082 ourLog.warn("Subscription has no ID, ignoring"); 083 return; 084 } 085 086 ActiveSubscription updatedSubscription = mySubscriptionRegistry.get( 087 msg.getSubscription().getIdElement(myFhirContext).getIdPart()); 088 if (updatedSubscription != null) { 089 msg.setSubscription(updatedSubscription.getSubscription()); 090 } 091 092 try { 093 094 // Interceptor call: SUBSCRIPTION_BEFORE_DELIVERY 095 HookParams params = new HookParams() 096 .add(ResourceDeliveryMessage.class, msg) 097 .add(CanonicalSubscription.class, msg.getSubscription()); 098 if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_BEFORE_DELIVERY, params)) { 099 return; 100 } 101 102 handleMessage(msg); 103 104 // Interceptor call: SUBSCRIPTION_AFTER_DELIVERY 105 myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_DELIVERY, params); 106 107 } catch (Exception e) { 108 109 String errorMsg = "Failure handling subscription payload for subscription: " + subscriptionId; 110 ourLog.error(errorMsg, e); 111 112 // Interceptor call: SUBSCRIPTION_AFTER_DELIVERY 113 HookParams hookParams = 114 new HookParams().add(ResourceDeliveryMessage.class, msg).add(Exception.class, e); 115 if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_DELIVERY_FAILED, hookParams)) { 116 return; 117 } 118 119 throw new MessagingException(Msg.code(2) + errorMsg, e); 120 } 121 } 122 123 public abstract void handleMessage(ResourceDeliveryMessage theMessage) throws Exception; 124 125 protected IBaseBundle createDeliveryBundleForPayloadSearchCriteria( 126 CanonicalSubscription theSubscription, IBaseResource thePayloadResource) { 127 String resType = theSubscription 128 .getPayloadSearchCriteria() 129 .substring(0, theSubscription.getPayloadSearchCriteria().indexOf('?')); 130 IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(resType); 131 RuntimeResourceDefinition resourceDefinition = myFhirContext.getResourceDefinition(resType); 132 133 String payloadUrl = theSubscription.getPayloadSearchCriteria(); 134 Map<String, String> valueMap = new HashMap<>(1); 135 valueMap.put( 136 "matched_resource_id", 137 thePayloadResource.getIdElement().toUnqualifiedVersionless().getValue()); 138 payloadUrl = new StringSubstitutor(valueMap).replace(payloadUrl); 139 SearchParameterMap payloadSearchMap = 140 myMatchUrlService.translateMatchUrl(payloadUrl, resourceDefinition, MatchUrlService.processIncludes()); 141 payloadSearchMap.setLoadSynchronous(true); 142 143 IBundleProvider searchResults = 144 dao.search(payloadSearchMap, createRequestDetailForPartitionedRequest(theSubscription)); 145 BundleBuilder builder = new BundleBuilder(myFhirContext); 146 for (IBaseResource next : searchResults.getAllResources()) { 147 builder.addTransactionUpdateEntry(next); 148 } 149 return builder.getBundle(); 150 } 151 152 @VisibleForTesting 153 public void setFhirContextForUnitTest(FhirContext theCtx) { 154 myFhirContext = theCtx; 155 } 156 157 @VisibleForTesting 158 public void setInterceptorBroadcasterForUnitTest(IInterceptorBroadcaster theInterceptorBroadcaster) { 159 myInterceptorBroadcaster = theInterceptorBroadcaster; 160 } 161 162 @VisibleForTesting 163 public void setSubscriptionRegistryForUnitTest(SubscriptionRegistry theSubscriptionRegistry) { 164 mySubscriptionRegistry = theSubscriptionRegistry; 165 } 166 167 @VisibleForTesting 168 public void setDaoRegistryForUnitTest(DaoRegistry theDaoRegistry) { 169 myDaoRegistry = theDaoRegistry; 170 } 171 172 @VisibleForTesting 173 public void setMatchUrlServiceForUnitTest(MatchUrlService theMatchUrlService) { 174 myMatchUrlService = theMatchUrlService; 175 } 176 177 public IInterceptorBroadcaster getInterceptorBroadcaster() { 178 return myInterceptorBroadcaster; 179 } 180}