001/*- 002 * #%L 003 * HAPI FHIR Subscription Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.subscription.match.deliver; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.context.RuntimeResourceDefinition; 024import ca.uhn.fhir.i18n.Msg; 025import ca.uhn.fhir.interceptor.api.HookParams; 026import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; 027import ca.uhn.fhir.interceptor.api.Pointcut; 028import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 029import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 030import ca.uhn.fhir.jpa.searchparam.MatchUrlService; 031import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 032import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription; 033import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry; 034import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; 035import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage; 036import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; 037import ca.uhn.fhir.rest.api.server.IBundleProvider; 038import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc; 039import ca.uhn.fhir.util.BundleBuilder; 040import com.google.common.annotations.VisibleForTesting; 041import org.apache.commons.text.StringSubstitutor; 042import org.hl7.fhir.instance.model.api.IBaseBundle; 043import org.hl7.fhir.instance.model.api.IBaseResource; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046import org.springframework.beans.factory.annotation.Autowired; 047import org.springframework.messaging.Message; 048import org.springframework.messaging.MessageHandler; 049import org.springframework.messaging.MessagingException; 050 051import java.util.HashMap; 052import java.util.Map; 053import java.util.Optional; 054 055import static ca.uhn.fhir.jpa.subscription.util.SubscriptionUtil.createRequestDetailForPartitionedRequest; 056 057public abstract class BaseSubscriptionDeliverySubscriber implements MessageHandler { 058 private static final Logger ourLog = LoggerFactory.getLogger(BaseSubscriptionDeliverySubscriber.class); 059 060 @Autowired 061 protected FhirContext myFhirContext; 062 063 @Autowired 064 protected SubscriptionRegistry mySubscriptionRegistry; 065 066 @Autowired 067 protected IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc; 068 069 @Autowired 070 private IInterceptorBroadcaster myInterceptorBroadcaster; 071 072 @Autowired 073 private DaoRegistry myDaoRegistry; 074 075 @Autowired 076 private MatchUrlService myMatchUrlService; 077 078 @Override 079 public void handleMessage(Message theMessage) throws MessagingException { 080 if (!(theMessage.getPayload() instanceof ResourceDeliveryMessage)) { 081 ourLog.warn("Unexpected payload type: {}", theMessage.getPayload()); 082 return; 083 } 084 085 ResourceDeliveryMessage msg = (ResourceDeliveryMessage) theMessage.getPayload(); 086 String subscriptionId = msg.getSubscriptionId(myFhirContext); 087 if (subscriptionId == null) { 088 ourLog.warn("Subscription has no ID, ignoring"); 089 return; 090 } 091 092 ActiveSubscription updatedSubscription = mySubscriptionRegistry.get( 093 msg.getSubscription().getIdElement(myFhirContext).getIdPart()); 094 if (updatedSubscription != null) { 095 msg.setSubscription(updatedSubscription.getSubscription()); 096 } 097 098 try { 099 100 // Interceptor call: SUBSCRIPTION_BEFORE_DELIVERY 101 HookParams params = new HookParams() 102 .add(ResourceDeliveryMessage.class, msg) 103 .add(CanonicalSubscription.class, msg.getSubscription()); 104 if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_BEFORE_DELIVERY, params)) { 105 return; 106 } 107 108 handleMessage(msg); 109 110 // Interceptor call: SUBSCRIPTION_AFTER_DELIVERY 111 myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_DELIVERY, params); 112 113 } catch (Exception e) { 114 115 String errorMsg = "Failure handling subscription payload for subscription: " + subscriptionId; 116 ourLog.error(errorMsg, e); 117 118 // Interceptor call: SUBSCRIPTION_AFTER_DELIVERY 119 HookParams hookParams = 120 new HookParams().add(ResourceDeliveryMessage.class, msg).add(Exception.class, e); 121 if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_DELIVERY_FAILED, hookParams)) { 122 return; 123 } 124 125 throw new MessagingException(Msg.code(2) + errorMsg, e); 126 } 127 } 128 129 public abstract void handleMessage(ResourceDeliveryMessage theMessage) throws Exception; 130 131 protected IBaseBundle createDeliveryBundleForPayloadSearchCriteria( 132 CanonicalSubscription theSubscription, IBaseResource thePayloadResource) { 133 String resType = theSubscription 134 .getPayloadSearchCriteria() 135 .substring(0, theSubscription.getPayloadSearchCriteria().indexOf('?')); 136 IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(resType); 137 RuntimeResourceDefinition resourceDefinition = myFhirContext.getResourceDefinition(resType); 138 139 String payloadUrl = theSubscription.getPayloadSearchCriteria(); 140 Map<String, String> valueMap = new HashMap<>(1); 141 valueMap.put( 142 "matched_resource_id", 143 thePayloadResource.getIdElement().toUnqualifiedVersionless().getValue()); 144 payloadUrl = new StringSubstitutor(valueMap).replace(payloadUrl); 145 SearchParameterMap payloadSearchMap = 146 myMatchUrlService.translateMatchUrl(payloadUrl, resourceDefinition, MatchUrlService.processIncludes()); 147 payloadSearchMap.setLoadSynchronous(true); 148 149 IBundleProvider searchResults = 150 dao.search(payloadSearchMap, createRequestDetailForPartitionedRequest(theSubscription)); 151 BundleBuilder builder = new BundleBuilder(myFhirContext); 152 for (IBaseResource next : searchResults.getAllResources()) { 153 builder.addTransactionUpdateEntry(next); 154 } 155 return builder.getBundle(); 156 } 157 158 protected Optional<ResourceModifiedMessage> inflateResourceModifiedMessageFromDeliveryMessage( 159 ResourceDeliveryMessage theMsg) { 160 ResourceModifiedMessage payloadLess = 161 new ResourceModifiedMessage(theMsg.getPayloadId(myFhirContext), theMsg.getOperationType()); 162 return myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessageOrNull(payloadLess); 163 } 164 165 @VisibleForTesting 166 public void setFhirContextForUnitTest(FhirContext theCtx) { 167 myFhirContext = theCtx; 168 } 169 170 @VisibleForTesting 171 public void setInterceptorBroadcasterForUnitTest(IInterceptorBroadcaster theInterceptorBroadcaster) { 172 myInterceptorBroadcaster = theInterceptorBroadcaster; 173 } 174 175 @VisibleForTesting 176 public void setSubscriptionRegistryForUnitTest(SubscriptionRegistry theSubscriptionRegistry) { 177 mySubscriptionRegistry = theSubscriptionRegistry; 178 } 179 180 @VisibleForTesting 181 public void setDaoRegistryForUnitTest(DaoRegistry theDaoRegistry) { 182 myDaoRegistry = theDaoRegistry; 183 } 184 185 @VisibleForTesting 186 public void setMatchUrlServiceForUnitTest(MatchUrlService theMatchUrlService) { 187 myMatchUrlService = theMatchUrlService; 188 } 189 190 @VisibleForTesting 191 public void setResourceModifiedMessagePersistenceSvcForUnitTest( 192 IResourceModifiedMessagePersistenceSvc theResourceModifiedMessagePersistenceSvc) { 193 myResourceModifiedMessagePersistenceSvc = theResourceModifiedMessagePersistenceSvc; 194 } 195 196 public IInterceptorBroadcaster getInterceptorBroadcaster() { 197 return myInterceptorBroadcaster; 198 } 199}