
001/*- 002 * #%L 003 * HAPI FHIR Subscription Server 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber; 021 022import ca.uhn.fhir.IHapiBootOrder; 023import ca.uhn.fhir.broker.api.ChannelConsumerSettings; 024import ca.uhn.fhir.broker.api.IChannelConsumer; 025import ca.uhn.fhir.broker.impl.MultiplexingListener; 026import ca.uhn.fhir.jpa.model.config.SubscriptionSettings; 027import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory; 028import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; 029import ca.uhn.fhir.jpa.topic.SubscriptionTopicMatchingListener; 030import ca.uhn.fhir.jpa.topic.SubscriptionTopicRegisteringListener; 031import ca.uhn.fhir.util.IoUtils; 032import jakarta.annotation.PreDestroy; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035import org.springframework.beans.factory.annotation.Autowired; 036import org.springframework.context.event.ContextRefreshedEvent; 037import org.springframework.context.event.EventListener; 038import org.springframework.core.annotation.Order; 039 040import static ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchingListener.SUBSCRIPTION_MATCHING_CHANNEL_NAME; 041 042public class MatchingQueueSubscriberLoader { 043 private static final Logger ourLog = LoggerFactory.getLogger(MatchingQueueSubscriberLoader.class); 044 045 @Autowired 046 private SubscriptionMatchingListener mySubscriptionMatchingListener; 047 048 @Autowired 049 private SubscriptionActivatingListener mySubscriptionActivatingListener; 050 051 @Autowired(required = false) 052 private SubscriptionTopicMatchingListener mySubscriptionTopicMatchingListener; 053 054 @Autowired 055 private SubscriptionRegisteringListener mySubscriptionRegisteringListener; 056 057 @Autowired 058 private SubscriptionChannelFactory mySubscriptionChannelFactory; 059 060 @Autowired(required = false) 061 private SubscriptionTopicRegisteringListener mySubscriptionTopicRegisteringListener; 062 063 @Autowired 064 private SubscriptionSettings mySubscriptionSettings; 065 066 protected IChannelConsumer<ResourceModifiedMessage> myMatchingConsumer; 067 private MultiplexingListener<ResourceModifiedMessage> myMultiplexingListener; 068 069 @EventListener(ContextRefreshedEvent.class) 070 @Order(IHapiBootOrder.SUBSCRIPTION_MATCHING_CHANNEL_HANDLER) 071 public void subscribeToMatchingChannel() { 072 if (myMatchingConsumer == null) { 073 myMultiplexingListener = new MultiplexingListener<>(ResourceModifiedMessage.class); 074 075 myMatchingConsumer = mySubscriptionChannelFactory.newMatchingConsumer( 076 SUBSCRIPTION_MATCHING_CHANNEL_NAME, myMultiplexingListener, getChannelConsumerSettings()); 077 myMultiplexingListener.addListener(mySubscriptionMatchingListener); 078 myMultiplexingListener.addListener(mySubscriptionActivatingListener); 079 myMultiplexingListener.addListener(mySubscriptionRegisteringListener); 080 if (mySubscriptionTopicMatchingListener != null) { 081 ourLog.info("Starting SubscriptionTopic Matching Subscriber"); 082 myMultiplexingListener.addListener(mySubscriptionTopicMatchingListener); 083 } 084 if (mySubscriptionTopicRegisteringListener != null) { 085 myMultiplexingListener.addListener(mySubscriptionTopicRegisteringListener); 086 } 087 if (myMatchingConsumer != null) { // can be null in mock tests 088 ourLog.info( 089 "Subscription Matching Subscriber subscribed to Matching Channel {} with name {}", 090 myMatchingConsumer.getClass().getName(), 091 SUBSCRIPTION_MATCHING_CHANNEL_NAME); 092 } 093 } 094 } 095 096 private ChannelConsumerSettings getChannelConsumerSettings() { 097 ChannelConsumerSettings channelConsumerSettings = new ChannelConsumerSettings(); 098 channelConsumerSettings.setQualifyChannelName( 099 mySubscriptionSettings.isQualifySubscriptionMatchingChannelName()); 100 return channelConsumerSettings; 101 } 102 103 @SuppressWarnings("unused") 104 @PreDestroy 105 public void stop() throws Exception { 106 if (myMatchingConsumer != null) { 107 ourLog.info( 108 "Destroying matching Channel {} with name {}", 109 myMatchingConsumer.getClass().getName(), 110 SUBSCRIPTION_MATCHING_CHANNEL_NAME); 111 IoUtils.closeQuietly(myMatchingConsumer, ourLog); 112 } 113 if (myMultiplexingListener != null) { 114 IoUtils.closeQuietly(myMultiplexingListener, ourLog); 115 } 116 } 117}