
001/*- 002 * #%L 003 * HAPI FHIR Storage api 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.broker.jms; 021 022import ca.uhn.fhir.broker.api.ChannelConsumerStartFailureException; 023import ca.uhn.fhir.broker.api.IChannelConsumer; 024import ca.uhn.fhir.broker.api.IMessageListener; 025import ca.uhn.fhir.i18n.Msg; 026import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 027import ca.uhn.fhir.rest.server.messaging.IMessage; 028import ca.uhn.fhir.util.IoUtils; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031import org.springframework.beans.factory.DisposableBean; 032import org.springframework.messaging.MessageHandler; 033 034/** 035 * Adapt a Spring Messaging (JMS) Queue to {@link IChannelConsumer} 036 * 037 * @param <T> the type of payload this message consumer is expecting to receive 038 */ 039public class SpringMessagingReceiverAdapter<T> implements IChannelConsumer<T> { 040 private static final Logger ourLog = LoggerFactory.getLogger(SpringMessagingReceiverAdapter.class); 041 private final Class<? extends IMessage<T>> myMessageType; 042 private final ISpringMessagingChannelReceiver mySpringMessagingChannelReceiver; 043 private final IMessageListener<T> myMessageListener; 044 private MessageHandler myMessageHandler; 045 private boolean myClosed; 046 047 public SpringMessagingReceiverAdapter( 048 Class<? extends IMessage<T>> theMessageType, 049 ISpringMessagingChannelReceiver theSpringMessagingChannelReceiver, 050 IMessageListener<T> theMessageListener) { 051 myMessageType = theMessageType; 052 mySpringMessagingChannelReceiver = theSpringMessagingChannelReceiver; 053 myMessageListener = theMessageListener; 054 } 055 056 public void subscribe(MessageHandler theMessageHandler) { 057 checkState(); 058 if (myMessageHandler != null) { 059 throw new IllegalArgumentException("Only one subscriber allowed"); 060 } 061 myMessageHandler = theMessageHandler; 062 mySpringMessagingChannelReceiver.subscribe(theMessageHandler); 063 } 064 065 @Override 066 public void close() { 067 myClosed = true; 068 if (myMessageHandler != null) { 069 mySpringMessagingChannelReceiver.unsubscribe(myMessageHandler); 070 closeAndDestroyQuietly(myMessageHandler); 071 } 072 destroyQuietly(mySpringMessagingChannelReceiver); 073 closeQuietly(myMessageListener); 074 } 075 076 private void closeAndDestroyQuietly(MessageHandler theMessageHandler) { 077 if (theMessageHandler instanceof AutoCloseable) { 078 IoUtils.closeQuietly((AutoCloseable) theMessageHandler, ourLog); 079 } 080 if (theMessageHandler instanceof DisposableBean) { 081 try { 082 ((DisposableBean) theMessageHandler).destroy(); 083 } catch (Exception e) { 084 throw new InternalErrorException(Msg.code(2647) + "Failed to destroy MessageHandler", e); 085 } 086 } 087 } 088 089 private void destroyQuietly(ISpringMessagingChannelReceiver theSpringMessagingChannelReceiver) { 090 try { 091 theSpringMessagingChannelReceiver.destroy(); 092 } catch (Exception e) { 093 ourLog.error("Error destroying Spring Messaging ChannelReceiver", e); 094 } 095 } 096 097 private void closeQuietly(IMessageListener<T> theMessageListener) { 098 if (theMessageListener instanceof AutoCloseable) { 099 IoUtils.closeQuietly((AutoCloseable) theMessageListener, ourLog); 100 } 101 } 102 103 @Override 104 public boolean isClosed() { 105 return myClosed; 106 } 107 108 @Override 109 public String getChannelName() { 110 return mySpringMessagingChannelReceiver.getChannelName(); 111 } 112 113 @Override 114 public void start() throws ChannelConsumerStartFailureException { 115 checkState(); 116 mySpringMessagingChannelReceiver.start(); 117 } 118 119 public ISpringMessagingChannelReceiver getSpringMessagingChannelReceiver() { 120 return mySpringMessagingChannelReceiver; 121 } 122 123 @Override 124 public Class<? extends IMessage<T>> getMessageType() { 125 return myMessageType; 126 } 127 128 @Override 129 public IMessageListener<T> getMessageListener() { 130 return myMessageListener; 131 } 132 133 @Override 134 public void pause() { 135 mySpringMessagingChannelReceiver.pause(); 136 } 137 138 @Override 139 public void resume() { 140 checkState(); 141 mySpringMessagingChannelReceiver.resume(); 142 } 143}