
001/*- 002 * #%L 003 * HAPI FHIR Subscription Server 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.subscription.match.deliver.message; 021 022import ca.uhn.fhir.broker.api.ChannelProducerSettings; 023import ca.uhn.fhir.broker.api.IBrokerClient; 024import ca.uhn.fhir.broker.api.IChannelProducer; 025import ca.uhn.fhir.i18n.Msg; 026import ca.uhn.fhir.interceptor.api.HookParams; 027import ca.uhn.fhir.interceptor.api.Pointcut; 028import ca.uhn.fhir.interceptor.model.IDefaultPartitionSettings; 029import ca.uhn.fhir.jpa.subscription.match.deliver.BaseSubscriptionDeliveryListener; 030import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; 031import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage; 032import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; 033import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; 034import ca.uhn.fhir.rest.api.EncodingEnum; 035import ca.uhn.fhir.util.IoUtils; 036import org.hl7.fhir.instance.model.api.IBaseResource; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039import org.springframework.context.annotation.Scope; 040import org.springframework.messaging.MessagingException; 041 042import java.net.URI; 043import java.net.URISyntaxException; 044import java.util.Optional; 045 046import static org.apache.commons.lang3.StringUtils.isNotBlank; 047 048@Scope("prototype") 049public class SubscriptionDeliveringMessageListener extends BaseSubscriptionDeliveryListener implements AutoCloseable { 050 private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionDeliveringMessageListener.class); 051 052 private final IBrokerClient myBrokerClient; 053 private IChannelProducer<ResourceModifiedMessage> myChannelProducer; 054 private final IDefaultPartitionSettings myDefaultPartitionSettings; 055 056 /** 057 * Constructor 058 */ 059 public SubscriptionDeliveringMessageListener( 060 IBrokerClient theBrokerClient, IDefaultPartitionSettings theDefaultPartitionSettings) { 061 super(); 062 myBrokerClient = theBrokerClient; 063 myDefaultPartitionSettings = theDefaultPartitionSettings; 064 } 065 066 public Class<ResourceDeliveryMessage> getPayloadType() { 067 return ResourceDeliveryMessage.class; 068 } 069 070 protected void doDelivery( 071 ResourceDeliveryMessage theSourceMessage, 072 CanonicalSubscription theSubscription, 073 IChannelProducer<ResourceModifiedMessage> theChannelProducer, 074 ResourceModifiedJsonMessage theWrappedMessageToSend) { 075 String payloadId = theSourceMessage.getPayloadId(); 076 if (isNotBlank(theSubscription.getPayloadSearchCriteria())) { 077 IBaseResource payloadResource = createDeliveryBundleForPayloadSearchCriteria( 078 theSubscription, theWrappedMessageToSend.getPayload().getResource(myFhirContext)); 079 ResourceModifiedJsonMessage newWrappedMessageToSend = 080 convertDeliveryMessageToResourceModifiedJsonMessage(theSourceMessage, payloadResource); 081 theWrappedMessageToSend.setPayload(newWrappedMessageToSend.getPayload()); 082 payloadId = 083 payloadResource.getIdElement().toUnqualifiedVersionless().getValue(); 084 } 085 theChannelProducer.send(theWrappedMessageToSend); 086 ourLog.debug( 087 "Delivering {} message payload {} for {}", 088 theSourceMessage.getOperationType(), 089 payloadId, 090 theSubscription 091 .getIdElement(myFhirContext) 092 .toUnqualifiedVersionless() 093 .getValue()); 094 } 095 096 private ResourceModifiedJsonMessage convertDeliveryMessageToResourceModifiedJsonMessage( 097 ResourceDeliveryMessage theMsg, IBaseResource thePayloadResource) { 098 ResourceModifiedMessage payload = new ResourceModifiedMessage( 099 myFhirContext, 100 thePayloadResource, 101 theMsg.getOperationType(), 102 myDefaultPartitionSettings.getDefaultRequestPartitionId()); 103 payload.setPayloadMessageKey(theMsg.getPayloadMessageKey()); 104 payload.setTransactionId(theMsg.getTransactionId()); 105 payload.setPartitionId(theMsg.getPartitionId()); 106 return new ResourceModifiedJsonMessage(payload); 107 } 108 109 @Override 110 public void handleMessage(ResourceDeliveryMessage theMessage) throws MessagingException, URISyntaxException { 111 CanonicalSubscription subscription = theMessage.getSubscription(); 112 IBaseResource payloadResource = theMessage.getPayload(myFhirContext); 113 if (payloadResource == null) { 114 Optional<ResourceModifiedMessage> inflatedMsg = 115 inflateResourceModifiedMessageFromDeliveryMessage(theMessage); 116 if (inflatedMsg.isEmpty()) { 117 return; 118 } 119 payloadResource = inflatedMsg.get().getResource(myFhirContext); 120 } 121 122 ResourceModifiedJsonMessage messageWrapperToSend = 123 convertDeliveryMessageToResourceModifiedJsonMessage(theMessage, payloadResource); 124 125 // Interceptor call: SUBSCRIPTION_BEFORE_MESSAGE_DELIVERY 126 HookParams params = new HookParams() 127 .add(CanonicalSubscription.class, subscription) 128 .add(ResourceDeliveryMessage.class, theMessage) 129 .add(ResourceModifiedJsonMessage.class, messageWrapperToSend); 130 if (!getInterceptorBroadcaster().callHooks(Pointcut.SUBSCRIPTION_BEFORE_MESSAGE_DELIVERY, params)) { 131 return; 132 } 133 // Grab the endpoint from the subscription 134 String endpointUrl = subscription.getEndpointUrl(); 135 136 String queueName = extractQueueNameFromEndpoint(endpointUrl); 137 138 ChannelProducerSettings channelSettings = new ChannelProducerSettings(); 139 channelSettings.setQualifyChannelName(false); 140 141 if (myChannelProducer == null) { 142 myChannelProducer = 143 myBrokerClient.getOrCreateProducer(queueName, ResourceModifiedJsonMessage.class, channelSettings); 144 } 145 146 // Grab the payload type (encoding mimetype) from the subscription 147 String payloadString = subscription.getPayloadString(); 148 EncodingEnum payloadType = null; 149 if (payloadString != null) { 150 payloadType = EncodingEnum.forContentType(payloadString); 151 } 152 153 if (payloadType != EncodingEnum.JSON) { 154 throw new UnsupportedOperationException( 155 Msg.code(4) + "Only JSON payload type is currently supported for Message Subscriptions"); 156 } 157 158 doDelivery(theMessage, subscription, myChannelProducer, messageWrapperToSend); 159 160 // Interceptor call: SUBSCRIPTION_AFTER_MESSAGE_DELIVERY 161 params = new HookParams() 162 .add(CanonicalSubscription.class, subscription) 163 .add(ResourceDeliveryMessage.class, theMessage); 164 if (!getInterceptorBroadcaster().callHooks(Pointcut.SUBSCRIPTION_AFTER_MESSAGE_DELIVERY, params)) { 165 //noinspection UnnecessaryReturnStatement 166 return; 167 } 168 } 169 170 private String extractQueueNameFromEndpoint(String theEndpointUrl) throws URISyntaxException { 171 URI uri = new URI(theEndpointUrl); 172 return uri.getSchemeSpecificPart(); 173 } 174 175 @Override 176 public void close() throws Exception { 177 if (myChannelProducer instanceof AutoCloseable) { 178 IoUtils.closeQuietly((AutoCloseable) myChannelProducer, ourLog); 179 } 180 } 181}