001package ca.uhn.fhir.jpa.subscription.match.registry;
002
003/*-
004 * #%L
005 * HAPI FHIR Subscription Server
006 * %%
007 * Copyright (C) 2014 - 2021 Smile CDR, Inc.
008 * %%
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *      http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 * #L%
021 */
022
023import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
024import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
025import ca.uhn.fhir.jpa.cache.IResourceChangeEvent;
026import ca.uhn.fhir.jpa.cache.IResourceChangeListener;
027import ca.uhn.fhir.jpa.cache.IResourceChangeListenerCache;
028import ca.uhn.fhir.jpa.cache.IResourceChangeListenerRegistry;
029import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
030import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
031import ca.uhn.fhir.rest.server.util.ISearchParamRegistry;
032import ca.uhn.fhir.jpa.searchparam.retry.Retrier;
033import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionActivatingSubscriber;
034import ca.uhn.fhir.rest.api.server.IBundleProvider;
035import ca.uhn.fhir.rest.param.TokenOrListParam;
036import ca.uhn.fhir.rest.param.TokenParam;
037import com.google.common.annotations.VisibleForTesting;
038import org.apache.commons.lang3.time.DateUtils;
039import org.hl7.fhir.instance.model.api.IBaseResource;
040import org.hl7.fhir.instance.model.api.IIdType;
041import org.hl7.fhir.r4.model.Subscription;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044import org.springframework.beans.factory.annotation.Autowired;
045
046import javax.annotation.Nonnull;
047import javax.annotation.PostConstruct;
048import javax.annotation.PreDestroy;
049import java.util.Collection;
050import java.util.HashSet;
051import java.util.List;
052import java.util.Set;
053import java.util.concurrent.Semaphore;
054import java.util.stream.Collectors;
055
056
057public class SubscriptionLoader implements IResourceChangeListener {
058        private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionLoader.class);
059        private static final int MAX_RETRIES = 60; // 60 * 5 seconds = 5 minutes
060        private static long REFRESH_INTERVAL = DateUtils.MILLIS_PER_MINUTE;
061
062        private final Object mySyncSubscriptionsLock = new Object();
063        @Autowired
064        private SubscriptionRegistry mySubscriptionRegistry;
065        @Autowired
066        DaoRegistry myDaoRegistry;
067        private Semaphore mySyncSubscriptionsSemaphore = new Semaphore(1);
068        @Autowired
069        private ISchedulerService mySchedulerService;
070        @Autowired
071        private SubscriptionActivatingSubscriber mySubscriptionActivatingInterceptor;
072        @Autowired
073        private ISearchParamRegistry mySearchParamRegistry;
074        @Autowired
075        private IResourceChangeListenerRegistry myResourceChangeListenerRegistry;
076
077        private SearchParameterMap mySearchParameterMap;
078
079        /**
080         * Constructor
081         */
082        public SubscriptionLoader() {
083                super();
084        }
085
086        @PostConstruct
087        public void registerListener() {
088                mySearchParameterMap = getSearchParameterMap();
089                IResourceChangeListenerCache subscriptionCache = myResourceChangeListenerRegistry.registerResourceResourceChangeListener("Subscription", mySearchParameterMap, this, REFRESH_INTERVAL);
090                subscriptionCache.forceRefresh();
091        }
092
093        @PreDestroy
094        public void unregisterListener() {
095                myResourceChangeListenerRegistry.unregisterResourceResourceChangeListener(this);
096        }
097
098        private boolean subscriptionsDaoExists() {
099                return myDaoRegistry != null && myDaoRegistry.isResourceTypeSupported("Subscription");
100        }
101
102        /**
103         * Read the existing subscriptions from the database
104         */
105        public void syncSubscriptions() {
106                if (!subscriptionsDaoExists()) {
107                        return;
108                }
109                if (!mySyncSubscriptionsSemaphore.tryAcquire()) {
110                        return;
111                }
112                try {
113                        doSyncSubscriptionsWithRetry();
114                } finally {
115                        mySyncSubscriptionsSemaphore.release();
116                }
117        }
118
119        @VisibleForTesting
120        public void acquireSemaphoreForUnitTest() throws InterruptedException {
121                mySyncSubscriptionsSemaphore.acquire();
122        }
123
124        @VisibleForTesting
125        public int doSyncSubscriptionsForUnitTest() {
126                // Two passes for delete flag to take effect
127                int first = doSyncSubscriptionsWithRetry();
128                int second = doSyncSubscriptionsWithRetry();
129                return first + second;
130        }
131
132        synchronized int doSyncSubscriptionsWithRetry() {
133                Retrier<Integer> syncSubscriptionRetrier = new Retrier<>(this::doSyncSubscriptions, MAX_RETRIES);
134                return syncSubscriptionRetrier.runWithRetry();
135        }
136
137        private int doSyncSubscriptions() {
138                if (mySchedulerService.isStopping()) {
139                        return 0;
140                }
141
142                synchronized (mySyncSubscriptionsLock) {
143                        ourLog.debug("Starting sync subscriptions");
144
145                        IBundleProvider subscriptionBundleList =  getSubscriptionDao().search(mySearchParameterMap);
146
147                        Integer subscriptionCount = subscriptionBundleList.size();
148                        assert subscriptionCount != null;
149                        if (subscriptionCount >= SubscriptionConstants.MAX_SUBSCRIPTION_RESULTS) {
150                                ourLog.error("Currently over " + SubscriptionConstants.MAX_SUBSCRIPTION_RESULTS + " subscriptions.  Some subscriptions have not been loaded.");
151                        }
152
153                        List<IBaseResource> resourceList = subscriptionBundleList.getResources(0, subscriptionCount);
154
155                        return updateSubscriptionRegistry(resourceList);
156                }
157        }
158
159        private IFhirResourceDao<?> getSubscriptionDao() {
160                return myDaoRegistry.getSubscriptionDao();
161        }
162
163        @Nonnull
164        private SearchParameterMap getSearchParameterMap() {
165                SearchParameterMap map = new SearchParameterMap();
166
167                if (mySearchParamRegistry.getActiveSearchParam("Subscription", "status") != null) {
168                        map.add(Subscription.SP_STATUS, new TokenOrListParam()
169                                .addOr(new TokenParam(null, Subscription.SubscriptionStatus.REQUESTED.toCode()))
170                                .addOr(new TokenParam(null, Subscription.SubscriptionStatus.ACTIVE.toCode())));
171                }
172                map.setLoadSynchronousUpTo(SubscriptionConstants.MAX_SUBSCRIPTION_RESULTS);
173                return map;
174        }
175
176        private int updateSubscriptionRegistry(List<IBaseResource> theResourceList) {
177                Set<String> allIds = new HashSet<>();
178                int activatedCount = 0;
179                int registeredCount = 0;
180
181                for (IBaseResource resource : theResourceList) {
182                        String nextId = resource.getIdElement().getIdPart();
183                        allIds.add(nextId);
184
185                        boolean activated = mySubscriptionActivatingInterceptor.activateSubscriptionIfRequired(resource);
186                        if (activated) {
187                                activatedCount++;
188                        }
189
190                        boolean registered = mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(resource);
191                        if (registered) {
192                                registeredCount++;
193                        }
194                }
195
196                mySubscriptionRegistry.unregisterAllSubscriptionsNotInCollection(allIds);
197                ourLog.debug("Finished sync subscriptions - activated {} and registered {}", theResourceList.size(), registeredCount);
198                return activatedCount;
199        }
200
201        @Override
202        public void handleInit(Collection<IIdType> theResourceIds) {
203                if (!subscriptionsDaoExists()) {
204                        ourLog.warn("Subsriptions are enabled on this server, but there is no Subscription DAO configured.");
205                        return;
206                }
207                IFhirResourceDao<?> subscriptionDao = getSubscriptionDao();
208                List<IBaseResource> resourceList = theResourceIds.stream().map(subscriptionDao::read).collect(Collectors.toList());
209                updateSubscriptionRegistry(resourceList);
210        }
211
212        @Override
213        public void handleChange(IResourceChangeEvent theResourceChangeEvent) {
214                // For now ignore the contents of theResourceChangeEvent.  In the future, consider updating the registry based on
215                // known subscriptions that have been created, updated & deleted
216                syncSubscriptions();
217        }
218}
219