001/*- 002 * #%L 003 * HAPI FHIR Storage api 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.subscription.channel.subscription; 021 022import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver; 023import org.apache.commons.lang3.Validate; 024import org.springframework.messaging.Message; 025import org.springframework.messaging.MessageHandler; 026import org.springframework.messaging.SubscribableChannel; 027import org.springframework.messaging.support.AbstractSubscribableChannel; 028import org.springframework.messaging.support.ChannelInterceptor; 029 030import java.util.Set; 031 032public class BroadcastingSubscribableChannelWrapper extends AbstractSubscribableChannel implements IChannelReceiver { 033 034 private final IChannelReceiver myWrappedChannel; 035 private final MessageHandler myHandler; 036 037 public BroadcastingSubscribableChannelWrapper(IChannelReceiver theChannel) { 038 myHandler = message -> send(message); 039 theChannel.subscribe(myHandler); 040 myWrappedChannel = theChannel; 041 } 042 043 public SubscribableChannel getWrappedChannel() { 044 return myWrappedChannel; 045 } 046 047 @Override 048 protected boolean sendInternal(Message<?> theMessage, long timeout) { 049 Set<MessageHandler> subscribers = getSubscribers(); 050 Validate.isTrue(subscribers.size() > 0, "Channel has zero subscribers"); 051 for (MessageHandler next : subscribers) { 052 next.handleMessage(theMessage); 053 } 054 return true; 055 } 056 057 @Override 058 public void destroy() throws Exception { 059 myWrappedChannel.destroy(); 060 myWrappedChannel.unsubscribe(myHandler); 061 } 062 063 @Override 064 public void addInterceptor(ChannelInterceptor interceptor) { 065 super.addInterceptor(interceptor); 066 myWrappedChannel.addInterceptor(interceptor); 067 } 068 069 @Override 070 public String getName() { 071 return myWrappedChannel.getName(); 072 } 073}