
001/*- 002 * #%L 003 * HAPI FHIR Subscription Server 004 * %% 005 * Copyright (C) 2014 - 2023 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.subscription.channel.subscription; 021 022import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription; 023import com.google.common.annotations.VisibleForTesting; 024import org.slf4j.Logger; 025import org.slf4j.LoggerFactory; 026import org.springframework.beans.factory.DisposableBean; 027import org.springframework.messaging.MessageChannel; 028import org.springframework.messaging.MessageHandler; 029import org.springframework.messaging.SubscribableChannel; 030 031import java.io.Closeable; 032import java.util.Collection; 033import java.util.HashSet; 034 035public class SubscriptionChannelWithHandlers implements Closeable { 036 private static final Logger ourLog = LoggerFactory.getLogger(ActiveSubscription.class); 037 038 private final String myChannelName; 039 private final SubscribableChannel mySubscribableChannel; 040 private final Collection<MessageHandler> myDeliveryHandlerSet = new HashSet<>(); 041 042 public SubscriptionChannelWithHandlers(String theChannelName, SubscribableChannel theSubscribableChannel) { 043 myChannelName = theChannelName; 044 mySubscribableChannel = theSubscribableChannel; 045 } 046 047 public void addHandler(MessageHandler theHandler) { 048 mySubscribableChannel.subscribe(theHandler); 049 myDeliveryHandlerSet.add(theHandler); 050 } 051 052 public void removeHandler(MessageHandler theMessageHandler) { 053 if (mySubscribableChannel != null) { 054 mySubscribableChannel.unsubscribe(theMessageHandler); 055 } 056 if (theMessageHandler instanceof DisposableBean) { 057 try { 058 ((DisposableBean) theMessageHandler).destroy(); 059 } catch (Exception e) { 060 ourLog.warn( 061 "Could not destroy {} handler for {}", 062 theMessageHandler.getClass().getSimpleName(), 063 myChannelName, 064 e); 065 } 066 } 067 } 068 069 @VisibleForTesting 070 public MessageHandler getDeliveryHandlerForUnitTest() { 071 return myDeliveryHandlerSet.iterator().next(); 072 } 073 074 @Override 075 public void close() { 076 for (MessageHandler messageHandler : myDeliveryHandlerSet) { 077 removeHandler(messageHandler); 078 } 079 if (mySubscribableChannel instanceof DisposableBean) { 080 tryDestroyChannel((DisposableBean) mySubscribableChannel); 081 } 082 } 083 084 private void tryDestroyChannel(DisposableBean theSubscribableChannel) { 085 try { 086 ourLog.info("Destroying channel {}", myChannelName); 087 theSubscribableChannel.destroy(); 088 } catch (Exception e) { 089 ourLog.error("Failed to destroy channel bean", e); 090 } 091 } 092 093 public MessageChannel getChannel() { 094 return mySubscribableChannel; 095 } 096}