001package ca.uhn.fhir.jpa.subscription.channel.subscription;
002
003/*-
004 * #%L
005 * HAPI FHIR Subscription Server
006 * %%
007 * Copyright (C) 2014 - 2021 Smile CDR, Inc.
008 * %%
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *      http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 * #L%
021 */
022
023import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
024import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
025import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
026import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
027import com.google.common.collect.Multimap;
028import com.google.common.collect.MultimapBuilder;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031import org.springframework.beans.factory.annotation.Autowired;
032import org.springframework.messaging.MessageChannel;
033import org.springframework.messaging.MessageHandler;
034
035import java.util.Map;
036import java.util.Optional;
037import java.util.concurrent.ConcurrentHashMap;
038
039public class SubscriptionChannelRegistry {
040        private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionRegistry.class);
041
042        private final SubscriptionChannelCache myDeliveryReceiverChannels = new SubscriptionChannelCache();
043        // This map is a reference count so we know to destroy the channel when there are no more active subscriptions using it
044        // Key Channel Name, Value Subscription Id
045        private final Multimap<String, String> myActiveSubscriptionByChannelName = MultimapBuilder.hashKeys().arrayListValues().build();
046        private final Map<String, IChannelProducer> myChannelNameToSender = new ConcurrentHashMap<>();
047
048        @Autowired
049        private SubscriptionDeliveryHandlerFactory mySubscriptionDeliveryHandlerFactory;
050        @Autowired
051        private SubscriptionChannelFactory mySubscriptionDeliveryChannelFactory;
052
053        public synchronized void add(ActiveSubscription theActiveSubscription) {
054                String channelName = theActiveSubscription.getChannelName();
055                ourLog.info("Adding subscription {} to channel {}", theActiveSubscription.getId(), channelName);
056                myActiveSubscriptionByChannelName.put(channelName, theActiveSubscription.getId());
057
058                if (myDeliveryReceiverChannels.containsKey(channelName)) {
059                        ourLog.info("Channel {} already exists.  Not creating.", channelName);
060                        return;
061                }
062
063                IChannelReceiver channelReceiver = newReceivingChannel(channelName);
064                Optional<MessageHandler> deliveryHandler = mySubscriptionDeliveryHandlerFactory.createDeliveryHandler(theActiveSubscription.getChannelType());
065
066                SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = new SubscriptionChannelWithHandlers(channelName, channelReceiver);
067                deliveryHandler.ifPresent(subscriptionChannelWithHandlers::addHandler);
068                myDeliveryReceiverChannels.put(channelName, subscriptionChannelWithHandlers);
069
070                IChannelProducer sendingChannel = newSendingChannel(channelName);
071                myChannelNameToSender.put(channelName, sendingChannel);
072        }
073
074        protected IChannelReceiver newReceivingChannel(String theChannelName) {
075                return mySubscriptionDeliveryChannelFactory.newDeliveryReceivingChannel(theChannelName, null);
076        }
077
078        protected IChannelProducer newSendingChannel(String theChannelName) {
079                return mySubscriptionDeliveryChannelFactory.newDeliverySendingChannel(theChannelName, null);
080        }
081
082        public synchronized void remove(ActiveSubscription theActiveSubscription) {
083                String channelName = theActiveSubscription.getChannelName();
084                ourLog.info("Removing subscription {} from channel {}", theActiveSubscription.getId(), channelName);
085                boolean removed = myActiveSubscriptionByChannelName.remove(channelName, theActiveSubscription.getId());
086                if (!removed) {
087                        ourLog.warn("Failed to remove subscription {} from channel {}", theActiveSubscription.getId(), channelName);
088                }
089
090                // This was the last one.  Close and remove the channel
091                if (!myActiveSubscriptionByChannelName.containsKey(channelName)) {
092                        SubscriptionChannelWithHandlers channel = myDeliveryReceiverChannels.get(channelName);
093                        if (channel != null) {
094                                channel.close();
095                        }
096                        myDeliveryReceiverChannels.closeAndRemove(channelName);
097                        myChannelNameToSender.remove(channelName);
098                }
099
100        }
101
102        public synchronized SubscriptionChannelWithHandlers getDeliveryReceiverChannel(String theChannelName) {
103                return myDeliveryReceiverChannels.get(theChannelName);
104        }
105
106        public synchronized MessageChannel getDeliverySenderChannel(String theChannelName) {
107                return myChannelNameToSender.get(theChannelName);
108        }
109
110        public synchronized int size() {
111                return myDeliveryReceiverChannels.size();
112        }
113}