001/*-
002 * #%L
003 * HAPI FHIR Subscription Server
004 * %%
005 * Copyright (C) 2014 - 2025 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.subscription.channel.subscription;
021
022import ca.uhn.fhir.broker.api.ChannelConsumerSettings;
023import ca.uhn.fhir.broker.api.ChannelProducerSettings;
024import ca.uhn.fhir.broker.api.IChannelConsumer;
025import ca.uhn.fhir.broker.api.IChannelProducer;
026import ca.uhn.fhir.broker.api.IMessageListener;
027import ca.uhn.fhir.broker.impl.MultiplexingListener;
028import ca.uhn.fhir.jpa.subscription.api.ISubscriptionDeliveryValidator;
029import ca.uhn.fhir.jpa.subscription.channel.models.ProducingChannelParameters;
030import ca.uhn.fhir.jpa.subscription.channel.models.ReceivingChannelParameters;
031import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
032import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
033import ca.uhn.fhir.jpa.subscription.model.ChannelRetryConfiguration;
034import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
035import com.google.common.annotations.VisibleForTesting;
036import com.google.common.collect.Multimap;
037import com.google.common.collect.MultimapBuilder;
038import com.google.common.collect.Multimaps;
039import jakarta.annotation.Nonnull;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042import org.springframework.beans.factory.annotation.Autowired;
043
044import java.util.Optional;
045
046public class SubscriptionChannelRegistry {
047        private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionRegistry.class);
048
049        private final SubscriptionConsumerCache myDeliveryConsumerCache = new SubscriptionConsumerCache();
050        private final SubscriptionProducerCache myDeliveryProducerCache = new SubscriptionProducerCache();
051        // This map is a reference count so we know to destroy the channel when there are no more active subscriptions using
052        // it
053        // Key Channel Name, Value Subscription Id
054        private final Multimap<String, String> myActiveSubscriptionByChannelName = Multimaps.synchronizedMultimap(
055                        MultimapBuilder.hashKeys().arrayListValues().build());
056
057        @Autowired
058        private SubscriptionDeliveryListenerFactory mySubscriptionDeliveryListenerFactory;
059
060        @Autowired
061        private SubscriptionChannelFactory mySubscriptionDeliveryChannelFactory;
062
063        @Autowired
064        private ISubscriptionDeliveryValidator mySubscriptionDeliveryValidator;
065
066        public SubscriptionChannelRegistry() {}
067
068        public synchronized void add(ActiveSubscription theActiveSubscription) {
069                String channelName = theActiveSubscription.getChannelName();
070                ourLog.info("Adding subscription {} to channel {}", theActiveSubscription.getId(), channelName);
071                myActiveSubscriptionByChannelName.put(channelName, theActiveSubscription.getId());
072
073                if (myDeliveryConsumerCache.containsKey(channelName)) {
074                        ourLog.info("Channel {} already exists.  Not creating.", channelName);
075                        return;
076                }
077
078                // we get the retry configurations from the cannonicalized subscriber
079                // these will be provided to both the producer and receiver channel
080                ChannelRetryConfiguration retryConfigParameters = theActiveSubscription.getRetryConfigurationParameters();
081
082                /*
083                 * When we create a subscription, we create both
084                 * a producing/sending channel and
085                 * a receiving channel.
086                 *
087                 * Matched subscriptions are sent to the Sending channel
088                 * and the sending channel sends to subscription matching service.
089                 *
090                 * Receiving channel will send it out to
091                 * the subscriber hook (REST, email, etc).
092                 */
093
094                // the receiving channel
095                // this sends to the hook (resthook/message/email/whatever)
096                ReceivingChannelParameters receivingParameters = new ReceivingChannelParameters(channelName);
097                receivingParameters.setRetryConfiguration(retryConfigParameters);
098
099                SubscriptionResourceDeliveryMessageConsumer subscriptionResourceDeliveryMessageConsumer =
100                                buildSubscriptionResourceDeliveryMessageConsumer(theActiveSubscription, receivingParameters);
101                myDeliveryConsumerCache.put(channelName, subscriptionResourceDeliveryMessageConsumer);
102
103                // create the producing channel.
104                // channel used for sending to subscription matcher
105                ProducingChannelParameters producingChannelParameters = new ProducingChannelParameters(channelName);
106                producingChannelParameters.setRetryConfiguration(retryConfigParameters);
107
108                IChannelProducer<ResourceDeliveryMessage> producer = newProducer(producingChannelParameters);
109                myDeliveryProducerCache.put(channelName, producer);
110        }
111
112        /**
113         * We package the consumer together with its listeners so it can be reused when possible without expensive teardown/rebuild.
114         * @param theActiveSubscription The active subscription being delivered to
115         * @param receivingParameters retry parameters
116         * @return a SubscriptionResourceDeliveryMessageConsumer that packages the consumer with listeners and an api to add/remove listeners
117         */
118        @Nonnull
119        private SubscriptionResourceDeliveryMessageConsumer buildSubscriptionResourceDeliveryMessageConsumer(
120                        ActiveSubscription theActiveSubscription, ReceivingChannelParameters receivingParameters) {
121                MultiplexingListener<ResourceDeliveryMessage> multiplexingListener =
122                                new MultiplexingListener<>(ResourceDeliveryMessage.class);
123                IChannelConsumer<ResourceDeliveryMessage> deliveryConsumer =
124                                newDeliveryConsumer(multiplexingListener, receivingParameters);
125                Optional<IMessageListener<ResourceDeliveryMessage>> oDeliveryListener =
126                                mySubscriptionDeliveryListenerFactory.createDeliveryListener(theActiveSubscription.getChannelType());
127
128                SubscriptionResourceDeliveryMessageConsumer subscriptionResourceDeliveryMessageConsumer =
129                                new SubscriptionResourceDeliveryMessageConsumer(deliveryConsumer);
130                SubscriptionValidatingListener subscriptionValidatingListener =
131                                new SubscriptionValidatingListener(mySubscriptionDeliveryValidator, theActiveSubscription.getIdDt());
132                subscriptionResourceDeliveryMessageConsumer.addListener(subscriptionValidatingListener);
133                oDeliveryListener.ifPresent(subscriptionResourceDeliveryMessageConsumer::addListener);
134                return subscriptionResourceDeliveryMessageConsumer;
135        }
136
137        protected IChannelConsumer<ResourceDeliveryMessage> newDeliveryConsumer(
138                        IMessageListener<ResourceDeliveryMessage> theListener, ReceivingChannelParameters theParameters) {
139                ChannelConsumerSettings settings = new ChannelConsumerSettings();
140                settings.setRetryConfiguration(theParameters.getRetryConfiguration());
141                return mySubscriptionDeliveryChannelFactory.newDeliveryConsumer(
142                                theParameters.getChannelName(), theListener, settings);
143        }
144
145        protected IChannelProducer<ResourceDeliveryMessage> newProducer(ProducingChannelParameters theParameters) {
146                ChannelProducerSettings settings = new ChannelProducerSettings();
147                settings.setRetryConfiguration(theParameters.getRetryConfiguration());
148                return mySubscriptionDeliveryChannelFactory.newDeliveryProducer(theParameters.getChannelName(), settings);
149        }
150
151        public void remove(ActiveSubscription theActiveSubscription) {
152                String channelName = theActiveSubscription.getChannelName();
153                ourLog.info("Removing subscription {} from channel {}", theActiveSubscription.getId(), channelName);
154                boolean removed = myActiveSubscriptionByChannelName.remove(channelName, theActiveSubscription.getId());
155
156                if (!removed) {
157                        ourLog.warn("Failed to remove subscription {} from channel {}", theActiveSubscription.getId(), channelName);
158                }
159
160                // This was the last one.  Close and remove the channel
161                if (!myActiveSubscriptionByChannelName.containsKey(channelName)) {
162                        myDeliveryConsumerCache.closeAndRemove(channelName);
163                        myDeliveryProducerCache.closeAndRemove(channelName);
164                }
165        }
166
167        public synchronized SubscriptionResourceDeliveryMessageConsumer getDeliveryConsumerWithListeners(
168                        String theChannelName) {
169                return myDeliveryConsumerCache.get(theChannelName);
170        }
171
172        public synchronized IChannelProducer<ResourceDeliveryMessage> getDeliveryChannelProducer(String theChannelName) {
173                return myDeliveryProducerCache.get(theChannelName);
174        }
175
176        public synchronized int size() {
177                return myDeliveryConsumerCache.size();
178        }
179
180        @VisibleForTesting
181        public void logForUnitTest() {
182                myDeliveryConsumerCache.logForUnitTest();
183        }
184}