
001/*- 002 * #%L 003 * HAPI FHIR Storage api 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.subscription.channel.impl; 021 022import ca.uhn.fhir.broker.jms.ISpringMessagingChannelProducer; 023import ca.uhn.fhir.broker.jms.ISpringMessagingChannelReceiver; 024import com.google.common.annotations.VisibleForTesting; 025import jakarta.annotation.Nonnull; 026import org.springframework.messaging.MessageHandler; 027import org.springframework.messaging.support.ExecutorSubscribableChannel; 028 029import java.util.ArrayList; 030import java.util.Optional; 031import java.util.concurrent.Executor; 032import java.util.function.Supplier; 033 034import static org.apache.commons.lang3.ObjectUtils.defaultIfNull; 035 036public class LinkedBlockingChannel extends ExecutorSubscribableChannel 037 implements ISpringMessagingChannelProducer, ISpringMessagingChannelReceiver { 038 039 private final String myChannelName; 040 private final Supplier<Integer> myQueueSizeSupplier; 041 042 private final RetryPolicyProvider myRetryPolicyProvider; 043 044 public LinkedBlockingChannel( 045 String theChannelName, 046 Executor theExecutor, 047 Supplier<Integer> theQueueSizeSupplier, 048 RetryPolicyProvider theRetryPolicyProvider) { 049 super(theExecutor); 050 myChannelName = theChannelName; 051 myQueueSizeSupplier = theQueueSizeSupplier; 052 myRetryPolicyProvider = theRetryPolicyProvider; 053 } 054 055 @VisibleForTesting 056 public int getQueueSizeForUnitTest() { 057 return defaultIfNull(myQueueSizeSupplier.get(), 0); 058 } 059 060 @VisibleForTesting 061 public void clearInterceptorsForUnitTest() { 062 setInterceptors(new ArrayList<>()); 063 } 064 065 @Override 066 public String getChannelName() { 067 return myChannelName; 068 } 069 070 @Override 071 public boolean hasSubscription(@Nonnull MessageHandler handler) { 072 return getSubscribers().stream() 073 .map(t -> (RetryingMessageHandlerWrapper) t) 074 .anyMatch(t -> t.getWrappedHandler() == handler); 075 } 076 077 @Override 078 public boolean subscribe(@Nonnull MessageHandler theHandler) { 079 return super.subscribe(new RetryingMessageHandlerWrapper(theHandler, getChannelName(), myRetryPolicyProvider)); 080 } 081 082 @Override 083 public boolean unsubscribe(@Nonnull MessageHandler handler) { 084 Optional<RetryingMessageHandlerWrapper> match = getSubscribers().stream() 085 .map(t -> (RetryingMessageHandlerWrapper) t) 086 .filter(t -> t.getWrappedHandler() == handler) 087 .findFirst(); 088 match.ifPresent(super::unsubscribe); 089 return match.isPresent(); 090 } 091 092 @Override 093 public void destroy() { 094 // nothing 095 } 096 097 /** 098 * Creates a synchronous channel for testing 099 */ 100 @VisibleForTesting 101 public static LinkedBlockingChannel newSynchronous(String theName, RetryPolicyProvider theRetryPolicyProvider) { 102 return new LinkedBlockingChannel(theName, null, () -> 0, theRetryPolicyProvider); 103 } 104}