001/*-
002 * #%L
003 * HAPI FHIR Subscription Server
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber;
021
022import ca.uhn.fhir.context.FhirContext;
023import ca.uhn.fhir.interceptor.model.RequestPartitionId;
024import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
025import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
026import ca.uhn.fhir.jpa.model.config.SubscriptionSettings;
027import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
028import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
029import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
030import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
031import ca.uhn.fhir.rest.api.Constants;
032import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
033import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
034import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
035import ca.uhn.fhir.subscription.SubscriptionConstants;
036import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
037import ca.uhn.fhir.util.SubscriptionUtil;
038import jakarta.annotation.Nonnull;
039import org.hl7.fhir.dstu2.model.Subscription;
040import org.hl7.fhir.instance.model.api.IBaseResource;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043import org.springframework.beans.factory.annotation.Autowired;
044import org.springframework.messaging.Message;
045import org.springframework.messaging.MessageHandler;
046import org.springframework.messaging.MessagingException;
047
048import java.util.Optional;
049
050/**
051 * Responsible for transitioning subscription resources from REQUESTED to ACTIVE
052 * Once activated, the subscription is added to the SubscriptionRegistry.
053 * <p>
054 * Also validates criteria.  If invalid, rejects the subscription without persisting the subscription.
055 */
056public class SubscriptionActivatingSubscriber implements MessageHandler {
057        private final Logger ourLog = LoggerFactory.getLogger(SubscriptionActivatingSubscriber.class);
058
059        @Autowired
060        private FhirContext myFhirContext;
061
062        @Autowired
063        private DaoRegistry myDaoRegistry;
064
065        @Autowired
066        private SubscriptionCanonicalizer mySubscriptionCanonicalizer;
067
068        @Autowired
069        private SubscriptionSettings mySubscriptionSettings;
070
071        @Autowired
072        private IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc;
073        /**
074         * Constructor
075         */
076        public SubscriptionActivatingSubscriber() {
077                super();
078        }
079
080        @Override
081        public void handleMessage(@Nonnull Message<?> theMessage) throws MessagingException {
082                if (!(theMessage instanceof ResourceModifiedJsonMessage)) {
083                        ourLog.warn("Received message of unexpected type on matching channel: {}", theMessage);
084                        return;
085                }
086
087                ResourceModifiedMessage payload = ((ResourceModifiedJsonMessage) theMessage).getPayload();
088                if (!payload.hasPayloadType(myFhirContext, "Subscription")) {
089                        return;
090                }
091
092                switch (payload.getOperationType()) {
093                        case CREATE:
094                        case UPDATE:
095                                if (payload.getPayload(myFhirContext) == null) {
096                                        Optional<ResourceModifiedMessage> inflatedMsg =
097                                                        myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessageOrNull(
098                                                                        payload);
099                                        if (inflatedMsg.isEmpty()) {
100                                                return;
101                                        }
102                                        payload = inflatedMsg.get();
103                                }
104
105                                activateSubscriptionIfRequired(payload.getNewPayload(myFhirContext));
106                                break;
107                        case TRANSACTION:
108                        case DELETE:
109                        case MANUALLY_TRIGGERED:
110                        default:
111                                break;
112                }
113        }
114
115        /**
116         * Note: This is synchronized because this is called both by matching channel messages
117         * as well as from Subscription Loader (which periodically refreshes from the DB to make
118         * sure nothing got missed). If these two mechanisms try to activate the same subscription
119         * at the same time they can get a constraint error.
120         */
121        public synchronized boolean activateSubscriptionIfRequired(final IBaseResource theSubscription) {
122                // Grab the value for "Subscription.channel.type" so we can see if this
123                // subscriber applies.
124                CanonicalSubscriptionChannelType subscriptionChannelType =
125                                mySubscriptionCanonicalizer.getChannelType(theSubscription);
126
127                // Only activate supported subscriptions
128                if (subscriptionChannelType == null
129                                || !mySubscriptionSettings
130                                                .getSupportedSubscriptionTypes()
131                                                .contains(subscriptionChannelType.toCanonical())) {
132                        return false;
133                }
134
135                String statusString = mySubscriptionCanonicalizer.getSubscriptionStatus(theSubscription);
136
137                if (SubscriptionConstants.REQUESTED_STATUS.equals(statusString)) {
138                        return activateSubscription(theSubscription);
139                }
140
141                return false;
142        }
143
144        @SuppressWarnings("unchecked")
145        private boolean activateSubscription(final IBaseResource theSubscription) {
146                IFhirResourceDao subscriptionDao = myDaoRegistry.getSubscriptionDao();
147                SystemRequestDetails srd = SystemRequestDetails.forAllPartitions();
148
149                IBaseResource subscription = null;
150                try {
151                        // read can throw ResourceGoneException
152                        // if this happens, we will treat this as a failure to activate
153                        subscription =
154                                        subscriptionDao.read(theSubscription.getIdElement(), SystemRequestDetails.forAllPartitions());
155                        subscription.setId(subscription.getIdElement().toVersionless());
156
157                        ourLog.info(
158                                        "Activating subscription {} from status {} to {}",
159                                        subscription.getIdElement().toUnqualified().getValue(),
160                                        SubscriptionConstants.REQUESTED_STATUS,
161                                        SubscriptionConstants.ACTIVE_STATUS);
162                        SubscriptionUtil.setStatus(myFhirContext, subscription, SubscriptionConstants.ACTIVE_STATUS);
163
164                        RequestPartitionId partitionId =
165                                        (RequestPartitionId) subscription.getUserData(Constants.RESOURCE_PARTITION_ID);
166                        subscriptionDao.update(subscription, new SystemRequestDetails().setRequestPartitionId(partitionId));
167                        return true;
168                } catch (final UnprocessableEntityException | ResourceGoneException e) {
169                        subscription = subscription != null ? subscription : theSubscription;
170                        ourLog.error("Failed to activate subscription " + subscription.getIdElement() + " : " + e.getMessage());
171                        ourLog.info("Changing status of {} to ERROR", subscription.getIdElement());
172                        SubscriptionUtil.setStatus(myFhirContext, subscription, SubscriptionConstants.ERROR_STATUS);
173                        SubscriptionUtil.setReason(myFhirContext, subscription, e.getMessage());
174                        subscriptionDao.update(subscription, srd);
175                        return false;
176                }
177        }
178
179        public boolean isChannelTypeSupported(IBaseResource theSubscription) {
180                Subscription.SubscriptionChannelType channelType =
181                                mySubscriptionCanonicalizer.getChannelType(theSubscription).toCanonical();
182                return mySubscriptionSettings.getSupportedSubscriptionTypes().contains(channelType);
183        }
184}