001package ca.uhn.fhir.jpa.subscription.channel.subscription;
002
003/*-
004 * #%L
005 * HAPI FHIR Subscription Server
006 * %%
007 * Copyright (C) 2014 - 2021 Smile CDR, Inc.
008 * %%
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *      http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 * #L%
021 */
022
023import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
024import com.google.common.annotations.VisibleForTesting;
025import org.slf4j.Logger;
026import org.slf4j.LoggerFactory;
027import org.springframework.beans.factory.DisposableBean;
028import org.springframework.messaging.MessageChannel;
029import org.springframework.messaging.MessageHandler;
030import org.springframework.messaging.SubscribableChannel;
031
032import java.io.Closeable;
033import java.util.Collection;
034import java.util.HashSet;
035
036public class SubscriptionChannelWithHandlers implements Closeable {
037        private static final Logger ourLog = LoggerFactory.getLogger(ActiveSubscription.class);
038
039        private final String myChannelName;
040        private final SubscribableChannel mySubscribableChannel;
041        private final Collection<MessageHandler> myDeliveryHandlerSet = new HashSet<>();
042
043        public SubscriptionChannelWithHandlers(String theChannelName, SubscribableChannel theSubscribableChannel) {
044                myChannelName = theChannelName;
045                mySubscribableChannel = theSubscribableChannel;
046        }
047
048        public void addHandler(MessageHandler theHandler) {
049                mySubscribableChannel.subscribe(theHandler);
050                myDeliveryHandlerSet.add(theHandler);
051        }
052
053        public void removeHandler(MessageHandler theMessageHandler) {
054                if (mySubscribableChannel != null) {
055                        mySubscribableChannel.unsubscribe(theMessageHandler);
056                }
057        }
058
059        @VisibleForTesting
060        public MessageHandler getDeliveryHandlerForUnitTest() {
061                return myDeliveryHandlerSet.iterator().next();
062        }
063
064        @Override
065        public void close() {
066                for (MessageHandler messageHandler : myDeliveryHandlerSet) {
067                        removeHandler(messageHandler);
068                }
069                if (mySubscribableChannel instanceof DisposableBean) {
070                        tryDestroyChannel((DisposableBean) mySubscribableChannel);
071                }
072        }
073
074        private void tryDestroyChannel(DisposableBean theSubscribableChannel) {
075                try {
076                        ourLog.info("Destroying channel {}", myChannelName);
077                        theSubscribableChannel.destroy();
078                } catch (Exception e) {
079                        ourLog.error("Failed to destroy channel bean", e);
080                }
081        }
082
083        public MessageChannel getChannel() {
084                return mySubscribableChannel;
085        }
086}