001/*-
002 * #%L
003 * HAPI FHIR Storage api
004 * %%
005 * Copyright (C) 2014 - 2025 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.broker.jms;
021
022import ca.uhn.fhir.broker.api.ChannelConsumerStartFailureException;
023import ca.uhn.fhir.broker.api.IChannelConsumer;
024import ca.uhn.fhir.broker.api.IMessageListener;
025import ca.uhn.fhir.i18n.Msg;
026import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
027import ca.uhn.fhir.rest.server.messaging.IMessage;
028import ca.uhn.fhir.util.IoUtils;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031import org.springframework.beans.factory.DisposableBean;
032import org.springframework.messaging.MessageHandler;
033
034/**
035 * Adapt a Spring Messaging (JMS) Queue to {@link IChannelConsumer}
036 *
037 * @param <T> the type of payload this message consumer is expecting to receive
038 */
039public class SpringMessagingReceiverAdapter<T> implements IChannelConsumer<T> {
040        private static final Logger ourLog = LoggerFactory.getLogger(SpringMessagingReceiverAdapter.class);
041        private final Class<? extends IMessage<T>> myMessageType;
042        private final ISpringMessagingChannelReceiver mySpringMessagingChannelReceiver;
043        private final IMessageListener<T> myMessageListener;
044        private MessageHandler myMessageHandler;
045        private boolean myClosed;
046
047        public SpringMessagingReceiverAdapter(
048                        Class<? extends IMessage<T>> theMessageType,
049                        ISpringMessagingChannelReceiver theSpringMessagingChannelReceiver,
050                        IMessageListener<T> theMessageListener) {
051                myMessageType = theMessageType;
052                mySpringMessagingChannelReceiver = theSpringMessagingChannelReceiver;
053                myMessageListener = theMessageListener;
054        }
055
056        public void subscribe(MessageHandler theMessageHandler) {
057                checkState();
058                if (myMessageHandler != null) {
059                        throw new IllegalArgumentException("Only one subscriber allowed");
060                }
061                myMessageHandler = theMessageHandler;
062                mySpringMessagingChannelReceiver.subscribe(theMessageHandler);
063        }
064
065        @Override
066        public void close() {
067                myClosed = true;
068                if (myMessageHandler != null) {
069                        mySpringMessagingChannelReceiver.unsubscribe(myMessageHandler);
070                        closeAndDestroyQuietly(myMessageHandler);
071                }
072                destroyQuietly(mySpringMessagingChannelReceiver);
073                closeQuietly(myMessageListener);
074        }
075
076        private void closeAndDestroyQuietly(MessageHandler theMessageHandler) {
077                if (theMessageHandler instanceof AutoCloseable) {
078                        IoUtils.closeQuietly((AutoCloseable) theMessageHandler, ourLog);
079                }
080                if (theMessageHandler instanceof DisposableBean) {
081                        try {
082                                ((DisposableBean) theMessageHandler).destroy();
083                        } catch (Exception e) {
084                                throw new InternalErrorException(Msg.code(2647) + "Failed to destroy MessageHandler", e);
085                        }
086                }
087        }
088
089        private void destroyQuietly(ISpringMessagingChannelReceiver theSpringMessagingChannelReceiver) {
090                try {
091                        theSpringMessagingChannelReceiver.destroy();
092                } catch (Exception e) {
093                        ourLog.error("Error destroying Spring Messaging ChannelReceiver", e);
094                }
095        }
096
097        private void closeQuietly(IMessageListener<T> theMessageListener) {
098                if (theMessageListener instanceof AutoCloseable) {
099                        IoUtils.closeQuietly((AutoCloseable) theMessageListener, ourLog);
100                }
101        }
102
103        @Override
104        public boolean isClosed() {
105                return myClosed;
106        }
107
108        @Override
109        public String getChannelName() {
110                return mySpringMessagingChannelReceiver.getChannelName();
111        }
112
113        @Override
114        public void start() throws ChannelConsumerStartFailureException {
115                checkState();
116                mySpringMessagingChannelReceiver.start();
117        }
118
119        public ISpringMessagingChannelReceiver getSpringMessagingChannelReceiver() {
120                return mySpringMessagingChannelReceiver;
121        }
122
123        @Override
124        public Class<? extends IMessage<T>> getMessageType() {
125                return myMessageType;
126        }
127
128        @Override
129        public IMessageListener<T> getMessageListener() {
130                return myMessageListener;
131        }
132
133        @Override
134        public void pause() {
135                mySpringMessagingChannelReceiver.pause();
136        }
137
138        @Override
139        public void resume() {
140                checkState();
141                mySpringMessagingChannelReceiver.resume();
142        }
143}