001/*-
002 * #%L
003 * HAPI FHIR Subscription Server
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.subscription.channel.subscription;
021
022import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
023import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings;
024import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
025import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
026import ca.uhn.fhir.jpa.subscription.channel.models.ProducingChannelParameters;
027import ca.uhn.fhir.jpa.subscription.channel.models.ReceivingChannelParameters;
028import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
029import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
030import ca.uhn.fhir.jpa.subscription.model.ChannelRetryConfiguration;
031import com.google.common.annotations.VisibleForTesting;
032import com.google.common.collect.Multimap;
033import com.google.common.collect.MultimapBuilder;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036import org.springframework.beans.factory.annotation.Autowired;
037import org.springframework.messaging.MessageChannel;
038import org.springframework.messaging.MessageHandler;
039
040import java.util.Map;
041import java.util.Optional;
042import java.util.concurrent.ConcurrentHashMap;
043
044public class SubscriptionChannelRegistry {
045        private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionRegistry.class);
046
047        private final SubscriptionChannelCache myDeliveryReceiverChannels = new SubscriptionChannelCache();
048        // This map is a reference count so we know to destroy the channel when there are no more active subscriptions using
049        // it
050        // Key Channel Name, Value Subscription Id
051        private final Multimap<String, String> myActiveSubscriptionByChannelName =
052                        MultimapBuilder.hashKeys().arrayListValues().build();
053        private final Map<String, IChannelProducer> myChannelNameToSender = new ConcurrentHashMap<>();
054
055        @Autowired
056        private SubscriptionDeliveryHandlerFactory mySubscriptionDeliveryHandlerFactory;
057
058        @Autowired
059        private SubscriptionChannelFactory mySubscriptionDeliveryChannelFactory;
060
061        public synchronized void add(ActiveSubscription theActiveSubscription) {
062                String channelName = theActiveSubscription.getChannelName();
063                ourLog.info("Adding subscription {} to channel {}", theActiveSubscription.getId(), channelName);
064                myActiveSubscriptionByChannelName.put(channelName, theActiveSubscription.getId());
065
066                if (myDeliveryReceiverChannels.containsKey(channelName)) {
067                        ourLog.info("Channel {} already exists.  Not creating.", channelName);
068                        return;
069                }
070
071                // we get the retry configurations from the cannonicalized subscriber
072                // these will be provided to both the producer and receiver channel
073                ChannelRetryConfiguration retryConfigParameters = theActiveSubscription.getRetryConfigurationParameters();
074
075                /*
076                 * When we create a subscription, we create both
077                 * a producing/sending channel and
078                 * a receiving channel.
079                 *
080                 * Matched subscriptions are sent to the Sending channel
081                 * and the sending channel sends to subscription matching service.
082                 *
083                 * Receiving channel will send it out to
084                 * the subscriber hook (REST, email, etc).
085                 */
086
087                // the receiving channel
088                // this sends to the hook (resthook/message/email/whatever)
089                ReceivingChannelParameters receivingParameters = new ReceivingChannelParameters(channelName);
090                receivingParameters.setRetryConfiguration(retryConfigParameters);
091
092                IChannelReceiver channelReceiver = newReceivingChannel(receivingParameters);
093                Optional<MessageHandler> deliveryHandler =
094                                mySubscriptionDeliveryHandlerFactory.createDeliveryHandler(theActiveSubscription.getChannelType());
095
096                SubscriptionChannelWithHandlers subscriptionChannelWithHandlers =
097                                new SubscriptionChannelWithHandlers(channelName, channelReceiver);
098                deliveryHandler.ifPresent(subscriptionChannelWithHandlers::addHandler);
099                myDeliveryReceiverChannels.put(channelName, subscriptionChannelWithHandlers);
100
101                // create the producing channel.
102                // channel used for sending to subscription matcher
103                ProducingChannelParameters producingChannelParameters = new ProducingChannelParameters(channelName);
104                producingChannelParameters.setRetryConfiguration(retryConfigParameters);
105
106                IChannelProducer sendingChannel = newSendingChannel(producingChannelParameters);
107                myChannelNameToSender.put(channelName, sendingChannel);
108        }
109
110        protected IChannelReceiver newReceivingChannel(ReceivingChannelParameters theParameters) {
111                ChannelConsumerSettings settings = new ChannelConsumerSettings();
112                settings.setRetryConfiguration(theParameters.getRetryConfiguration());
113                return mySubscriptionDeliveryChannelFactory.newDeliveryReceivingChannel(
114                                theParameters.getChannelName(), settings);
115        }
116
117        protected IChannelProducer newSendingChannel(ProducingChannelParameters theParameters) {
118                ChannelProducerSettings settings = new ChannelProducerSettings();
119                settings.setRetryConfiguration(theParameters.getRetryConfiguration());
120                return mySubscriptionDeliveryChannelFactory.newDeliverySendingChannel(theParameters.getChannelName(), settings);
121        }
122
123        public synchronized void remove(ActiveSubscription theActiveSubscription) {
124                String channelName = theActiveSubscription.getChannelName();
125                ourLog.info("Removing subscription {} from channel {}", theActiveSubscription.getId(), channelName);
126                boolean removed = myActiveSubscriptionByChannelName.remove(channelName, theActiveSubscription.getId());
127
128                if (!removed) {
129                        ourLog.warn("Failed to remove subscription {} from channel {}", theActiveSubscription.getId(), channelName);
130                }
131
132                // This was the last one.  Close and remove the channel
133                if (!myActiveSubscriptionByChannelName.containsKey(channelName)) {
134                        SubscriptionChannelWithHandlers channel = myDeliveryReceiverChannels.get(channelName);
135                        if (channel != null) {
136                                channel.close();
137                        }
138                        myDeliveryReceiverChannels.closeAndRemove(channelName);
139                        myChannelNameToSender.remove(channelName);
140                }
141        }
142
143        public synchronized SubscriptionChannelWithHandlers getDeliveryReceiverChannel(String theChannelName) {
144                return myDeliveryReceiverChannels.get(theChannelName);
145        }
146
147        public synchronized MessageChannel getDeliverySenderChannel(String theChannelName) {
148                return myChannelNameToSender.get(theChannelName);
149        }
150
151        public synchronized int size() {
152                return myDeliveryReceiverChannels.size();
153        }
154
155        @VisibleForTesting
156        public void logForUnitTest() {
157                myDeliveryReceiverChannels.logForUnitTest();
158        }
159}