001/*- 002 * #%L 003 * HAPI FHIR Subscription Server 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.subscription.match.deliver.message; 021 022import ca.uhn.fhir.i18n.Msg; 023import ca.uhn.fhir.interceptor.api.HookParams; 024import ca.uhn.fhir.interceptor.api.Pointcut; 025import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings; 026import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory; 027import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer; 028import ca.uhn.fhir.jpa.subscription.match.deliver.BaseSubscriptionDeliverySubscriber; 029import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; 030import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage; 031import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; 032import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; 033import ca.uhn.fhir.rest.api.EncodingEnum; 034import org.hl7.fhir.instance.model.api.IBaseResource; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037import org.springframework.context.annotation.Scope; 038import org.springframework.messaging.MessagingException; 039 040import java.net.URI; 041import java.net.URISyntaxException; 042import java.util.Optional; 043 044import static org.apache.commons.lang3.StringUtils.isNotBlank; 045 046@Scope("prototype") 047public class SubscriptionDeliveringMessageSubscriber extends BaseSubscriptionDeliverySubscriber { 048 private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionDeliveringMessageSubscriber.class); 049 050 private final IChannelFactory myChannelFactory; 051 052 /** 053 * Constructor 054 */ 055 public SubscriptionDeliveringMessageSubscriber(IChannelFactory theChannelFactory) { 056 super(); 057 myChannelFactory = theChannelFactory; 058 } 059 060 protected void doDelivery( 061 ResourceDeliveryMessage theSourceMessage, 062 CanonicalSubscription theSubscription, 063 IChannelProducer theChannelProducer, 064 ResourceModifiedJsonMessage theWrappedMessageToSend) { 065 String payloadId = theSourceMessage.getPayloadId(); 066 if (isNotBlank(theSubscription.getPayloadSearchCriteria())) { 067 IBaseResource payloadResource = createDeliveryBundleForPayloadSearchCriteria( 068 theSubscription, theWrappedMessageToSend.getPayload().getPayload(myFhirContext)); 069 ResourceModifiedJsonMessage newWrappedMessageToSend = 070 convertDeliveryMessageToResourceModifiedJsonMessage(theSourceMessage, payloadResource); 071 theWrappedMessageToSend.setPayload(newWrappedMessageToSend.getPayload()); 072 payloadId = 073 payloadResource.getIdElement().toUnqualifiedVersionless().getValue(); 074 } 075 theChannelProducer.send(theWrappedMessageToSend); 076 ourLog.debug( 077 "Delivering {} message payload {} for {}", 078 theSourceMessage.getOperationType(), 079 payloadId, 080 theSubscription 081 .getIdElement(myFhirContext) 082 .toUnqualifiedVersionless() 083 .getValue()); 084 } 085 086 private ResourceModifiedJsonMessage convertDeliveryMessageToResourceModifiedJsonMessage( 087 ResourceDeliveryMessage theMsg, IBaseResource thePayloadResource) { 088 ResourceModifiedMessage payload = 089 new ResourceModifiedMessage(myFhirContext, thePayloadResource, theMsg.getOperationType()); 090 payload.setMessageKey(theMsg.getMessageKeyOrDefault()); 091 payload.setTransactionId(theMsg.getTransactionId()); 092 payload.setPartitionId(theMsg.getRequestPartitionId()); 093 return new ResourceModifiedJsonMessage(payload); 094 } 095 096 @Override 097 public void handleMessage(ResourceDeliveryMessage theMessage) throws MessagingException, URISyntaxException { 098 CanonicalSubscription subscription = theMessage.getSubscription(); 099 IBaseResource payloadResource = theMessage.getPayload(myFhirContext); 100 if (payloadResource == null) { 101 Optional<ResourceModifiedMessage> inflatedMsg = 102 inflateResourceModifiedMessageFromDeliveryMessage(theMessage); 103 if (inflatedMsg.isEmpty()) { 104 return; 105 } 106 payloadResource = inflatedMsg.get().getPayload(myFhirContext); 107 } 108 109 ResourceModifiedJsonMessage messageWrapperToSend = 110 convertDeliveryMessageToResourceModifiedJsonMessage(theMessage, payloadResource); 111 112 // Interceptor call: SUBSCRIPTION_BEFORE_MESSAGE_DELIVERY 113 HookParams params = new HookParams() 114 .add(CanonicalSubscription.class, subscription) 115 .add(ResourceDeliveryMessage.class, theMessage) 116 .add(ResourceModifiedJsonMessage.class, messageWrapperToSend); 117 if (!getInterceptorBroadcaster().callHooks(Pointcut.SUBSCRIPTION_BEFORE_MESSAGE_DELIVERY, params)) { 118 return; 119 } 120 // Grab the endpoint from the subscription 121 String endpointUrl = subscription.getEndpointUrl(); 122 123 String queueName = extractQueueNameFromEndpoint(endpointUrl); 124 125 ChannelProducerSettings channelSettings = new ChannelProducerSettings(); 126 channelSettings.setQualifyChannelName(false); 127 128 IChannelProducer channelProducer = 129 myChannelFactory.getOrCreateProducer(queueName, ResourceModifiedJsonMessage.class, channelSettings); 130 131 // Grab the payload type (encoding mimetype) from the subscription 132 String payloadString = subscription.getPayloadString(); 133 EncodingEnum payloadType = null; 134 if (payloadString != null) { 135 payloadType = EncodingEnum.forContentType(payloadString); 136 } 137 138 if (payloadType != EncodingEnum.JSON) { 139 throw new UnsupportedOperationException( 140 Msg.code(4) + "Only JSON payload type is currently supported for Message Subscriptions"); 141 } 142 143 doDelivery(theMessage, subscription, channelProducer, messageWrapperToSend); 144 145 // Interceptor call: SUBSCRIPTION_AFTER_MESSAGE_DELIVERY 146 params = new HookParams() 147 .add(CanonicalSubscription.class, subscription) 148 .add(ResourceDeliveryMessage.class, theMessage); 149 if (!getInterceptorBroadcaster().callHooks(Pointcut.SUBSCRIPTION_AFTER_MESSAGE_DELIVERY, params)) { 150 //noinspection UnnecessaryReturnStatement 151 return; 152 } 153 } 154 155 private String extractQueueNameFromEndpoint(String theEndpointUrl) throws URISyntaxException { 156 URI uri = new URI(theEndpointUrl); 157 return uri.getSchemeSpecificPart(); 158 } 159}