
001/*- 002 * #%L 003 * HAPI FHIR Subscription Server 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.subscription.channel.subscription; 021 022import ca.uhn.fhir.broker.api.ChannelConsumerSettings; 023import ca.uhn.fhir.broker.api.ChannelProducerSettings; 024import ca.uhn.fhir.broker.api.IChannelConsumer; 025import ca.uhn.fhir.broker.api.IChannelProducer; 026import ca.uhn.fhir.broker.api.IMessageListener; 027import ca.uhn.fhir.broker.impl.MultiplexingListener; 028import ca.uhn.fhir.jpa.subscription.api.ISubscriptionDeliveryValidator; 029import ca.uhn.fhir.jpa.subscription.channel.models.ProducingChannelParameters; 030import ca.uhn.fhir.jpa.subscription.channel.models.ReceivingChannelParameters; 031import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription; 032import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry; 033import ca.uhn.fhir.jpa.subscription.model.ChannelRetryConfiguration; 034import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage; 035import com.google.common.annotations.VisibleForTesting; 036import com.google.common.collect.Multimap; 037import com.google.common.collect.MultimapBuilder; 038import com.google.common.collect.Multimaps; 039import jakarta.annotation.Nonnull; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042import org.springframework.beans.factory.annotation.Autowired; 043 044import java.util.Optional; 045 046public class SubscriptionChannelRegistry { 047 private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionRegistry.class); 048 049 private final SubscriptionConsumerCache myDeliveryConsumerCache = new SubscriptionConsumerCache(); 050 private final SubscriptionProducerCache myDeliveryProducerCache = new SubscriptionProducerCache(); 051 // This map is a reference count so we know to destroy the channel when there are no more active subscriptions using 052 // it 053 // Key Channel Name, Value Subscription Id 054 private final Multimap<String, String> myActiveSubscriptionByChannelName = Multimaps.synchronizedMultimap( 055 MultimapBuilder.hashKeys().arrayListValues().build()); 056 057 @Autowired 058 private SubscriptionDeliveryListenerFactory mySubscriptionDeliveryListenerFactory; 059 060 @Autowired 061 private SubscriptionChannelFactory mySubscriptionDeliveryChannelFactory; 062 063 @Autowired 064 private ISubscriptionDeliveryValidator mySubscriptionDeliveryValidator; 065 066 public SubscriptionChannelRegistry() {} 067 068 public synchronized void add(ActiveSubscription theActiveSubscription) { 069 String channelName = theActiveSubscription.getChannelName(); 070 ourLog.info("Adding subscription {} to channel {}", theActiveSubscription.getId(), channelName); 071 myActiveSubscriptionByChannelName.put(channelName, theActiveSubscription.getId()); 072 073 if (myDeliveryConsumerCache.containsKey(channelName)) { 074 ourLog.info("Channel {} already exists. Not creating.", channelName); 075 return; 076 } 077 078 // we get the retry configurations from the cannonicalized subscriber 079 // these will be provided to both the producer and receiver channel 080 ChannelRetryConfiguration retryConfigParameters = theActiveSubscription.getRetryConfigurationParameters(); 081 082 /* 083 * When we create a subscription, we create both 084 * a producing/sending channel and 085 * a receiving channel. 086 * 087 * Matched subscriptions are sent to the Sending channel 088 * and the sending channel sends to subscription matching service. 089 * 090 * Receiving channel will send it out to 091 * the subscriber hook (REST, email, etc). 092 */ 093 094 // the receiving channel 095 // this sends to the hook (resthook/message/email/whatever) 096 ReceivingChannelParameters receivingParameters = new ReceivingChannelParameters(channelName); 097 receivingParameters.setRetryConfiguration(retryConfigParameters); 098 099 SubscriptionResourceDeliveryMessageConsumer subscriptionResourceDeliveryMessageConsumer = 100 buildSubscriptionResourceDeliveryMessageConsumer(theActiveSubscription, receivingParameters); 101 myDeliveryConsumerCache.put(channelName, subscriptionResourceDeliveryMessageConsumer); 102 103 // create the producing channel. 104 // channel used for sending to subscription matcher 105 ProducingChannelParameters producingChannelParameters = new ProducingChannelParameters(channelName); 106 producingChannelParameters.setRetryConfiguration(retryConfigParameters); 107 108 IChannelProducer<ResourceDeliveryMessage> producer = newProducer(producingChannelParameters); 109 myDeliveryProducerCache.put(channelName, producer); 110 } 111 112 /** 113 * We package the consumer together with its listeners so it can be reused when possible without expensive teardown/rebuild. 114 * @param theActiveSubscription The active subscription being delivered to 115 * @param receivingParameters retry parameters 116 * @return a SubscriptionResourceDeliveryMessageConsumer that packages the consumer with listeners and an api to add/remove listeners 117 */ 118 @Nonnull 119 private SubscriptionResourceDeliveryMessageConsumer buildSubscriptionResourceDeliveryMessageConsumer( 120 ActiveSubscription theActiveSubscription, ReceivingChannelParameters receivingParameters) { 121 MultiplexingListener<ResourceDeliveryMessage> multiplexingListener = 122 new MultiplexingListener<>(ResourceDeliveryMessage.class); 123 IChannelConsumer<ResourceDeliveryMessage> deliveryConsumer = 124 newDeliveryConsumer(multiplexingListener, receivingParameters); 125 Optional<IMessageListener<ResourceDeliveryMessage>> oDeliveryListener = 126 mySubscriptionDeliveryListenerFactory.createDeliveryListener(theActiveSubscription.getChannelType()); 127 128 SubscriptionResourceDeliveryMessageConsumer subscriptionResourceDeliveryMessageConsumer = 129 new SubscriptionResourceDeliveryMessageConsumer(deliveryConsumer); 130 SubscriptionValidatingListener subscriptionValidatingListener = 131 new SubscriptionValidatingListener(mySubscriptionDeliveryValidator, theActiveSubscription.getIdDt()); 132 subscriptionResourceDeliveryMessageConsumer.addListener(subscriptionValidatingListener); 133 oDeliveryListener.ifPresent(subscriptionResourceDeliveryMessageConsumer::addListener); 134 return subscriptionResourceDeliveryMessageConsumer; 135 } 136 137 protected IChannelConsumer<ResourceDeliveryMessage> newDeliveryConsumer( 138 IMessageListener<ResourceDeliveryMessage> theListener, ReceivingChannelParameters theParameters) { 139 ChannelConsumerSettings settings = new ChannelConsumerSettings(); 140 settings.setRetryConfiguration(theParameters.getRetryConfiguration()); 141 return mySubscriptionDeliveryChannelFactory.newDeliveryConsumer( 142 theParameters.getChannelName(), theListener, settings); 143 } 144 145 protected IChannelProducer<ResourceDeliveryMessage> newProducer(ProducingChannelParameters theParameters) { 146 ChannelProducerSettings settings = new ChannelProducerSettings(); 147 settings.setRetryConfiguration(theParameters.getRetryConfiguration()); 148 return mySubscriptionDeliveryChannelFactory.newDeliveryProducer(theParameters.getChannelName(), settings); 149 } 150 151 public void remove(ActiveSubscription theActiveSubscription) { 152 String channelName = theActiveSubscription.getChannelName(); 153 ourLog.info("Removing subscription {} from channel {}", theActiveSubscription.getId(), channelName); 154 boolean removed = myActiveSubscriptionByChannelName.remove(channelName, theActiveSubscription.getId()); 155 156 if (!removed) { 157 ourLog.warn("Failed to remove subscription {} from channel {}", theActiveSubscription.getId(), channelName); 158 } 159 160 // This was the last one. Close and remove the channel 161 if (!myActiveSubscriptionByChannelName.containsKey(channelName)) { 162 myDeliveryConsumerCache.closeAndRemove(channelName); 163 myDeliveryProducerCache.closeAndRemove(channelName); 164 } 165 } 166 167 public synchronized SubscriptionResourceDeliveryMessageConsumer getDeliveryConsumerWithListeners( 168 String theChannelName) { 169 return myDeliveryConsumerCache.get(theChannelName); 170 } 171 172 public synchronized IChannelProducer<ResourceDeliveryMessage> getDeliveryChannelProducer(String theChannelName) { 173 return myDeliveryProducerCache.get(theChannelName); 174 } 175 176 public synchronized int size() { 177 return myDeliveryConsumerCache.size(); 178 } 179 180 @VisibleForTesting 181 public void logForUnitTest() { 182 myDeliveryConsumerCache.logForUnitTest(); 183 } 184}