001/*-
002 * #%L
003 * HAPI FHIR Storage api
004 * %%
005 * Copyright (C) 2014 - 2025 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.subscription.channel.impl;
021
022import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
023import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
024import jakarta.annotation.Nonnull;
025import org.springframework.messaging.MessageHandler;
026import org.springframework.messaging.support.ExecutorSubscribableChannel;
027
028import java.util.ArrayList;
029import java.util.Optional;
030import java.util.concurrent.Executor;
031import java.util.function.Supplier;
032
033import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
034
035public class LinkedBlockingChannel extends ExecutorSubscribableChannel implements IChannelProducer, IChannelReceiver {
036
037        private final String myName;
038        private final Supplier<Integer> myQueueSizeSupplier;
039
040        private final RetryPolicyProvider myRetryPolicyProvider;
041
042        public LinkedBlockingChannel(
043                        String theName,
044                        Executor theExecutor,
045                        Supplier<Integer> theQueueSizeSupplier,
046                        RetryPolicyProvider theRetryPolicyProvider) {
047                super(theExecutor);
048                myName = theName;
049                myQueueSizeSupplier = theQueueSizeSupplier;
050                myRetryPolicyProvider = theRetryPolicyProvider;
051        }
052
053        public int getQueueSizeForUnitTest() {
054                return defaultIfNull(myQueueSizeSupplier.get(), 0);
055        }
056
057        public void clearInterceptorsForUnitTest() {
058                setInterceptors(new ArrayList<>());
059        }
060
061        @Override
062        public String getName() {
063                return myName;
064        }
065
066        @Override
067        public boolean hasSubscription(@Nonnull MessageHandler handler) {
068                return getSubscribers().stream()
069                                .map(t -> (RetryingMessageHandlerWrapper) t)
070                                .anyMatch(t -> t.getWrappedHandler() == handler);
071        }
072
073        @Override
074        public boolean subscribe(@Nonnull MessageHandler theHandler) {
075                return super.subscribe(new RetryingMessageHandlerWrapper(theHandler, getName(), myRetryPolicyProvider));
076        }
077
078        @Override
079        public boolean unsubscribe(@Nonnull MessageHandler handler) {
080                Optional<RetryingMessageHandlerWrapper> match = getSubscribers().stream()
081                                .map(t -> (RetryingMessageHandlerWrapper) t)
082                                .filter(t -> t.getWrappedHandler() == handler)
083                                .findFirst();
084                match.ifPresent(super::unsubscribe);
085                return match.isPresent();
086        }
087
088        @Override
089        public void destroy() {
090                // nothing
091        }
092
093        /**
094         * Creates a synchronous channel, mostly intended for testing
095         */
096        public static LinkedBlockingChannel newSynchronous(String theName, RetryPolicyProvider theRetryPolicyProvider) {
097                return new LinkedBlockingChannel(theName, null, () -> 0, theRetryPolicyProvider);
098        }
099}