001package ca.uhn.fhir.jpa.subscription.submit.svc;
002
003/*-
004 * #%L
005 * HAPI FHIR Subscription Server
006 * %%
007 * Copyright (C) 2014 - 2025 Smile CDR, Inc.
008 * %%
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *      http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 * #L%
021 */
022
023import ca.uhn.fhir.broker.api.ChannelProducerSettings;
024import ca.uhn.fhir.broker.api.IChannelProducer;
025import ca.uhn.fhir.broker.api.ISendResult;
026import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
027import ca.uhn.fhir.jpa.model.config.SubscriptionSettings;
028import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessage;
029import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessagePK;
030import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
031import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer;
032import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
033import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
034import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionMatcherInterceptor;
035import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
036import ca.uhn.fhir.subscription.api.IResourceModifiedConsumerWithRetries;
037import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
038import ca.uhn.fhir.util.IoUtils;
039import com.google.common.annotations.VisibleForTesting;
040import jakarta.annotation.PreDestroy;
041import org.apache.commons.lang3.Validate;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044import org.springframework.context.event.ContextRefreshedEvent;
045import org.springframework.context.event.EventListener;
046import org.springframework.messaging.MessageDeliveryException;
047import org.springframework.transaction.annotation.Propagation;
048import org.springframework.transaction.support.TransactionCallback;
049
050import static ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchingListener.SUBSCRIPTION_MATCHING_CHANNEL_NAME;
051
052/**
053 * This service provides two distinct contexts in which it submits messages to the subscription pipeline.
054 *
055 * It implements {@link IResourceModifiedConsumer} for synchronous submissions where retry upon failures is not required.
056 *
057 * It implements {@link IResourceModifiedConsumerWithRetries} for synchronous submissions performed as part of processing
058 * an operation on a resource (see {@link SubscriptionMatcherInterceptor}).  Submissions in such context require retries
059 * upon submission failure.
060 *
061 *
062 */
063public class ResourceModifiedSubmitterSvc implements IResourceModifiedConsumer, IResourceModifiedConsumerWithRetries {
064
065        private static final Logger ourLog = LoggerFactory.getLogger(ResourceModifiedSubmitterSvc.class);
066        private volatile IChannelProducer<ResourceModifiedMessage> myMatchingChannelProducer;
067
068        private final SubscriptionSettings mySubscriptionSettings;
069        private final SubscriptionChannelFactory mySubscriptionChannelFactory;
070        private final IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc;
071        private final IHapiTransactionService myHapiTransactionService;
072
073        @EventListener(classes = {ContextRefreshedEvent.class})
074        public void startIfNeeded() {
075                if (!mySubscriptionSettings.hasSupportedSubscriptionTypes()) {
076                        ourLog.debug(
077                                        "Subscriptions are disabled on this server.  Skipping {} channel creation.",
078                                        SUBSCRIPTION_MATCHING_CHANNEL_NAME);
079                        return;
080                }
081                if (myMatchingChannelProducer == null) {
082                        myMatchingChannelProducer = mySubscriptionChannelFactory.newMatchingProducer(
083                                        SUBSCRIPTION_MATCHING_CHANNEL_NAME, getChannelProducerSettings());
084                }
085        }
086
087        public ResourceModifiedSubmitterSvc(
088                        SubscriptionSettings theSubscriptionSettings,
089                        SubscriptionChannelFactory theSubscriptionChannelFactory,
090                        IResourceModifiedMessagePersistenceSvc resourceModifiedMessagePersistenceSvc,
091                        IHapiTransactionService theHapiTransactionService) {
092                mySubscriptionSettings = theSubscriptionSettings;
093                mySubscriptionChannelFactory = theSubscriptionChannelFactory;
094                myResourceModifiedMessagePersistenceSvc = resourceModifiedMessagePersistenceSvc;
095                myHapiTransactionService = theHapiTransactionService;
096        }
097
098        /**
099         * @inheritDoc Submit a message to the broker without retries.
100         * <p>
101         * Implementation of the {@link IResourceModifiedConsumer}
102         * @return the result of the send operation
103         */
104        @Override
105        public ISendResult submitResourceModified(ResourceModifiedMessage theMsg) {
106                startIfNeeded();
107
108                ourLog.trace("Sending resource modified message to processing channel");
109                Validate.notNull(
110                                myMatchingChannelProducer,
111                                "A SubscriptionMatcherInterceptor has been registered without calling start() on it.");
112                return myMatchingChannelProducer.send(new ResourceModifiedJsonMessage(theMsg));
113        }
114
115        /**
116         * This method will inflate the ResourceModifiedMessage represented by the IPersistedResourceModifiedMessage and attempts
117         * to submit it to the subscription processing pipeline.
118         *
119         * If submission succeeds, the IPersistedResourceModifiedMessage is deleted and true is returned.  In the event where submission
120         * fails, we return false and the IPersistedResourceModifiedMessage is rollback for later re-submission.
121         *
122         * @param thePersistedResourceModifiedMessage A ResourceModifiedMessage in it's IPersistedResourceModifiedMessage that requires submission.
123         * @return Whether the message was successfully submitted to the broker.
124         */
125        @Override
126        public boolean submitPersisedResourceModifiedMessage(
127                        IPersistedResourceModifiedMessage thePersistedResourceModifiedMessage) {
128                return myHapiTransactionService
129                                .withSystemRequest()
130                                .withPropagation(Propagation.REQUIRES_NEW)
131                                .execute(doProcessResourceModifiedInTransaction(thePersistedResourceModifiedMessage));
132        }
133
134        /**
135         * This method is the cornerstone in the submit and retry upon failure mechanism for messages needing submission to the subscription processing pipeline.
136         * It requires execution in a transaction for rollback of deleting the persistedResourceModifiedMessage pointed to by <code>thePersistedResourceModifiedMessage<code/>
137         * in the event where submission would fail.
138         *
139         * @param thePersistedResourceModifiedMessage the primary key pointing to the persisted version (IPersistedResourceModifiedMessage) of a ResourceModifiedMessage needing submission
140         * @return true upon successful submission, false otherwise.
141         */
142        protected TransactionCallback<Boolean> doProcessResourceModifiedInTransaction(
143                        IPersistedResourceModifiedMessage thePersistedResourceModifiedMessage) {
144                return theStatus -> {
145                        boolean processed = true;
146                        ResourceModifiedMessage resourceModifiedMessage = null;
147
148                        try {
149                                // delete the entry to lock the row to ensure unique processing
150                                boolean wasDeleted = deletePersistedResourceModifiedMessage(
151                                                thePersistedResourceModifiedMessage.getPersistedResourceModifiedMessagePk());
152
153                                // submit the resource modified message with empty payload, actual inflation is done by the matcher.
154                                resourceModifiedMessage =
155                                                createResourceModifiedMessageWithoutInflation(thePersistedResourceModifiedMessage);
156
157                                if (wasDeleted) {
158                                        submitResourceModified(resourceModifiedMessage);
159                                }
160                        } catch (MessageDeliveryException exception) {
161                                // we encountered an issue when trying to send the message so mark the transaction for rollback
162                                String payloadId = "[unknown]";
163                                String subscriptionId = "[unknown]";
164                                if (resourceModifiedMessage != null) {
165                                        payloadId = resourceModifiedMessage.getPayloadId();
166                                        subscriptionId = resourceModifiedMessage.getSubscriptionId();
167                                }
168                                ourLog.error(
169                                                "Channel submission failed for resource with id {} matching subscription with id {}.  Further attempts will be performed at later time.",
170                                                payloadId,
171                                                subscriptionId,
172                                                exception);
173                                processed = false;
174                                theStatus.setRollbackOnly();
175                        } catch (Exception ex) {
176                                // catch other errors
177                                ourLog.error(
178                                                "Unexpected error encountered while processing resource modified message. Marking as processed to prevent further errors.",
179                                                ex);
180                                processed = true;
181                        }
182
183                        return processed;
184                };
185        }
186
187        private ResourceModifiedMessage createResourceModifiedMessageWithoutInflation(
188                        IPersistedResourceModifiedMessage thePersistedResourceModifiedMessage) {
189                return myResourceModifiedMessagePersistenceSvc.createResourceModifiedMessageFromEntityWithoutInflation(
190                                thePersistedResourceModifiedMessage);
191        }
192
193        private boolean deletePersistedResourceModifiedMessage(IPersistedResourceModifiedMessagePK theResourceModifiedPK) {
194                try {
195                        // delete the entry to lock the row to ensure unique processing
196                        return myResourceModifiedMessagePersistenceSvc.deleteByPK(theResourceModifiedPK);
197                } catch (ResourceNotFoundException exception) {
198                        ourLog.warn(
199                                        "thePersistedResourceModifiedMessage with {} and version {} could not be deleted as it may have already been deleted.",
200                                        theResourceModifiedPK.getResourcePid(),
201                                        theResourceModifiedPK.getResourceVersion());
202                        // we were not able to delete the pk.  this implies that someone else did read/delete the PK and processed
203                        // the message
204                        // successfully before we did.
205
206                        return false;
207                } catch (Exception ex) {
208                        ourLog.error("Unknown exception when deleting persisted resource modified message. Returning false.", ex);
209                        return false;
210                }
211        }
212
213        private ChannelProducerSettings getChannelProducerSettings() {
214                ChannelProducerSettings channelProducerSettings = new ChannelProducerSettings();
215                channelProducerSettings.setQualifyChannelName(
216                                mySubscriptionSettings.isQualifySubscriptionMatchingChannelName());
217                return channelProducerSettings;
218        }
219
220        @PreDestroy
221        public void shutdown() throws Exception {
222                if (myMatchingChannelProducer instanceof AutoCloseable producer) {
223                        IoUtils.closeQuietly(producer, ourLog);
224                }
225        }
226
227        @VisibleForTesting
228        public IChannelProducer<ResourceModifiedMessage> getMatchingChannelProducerForUnitTest() {
229                startIfNeeded();
230                return myMatchingChannelProducer;
231        }
232}