001/*- 002 * #%L 003 * HAPI FHIR Subscription Server 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.subscription.match.registry; 021 022import ca.uhn.fhir.interceptor.api.HookParams; 023import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; 024import ca.uhn.fhir.interceptor.api.Pointcut; 025import ca.uhn.fhir.jpa.subscription.channel.subscription.ISubscriptionDeliveryChannelNamer; 026import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry; 027import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; 028import ca.uhn.fhir.jpa.subscription.model.ChannelRetryConfiguration; 029import ca.uhn.fhir.util.HapiExtensions; 030import jakarta.annotation.PreDestroy; 031import org.apache.commons.lang3.Validate; 032import org.hl7.fhir.instance.model.api.IBaseResource; 033import org.hl7.fhir.instance.model.api.IIdType; 034import org.hl7.fhir.r4.model.Subscription; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037import org.springframework.beans.factory.annotation.Autowired; 038 039import java.util.Collection; 040import java.util.Collections; 041import java.util.List; 042import java.util.Optional; 043 044/** 045 * Cache of active subscriptions. When a new subscription is added to the cache, a new Spring Channel is created 046 * and a new MessageHandler for that subscription is subscribed to that channel. These subscriptions, channels, and 047 * handlers are all caches in this registry so they can be removed it the subscription is deleted. 048 */ 049public class SubscriptionRegistry { 050 private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionRegistry.class); 051 private final ActiveSubscriptionCache myActiveSubscriptionCache = new ActiveSubscriptionCache(); 052 053 @Autowired 054 private SubscriptionCanonicalizer mySubscriptionCanonicalizer; 055 056 @Autowired 057 private ISubscriptionDeliveryChannelNamer mySubscriptionDeliveryChannelNamer; 058 059 @Autowired 060 private SubscriptionChannelRegistry mySubscriptionChannelRegistry; 061 062 @Autowired 063 private IInterceptorBroadcaster myInterceptorBroadcaster; 064 065 /** 066 * Constructor 067 */ 068 public SubscriptionRegistry() { 069 super(); 070 } 071 072 public ActiveSubscription get(String theIdPart) { 073 return myActiveSubscriptionCache.get(theIdPart); 074 } 075 076 public Collection<ActiveSubscription> getAll() { 077 return myActiveSubscriptionCache.getAll(); 078 } 079 080 public List<ActiveSubscription> getTopicSubscriptionsByTopic(String theTopic) { 081 return myActiveSubscriptionCache.getTopicSubscriptionsForTopic(theTopic); 082 } 083 084 private Optional<CanonicalSubscription> hasSubscription(IIdType theId) { 085 Validate.notNull(theId, "theId must not be null"); 086 Validate.notBlank(theId.getIdPart(), "theId must have an ID part"); 087 Optional<ActiveSubscription> activeSubscription = 088 Optional.ofNullable(myActiveSubscriptionCache.get(theId.getIdPart())); 089 return activeSubscription.map(ActiveSubscription::getSubscription); 090 } 091 092 /** 093 * Extracts the retry configuration settings from the CanonicalSubscription object. 094 * 095 * @return the configuration, or null, if no retry (or a bad retry value) is specified. 096 */ 097 private ChannelRetryConfiguration getRetryConfigurationFromSubscriptionExtensions( 098 CanonicalSubscription theSubscription) { 099 ChannelRetryConfiguration configuration = new ChannelRetryConfiguration(); 100 101 List<String> retryCount = theSubscription.getChannelExtensions(HapiExtensions.EX_RETRY_COUNT); 102 if (retryCount.size() == 1) { 103 String val = retryCount.get(0); 104 configuration.setRetryCount(Integer.parseInt(val)); 105 } 106 // else - 0 or more than 1 means no retry policy at all 107 108 // retry count is required for any retry policy 109 if (configuration.getRetryCount() == null || configuration.getRetryCount() < 0) { 110 configuration = null; 111 } 112 113 return configuration; 114 } 115 116 private void registerSubscription(IIdType theId, CanonicalSubscription theCanonicalSubscription) { 117 Validate.notNull(theId, "theId must not be null"); 118 String subscriptionId = theId.getIdPart(); 119 Validate.notBlank(subscriptionId, "theId must have an ID part"); 120 Validate.notNull(theCanonicalSubscription, "theCanonicalSubscription must not be null"); 121 122 String channelName = mySubscriptionDeliveryChannelNamer.nameFromSubscription(theCanonicalSubscription); 123 124 // get the actual retry configuration 125 ChannelRetryConfiguration configuration = 126 getRetryConfigurationFromSubscriptionExtensions(theCanonicalSubscription); 127 128 ActiveSubscription activeSubscription = new ActiveSubscription(theCanonicalSubscription, channelName); 129 activeSubscription.setRetryConfiguration(configuration); 130 131 // add to our registries 132 mySubscriptionChannelRegistry.add(activeSubscription); 133 myActiveSubscriptionCache.put(subscriptionId, activeSubscription); 134 135 ourLog.info( 136 "Registered active subscription Subscription/{} - Have {} registered", 137 subscriptionId, 138 myActiveSubscriptionCache.size()); 139 140 // Interceptor call: SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED 141 HookParams params = new HookParams().add(CanonicalSubscription.class, theCanonicalSubscription); 142 myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, params); 143 } 144 145 public void unregisterSubscriptionIfRegistered(String theSubscriptionId) { 146 Validate.notNull(theSubscriptionId, "theSubscriptionId must not be null"); 147 148 ActiveSubscription activeSubscription = myActiveSubscriptionCache.remove(theSubscriptionId); 149 if (activeSubscription != null) { 150 mySubscriptionChannelRegistry.remove(activeSubscription); 151 ourLog.info( 152 "Unregistered active subscription {} - Have {} registered", 153 theSubscriptionId, 154 myActiveSubscriptionCache.size()); 155 156 // Interceptor call: SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_UNREGISTERED 157 HookParams params = new HookParams(); 158 myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_UNREGISTERED, params); 159 } 160 } 161 162 @PreDestroy 163 public void unregisterAllSubscriptions() { 164 // Once to set flag 165 unregisterAllSubscriptionsNotInCollection(Collections.emptyList()); 166 // Twice to remove 167 unregisterAllSubscriptionsNotInCollection(Collections.emptyList()); 168 } 169 170 void unregisterAllSubscriptionsNotInCollection(Collection<String> theAllIds) { 171 172 List<String> idsToDelete = 173 myActiveSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(theAllIds); 174 for (String id : idsToDelete) { 175 unregisterSubscriptionIfRegistered(id); 176 } 177 } 178 179 public synchronized boolean registerSubscriptionUnlessAlreadyRegistered(IBaseResource theSubscription) { 180 Validate.notNull(theSubscription, "theSubscription must not be null"); 181 Optional<CanonicalSubscription> existingSubscription = hasSubscription(theSubscription.getIdElement()); 182 CanonicalSubscription newSubscription = mySubscriptionCanonicalizer.canonicalize(theSubscription); 183 184 if (existingSubscription.isPresent()) { 185 if (newSubscription.equals(existingSubscription.get())) { 186 // No changes 187 return false; 188 } 189 ourLog.info( 190 "Updating already-registered active subscription {}", 191 theSubscription.getIdElement().toUnqualified().getValue()); 192 if (channelTypeSame(existingSubscription.get(), newSubscription)) { 193 ourLog.info( 194 "Channel type is same. Updating active subscription and re-using existing channel and handlers."); 195 updateSubscription(theSubscription); 196 return true; 197 } 198 unregisterSubscriptionIfRegistered(theSubscription.getIdElement().getIdPart()); 199 } 200 if (Subscription.SubscriptionStatus.ACTIVE.equals(newSubscription.getStatus())) { 201 registerSubscription(theSubscription.getIdElement(), newSubscription); 202 return true; 203 } else { 204 return false; 205 } 206 } 207 208 private void updateSubscription(IBaseResource theSubscription) { 209 IIdType theId = theSubscription.getIdElement(); 210 Validate.notNull(theId, "theId must not be null"); 211 Validate.notBlank(theId.getIdPart(), "theId must have an ID part"); 212 ActiveSubscription activeSubscription = myActiveSubscriptionCache.get(theId.getIdPart()); 213 Validate.notNull(activeSubscription, "Subscription with ID %s not found in cache", theId.getIdPart()); 214 CanonicalSubscription canonicalized = mySubscriptionCanonicalizer.canonicalize(theSubscription); 215 activeSubscription.setSubscription(canonicalized); 216 217 // Interceptor call: SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED 218 HookParams params = new HookParams().add(CanonicalSubscription.class, canonicalized); 219 myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, params); 220 } 221 222 private boolean channelTypeSame( 223 CanonicalSubscription theExistingSubscription, CanonicalSubscription theNewSubscription) { 224 return theExistingSubscription.getChannelType().equals(theNewSubscription.getChannelType()); 225 } 226 227 public int size() { 228 return myActiveSubscriptionCache.size(); 229 } 230 231 public List<ActiveSubscription> getAllNonTopicSubscriptions() { 232 return myActiveSubscriptionCache.getAllNonTopicSubscriptions(); 233 } 234}