001/*-
002 * #%L
003 * HAPI FHIR Subscription Server
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.subscription.channel.subscription;
021
022import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
023import com.google.common.annotations.VisibleForTesting;
024import org.slf4j.Logger;
025import org.slf4j.LoggerFactory;
026import org.springframework.beans.factory.DisposableBean;
027import org.springframework.messaging.MessageChannel;
028import org.springframework.messaging.MessageHandler;
029import org.springframework.messaging.SubscribableChannel;
030
031import java.io.Closeable;
032import java.util.Collection;
033import java.util.HashSet;
034
035public class SubscriptionChannelWithHandlers implements Closeable {
036        private static final Logger ourLog = LoggerFactory.getLogger(ActiveSubscription.class);
037
038        private final String myChannelName;
039        private final SubscribableChannel mySubscribableChannel;
040        private final Collection<MessageHandler> myDeliveryHandlerSet = new HashSet<>();
041
042        public SubscriptionChannelWithHandlers(String theChannelName, SubscribableChannel theSubscribableChannel) {
043                myChannelName = theChannelName;
044                mySubscribableChannel = theSubscribableChannel;
045        }
046
047        public void addHandler(MessageHandler theHandler) {
048                mySubscribableChannel.subscribe(theHandler);
049                myDeliveryHandlerSet.add(theHandler);
050        }
051
052        public void removeHandler(MessageHandler theMessageHandler) {
053                if (mySubscribableChannel != null) {
054                        mySubscribableChannel.unsubscribe(theMessageHandler);
055                }
056                if (theMessageHandler instanceof DisposableBean) {
057                        try {
058                                ((DisposableBean) theMessageHandler).destroy();
059                        } catch (Exception e) {
060                                ourLog.warn(
061                                                "Could not destroy {} handler for {}",
062                                                theMessageHandler.getClass().getSimpleName(),
063                                                myChannelName,
064                                                e);
065                        }
066                }
067        }
068
069        @VisibleForTesting
070        public MessageHandler getDeliveryHandlerForUnitTest() {
071                return myDeliveryHandlerSet.iterator().next();
072        }
073
074        @Override
075        public void close() {
076                for (MessageHandler messageHandler : myDeliveryHandlerSet) {
077                        removeHandler(messageHandler);
078                }
079                if (mySubscribableChannel instanceof DisposableBean) {
080                        tryDestroyChannel((DisposableBean) mySubscribableChannel);
081                }
082        }
083
084        private void tryDestroyChannel(DisposableBean theSubscribableChannel) {
085                try {
086                        ourLog.info("Destroying channel {}", myChannelName);
087                        theSubscribableChannel.destroy();
088                } catch (Exception e) {
089                        ourLog.error("Failed to destroy channel bean", e);
090                }
091        }
092
093        public MessageChannel getChannel() {
094                return mySubscribableChannel;
095        }
096}