001/*-
002 * #%L
003 * HAPI FHIR Subscription Server
004 * %%
005 * Copyright (C) 2014 - 2023 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber;
021
022import ca.uhn.fhir.context.FhirContext;
023import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
024import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
025import ca.uhn.fhir.jpa.model.entity.StorageSettings;
026import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
027import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
028import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
029import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
030import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
031import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
032import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
033import ca.uhn.fhir.subscription.SubscriptionConstants;
034import ca.uhn.fhir.util.SubscriptionUtil;
035import org.hl7.fhir.dstu2.model.Subscription;
036import org.hl7.fhir.instance.model.api.IBaseResource;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039import org.springframework.beans.factory.annotation.Autowired;
040import org.springframework.messaging.Message;
041import org.springframework.messaging.MessageHandler;
042import org.springframework.messaging.MessagingException;
043
044import javax.annotation.Nonnull;
045
046/**
047 * Responsible for transitioning subscription resources from REQUESTED to ACTIVE
048 * Once activated, the subscription is added to the SubscriptionRegistry.
049 * <p>
050 * Also validates criteria.  If invalid, rejects the subscription without persisting the subscription.
051 */
052public class SubscriptionActivatingSubscriber implements MessageHandler {
053        private final Logger ourLog = LoggerFactory.getLogger(SubscriptionActivatingSubscriber.class);
054
055        @Autowired
056        private FhirContext myFhirContext;
057
058        @Autowired
059        private DaoRegistry myDaoRegistry;
060
061        @Autowired
062        private SubscriptionCanonicalizer mySubscriptionCanonicalizer;
063
064        @Autowired
065        private StorageSettings myStorageSettings;
066
067        /**
068         * Constructor
069         */
070        public SubscriptionActivatingSubscriber() {
071                super();
072        }
073
074        @Override
075        public void handleMessage(@Nonnull Message<?> theMessage) throws MessagingException {
076                if (!(theMessage instanceof ResourceModifiedJsonMessage)) {
077                        ourLog.warn("Received message of unexpected type on matching channel: {}", theMessage);
078                        return;
079                }
080
081                ResourceModifiedMessage payload = ((ResourceModifiedJsonMessage) theMessage).getPayload();
082                if (!payload.hasPayloadType(myFhirContext, "Subscription")) {
083                        return;
084                }
085
086                switch (payload.getOperationType()) {
087                        case CREATE:
088                        case UPDATE:
089                                activateSubscriptionIfRequired(payload.getNewPayload(myFhirContext));
090                                break;
091                        case TRANSACTION:
092                        case DELETE:
093                        case MANUALLY_TRIGGERED:
094                        default:
095                                break;
096                }
097        }
098
099        /**
100         * Note: This is synchronized because this is called both by matching channel messages
101         * as well as from Subscription Loader (which periodically refreshes from the DB to make
102         * sure nothing got missed). If these two mechanisms try to activate the same subscription
103         * at the same time they can get a constraint error.
104         */
105        public synchronized boolean activateSubscriptionIfRequired(final IBaseResource theSubscription) {
106                // Grab the value for "Subscription.channel.type" so we can see if this
107                // subscriber applies..
108                CanonicalSubscriptionChannelType subscriptionChannelType =
109                                mySubscriptionCanonicalizer.getChannelType(theSubscription);
110
111                // Only activate supported subscriptions
112                if (subscriptionChannelType == null
113                                || !myStorageSettings.getSupportedSubscriptionTypes().contains(subscriptionChannelType.toCanonical())) {
114                        return false;
115                }
116
117                String statusString = mySubscriptionCanonicalizer.getSubscriptionStatus(theSubscription);
118
119                if (SubscriptionConstants.REQUESTED_STATUS.equals(statusString)) {
120                        return activateSubscription(theSubscription);
121                }
122
123                return false;
124        }
125
126        @SuppressWarnings("unchecked")
127        private boolean activateSubscription(final IBaseResource theSubscription) {
128                IFhirResourceDao subscriptionDao = myDaoRegistry.getSubscriptionDao();
129                SystemRequestDetails srd = SystemRequestDetails.forAllPartitions();
130
131                IBaseResource subscription = null;
132                try {
133                        // read can throw ResourceGoneException
134                        // if this happens, we will treat this as a failure to activate
135                        subscription =
136                                        subscriptionDao.read(theSubscription.getIdElement(), SystemRequestDetails.forAllPartitions());
137                        subscription.setId(subscription.getIdElement().toVersionless());
138
139                        ourLog.info(
140                                        "Activating subscription {} from status {} to {}",
141                                        subscription.getIdElement().toUnqualified().getValue(),
142                                        SubscriptionConstants.REQUESTED_STATUS,
143                                        SubscriptionConstants.ACTIVE_STATUS);
144                        SubscriptionUtil.setStatus(myFhirContext, subscription, SubscriptionConstants.ACTIVE_STATUS);
145                        subscriptionDao.update(subscription, srd);
146                        return true;
147                } catch (final UnprocessableEntityException | ResourceGoneException e) {
148                        subscription = subscription != null ? subscription : theSubscription;
149                        ourLog.error("Failed to activate subscription " + subscription.getIdElement() + " : " + e.getMessage());
150                        ourLog.info("Changing status of {} to ERROR", subscription.getIdElement());
151                        SubscriptionUtil.setStatus(myFhirContext, subscription, SubscriptionConstants.ERROR_STATUS);
152                        SubscriptionUtil.setReason(myFhirContext, subscription, e.getMessage());
153                        subscriptionDao.update(subscription, srd);
154                        return false;
155                }
156        }
157
158        public boolean isChannelTypeSupported(IBaseResource theSubscription) {
159                Subscription.SubscriptionChannelType channelType =
160                                mySubscriptionCanonicalizer.getChannelType(theSubscription).toCanonical();
161                return myStorageSettings.getSupportedSubscriptionTypes().contains(channelType);
162        }
163}