001/*-
002 * #%L
003 * HAPI FHIR Subscription Server
004 * %%
005 * Copyright (C) 2014 - 2025 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.subscription.match.registry;
021
022import ca.uhn.fhir.interceptor.api.HookParams;
023import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
024import ca.uhn.fhir.interceptor.api.Pointcut;
025import ca.uhn.fhir.jpa.subscription.channel.subscription.ISubscriptionDeliveryChannelNamer;
026import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry;
027import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
028import ca.uhn.fhir.jpa.subscription.model.ChannelRetryConfiguration;
029import ca.uhn.fhir.util.HapiExtensions;
030import jakarta.annotation.PreDestroy;
031import org.apache.commons.lang3.Validate;
032import org.hl7.fhir.instance.model.api.IBaseResource;
033import org.hl7.fhir.instance.model.api.IIdType;
034import org.hl7.fhir.r4.model.Subscription;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037import org.springframework.beans.factory.annotation.Autowired;
038
039import java.util.Collection;
040import java.util.Collections;
041import java.util.List;
042import java.util.Optional;
043
044/**
045 * Cache of active subscriptions.  When a new subscription is added to the cache, a new Spring Channel is created
046 * and a new MessageHandler for that subscription is subscribed to that channel.  These subscriptions, channels, and
047 * handlers are all caches in this registry so they can be removed it the subscription is deleted.
048 */
049public class SubscriptionRegistry {
050        private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionRegistry.class);
051        private final ActiveSubscriptionCache myActiveSubscriptionCache = new ActiveSubscriptionCache();
052
053        @Autowired
054        private SubscriptionCanonicalizer mySubscriptionCanonicalizer;
055
056        @Autowired
057        private ISubscriptionDeliveryChannelNamer mySubscriptionDeliveryChannelNamer;
058
059        @Autowired
060        private SubscriptionChannelRegistry mySubscriptionChannelRegistry;
061
062        @Autowired
063        private IInterceptorBroadcaster myInterceptorBroadcaster;
064
065        /**
066         * Constructor
067         */
068        public SubscriptionRegistry() {
069                super();
070        }
071
072        public ActiveSubscription get(String theIdPart) {
073                return myActiveSubscriptionCache.get(theIdPart);
074        }
075
076        public Collection<ActiveSubscription> getAll() {
077                return myActiveSubscriptionCache.getAll();
078        }
079
080        public List<ActiveSubscription> getTopicSubscriptionsByTopic(String theTopic) {
081                return myActiveSubscriptionCache.getTopicSubscriptionsForTopic(theTopic);
082        }
083
084        private Optional<CanonicalSubscription> hasSubscription(IIdType theId) {
085                Validate.notNull(theId, "theId must not be null");
086                Validate.notBlank(theId.getIdPart(), "theId must have an ID part");
087                Optional<ActiveSubscription> activeSubscription =
088                                Optional.ofNullable(myActiveSubscriptionCache.get(theId.getIdPart()));
089                return activeSubscription.map(ActiveSubscription::getSubscription);
090        }
091
092        /**
093         * Extracts the retry configuration settings from the CanonicalSubscription object.
094         *
095         * @return the configuration, or null, if no retry (or a bad retry value) is specified.
096         */
097        private ChannelRetryConfiguration getRetryConfigurationFromSubscriptionExtensions(
098                        CanonicalSubscription theSubscription) {
099                ChannelRetryConfiguration configuration = new ChannelRetryConfiguration();
100
101                List<String> retryCount = theSubscription.getChannelExtensions(HapiExtensions.EX_RETRY_COUNT);
102                if (retryCount.size() == 1) {
103                        String val = retryCount.get(0);
104                        configuration.setRetryCount(Integer.parseInt(val));
105                }
106                // else - 0 or more than 1 means no retry policy at all
107
108                // retry count is required for any retry policy
109                if (configuration.getRetryCount() == null || configuration.getRetryCount() < 0) {
110                        configuration = null;
111                }
112
113                return configuration;
114        }
115
116        private void registerSubscription(IIdType theId, CanonicalSubscription theCanonicalSubscription) {
117                Validate.notNull(theId, "theId must not be null");
118                String subscriptionId = theId.getIdPart();
119                Validate.notBlank(subscriptionId, "theId must have an ID part");
120                Validate.notNull(theCanonicalSubscription, "theCanonicalSubscription must not be null");
121
122                String channelName = mySubscriptionDeliveryChannelNamer.nameFromSubscription(theCanonicalSubscription);
123
124                // get the actual retry configuration
125                ChannelRetryConfiguration configuration =
126                                getRetryConfigurationFromSubscriptionExtensions(theCanonicalSubscription);
127
128                ActiveSubscription activeSubscription = new ActiveSubscription(theCanonicalSubscription, channelName);
129                activeSubscription.setRetryConfiguration(configuration);
130
131                // add to our registries
132                mySubscriptionChannelRegistry.add(activeSubscription);
133                myActiveSubscriptionCache.put(subscriptionId, activeSubscription);
134
135                ourLog.info(
136                                "Registered active subscription Subscription/{} - Have {} registered",
137                                subscriptionId,
138                                myActiveSubscriptionCache.size());
139
140                // Interceptor call: SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED
141                HookParams params = new HookParams().add(CanonicalSubscription.class, theCanonicalSubscription);
142                myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, params);
143        }
144
145        public void unregisterSubscriptionIfRegistered(String theSubscriptionId) {
146                Validate.notNull(theSubscriptionId, "theSubscriptionId must not be null");
147
148                ActiveSubscription activeSubscription = myActiveSubscriptionCache.remove(theSubscriptionId);
149                if (activeSubscription != null) {
150                        mySubscriptionChannelRegistry.remove(activeSubscription);
151                        ourLog.info(
152                                        "Unregistered active subscription {} - Have {} registered",
153                                        theSubscriptionId,
154                                        myActiveSubscriptionCache.size());
155
156                        // Interceptor call: SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_UNREGISTERED
157                        HookParams params = new HookParams();
158                        myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_UNREGISTERED, params);
159                }
160        }
161
162        @PreDestroy
163        public void unregisterAllSubscriptions() {
164                // Once to set flag
165                unregisterAllSubscriptionsNotInCollection(Collections.emptyList());
166                // Twice to remove
167                unregisterAllSubscriptionsNotInCollection(Collections.emptyList());
168        }
169
170        void unregisterAllSubscriptionsNotInCollection(Collection<String> theAllIds) {
171
172                List<String> idsToDelete =
173                                myActiveSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(theAllIds);
174                for (String id : idsToDelete) {
175                        unregisterSubscriptionIfRegistered(id);
176                }
177        }
178
179        public synchronized boolean registerSubscriptionUnlessAlreadyRegistered(IBaseResource theSubscription) {
180                Validate.notNull(theSubscription, "theSubscription must not be null");
181                Optional<CanonicalSubscription> existingSubscription = hasSubscription(theSubscription.getIdElement());
182                CanonicalSubscription newSubscription = mySubscriptionCanonicalizer.canonicalize(theSubscription);
183
184                if (existingSubscription.isPresent()) {
185                        if (newSubscription.equals(existingSubscription.get())) {
186                                // No changes
187                                return false;
188                        }
189                        ourLog.info(
190                                        "Updating already-registered active subscription {}",
191                                        theSubscription.getIdElement().toUnqualified().getValue());
192                        if (channelTypeSame(existingSubscription.get(), newSubscription)) {
193                                ourLog.info(
194                                                "Channel type is same.  Updating active subscription and re-using existing channel and handlers.");
195                                updateSubscription(theSubscription);
196                                return true;
197                        }
198                        unregisterSubscriptionIfRegistered(theSubscription.getIdElement().getIdPart());
199                }
200                if (Subscription.SubscriptionStatus.ACTIVE.equals(newSubscription.getStatus())) {
201                        registerSubscription(theSubscription.getIdElement(), newSubscription);
202                        return true;
203                } else {
204                        return false;
205                }
206        }
207
208        private void updateSubscription(IBaseResource theSubscription) {
209                IIdType theId = theSubscription.getIdElement();
210                Validate.notNull(theId, "theId must not be null");
211                Validate.notBlank(theId.getIdPart(), "theId must have an ID part");
212                ActiveSubscription activeSubscription = myActiveSubscriptionCache.get(theId.getIdPart());
213                Validate.notNull(activeSubscription, "Subscription with ID %s not found in cache", theId.getIdPart());
214                CanonicalSubscription canonicalized = mySubscriptionCanonicalizer.canonicalize(theSubscription);
215                activeSubscription.setSubscription(canonicalized);
216
217                // Interceptor call: SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED
218                HookParams params = new HookParams().add(CanonicalSubscription.class, canonicalized);
219                myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, params);
220        }
221
222        private boolean channelTypeSame(
223                        CanonicalSubscription theExistingSubscription, CanonicalSubscription theNewSubscription) {
224                return theExistingSubscription.getChannelType().equals(theNewSubscription.getChannelType());
225        }
226
227        public int size() {
228                return myActiveSubscriptionCache.size();
229        }
230
231        public List<ActiveSubscription> getAllNonTopicSubscriptions() {
232                return myActiveSubscriptionCache.getAllNonTopicSubscriptions();
233        }
234}