
001/*- 002 * #%L 003 * HAPI FHIR Storage api 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.subscription.channel.impl; 021 022import ca.uhn.fhir.broker.api.ChannelConsumerSettings; 023import ca.uhn.fhir.broker.api.ChannelProducerSettings; 024import ca.uhn.fhir.broker.api.IChannelNamer; 025import ca.uhn.fhir.broker.api.IChannelSettings; 026import ca.uhn.fhir.broker.jms.ISpringMessagingChannelProducer; 027import ca.uhn.fhir.subscription.SubscriptionConstants; 028import ca.uhn.fhir.util.ThreadPoolUtil; 029import jakarta.annotation.Nonnull; 030import jakarta.annotation.PreDestroy; 031import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; 032 033import java.util.Collections; 034import java.util.HashMap; 035import java.util.Map; 036 037public class LinkedBlockingChannelFactory { 038 039 private final IChannelNamer myChannelNamer; 040 private final Map<String, LinkedBlockingChannel> myChannels = Collections.synchronizedMap(new HashMap<>()); 041 042 protected RetryPolicyProvider myRetryPolicyProvider; 043 044 public LinkedBlockingChannelFactory(IChannelNamer theChannelNamer, RetryPolicyProvider theRetryPolicyProvider) { 045 myChannelNamer = theChannelNamer; 046 myRetryPolicyProvider = theRetryPolicyProvider; 047 } 048 049 public LinkedBlockingChannel getOrCreateReceiver( 050 String theChannelName, ChannelConsumerSettings theChannelSettings) { 051 return getOrCreateChannel(theChannelName, theChannelSettings.getConcurrentConsumers(), theChannelSettings); 052 } 053 054 public ISpringMessagingChannelProducer getOrCreateProducer( 055 String theChannelName, ChannelProducerSettings theChannelSettings) { 056 return getOrCreateChannel(theChannelName, theChannelSettings.getConcurrentConsumers(), theChannelSettings); 057 } 058 059 public IChannelNamer getChannelNamer() { 060 return myChannelNamer; 061 } 062 063 private LinkedBlockingChannel getOrCreateChannel( 064 String theChannelName, int theConcurrentConsumers, IChannelSettings theChannelSettings) { 065 // TODO - does this need retry settings? 066 final String channelName = myChannelNamer.getChannelName(theChannelName, theChannelSettings); 067 068 return myChannels.computeIfAbsent( 069 channelName, t -> buildLinkedBlockingChannel(theConcurrentConsumers, channelName)); 070 } 071 072 @Nonnull 073 protected LinkedBlockingChannel buildLinkedBlockingChannel(int theConcurrentConsumers, String theChannelName) { 074 String threadNamePrefix = theChannelName + "-"; 075 ThreadPoolTaskExecutor threadPoolExecutor = ThreadPoolUtil.newThreadPool( 076 theConcurrentConsumers, 077 theConcurrentConsumers, 078 threadNamePrefix, 079 SubscriptionConstants.DELIVERY_EXECUTOR_QUEUE_SIZE); 080 081 return new LinkedBlockingChannel( 082 theChannelName, threadPoolExecutor, threadPoolExecutor::getQueueSize, myRetryPolicyProvider); 083 } 084 085 @PreDestroy 086 public void stop() { 087 myChannels.clear(); 088 } 089}