001/*-
002 * #%L
003 * HAPI FHIR Storage api
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.subscription.channel.impl;
021
022import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
023import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
024import jakarta.annotation.Nonnull;
025import org.springframework.messaging.MessageHandler;
026import org.springframework.messaging.support.ExecutorSubscribableChannel;
027
028import java.util.ArrayList;
029import java.util.Optional;
030import java.util.concurrent.Executor;
031import java.util.function.Supplier;
032
033import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
034
035public class LinkedBlockingChannel extends ExecutorSubscribableChannel implements IChannelProducer, IChannelReceiver {
036
037        private final String myName;
038        private final Supplier<Integer> myQueueSizeSupplier;
039
040        public LinkedBlockingChannel(String theName, Executor theExecutor, Supplier<Integer> theQueueSizeSupplier) {
041                super(theExecutor);
042                myName = theName;
043                myQueueSizeSupplier = theQueueSizeSupplier;
044        }
045
046        public int getQueueSizeForUnitTest() {
047                return defaultIfNull(myQueueSizeSupplier.get(), 0);
048        }
049
050        public void clearInterceptorsForUnitTest() {
051                setInterceptors(new ArrayList<>());
052        }
053
054        @Override
055        public String getName() {
056                return myName;
057        }
058
059        @Override
060        public boolean hasSubscription(@Nonnull MessageHandler handler) {
061                return getSubscribers().stream()
062                                .map(t -> (RetryingMessageHandlerWrapper) t)
063                                .anyMatch(t -> t.getWrappedHandler() == handler);
064        }
065
066        @Override
067        public boolean subscribe(@Nonnull MessageHandler theHandler) {
068                return super.subscribe(new RetryingMessageHandlerWrapper(theHandler, getName()));
069        }
070
071        @Override
072        public boolean unsubscribe(@Nonnull MessageHandler handler) {
073                Optional<RetryingMessageHandlerWrapper> match = getSubscribers().stream()
074                                .map(t -> (RetryingMessageHandlerWrapper) t)
075                                .filter(t -> t.getWrappedHandler() == handler)
076                                .findFirst();
077                match.ifPresent(super::unsubscribe);
078                return match.isPresent();
079        }
080
081        @Override
082        public void destroy() {
083                // nothing
084        }
085
086        /**
087         * Creates a synchronous channel, mostly intended for testing
088         */
089        public static LinkedBlockingChannel newSynchronous(String theName) {
090                return new LinkedBlockingChannel(theName, null, () -> 0);
091        }
092}