001/*- 002 * #%L 003 * HAPI FHIR Subscription Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.subscription.match.registry; 021 022import ca.uhn.fhir.interceptor.api.HookParams; 023import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; 024import ca.uhn.fhir.interceptor.api.Pointcut; 025import ca.uhn.fhir.jpa.subscription.channel.subscription.ISubscriptionDeliveryChannelNamer; 026import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry; 027import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; 028import ca.uhn.fhir.jpa.subscription.model.ChannelRetryConfiguration; 029import ca.uhn.fhir.util.HapiExtensions; 030import jakarta.annotation.PreDestroy; 031import org.apache.commons.lang3.Validate; 032import org.hl7.fhir.instance.model.api.IBaseResource; 033import org.hl7.fhir.instance.model.api.IIdType; 034import org.hl7.fhir.r4.model.Subscription; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037import org.springframework.beans.factory.annotation.Autowired; 038 039import java.util.Collection; 040import java.util.Collections; 041import java.util.List; 042import java.util.Optional; 043 044/** 045 * Cache of active subscriptions. When a new subscription is added to the cache, a new Spring Channel is created 046 * and a new MessageHandler for that subscription is subscribed to that channel. These subscriptions, channels, and 047 * handlers are all caches in this registry so they can be removed it the subscription is deleted. 048 */ 049public class SubscriptionRegistry { 050 private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionRegistry.class); 051 private final ActiveSubscriptionCache myActiveSubscriptionCache = new ActiveSubscriptionCache(); 052 053 @Autowired 054 private SubscriptionCanonicalizer mySubscriptionCanonicalizer; 055 056 @Autowired 057 private ISubscriptionDeliveryChannelNamer mySubscriptionDeliveryChannelNamer; 058 059 @Autowired 060 private SubscriptionChannelRegistry mySubscriptionChannelRegistry; 061 062 @Autowired 063 private IInterceptorBroadcaster myInterceptorBroadcaster; 064 065 /** 066 * Constructor 067 */ 068 public SubscriptionRegistry() { 069 super(); 070 } 071 072 public synchronized ActiveSubscription get(String theIdPart) { 073 return myActiveSubscriptionCache.get(theIdPart); 074 } 075 076 public synchronized Collection<ActiveSubscription> getAll() { 077 return myActiveSubscriptionCache.getAll(); 078 } 079 080 public synchronized List<ActiveSubscription> getTopicSubscriptionsByTopic(String theTopic) { 081 return myActiveSubscriptionCache.getTopicSubscriptionsForTopic(theTopic); 082 } 083 084 private Optional<CanonicalSubscription> hasSubscription(IIdType theId) { 085 Validate.notNull(theId); 086 Validate.notBlank(theId.getIdPart()); 087 Optional<ActiveSubscription> activeSubscription = 088 Optional.ofNullable(myActiveSubscriptionCache.get(theId.getIdPart())); 089 return activeSubscription.map(ActiveSubscription::getSubscription); 090 } 091 092 /** 093 * Extracts the retry configuration settings from the CanonicalSubscription object. 094 * 095 * Returns the configuration, or null, if no retry (or a bad retry value) 096 * is specified. 097 */ 098 private ChannelRetryConfiguration getRetryConfigurationFromSubscriptionExtensions( 099 CanonicalSubscription theSubscription) { 100 ChannelRetryConfiguration configuration = new ChannelRetryConfiguration(); 101 102 List<String> retryCount = theSubscription.getChannelExtensions(HapiExtensions.EX_RETRY_COUNT); 103 if (retryCount.size() == 1) { 104 String val = retryCount.get(0); 105 configuration.setRetryCount(Integer.parseInt(val)); 106 } 107 // else - 0 or more than 1 means no retry policy at all 108 109 // retry count is required for any retry policy 110 if (configuration.getRetryCount() == null || configuration.getRetryCount() < 0) { 111 configuration = null; 112 } 113 114 return configuration; 115 } 116 117 private void registerSubscription(IIdType theId, CanonicalSubscription theCanonicalSubscription) { 118 Validate.notNull(theId); 119 String subscriptionId = theId.getIdPart(); 120 Validate.notBlank(subscriptionId); 121 Validate.notNull(theCanonicalSubscription); 122 123 String channelName = mySubscriptionDeliveryChannelNamer.nameFromSubscription(theCanonicalSubscription); 124 125 // get the actual retry configuration 126 ChannelRetryConfiguration configuration = 127 getRetryConfigurationFromSubscriptionExtensions(theCanonicalSubscription); 128 129 ActiveSubscription activeSubscription = new ActiveSubscription(theCanonicalSubscription, channelName); 130 activeSubscription.setRetryConfiguration(configuration); 131 132 // add to our registries 133 mySubscriptionChannelRegistry.add(activeSubscription); 134 myActiveSubscriptionCache.put(subscriptionId, activeSubscription); 135 136 ourLog.info( 137 "Registered active subscription Subscription/{} - Have {} registered", 138 subscriptionId, 139 myActiveSubscriptionCache.size()); 140 141 // Interceptor call: SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED 142 HookParams params = new HookParams().add(CanonicalSubscription.class, theCanonicalSubscription); 143 myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, params); 144 } 145 146 public synchronized void unregisterSubscriptionIfRegistered(String theSubscriptionId) { 147 Validate.notNull(theSubscriptionId); 148 149 ActiveSubscription activeSubscription = myActiveSubscriptionCache.remove(theSubscriptionId); 150 if (activeSubscription != null) { 151 mySubscriptionChannelRegistry.remove(activeSubscription); 152 ourLog.info( 153 "Unregistered active subscription {} - Have {} registered", 154 theSubscriptionId, 155 myActiveSubscriptionCache.size()); 156 157 // Interceptor call: SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_UNREGISTERED 158 HookParams params = new HookParams(); 159 myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_UNREGISTERED, params); 160 } 161 } 162 163 @PreDestroy 164 public synchronized void unregisterAllSubscriptions() { 165 // Once to set flag 166 unregisterAllSubscriptionsNotInCollection(Collections.emptyList()); 167 // Twice to remove 168 unregisterAllSubscriptionsNotInCollection(Collections.emptyList()); 169 } 170 171 synchronized void unregisterAllSubscriptionsNotInCollection(Collection<String> theAllIds) { 172 173 List<String> idsToDelete = 174 myActiveSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(theAllIds); 175 for (String id : idsToDelete) { 176 unregisterSubscriptionIfRegistered(id); 177 } 178 } 179 180 public synchronized boolean registerSubscriptionUnlessAlreadyRegistered(IBaseResource theSubscription) { 181 Validate.notNull(theSubscription); 182 Optional<CanonicalSubscription> existingSubscription = hasSubscription(theSubscription.getIdElement()); 183 CanonicalSubscription newSubscription = mySubscriptionCanonicalizer.canonicalize(theSubscription); 184 185 if (existingSubscription.isPresent()) { 186 if (newSubscription.equals(existingSubscription.get())) { 187 // No changes 188 return false; 189 } 190 ourLog.info( 191 "Updating already-registered active subscription {}", 192 theSubscription.getIdElement().toUnqualified().getValue()); 193 if (channelTypeSame(existingSubscription.get(), newSubscription)) { 194 ourLog.info( 195 "Channel type is same. Updating active subscription and re-using existing channel and handlers."); 196 updateSubscription(theSubscription); 197 return true; 198 } 199 unregisterSubscriptionIfRegistered(theSubscription.getIdElement().getIdPart()); 200 } 201 if (Subscription.SubscriptionStatus.ACTIVE.equals(newSubscription.getStatus())) { 202 registerSubscription(theSubscription.getIdElement(), newSubscription); 203 return true; 204 } else { 205 return false; 206 } 207 } 208 209 private void updateSubscription(IBaseResource theSubscription) { 210 IIdType theId = theSubscription.getIdElement(); 211 Validate.notNull(theId); 212 Validate.notBlank(theId.getIdPart()); 213 ActiveSubscription activeSubscription = myActiveSubscriptionCache.get(theId.getIdPart()); 214 Validate.notNull(activeSubscription); 215 CanonicalSubscription canonicalized = mySubscriptionCanonicalizer.canonicalize(theSubscription); 216 activeSubscription.setSubscription(canonicalized); 217 218 // Interceptor call: SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED 219 HookParams params = new HookParams().add(CanonicalSubscription.class, canonicalized); 220 myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, params); 221 } 222 223 private boolean channelTypeSame( 224 CanonicalSubscription theExistingSubscription, CanonicalSubscription theNewSubscription) { 225 return theExistingSubscription.getChannelType().equals(theNewSubscription.getChannelType()); 226 } 227 228 public int size() { 229 return myActiveSubscriptionCache.size(); 230 } 231 232 public synchronized List<ActiveSubscription> getAllNonTopicSubscriptions() { 233 return myActiveSubscriptionCache.getAllNonTopicSubscriptions(); 234 } 235}