001package ca.uhn.fhir.jpa.subscription.submit.svc;
002
003/*-
004 * #%L
005 * HAPI FHIR Subscription Server
006 * %%
007 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
008 * %%
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *      http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 * #L%
021 */
022
023import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
024import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessage;
025import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessagePK;
026import ca.uhn.fhir.jpa.model.entity.StorageSettings;
027import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings;
028import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
029import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
030import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer;
031import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
032import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
033import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionMatcherInterceptor;
034import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
035import ca.uhn.fhir.subscription.api.IResourceModifiedConsumerWithRetries;
036import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
037import org.apache.commons.lang3.Validate;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040import org.springframework.context.event.ContextRefreshedEvent;
041import org.springframework.context.event.EventListener;
042import org.springframework.messaging.MessageChannel;
043import org.springframework.messaging.MessageDeliveryException;
044import org.springframework.transaction.annotation.Propagation;
045import org.springframework.transaction.support.TransactionCallback;
046
047import static ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchingSubscriber.SUBSCRIPTION_MATCHING_CHANNEL_NAME;
048
049/**
050 * This service provides two distinct contexts in which it submits messages to the subscription pipeline.
051 *
052 * It implements {@link IResourceModifiedConsumer} for synchronous submissions where retry upon failures is not required.
053 *
054 * It implements {@link IResourceModifiedConsumerWithRetries} for synchronous submissions performed as part of processing
055 * an operation on a resource (see {@link SubscriptionMatcherInterceptor}).  Submissions in such context require retries
056 * upon submission failure.
057 *
058 *
059 */
060public class ResourceModifiedSubmitterSvc implements IResourceModifiedConsumer, IResourceModifiedConsumerWithRetries {
061
062        private static final Logger ourLog = LoggerFactory.getLogger(ResourceModifiedSubmitterSvc.class);
063        private volatile MessageChannel myMatchingChannel;
064
065        private final StorageSettings myStorageSettings;
066        private final SubscriptionChannelFactory mySubscriptionChannelFactory;
067        private final IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc;
068        private final IHapiTransactionService myHapiTransactionService;
069
070        @EventListener(classes = {ContextRefreshedEvent.class})
071        public void startIfNeeded() {
072                if (!myStorageSettings.hasSupportedSubscriptionTypes()) {
073                        ourLog.debug(
074                                        "Subscriptions are disabled on this server.  Skipping {} channel creation.",
075                                        SUBSCRIPTION_MATCHING_CHANNEL_NAME);
076                        return;
077                }
078                if (myMatchingChannel == null) {
079                        myMatchingChannel = mySubscriptionChannelFactory.newMatchingSendingChannel(
080                                        SUBSCRIPTION_MATCHING_CHANNEL_NAME, getChannelProducerSettings());
081                }
082        }
083
084        public ResourceModifiedSubmitterSvc(
085                        StorageSettings theStorageSettings,
086                        SubscriptionChannelFactory theSubscriptionChannelFactory,
087                        IResourceModifiedMessagePersistenceSvc resourceModifiedMessagePersistenceSvc,
088                        IHapiTransactionService theHapiTransactionService) {
089                myStorageSettings = theStorageSettings;
090                mySubscriptionChannelFactory = theSubscriptionChannelFactory;
091                myResourceModifiedMessagePersistenceSvc = resourceModifiedMessagePersistenceSvc;
092                myHapiTransactionService = theHapiTransactionService;
093        }
094
095        /**
096         * @inheritDoc
097         * Submit a message to the broker without retries.
098         *
099         * Implementation of the {@link IResourceModifiedConsumer}
100         *
101         */
102        @Override
103        public void submitResourceModified(ResourceModifiedMessage theMsg) {
104                startIfNeeded();
105
106                ourLog.trace("Sending resource modified message to processing channel");
107                Validate.notNull(
108                                myMatchingChannel,
109                                "A SubscriptionMatcherInterceptor has been registered without calling start() on it.");
110                myMatchingChannel.send(new ResourceModifiedJsonMessage(theMsg));
111        }
112
113        /**
114         * This method will inflate the ResourceModifiedMessage represented by the IPersistedResourceModifiedMessage and attempts
115         * to submit it to the subscription processing pipeline.
116         *
117         * If submission succeeds, the IPersistedResourceModifiedMessage is deleted and true is returned.  In the event where submission
118         * fails, we return false and the IPersistedResourceModifiedMessage is rollback for later re-submission.
119         *
120         * @param thePersistedResourceModifiedMessage A ResourceModifiedMessage in it's IPersistedResourceModifiedMessage that requires submission.
121         * @return Whether the message was successfully submitted to the broker.
122         */
123        @Override
124        public boolean submitPersisedResourceModifiedMessage(
125                        IPersistedResourceModifiedMessage thePersistedResourceModifiedMessage) {
126                return myHapiTransactionService
127                                .withSystemRequest()
128                                .withPropagation(Propagation.REQUIRES_NEW)
129                                .execute(doProcessResourceModifiedInTransaction(thePersistedResourceModifiedMessage));
130        }
131
132        /**
133         * This method is the cornerstone in the submit and retry upon failure mechanism for messages needing submission to the subscription processing pipeline.
134         * It requires execution in a transaction for rollback of deleting the persistedResourceModifiedMessage pointed to by <code>thePersistedResourceModifiedMessage<code/>
135         * in the event where submission would fail.
136         *
137         * @param thePersistedResourceModifiedMessage the primary key pointing to the persisted version (IPersistedResourceModifiedMessage) of a ResourceModifiedMessage needing submission
138         * @return true upon successful submission, false otherwise.
139         */
140        protected TransactionCallback<Boolean> doProcessResourceModifiedInTransaction(
141                        IPersistedResourceModifiedMessage thePersistedResourceModifiedMessage) {
142                return theStatus -> {
143                        boolean processed = true;
144                        ResourceModifiedMessage resourceModifiedMessage = null;
145
146                        try {
147                                // delete the entry to lock the row to ensure unique processing
148                                boolean wasDeleted = deletePersistedResourceModifiedMessage(
149                                                thePersistedResourceModifiedMessage.getPersistedResourceModifiedMessagePk());
150
151                                // submit the resource modified message with empty payload, actual inflation is done by the matcher.
152                                resourceModifiedMessage =
153                                                createResourceModifiedMessageWithoutInflation(thePersistedResourceModifiedMessage);
154
155                                if (wasDeleted) {
156                                        submitResourceModified(resourceModifiedMessage);
157                                }
158                        } catch (MessageDeliveryException exception) {
159                                // we encountered an issue when trying to send the message so mark the transaction for rollback
160                                String payloadId = "[unknown]";
161                                String subscriptionId = "[unknown]";
162                                if (resourceModifiedMessage != null) {
163                                        payloadId = resourceModifiedMessage.getPayloadId();
164                                        subscriptionId = resourceModifiedMessage.getSubscriptionId();
165                                }
166                                ourLog.error(
167                                                "Channel submission failed for resource with id {} matching subscription with id {}.  Further attempts will be performed at later time.",
168                                                payloadId,
169                                                subscriptionId,
170                                                exception);
171                                processed = false;
172                                theStatus.setRollbackOnly();
173                        } catch (Exception ex) {
174                                // catch other errors
175                                ourLog.error(
176                                                "Unexpected error encountered while processing resource modified message. Marking as processed to prevent further errors.",
177                                                ex);
178                                processed = true;
179                        }
180
181                        return processed;
182                };
183        }
184
185        private ResourceModifiedMessage createResourceModifiedMessageWithoutInflation(
186                        IPersistedResourceModifiedMessage thePersistedResourceModifiedMessage) {
187                return myResourceModifiedMessagePersistenceSvc.createResourceModifiedMessageFromEntityWithoutInflation(
188                                thePersistedResourceModifiedMessage);
189        }
190
191        private boolean deletePersistedResourceModifiedMessage(IPersistedResourceModifiedMessagePK theResourceModifiedPK) {
192                try {
193                        // delete the entry to lock the row to ensure unique processing
194                        return myResourceModifiedMessagePersistenceSvc.deleteByPK(theResourceModifiedPK);
195                } catch (ResourceNotFoundException exception) {
196                        ourLog.warn(
197                                        "thePersistedResourceModifiedMessage with {} and version {} could not be deleted as it may have already been deleted.",
198                                        theResourceModifiedPK.getResourcePid(),
199                                        theResourceModifiedPK.getResourceVersion());
200                        // we were not able to delete the pk.  this implies that someone else did read/delete the PK and processed
201                        // the message
202                        // successfully before we did.
203
204                        return false;
205                } catch (Exception ex) {
206                        ourLog.error("Unknown exception when deleting persisted resource modified message. Returning false.", ex);
207                        return false;
208                }
209        }
210
211        private ChannelProducerSettings getChannelProducerSettings() {
212                ChannelProducerSettings channelProducerSettings = new ChannelProducerSettings();
213                channelProducerSettings.setQualifyChannelName(myStorageSettings.isQualifySubscriptionMatchingChannelName());
214                return channelProducerSettings;
215        }
216
217        public IChannelProducer getProcessingChannelForUnitTest() {
218                startIfNeeded();
219                return (IChannelProducer) myMatchingChannel;
220        }
221}