001/*-
002 * #%L
003 * HAPI FHIR Subscription Server
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.subscription.match.registry;
021
022import ca.uhn.fhir.interceptor.api.HookParams;
023import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
024import ca.uhn.fhir.interceptor.api.Pointcut;
025import ca.uhn.fhir.jpa.subscription.channel.subscription.ISubscriptionDeliveryChannelNamer;
026import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry;
027import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
028import ca.uhn.fhir.jpa.subscription.model.ChannelRetryConfiguration;
029import ca.uhn.fhir.util.HapiExtensions;
030import jakarta.annotation.PreDestroy;
031import org.apache.commons.lang3.Validate;
032import org.hl7.fhir.instance.model.api.IBaseResource;
033import org.hl7.fhir.instance.model.api.IIdType;
034import org.hl7.fhir.r4.model.Subscription;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037import org.springframework.beans.factory.annotation.Autowired;
038
039import java.util.Collection;
040import java.util.Collections;
041import java.util.List;
042import java.util.Optional;
043
044/**
045 * Cache of active subscriptions.  When a new subscription is added to the cache, a new Spring Channel is created
046 * and a new MessageHandler for that subscription is subscribed to that channel.  These subscriptions, channels, and
047 * handlers are all caches in this registry so they can be removed it the subscription is deleted.
048 */
049public class SubscriptionRegistry {
050        private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionRegistry.class);
051        private final ActiveSubscriptionCache myActiveSubscriptionCache = new ActiveSubscriptionCache();
052
053        @Autowired
054        private SubscriptionCanonicalizer mySubscriptionCanonicalizer;
055
056        @Autowired
057        private ISubscriptionDeliveryChannelNamer mySubscriptionDeliveryChannelNamer;
058
059        @Autowired
060        private SubscriptionChannelRegistry mySubscriptionChannelRegistry;
061
062        @Autowired
063        private IInterceptorBroadcaster myInterceptorBroadcaster;
064
065        /**
066         * Constructor
067         */
068        public SubscriptionRegistry() {
069                super();
070        }
071
072        public synchronized ActiveSubscription get(String theIdPart) {
073                return myActiveSubscriptionCache.get(theIdPart);
074        }
075
076        public synchronized Collection<ActiveSubscription> getAll() {
077                return myActiveSubscriptionCache.getAll();
078        }
079
080        public synchronized List<ActiveSubscription> getTopicSubscriptionsByTopic(String theTopic) {
081                return myActiveSubscriptionCache.getTopicSubscriptionsForTopic(theTopic);
082        }
083
084        private Optional<CanonicalSubscription> hasSubscription(IIdType theId) {
085                Validate.notNull(theId);
086                Validate.notBlank(theId.getIdPart());
087                Optional<ActiveSubscription> activeSubscription =
088                                Optional.ofNullable(myActiveSubscriptionCache.get(theId.getIdPart()));
089                return activeSubscription.map(ActiveSubscription::getSubscription);
090        }
091
092        /**
093         * Extracts the retry configuration settings from the CanonicalSubscription object.
094         *
095         * Returns the configuration, or null, if no retry (or a bad retry value)
096         * is specified.
097         */
098        private ChannelRetryConfiguration getRetryConfigurationFromSubscriptionExtensions(
099                        CanonicalSubscription theSubscription) {
100                ChannelRetryConfiguration configuration = new ChannelRetryConfiguration();
101
102                List<String> retryCount = theSubscription.getChannelExtensions(HapiExtensions.EX_RETRY_COUNT);
103                if (retryCount.size() == 1) {
104                        String val = retryCount.get(0);
105                        configuration.setRetryCount(Integer.parseInt(val));
106                }
107                // else - 0 or more than 1 means no retry policy at all
108
109                // retry count is required for any retry policy
110                if (configuration.getRetryCount() == null || configuration.getRetryCount() < 0) {
111                        configuration = null;
112                }
113
114                return configuration;
115        }
116
117        private void registerSubscription(IIdType theId, CanonicalSubscription theCanonicalSubscription) {
118                Validate.notNull(theId);
119                String subscriptionId = theId.getIdPart();
120                Validate.notBlank(subscriptionId);
121                Validate.notNull(theCanonicalSubscription);
122
123                String channelName = mySubscriptionDeliveryChannelNamer.nameFromSubscription(theCanonicalSubscription);
124
125                // get the actual retry configuration
126                ChannelRetryConfiguration configuration =
127                                getRetryConfigurationFromSubscriptionExtensions(theCanonicalSubscription);
128
129                ActiveSubscription activeSubscription = new ActiveSubscription(theCanonicalSubscription, channelName);
130                activeSubscription.setRetryConfiguration(configuration);
131
132                // add to our registries
133                mySubscriptionChannelRegistry.add(activeSubscription);
134                myActiveSubscriptionCache.put(subscriptionId, activeSubscription);
135
136                ourLog.info(
137                                "Registered active subscription Subscription/{} - Have {} registered",
138                                subscriptionId,
139                                myActiveSubscriptionCache.size());
140
141                // Interceptor call: SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED
142                HookParams params = new HookParams().add(CanonicalSubscription.class, theCanonicalSubscription);
143                myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, params);
144        }
145
146        public synchronized void unregisterSubscriptionIfRegistered(String theSubscriptionId) {
147                Validate.notNull(theSubscriptionId);
148
149                ActiveSubscription activeSubscription = myActiveSubscriptionCache.remove(theSubscriptionId);
150                if (activeSubscription != null) {
151                        mySubscriptionChannelRegistry.remove(activeSubscription);
152                        ourLog.info(
153                                        "Unregistered active subscription {} - Have {} registered",
154                                        theSubscriptionId,
155                                        myActiveSubscriptionCache.size());
156
157                        // Interceptor call: SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_UNREGISTERED
158                        HookParams params = new HookParams();
159                        myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_UNREGISTERED, params);
160                }
161        }
162
163        @PreDestroy
164        public synchronized void unregisterAllSubscriptions() {
165                // Once to set flag
166                unregisterAllSubscriptionsNotInCollection(Collections.emptyList());
167                // Twice to remove
168                unregisterAllSubscriptionsNotInCollection(Collections.emptyList());
169        }
170
171        synchronized void unregisterAllSubscriptionsNotInCollection(Collection<String> theAllIds) {
172
173                List<String> idsToDelete =
174                                myActiveSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(theAllIds);
175                for (String id : idsToDelete) {
176                        unregisterSubscriptionIfRegistered(id);
177                }
178        }
179
180        public synchronized boolean registerSubscriptionUnlessAlreadyRegistered(IBaseResource theSubscription) {
181                Validate.notNull(theSubscription);
182                Optional<CanonicalSubscription> existingSubscription = hasSubscription(theSubscription.getIdElement());
183                CanonicalSubscription newSubscription = mySubscriptionCanonicalizer.canonicalize(theSubscription);
184
185                if (existingSubscription.isPresent()) {
186                        if (newSubscription.equals(existingSubscription.get())) {
187                                // No changes
188                                return false;
189                        }
190                        ourLog.info(
191                                        "Updating already-registered active subscription {}",
192                                        theSubscription.getIdElement().toUnqualified().getValue());
193                        if (channelTypeSame(existingSubscription.get(), newSubscription)) {
194                                ourLog.info(
195                                                "Channel type is same.  Updating active subscription and re-using existing channel and handlers.");
196                                updateSubscription(theSubscription);
197                                return true;
198                        }
199                        unregisterSubscriptionIfRegistered(theSubscription.getIdElement().getIdPart());
200                }
201                if (Subscription.SubscriptionStatus.ACTIVE.equals(newSubscription.getStatus())) {
202                        registerSubscription(theSubscription.getIdElement(), newSubscription);
203                        return true;
204                } else {
205                        return false;
206                }
207        }
208
209        private void updateSubscription(IBaseResource theSubscription) {
210                IIdType theId = theSubscription.getIdElement();
211                Validate.notNull(theId);
212                Validate.notBlank(theId.getIdPart());
213                ActiveSubscription activeSubscription = myActiveSubscriptionCache.get(theId.getIdPart());
214                Validate.notNull(activeSubscription);
215                CanonicalSubscription canonicalized = mySubscriptionCanonicalizer.canonicalize(theSubscription);
216                activeSubscription.setSubscription(canonicalized);
217
218                // Interceptor call: SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED
219                HookParams params = new HookParams().add(CanonicalSubscription.class, canonicalized);
220                myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, params);
221        }
222
223        private boolean channelTypeSame(
224                        CanonicalSubscription theExistingSubscription, CanonicalSubscription theNewSubscription) {
225                return theExistingSubscription.getChannelType().equals(theNewSubscription.getChannelType());
226        }
227
228        public int size() {
229                return myActiveSubscriptionCache.size();
230        }
231
232        public synchronized List<ActiveSubscription> getAllNonTopicSubscriptions() {
233                return myActiveSubscriptionCache.getAllNonTopicSubscriptions();
234        }
235}