001package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber;
002
003/*-
004 * #%L
005 * HAPI FHIR Subscription Server
006 * %%
007 * Copyright (C) 2014 - 2021 Smile CDR, Inc.
008 * %%
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *      http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 * #L%
021 */
022
023import ca.uhn.fhir.jpa.api.config.DaoConfig;
024import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
025import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
026import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator;
027import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
028import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionConstants;
029import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
030import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
031import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
032import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
033import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
034import ca.uhn.fhir.util.SubscriptionUtil;
035import org.hl7.fhir.instance.model.api.IBaseResource;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038import org.springframework.beans.factory.annotation.Autowired;
039import org.springframework.messaging.Message;
040import org.springframework.messaging.MessageHandler;
041import org.springframework.messaging.MessagingException;
042
043import javax.annotation.Nonnull;
044
045/**
046 * Responsible for transitioning subscription resources from REQUESTED to ACTIVE
047 * Once activated, the subscription is added to the SubscriptionRegistry.
048 * <p>
049 * Also validates criteria.  If invalid, rejects the subscription without persisting the subscription.
050 */
051public class SubscriptionActivatingSubscriber extends BaseSubscriberForSubscriptionResources implements MessageHandler {
052        private Logger ourLog = LoggerFactory.getLogger(SubscriptionActivatingSubscriber.class);
053        @Autowired
054        private SubscriptionRegistry mySubscriptionRegistry;
055        @Autowired
056        private DaoRegistry myDaoRegistry;
057        @Autowired
058        private SubscriptionCanonicalizer mySubscriptionCanonicalizer;
059        @Autowired
060        private DaoConfig myDaoConfig;
061        @Autowired
062        private SubscriptionStrategyEvaluator mySubscriptionStrategyEvaluator;
063
064        /**
065         * Constructor
066         */
067        public SubscriptionActivatingSubscriber() {
068                super();
069        }
070
071        @Override
072        public void handleMessage(@Nonnull Message<?> theMessage) throws MessagingException {
073                if (!(theMessage instanceof ResourceModifiedJsonMessage)) {
074                        ourLog.warn("Received message of unexpected type on matching channel: {}", theMessage);
075                        return;
076                }
077
078                ResourceModifiedMessage payload = ((ResourceModifiedJsonMessage) theMessage).getPayload();
079                if (!isSubscription(payload)) {
080                        return;
081                }
082
083                switch (payload.getOperationType()) {
084                        case CREATE:
085                        case UPDATE:
086                                activateSubscriptionIfRequired(payload.getNewPayload(myFhirContext));
087                                break;
088                        case DELETE:
089                        case MANUALLY_TRIGGERED:
090                        default:
091                                break;
092                }
093
094        }
095
096        public boolean activateSubscriptionIfRequired(final IBaseResource theSubscription) {
097                // Grab the value for "Subscription.channel.type" so we can see if this
098                // subscriber applies..
099                CanonicalSubscriptionChannelType subscriptionChannelType = mySubscriptionCanonicalizer.getChannelType(theSubscription);
100
101                // Only activate supported subscriptions
102                if (subscriptionChannelType == null || !myDaoConfig.getSupportedSubscriptionTypes().contains(subscriptionChannelType.toCanonical())) {
103                        return false;
104                }
105
106                String statusString = mySubscriptionCanonicalizer.getSubscriptionStatus(theSubscription);
107
108                if (SubscriptionConstants.REQUESTED_STATUS.equals(statusString)) {
109                        return activateSubscription(theSubscription);
110                }
111
112                return false;
113        }
114
115        @SuppressWarnings("unchecked")
116        private boolean activateSubscription(final IBaseResource theSubscription) {
117                IFhirResourceDao subscriptionDao = myDaoRegistry.getSubscriptionDao();
118                IBaseResource subscription = subscriptionDao.read(theSubscription.getIdElement());
119                subscription.setId(subscription.getIdElement().toVersionless());
120
121                ourLog.info("Activating subscription {} from status {} to {}", subscription.getIdElement().toUnqualified().getValue(), SubscriptionConstants.REQUESTED_STATUS, SubscriptionConstants.ACTIVE_STATUS);
122                try {
123                        SubscriptionUtil.setStatus(myFhirContext, subscription, SubscriptionConstants.ACTIVE_STATUS);
124                        subscriptionDao.update(subscription);
125                        return true;
126                } catch (final UnprocessableEntityException e) {
127                        ourLog.info("Changing status of {} to ERROR", subscription.getIdElement());
128                        SubscriptionUtil.setStatus(myFhirContext, subscription, "error");
129                        SubscriptionUtil.setReason(myFhirContext, subscription, e.getMessage());
130                        subscriptionDao.update(subscription);
131                        return false;
132                }
133        }
134
135}