001/*-
002 * #%L
003 * HAPI FHIR Subscription Server
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.subscription.match.registry;
021
022import ca.uhn.fhir.cache.BaseResourceCacheSynchronizer;
023import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
024import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionActivatingSubscriber;
025import ca.uhn.fhir.rest.param.TokenOrListParam;
026import ca.uhn.fhir.rest.param.TokenParam;
027import ca.uhn.fhir.rest.server.util.ISearchParamRegistry;
028import ca.uhn.fhir.subscription.SubscriptionConstants;
029import com.google.common.annotations.VisibleForTesting;
030import jakarta.annotation.Nonnull;
031import org.apache.commons.lang3.StringUtils;
032import org.hl7.fhir.instance.model.api.IBaseResource;
033import org.hl7.fhir.r4.model.Subscription;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036import org.springframework.beans.factory.annotation.Autowired;
037
038import java.util.HashSet;
039import java.util.List;
040import java.util.Set;
041
042public class SubscriptionLoader extends BaseResourceCacheSynchronizer {
043        private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionLoader.class);
044
045        @Autowired
046        private SubscriptionRegistry mySubscriptionRegistry;
047
048        @Autowired
049        private SubscriptionActivatingSubscriber mySubscriptionActivatingInterceptor;
050
051        @Autowired
052        private SubscriptionCanonicalizer mySubscriptionCanonicalizer;
053
054        @Autowired
055        protected ISearchParamRegistry mySearchParamRegistry;
056
057        /**
058         * Constructor
059         */
060        public SubscriptionLoader() {
061                super("Subscription");
062        }
063
064        @VisibleForTesting
065        public int doSyncSubscriptionsForUnitTest() {
066                return super.doSyncResourcesForUnitTest();
067        }
068
069        @Override
070        @Nonnull
071        protected SearchParameterMap getSearchParameterMap() {
072                SearchParameterMap map = new SearchParameterMap();
073
074                if (mySearchParamRegistry.getActiveSearchParam(
075                                                "Subscription", "status", ISearchParamRegistry.SearchParamLookupContextEnum.ALL)
076                                != null) {
077                        map.add(
078                                        Subscription.SP_STATUS,
079                                        new TokenOrListParam()
080                                                        .addOr(new TokenParam(null, Subscription.SubscriptionStatus.REQUESTED.toCode()))
081                                                        .addOr(new TokenParam(null, Subscription.SubscriptionStatus.ACTIVE.toCode())));
082                }
083                map.setLoadSynchronousUpTo(SubscriptionConstants.MAX_SUBSCRIPTION_RESULTS);
084                return map;
085        }
086
087        @Override
088        protected void handleInit(List<IBaseResource> resourceList) {
089                updateSubscriptionRegistry(resourceList);
090        }
091
092        @Override
093        protected int syncResourcesIntoCache(List<IBaseResource> resourceList) {
094                return updateSubscriptionRegistry(resourceList);
095        }
096
097        private int updateSubscriptionRegistry(List<IBaseResource> theResourceList) {
098                Set<String> allIds = new HashSet<>();
099                int activatedCount = 0;
100                int registeredCount = 0;
101
102                for (IBaseResource resource : theResourceList) {
103                        String nextId = resource.getIdElement().getIdPart();
104                        allIds.add(nextId);
105
106                        boolean activated = activateSubscriptionIfRequested(resource);
107                        if (activated) {
108                                ++activatedCount;
109                        }
110
111                        boolean registered = mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(resource);
112                        if (registered) {
113                                registeredCount++;
114                        }
115                }
116
117                mySubscriptionRegistry.unregisterAllSubscriptionsNotInCollection(allIds);
118                ourLog.debug(
119                                "Finished sync subscriptions - activated {} and registered {}",
120                                theResourceList.size(),
121                                registeredCount);
122                return activatedCount;
123        }
124
125        /**
126         * Check status of theSubscription and update to "active" if needed.
127         * @return true if activated
128         */
129        private boolean activateSubscriptionIfRequested(IBaseResource theSubscription) {
130                boolean successfullyActivated = false;
131
132                if (SubscriptionConstants.REQUESTED_STATUS.equals(
133                                mySubscriptionCanonicalizer.getSubscriptionStatus(theSubscription))) {
134                        if (mySubscriptionActivatingInterceptor.isChannelTypeSupported(theSubscription)) {
135                                // internally, subscriptions that cannot activate will be set to error
136                                if (mySubscriptionActivatingInterceptor.activateSubscriptionIfRequired(theSubscription)) {
137                                        successfullyActivated = true;
138                                } else {
139                                        logSubscriptionNotActivatedPlusErrorIfPossible(theSubscription);
140                                }
141                        } else {
142                                ourLog.debug(
143                                                "Could not activate subscription {} because channel type {} is not supported.",
144                                                theSubscription.getIdElement(),
145                                                mySubscriptionCanonicalizer.getChannelType(theSubscription));
146                        }
147                }
148
149                return successfullyActivated;
150        }
151
152        /**
153         * Logs
154         *
155         * @param theSubscription
156         */
157        private void logSubscriptionNotActivatedPlusErrorIfPossible(IBaseResource theSubscription) {
158                String error;
159                if (theSubscription instanceof Subscription) {
160                        error = ((Subscription) theSubscription).getError();
161                } else if (theSubscription instanceof org.hl7.fhir.dstu3.model.Subscription) {
162                        error = ((org.hl7.fhir.dstu3.model.Subscription) theSubscription).getError();
163                } else if (theSubscription instanceof org.hl7.fhir.dstu2.model.Subscription) {
164                        error = ((org.hl7.fhir.dstu2.model.Subscription) theSubscription).getError();
165                } else {
166                        error = "";
167                }
168                ourLog.error(
169                                "Subscription {} could not be activated. "
170                                                + "This will not prevent startup, but it could lead to undesirable outcomes! {}",
171                                theSubscription.getIdElement().getIdPart(),
172                                (StringUtils.isBlank(error) ? "" : "Error: " + error));
173        }
174
175        public void syncSubscriptions() {
176                super.syncDatabaseToCache();
177        }
178}