001package ca.uhn.fhir.jpa.subscription.match.registry;
002
003/*-
004 * #%L
005 * HAPI FHIR Subscription Server
006 * %%
007 * Copyright (C) 2014 - 2021 Smile CDR, Inc.
008 * %%
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *      http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 * #L%
021 */
022
023import ca.uhn.fhir.interceptor.api.HookParams;
024import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
025import ca.uhn.fhir.interceptor.api.Pointcut;
026import ca.uhn.fhir.jpa.subscription.channel.subscription.ISubscriptionDeliveryChannelNamer;
027import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelRegistry;
028import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
029import org.apache.commons.lang3.Validate;
030import org.hl7.fhir.instance.model.api.IBaseResource;
031import org.hl7.fhir.instance.model.api.IIdType;
032import org.hl7.fhir.r4.model.Subscription;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035import org.springframework.beans.factory.annotation.Autowired;
036
037import javax.annotation.PreDestroy;
038import java.util.Collection;
039import java.util.Collections;
040import java.util.List;
041import java.util.Optional;
042
043/**
044 * Cache of active subscriptions.  When a new subscription is added to the cache, a new Spring Channel is created
045 * and a new MessageHandler for that subscription is subscribed to that channel.  These subscriptions, channels, and
046 * handlers are all caches in this registry so they can be removed it the subscription is deleted.
047 */
048
049public class SubscriptionRegistry {
050        private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionRegistry.class);
051        private final ActiveSubscriptionCache myActiveSubscriptionCache = new ActiveSubscriptionCache();
052        @Autowired
053        private SubscriptionCanonicalizer mySubscriptionCanonicalizer;
054        @Autowired
055        private ISubscriptionDeliveryChannelNamer mySubscriptionDeliveryChannelNamer;
056        @Autowired
057        private SubscriptionChannelRegistry mySubscriptionChannelRegistry;
058        @Autowired
059        private IInterceptorBroadcaster myInterceptorBroadcaster;
060
061        /**
062         * Constructor
063         */
064        public SubscriptionRegistry() {
065                super();
066        }
067
068        public ActiveSubscription get(String theIdPart) {
069                return myActiveSubscriptionCache.get(theIdPart);
070        }
071
072        public Collection<ActiveSubscription> getAll() {
073                return myActiveSubscriptionCache.getAll();
074        }
075
076        private Optional<CanonicalSubscription> hasSubscription(IIdType theId) {
077                Validate.notNull(theId);
078                Validate.notBlank(theId.getIdPart());
079                Optional<ActiveSubscription> activeSubscription = Optional.ofNullable(myActiveSubscriptionCache.get(theId.getIdPart()));
080                return activeSubscription.map(ActiveSubscription::getSubscription);
081        }
082
083        private void registerSubscription(IIdType theId, IBaseResource theSubscription) {
084                Validate.notNull(theId);
085                String subscriptionId = theId.getIdPart();
086                Validate.notBlank(subscriptionId);
087                Validate.notNull(theSubscription);
088
089                CanonicalSubscription canonicalized = mySubscriptionCanonicalizer.canonicalize(theSubscription);
090
091                String channelName = mySubscriptionDeliveryChannelNamer.nameFromSubscription(canonicalized);
092
093                ActiveSubscription activeSubscription = new ActiveSubscription(canonicalized, channelName);
094                mySubscriptionChannelRegistry.add(activeSubscription);
095                myActiveSubscriptionCache.put(subscriptionId, activeSubscription);
096
097                ourLog.info("Registered active subscription Subscription/{} - Have {} registered", subscriptionId, myActiveSubscriptionCache.size());
098
099                // Interceptor call: SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED
100                HookParams params = new HookParams()
101                        .add(CanonicalSubscription.class, canonicalized);
102                myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, params);
103
104        }
105
106        public void unregisterSubscriptionIfRegistered(String theSubscriptionId) {
107                Validate.notNull(theSubscriptionId);
108
109                ActiveSubscription activeSubscription = myActiveSubscriptionCache.remove(theSubscriptionId);
110                if (activeSubscription != null) {
111                        mySubscriptionChannelRegistry.remove(activeSubscription);
112                        ourLog.info("Unregistered active subscription {} - Have {} registered", theSubscriptionId, myActiveSubscriptionCache.size());
113
114                        // Interceptor call: SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_UNREGISTERED
115                        HookParams params = new HookParams();
116                        myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_UNREGISTERED, params);
117
118                }
119        }
120
121        @PreDestroy
122        public void unregisterAllSubscriptions() {
123                // Once to set flag
124                unregisterAllSubscriptionsNotInCollection(Collections.emptyList());
125                // Twice to remove
126                unregisterAllSubscriptionsNotInCollection(Collections.emptyList());
127        }
128
129        void unregisterAllSubscriptionsNotInCollection(Collection<String> theAllIds) {
130
131                List<String> idsToDelete = myActiveSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(theAllIds);
132                for (String id : idsToDelete) {
133                        unregisterSubscriptionIfRegistered(id);
134                }
135        }
136
137        public synchronized boolean registerSubscriptionUnlessAlreadyRegistered(IBaseResource theSubscription) {
138                Optional<CanonicalSubscription> existingSubscription = hasSubscription(theSubscription.getIdElement());
139                CanonicalSubscription newSubscription = mySubscriptionCanonicalizer.canonicalize(theSubscription);
140
141                if (existingSubscription.isPresent()) {
142                        if (newSubscription.equals(existingSubscription.get())) {
143                                // No changes
144                                return false;
145                        }
146                        ourLog.info("Updating already-registered active subscription {}", theSubscription.getIdElement().toUnqualified().getValue());
147                        if (channelTypeSame(existingSubscription.get(), newSubscription)) {
148                                ourLog.info("Channel type is same.  Updating active subscription and re-using existing channel and handlers.");
149                                updateSubscription(theSubscription);
150                                return true;
151                        }
152                        unregisterSubscriptionIfRegistered(theSubscription.getIdElement().getIdPart());
153                }
154                if (Subscription.SubscriptionStatus.ACTIVE.equals(newSubscription.getStatus())) {
155                        registerSubscription(theSubscription.getIdElement(), theSubscription);
156                        return true;
157                } else {
158                        return false;
159                }
160        }
161
162        private void updateSubscription(IBaseResource theSubscription) {
163                IIdType theId = theSubscription.getIdElement();
164                Validate.notNull(theId);
165                Validate.notBlank(theId.getIdPart());
166                ActiveSubscription activeSubscription = myActiveSubscriptionCache.get(theId.getIdPart());
167                Validate.notNull(activeSubscription);
168                CanonicalSubscription canonicalized = mySubscriptionCanonicalizer.canonicalize(theSubscription);
169                activeSubscription.setSubscription(canonicalized);
170
171                // Interceptor call: SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED
172                HookParams params = new HookParams()
173                        .add(CanonicalSubscription.class, canonicalized);
174                myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, params);
175        }
176
177        private boolean channelTypeSame(CanonicalSubscription theExistingSubscription, CanonicalSubscription theNewSubscription) {
178                return theExistingSubscription.getChannelType().equals(theNewSubscription.getChannelType());
179        }
180
181        public int size() {
182                return myActiveSubscriptionCache.size();
183        }
184}