
001/*- 002 * #%L 003 * HAPI FHIR Storage api 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.subscription.channel.impl; 021 022import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings; 023import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings; 024import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory; 025import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer; 026import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver; 027import ca.uhn.fhir.jpa.subscription.channel.api.IChannelSettings; 028import ca.uhn.fhir.jpa.subscription.channel.subscription.IChannelNamer; 029import ca.uhn.fhir.subscription.SubscriptionConstants; 030import ca.uhn.fhir.util.ThreadPoolUtil; 031import jakarta.annotation.Nonnull; 032import jakarta.annotation.PreDestroy; 033import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; 034 035import java.util.Collections; 036import java.util.HashMap; 037import java.util.Map; 038 039public class LinkedBlockingChannelFactory implements IChannelFactory { 040 041 private final IChannelNamer myChannelNamer; 042 private final Map<String, LinkedBlockingChannel> myChannels = Collections.synchronizedMap(new HashMap<>()); 043 044 private RetryPolicyProvider myRetryPolicyProvider; 045 046 public LinkedBlockingChannelFactory(IChannelNamer theChannelNamer, RetryPolicyProvider theRetryPolicyProvider) { 047 myChannelNamer = theChannelNamer; 048 myRetryPolicyProvider = theRetryPolicyProvider; 049 } 050 051 @Override 052 public IChannelReceiver getOrCreateReceiver( 053 String theChannelName, Class<?> theMessageType, ChannelConsumerSettings theChannelSettings) { 054 return getOrCreateChannel(theChannelName, theChannelSettings.getConcurrentConsumers(), theChannelSettings); 055 } 056 057 @Override 058 public IChannelProducer getOrCreateProducer( 059 String theChannelName, Class<?> theMessageType, ChannelProducerSettings theChannelSettings) { 060 return getOrCreateChannel(theChannelName, theChannelSettings.getConcurrentConsumers(), theChannelSettings); 061 } 062 063 @Override 064 public IChannelNamer getChannelNamer() { 065 return myChannelNamer; 066 } 067 068 private LinkedBlockingChannel getOrCreateChannel( 069 String theChannelName, int theConcurrentConsumers, IChannelSettings theChannelSettings) { 070 // TODO - does this need retry settings? 071 final String channelName = myChannelNamer.getChannelName(theChannelName, theChannelSettings); 072 073 return myChannels.computeIfAbsent( 074 channelName, t -> buildLinkedBlockingChannel(theConcurrentConsumers, channelName)); 075 } 076 077 @Nonnull 078 private LinkedBlockingChannel buildLinkedBlockingChannel(int theConcurrentConsumers, String theChannelName) { 079 String threadNamePrefix = theChannelName + "-"; 080 ThreadPoolTaskExecutor threadPoolExecutor = ThreadPoolUtil.newThreadPool( 081 theConcurrentConsumers, 082 theConcurrentConsumers, 083 threadNamePrefix, 084 SubscriptionConstants.DELIVERY_EXECUTOR_QUEUE_SIZE); 085 086 return new LinkedBlockingChannel( 087 theChannelName, threadPoolExecutor, threadPoolExecutor::getQueueSize, myRetryPolicyProvider); 088 } 089 090 @PreDestroy 091 public void stop() { 092 myChannels.clear(); 093 } 094}