001/*-
002 * #%L
003 * HAPI FHIR Storage api
004 * %%
005 * Copyright (C) 2014 - 2025 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.subscription.channel.subscription;
021
022import ca.uhn.fhir.broker.api.ChannelConsumerSettings;
023import ca.uhn.fhir.broker.api.ChannelProducerSettings;
024import ca.uhn.fhir.broker.api.IBrokerClient;
025import ca.uhn.fhir.broker.api.IChannelConsumer;
026import ca.uhn.fhir.broker.api.IChannelProducer;
027import ca.uhn.fhir.broker.api.IMessageListener;
028import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage;
029import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
030import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
031import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
032import ca.uhn.fhir.subscription.SubscriptionConstants;
033import org.apache.commons.lang3.Validate;
034
035public class SubscriptionChannelFactory {
036        private final IBrokerClient myBrokerClient;
037
038        /**
039         * Constructor
040         */
041        public SubscriptionChannelFactory(IBrokerClient theBrokerClient) {
042                Validate.notNull(theBrokerClient);
043                myBrokerClient = theBrokerClient;
044        }
045
046        public IChannelProducer<ResourceDeliveryMessage> newDeliveryProducer(
047                        String theChannelName, ChannelProducerSettings theChannelSettings) {
048                ChannelProducerSettings config = newProducerConfigForDeliveryChannel(theChannelSettings);
049                config.setRetryConfiguration(theChannelSettings.getRetryConfigurationParameters());
050                return myBrokerClient.getOrCreateProducer(theChannelName, ResourceDeliveryJsonMessage.class, config);
051        }
052
053        public IChannelConsumer<ResourceDeliveryMessage> newDeliveryConsumer(
054                        String theChannelName,
055                        IMessageListener<ResourceDeliveryMessage> theListener,
056                        ChannelConsumerSettings theChannelSettings) {
057                ChannelConsumerSettings config = newConsumerConfigForDeliveryChannel(theChannelSettings);
058                return myBrokerClient.getOrCreateConsumer(
059                                theChannelName, ResourceDeliveryJsonMessage.class, theListener, config);
060        }
061
062        public IChannelProducer<ResourceModifiedMessage> newMatchingProducer(
063                        String theChannelName, ChannelProducerSettings theChannelSettings) {
064                ChannelProducerSettings config = newProducerConfigForMatchingChannel(theChannelSettings);
065                return myBrokerClient.getOrCreateProducer(theChannelName, ResourceModifiedJsonMessage.class, config);
066        }
067
068        public IChannelConsumer<ResourceModifiedMessage> newMatchingConsumer(
069                        String theChannelName,
070                        IMessageListener<ResourceModifiedMessage> theListener,
071                        ChannelConsumerSettings theChannelSettings) {
072                ChannelConsumerSettings config = newConsumerConfigForMatchingChannel(theChannelSettings);
073                return myBrokerClient.getOrCreateConsumer(
074                                theChannelName, ResourceModifiedJsonMessage.class, theListener, config);
075        }
076
077        protected ChannelProducerSettings newProducerConfigForDeliveryChannel(ChannelProducerSettings theOptions) {
078                ChannelProducerSettings config = new ChannelProducerSettings();
079                config.setConcurrentConsumers(getDeliveryChannelConcurrentConsumers());
080                config.setRetryConfiguration(theOptions.getRetryConfigurationParameters());
081                return config;
082        }
083
084        protected ChannelConsumerSettings newConsumerConfigForDeliveryChannel(ChannelConsumerSettings theOptions) {
085                ChannelConsumerSettings config = new ChannelConsumerSettings();
086                config.setConcurrentConsumers(getDeliveryChannelConcurrentConsumers());
087                if (theOptions != null) {
088                        config.setRetryConfiguration(theOptions.getRetryConfigurationParameters());
089                }
090                return config;
091        }
092
093        protected ChannelProducerSettings newProducerConfigForMatchingChannel(ChannelProducerSettings theOptions) {
094                ChannelProducerSettings config = new ChannelProducerSettings();
095                if (theOptions != null) {
096                        config.setRetryConfiguration(theOptions.getRetryConfigurationParameters());
097                        config.setQualifyChannelName(theOptions.isQualifyChannelName());
098                }
099                config.setConcurrentConsumers(getMatchingChannelConcurrentConsumers());
100                return config;
101        }
102
103        protected ChannelConsumerSettings newConsumerConfigForMatchingChannel(ChannelConsumerSettings theOptions) {
104                ChannelConsumerSettings config = new ChannelConsumerSettings();
105                config.setConcurrentConsumers(getMatchingChannelConcurrentConsumers());
106                if (theOptions != null) {
107                        config.setQualifyChannelName(theOptions.isQualifyChannelName());
108                        config.setRetryConfiguration(theOptions.getRetryConfigurationParameters());
109                }
110                return config;
111        }
112
113        public int getDeliveryChannelConcurrentConsumers() {
114                return SubscriptionConstants.DELIVERY_CHANNEL_CONCURRENT_CONSUMERS;
115        }
116
117        public int getMatchingChannelConcurrentConsumers() {
118                return SubscriptionConstants.MATCHING_CHANNEL_CONCURRENT_CONSUMERS;
119        }
120
121        public IBrokerClient getBrokerClient() {
122                return myBrokerClient;
123        }
124}