
001/*- 002 * #%L 003 * HAPI FHIR Storage api 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.subscription.channel.subscription; 021 022import ca.uhn.fhir.broker.api.ChannelConsumerSettings; 023import ca.uhn.fhir.broker.api.ChannelProducerSettings; 024import ca.uhn.fhir.broker.api.IBrokerClient; 025import ca.uhn.fhir.broker.api.IChannelConsumer; 026import ca.uhn.fhir.broker.api.IChannelProducer; 027import ca.uhn.fhir.broker.api.IMessageListener; 028import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryJsonMessage; 029import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage; 030import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; 031import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; 032import ca.uhn.fhir.subscription.SubscriptionConstants; 033import org.apache.commons.lang3.Validate; 034 035public class SubscriptionChannelFactory { 036 private final IBrokerClient myBrokerClient; 037 038 /** 039 * Constructor 040 */ 041 public SubscriptionChannelFactory(IBrokerClient theBrokerClient) { 042 Validate.notNull(theBrokerClient); 043 myBrokerClient = theBrokerClient; 044 } 045 046 public IChannelProducer<ResourceDeliveryMessage> newDeliveryProducer( 047 String theChannelName, ChannelProducerSettings theChannelSettings) { 048 ChannelProducerSettings config = newProducerConfigForDeliveryChannel(theChannelSettings); 049 config.setRetryConfiguration(theChannelSettings.getRetryConfigurationParameters()); 050 return myBrokerClient.getOrCreateProducer(theChannelName, ResourceDeliveryJsonMessage.class, config); 051 } 052 053 public IChannelConsumer<ResourceDeliveryMessage> newDeliveryConsumer( 054 String theChannelName, 055 IMessageListener<ResourceDeliveryMessage> theListener, 056 ChannelConsumerSettings theChannelSettings) { 057 ChannelConsumerSettings config = newConsumerConfigForDeliveryChannel(theChannelSettings); 058 return myBrokerClient.getOrCreateConsumer( 059 theChannelName, ResourceDeliveryJsonMessage.class, theListener, config); 060 } 061 062 public IChannelProducer<ResourceModifiedMessage> newMatchingProducer( 063 String theChannelName, ChannelProducerSettings theChannelSettings) { 064 ChannelProducerSettings config = newProducerConfigForMatchingChannel(theChannelSettings); 065 return myBrokerClient.getOrCreateProducer(theChannelName, ResourceModifiedJsonMessage.class, config); 066 } 067 068 public IChannelConsumer<ResourceModifiedMessage> newMatchingConsumer( 069 String theChannelName, 070 IMessageListener<ResourceModifiedMessage> theListener, 071 ChannelConsumerSettings theChannelSettings) { 072 ChannelConsumerSettings config = newConsumerConfigForMatchingChannel(theChannelSettings); 073 return myBrokerClient.getOrCreateConsumer( 074 theChannelName, ResourceModifiedJsonMessage.class, theListener, config); 075 } 076 077 protected ChannelProducerSettings newProducerConfigForDeliveryChannel(ChannelProducerSettings theOptions) { 078 ChannelProducerSettings config = new ChannelProducerSettings(); 079 config.setConcurrentConsumers(getDeliveryChannelConcurrentConsumers()); 080 config.setRetryConfiguration(theOptions.getRetryConfigurationParameters()); 081 return config; 082 } 083 084 protected ChannelConsumerSettings newConsumerConfigForDeliveryChannel(ChannelConsumerSettings theOptions) { 085 ChannelConsumerSettings config = new ChannelConsumerSettings(); 086 config.setConcurrentConsumers(getDeliveryChannelConcurrentConsumers()); 087 if (theOptions != null) { 088 config.setRetryConfiguration(theOptions.getRetryConfigurationParameters()); 089 } 090 return config; 091 } 092 093 protected ChannelProducerSettings newProducerConfigForMatchingChannel(ChannelProducerSettings theOptions) { 094 ChannelProducerSettings config = new ChannelProducerSettings(); 095 if (theOptions != null) { 096 config.setRetryConfiguration(theOptions.getRetryConfigurationParameters()); 097 config.setQualifyChannelName(theOptions.isQualifyChannelName()); 098 } 099 config.setConcurrentConsumers(getMatchingChannelConcurrentConsumers()); 100 return config; 101 } 102 103 protected ChannelConsumerSettings newConsumerConfigForMatchingChannel(ChannelConsumerSettings theOptions) { 104 ChannelConsumerSettings config = new ChannelConsumerSettings(); 105 config.setConcurrentConsumers(getMatchingChannelConcurrentConsumers()); 106 if (theOptions != null) { 107 config.setQualifyChannelName(theOptions.isQualifyChannelName()); 108 config.setRetryConfiguration(theOptions.getRetryConfigurationParameters()); 109 } 110 return config; 111 } 112 113 public int getDeliveryChannelConcurrentConsumers() { 114 return SubscriptionConstants.DELIVERY_CHANNEL_CONCURRENT_CONSUMERS; 115 } 116 117 public int getMatchingChannelConcurrentConsumers() { 118 return SubscriptionConstants.MATCHING_CHANNEL_CONCURRENT_CONSUMERS; 119 } 120 121 public IBrokerClient getBrokerClient() { 122 return myBrokerClient; 123 } 124}