001/*-
002 * #%L
003 * HAPI FHIR Subscription Server
004 * %%
005 * Copyright (C) 2014 - 2025 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.subscription.submit.interceptor;
021
022import ca.uhn.fhir.broker.api.ISendResult;
023import ca.uhn.fhir.broker.api.PayloadTooLargeException;
024import ca.uhn.fhir.jpa.subscription.async.AsyncResourceModifiedProcessingSchedulerSvc;
025import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer;
026import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
027import ca.uhn.fhir.util.Logs;
028import org.slf4j.Logger;
029import org.springframework.beans.factory.annotation.Autowired;
030import org.springframework.transaction.support.TransactionSynchronizationAdapter;
031import org.springframework.transaction.support.TransactionSynchronizationManager;
032
033/**
034 * The purpose of this interceptor is to synchronously submit ResourceModifiedMessage to the
035 * subscription processing pipeline, ie, as part of processing the operation on a resource.
036 * It is meant to replace the SubscriptionMatcherInterceptor in integrated tests where
037 * scheduling is disabled.  See {@link AsyncResourceModifiedProcessingSchedulerSvc}
038 * for further details on asynchronous submissions.
039 */
040public class SynchronousSubscriptionMatcherInterceptor extends SubscriptionMatcherInterceptor {
041        private static final Logger ourLog = Logs.getSubscriptionTroubleshootingLog();
042
043        @Autowired
044        private IResourceModifiedConsumer myResourceModifiedConsumer;
045
046        @Override
047        protected void processResourceModifiedMessage(ResourceModifiedMessage theResourceModifiedMessage) {
048                if (TransactionSynchronizationManager.isSynchronizationActive()) {
049                        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
050                                @Override
051                                public int getOrder() {
052                                        return 0;
053                                }
054
055                                @Override
056                                public void afterCommit() {
057                                        doSubmitResourceModified(theResourceModifiedMessage);
058                                }
059                        });
060                } else {
061                        doSubmitResourceModified(theResourceModifiedMessage);
062                }
063        }
064
065        /**
066         * Submit the message through the broker channel to the matcher.
067         *
068         * Note: most of our integrated tests for subscription assume we can successfully inflate the message and therefore
069         * does not run with an actual database to persist the data. In these cases, submitting the complete message (i.e.
070         * with payload) is OK. However, there are a few tests that do not assume it and do run with an actual DB. For them,
071         * we should null out the payload body before submitting. This try-catch block only covers the case where the
072         * payload is too large, which is enough for now. However, for better practice we might want to consider splitting
073         * this interceptor into two, each for tests with/without DB connection.
074         * @param theResourceModifiedMessage
075         * @return the outcome of sending the message to the broker
076         */
077        private ISendResult doSubmitResourceModified(ResourceModifiedMessage theResourceModifiedMessage) {
078                try {
079                        ISendResult retval = myResourceModifiedConsumer.submitResourceModified(theResourceModifiedMessage);
080                        if (!retval.isSuccessful()) {
081                                ourLog.warn("Failed to send message to Delivery Channel.");
082                        }
083                        return retval;
084                } catch (Exception e) {
085                        if (e instanceof PayloadTooLargeException || e.getCause() instanceof PayloadTooLargeException) {
086                                ourLog.warn(
087                                                "Failed to send message to Subscription Matching Channel because the payload size is larger than broker "
088                                                                + "max message size. Retry is about to be performed without payload.");
089                                theResourceModifiedMessage.setPayloadToNull();
090                                return myResourceModifiedConsumer.submitResourceModified(theResourceModifiedMessage);
091                        } else {
092                                ourLog.error("Failed to send message to Subscription Matching Channel", e);
093                                throw e;
094                        }
095                }
096        }
097}