001/*-
002 * #%L
003 * HAPI FHIR Storage api
004 * %%
005 * Copyright (C) 2014 - 2025 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.subscription.channel.impl;
021
022import ca.uhn.fhir.broker.jms.ISpringMessagingChannelProducer;
023import ca.uhn.fhir.broker.jms.ISpringMessagingChannelReceiver;
024import com.google.common.annotations.VisibleForTesting;
025import jakarta.annotation.Nonnull;
026import org.springframework.messaging.MessageHandler;
027import org.springframework.messaging.support.ExecutorSubscribableChannel;
028
029import java.util.ArrayList;
030import java.util.Optional;
031import java.util.concurrent.Executor;
032import java.util.function.Supplier;
033
034import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
035
036public class LinkedBlockingChannel extends ExecutorSubscribableChannel
037                implements ISpringMessagingChannelProducer, ISpringMessagingChannelReceiver {
038
039        private final String myChannelName;
040        private final Supplier<Integer> myQueueSizeSupplier;
041
042        private final RetryPolicyProvider myRetryPolicyProvider;
043
044        public LinkedBlockingChannel(
045                        String theChannelName,
046                        Executor theExecutor,
047                        Supplier<Integer> theQueueSizeSupplier,
048                        RetryPolicyProvider theRetryPolicyProvider) {
049                super(theExecutor);
050                myChannelName = theChannelName;
051                myQueueSizeSupplier = theQueueSizeSupplier;
052                myRetryPolicyProvider = theRetryPolicyProvider;
053        }
054
055        @VisibleForTesting
056        public int getQueueSizeForUnitTest() {
057                return defaultIfNull(myQueueSizeSupplier.get(), 0);
058        }
059
060        @VisibleForTesting
061        public void clearInterceptorsForUnitTest() {
062                setInterceptors(new ArrayList<>());
063        }
064
065        @Override
066        public String getChannelName() {
067                return myChannelName;
068        }
069
070        @Override
071        public boolean hasSubscription(@Nonnull MessageHandler handler) {
072                return getSubscribers().stream()
073                                .map(t -> (RetryingMessageHandlerWrapper) t)
074                                .anyMatch(t -> t.getWrappedHandler() == handler);
075        }
076
077        @Override
078        public boolean subscribe(@Nonnull MessageHandler theHandler) {
079                return super.subscribe(new RetryingMessageHandlerWrapper(theHandler, getChannelName(), myRetryPolicyProvider));
080        }
081
082        @Override
083        public boolean unsubscribe(@Nonnull MessageHandler handler) {
084                Optional<RetryingMessageHandlerWrapper> match = getSubscribers().stream()
085                                .map(t -> (RetryingMessageHandlerWrapper) t)
086                                .filter(t -> t.getWrappedHandler() == handler)
087                                .findFirst();
088                match.ifPresent(super::unsubscribe);
089                return match.isPresent();
090        }
091
092        @Override
093        public void destroy() {
094                // nothing
095        }
096
097        /**
098         * Creates a synchronous channel for testing
099         */
100        @VisibleForTesting
101        public static LinkedBlockingChannel newSynchronous(String theName, RetryPolicyProvider theRetryPolicyProvider) {
102                return new LinkedBlockingChannel(theName, null, () -> 0, theRetryPolicyProvider);
103        }
104}