001/*- 002 * #%L 003 * HAPI FHIR Subscription Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.subscription.channel.subscription; 021 022import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings; 023import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings; 024import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer; 025import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver; 026import ca.uhn.fhir.jpa.subscription.channel.models.ProducingChannelParameters; 027import ca.uhn.fhir.jpa.subscription.channel.models.ReceivingChannelParameters; 028import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription; 029import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry; 030import ca.uhn.fhir.jpa.subscription.model.ChannelRetryConfiguration; 031import com.google.common.annotations.VisibleForTesting; 032import com.google.common.collect.Multimap; 033import com.google.common.collect.MultimapBuilder; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036import org.springframework.beans.factory.annotation.Autowired; 037import org.springframework.messaging.MessageChannel; 038import org.springframework.messaging.MessageHandler; 039 040import java.util.Map; 041import java.util.Optional; 042import java.util.concurrent.ConcurrentHashMap; 043 044public class SubscriptionChannelRegistry { 045 private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionRegistry.class); 046 047 private final SubscriptionChannelCache myDeliveryReceiverChannels = new SubscriptionChannelCache(); 048 // This map is a reference count so we know to destroy the channel when there are no more active subscriptions using 049 // it 050 // Key Channel Name, Value Subscription Id 051 private final Multimap<String, String> myActiveSubscriptionByChannelName = 052 MultimapBuilder.hashKeys().arrayListValues().build(); 053 private final Map<String, IChannelProducer> myChannelNameToSender = new ConcurrentHashMap<>(); 054 055 @Autowired 056 private SubscriptionDeliveryHandlerFactory mySubscriptionDeliveryHandlerFactory; 057 058 @Autowired 059 private SubscriptionChannelFactory mySubscriptionDeliveryChannelFactory; 060 061 public synchronized void add(ActiveSubscription theActiveSubscription) { 062 String channelName = theActiveSubscription.getChannelName(); 063 ourLog.info("Adding subscription {} to channel {}", theActiveSubscription.getId(), channelName); 064 myActiveSubscriptionByChannelName.put(channelName, theActiveSubscription.getId()); 065 066 if (myDeliveryReceiverChannels.containsKey(channelName)) { 067 ourLog.info("Channel {} already exists. Not creating.", channelName); 068 return; 069 } 070 071 // we get the retry configurations from the cannonicalized subscriber 072 // these will be provided to both the producer and receiver channel 073 ChannelRetryConfiguration retryConfigParameters = theActiveSubscription.getRetryConfigurationParameters(); 074 075 /* 076 * When we create a subscription, we create both 077 * a producing/sending channel and 078 * a receiving channel. 079 * 080 * Matched subscriptions are sent to the Sending channel 081 * and the sending channel sends to subscription matching service. 082 * 083 * Receiving channel will send it out to 084 * the subscriber hook (REST, email, etc). 085 */ 086 087 // the receiving channel 088 // this sends to the hook (resthook/message/email/whatever) 089 ReceivingChannelParameters receivingParameters = new ReceivingChannelParameters(channelName); 090 receivingParameters.setRetryConfiguration(retryConfigParameters); 091 092 IChannelReceiver channelReceiver = newReceivingChannel(receivingParameters); 093 Optional<MessageHandler> deliveryHandler = 094 mySubscriptionDeliveryHandlerFactory.createDeliveryHandler(theActiveSubscription.getChannelType()); 095 096 SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = 097 new SubscriptionChannelWithHandlers(channelName, channelReceiver); 098 deliveryHandler.ifPresent(subscriptionChannelWithHandlers::addHandler); 099 myDeliveryReceiverChannels.put(channelName, subscriptionChannelWithHandlers); 100 101 // create the producing channel. 102 // channel used for sending to subscription matcher 103 ProducingChannelParameters producingChannelParameters = new ProducingChannelParameters(channelName); 104 producingChannelParameters.setRetryConfiguration(retryConfigParameters); 105 106 IChannelProducer sendingChannel = newSendingChannel(producingChannelParameters); 107 myChannelNameToSender.put(channelName, sendingChannel); 108 } 109 110 protected IChannelReceiver newReceivingChannel(ReceivingChannelParameters theParameters) { 111 ChannelConsumerSettings settings = new ChannelConsumerSettings(); 112 settings.setRetryConfiguration(theParameters.getRetryConfiguration()); 113 return mySubscriptionDeliveryChannelFactory.newDeliveryReceivingChannel( 114 theParameters.getChannelName(), settings); 115 } 116 117 protected IChannelProducer newSendingChannel(ProducingChannelParameters theParameters) { 118 ChannelProducerSettings settings = new ChannelProducerSettings(); 119 settings.setRetryConfiguration(theParameters.getRetryConfiguration()); 120 return mySubscriptionDeliveryChannelFactory.newDeliverySendingChannel(theParameters.getChannelName(), settings); 121 } 122 123 public synchronized void remove(ActiveSubscription theActiveSubscription) { 124 String channelName = theActiveSubscription.getChannelName(); 125 ourLog.info("Removing subscription {} from channel {}", theActiveSubscription.getId(), channelName); 126 boolean removed = myActiveSubscriptionByChannelName.remove(channelName, theActiveSubscription.getId()); 127 128 if (!removed) { 129 ourLog.warn("Failed to remove subscription {} from channel {}", theActiveSubscription.getId(), channelName); 130 } 131 132 // This was the last one. Close and remove the channel 133 if (!myActiveSubscriptionByChannelName.containsKey(channelName)) { 134 SubscriptionChannelWithHandlers channel = myDeliveryReceiverChannels.get(channelName); 135 if (channel != null) { 136 channel.close(); 137 } 138 myDeliveryReceiverChannels.closeAndRemove(channelName); 139 myChannelNameToSender.remove(channelName); 140 } 141 } 142 143 public synchronized SubscriptionChannelWithHandlers getDeliveryReceiverChannel(String theChannelName) { 144 return myDeliveryReceiverChannels.get(theChannelName); 145 } 146 147 public synchronized MessageChannel getDeliverySenderChannel(String theChannelName) { 148 return myChannelNameToSender.get(theChannelName); 149 } 150 151 public synchronized int size() { 152 return myDeliveryReceiverChannels.size(); 153 } 154 155 @VisibleForTesting 156 public void logForUnitTest() { 157 myDeliveryReceiverChannels.logForUnitTest(); 158 } 159}