
001/*- 002 * #%L 003 * HAPI FHIR Storage api 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.broker.impl; 021 022import ca.uhn.fhir.broker.api.ChannelConsumerSettings; 023import ca.uhn.fhir.broker.api.ChannelProducerSettings; 024import ca.uhn.fhir.broker.api.IBrokerClient; 025import ca.uhn.fhir.broker.api.IChannelConsumer; 026import ca.uhn.fhir.broker.api.IChannelNamer; 027import ca.uhn.fhir.broker.api.IChannelProducer; 028import ca.uhn.fhir.broker.api.IMessageListener; 029import ca.uhn.fhir.broker.jms.SpringMessagingMessageHandlerAdapter; 030import ca.uhn.fhir.broker.jms.SpringMessagingProducerAdapter; 031import ca.uhn.fhir.broker.jms.SpringMessagingReceiverAdapter; 032import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel; 033import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannelFactory; 034import ca.uhn.fhir.rest.server.messaging.IMessage; 035import org.springframework.beans.factory.annotation.Autowired; 036import org.springframework.messaging.MessageHandler; 037 038public class LinkedBlockingBrokerClient implements IBrokerClient { 039 private LinkedBlockingChannelFactory myLinkedBlockingChannelFactory; 040 private final IChannelNamer myChannelNamer; 041 042 public LinkedBlockingBrokerClient(IChannelNamer theChannelNamer) { 043 myChannelNamer = theChannelNamer; 044 } 045 046 @Override 047 public <T> IChannelConsumer<T> getOrCreateConsumer( 048 String theChannelName, 049 Class<? extends IMessage<T>> theMessageType, 050 IMessageListener<T> theMessageListener, 051 ChannelConsumerSettings theChannelConsumerSettings) { 052 LinkedBlockingChannel springMessagingChannelReceiver = 053 myLinkedBlockingChannelFactory.getOrCreateReceiver(theChannelName, theChannelConsumerSettings); 054 SpringMessagingReceiverAdapter<T> retval = new SpringMessagingReceiverAdapter<>( 055 theMessageType, springMessagingChannelReceiver, theMessageListener); 056 MessageHandler handler = new SpringMessagingMessageHandlerAdapter<>(theMessageType, theMessageListener); 057 retval.subscribe(handler); 058 return retval; 059 } 060 061 @Override 062 public <T> IChannelProducer<T> getOrCreateProducer( 063 String theChannelName, 064 Class<? extends IMessage<T>> theMessageType, 065 ChannelProducerSettings theChannelProducerSettings) { 066 return new SpringMessagingProducerAdapter<>( 067 theMessageType, 068 myLinkedBlockingChannelFactory.getOrCreateProducer(theChannelName, theChannelProducerSettings)); 069 } 070 071 @Override 072 public IChannelNamer getChannelNamer() { 073 return myChannelNamer; 074 } 075 076 @Autowired 077 public void setLinkedBlockingChannelFactory(LinkedBlockingChannelFactory theLinkedBlockingChannelFactory) { 078 myLinkedBlockingChannelFactory = theLinkedBlockingChannelFactory; 079 } 080}