001/*-
002 * #%L
003 * HAPI FHIR Subscription Server
004 * %%
005 * Copyright (C) 2014 - 2025 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.topic;
021
022import ca.uhn.fhir.context.FhirContext;
023import ca.uhn.fhir.interceptor.model.RequestPartitionId;
024import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
025import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
026import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
027import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
028import ca.uhn.fhir.rest.api.server.RequestDetails;
029import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
030import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
031import ca.uhn.fhir.util.Logs;
032import ca.uhn.hapi.converters.canonical.SubscriptionTopicCanonicalizer;
033import jakarta.annotation.Nonnull;
034import org.hl7.fhir.instance.model.api.IBaseResource;
035import org.hl7.fhir.instance.model.api.IIdType;
036import org.hl7.fhir.r5.model.Enumerations;
037import org.hl7.fhir.r5.model.SubscriptionTopic;
038import org.slf4j.Logger;
039import org.springframework.beans.factory.annotation.Autowired;
040import org.springframework.messaging.Message;
041import org.springframework.messaging.MessageHandler;
042import org.springframework.messaging.MessagingException;
043
044/**
045 * Responsible for transitioning subscription resources from REQUESTED to ACTIVE
046 * Once activated, the subscription is added to the SubscriptionRegistry.
047 * <p>
048 * Also validates criteria.  If invalid, rejects the subscription without persisting the subscription.
049 */
050public class SubscriptionTopicRegisteringSubscriber implements MessageHandler {
051        private static final Logger ourLog = Logs.getSubscriptionTopicLog();
052
053        @Autowired
054        private FhirContext myFhirContext;
055
056        @Autowired
057        private SubscriptionTopicRegistry mySubscriptionTopicRegistry;
058
059        @Autowired
060        private DaoRegistry myDaoRegistry;
061
062        /**
063         * Constructor
064         */
065        public SubscriptionTopicRegisteringSubscriber() {
066                super();
067        }
068
069        @Override
070        public void handleMessage(@Nonnull Message<?> theMessage) throws MessagingException {
071                if (!(theMessage instanceof ResourceModifiedJsonMessage)) {
072                        ourLog.warn("Received message of unexpected type on matching channel: {}", theMessage);
073                        return;
074                }
075
076                ResourceModifiedMessage payload = ((ResourceModifiedJsonMessage) theMessage).getPayload();
077
078                if (!payload.hasPayloadType(myFhirContext, "SubscriptionTopic")) {
079                        return;
080                }
081
082                switch (payload.getOperationType()) {
083                        case MANUALLY_TRIGGERED:
084                        case TRANSACTION:
085                                return;
086                        case CREATE:
087                        case UPDATE:
088                        case DELETE:
089                                break;
090                }
091
092                // We read the resource back from the DB instead of using the supplied copy for
093                // two reasons:
094                // - in order to store partition id in the userdata of the resource for partitioned subscriptions
095                // - in case we're processing out of order and a create-then-delete has been processed backwards (or vice versa)
096
097                IBaseResource payloadResource;
098                IIdType payloadId = payload.getPayloadId(myFhirContext).toUnqualifiedVersionless();
099                try {
100                        IFhirResourceDao<?> subscriptionDao = myDaoRegistry.getResourceDao("SubscriptionTopic");
101                        RequestDetails systemRequestDetails = getPartitionAwareRequestDetails(payload);
102                        payloadResource = subscriptionDao.read(payloadId, systemRequestDetails);
103                        if (payloadResource == null) {
104                                // Only for unit test
105                                payloadResource = payload.getPayload(myFhirContext);
106                        }
107                } catch (ResourceGoneException e) {
108                        mySubscriptionTopicRegistry.unregister(payloadId.getIdPart());
109                        return;
110                }
111
112                SubscriptionTopic subscriptionTopic =
113                                SubscriptionTopicCanonicalizer.canonicalizeTopic(myFhirContext, payloadResource);
114                if (subscriptionTopic.getStatus() == Enumerations.PublicationStatus.ACTIVE) {
115                        mySubscriptionTopicRegistry.register(subscriptionTopic);
116                } else {
117                        mySubscriptionTopicRegistry.unregister(payloadId.getIdPart());
118                }
119        }
120
121        /**
122         * There were some situations where the RequestDetails attempted to use the default partition
123         * and the partition name was a list containing null values (i.e. using the package installer to STORE_AND_INSTALL
124         * Subscriptions while partitioning was enabled). If any partition matches these criteria,
125         * {@link RequestPartitionId#defaultPartition()} is used to obtain the default partition.
126         */
127        private RequestDetails getPartitionAwareRequestDetails(ResourceModifiedMessage payload) {
128                RequestPartitionId payloadPartitionId = payload.getPartitionId();
129                if (payloadPartitionId == null || payloadPartitionId.isDefaultPartition()) {
130                        // This may look redundant but the package installer STORE_AND_INSTALL Subscriptions when partitioning is
131                        // enabled
132                        // creates a corrupt default partition.  This resets it to a clean one.
133                        payloadPartitionId = RequestPartitionId.defaultPartition();
134                }
135                return new SystemRequestDetails().setRequestPartitionId(payloadPartitionId);
136        }
137}