
001/*- 002 * #%L 003 * HAPI FHIR Subscription Server 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.subscription.submit.interceptor; 021 022import ca.uhn.fhir.broker.api.ISendResult; 023import ca.uhn.fhir.broker.api.PayloadTooLargeException; 024import ca.uhn.fhir.jpa.subscription.async.AsyncResourceModifiedProcessingSchedulerSvc; 025import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer; 026import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; 027import ca.uhn.fhir.util.Logs; 028import org.slf4j.Logger; 029import org.springframework.beans.factory.annotation.Autowired; 030import org.springframework.transaction.support.TransactionSynchronizationAdapter; 031import org.springframework.transaction.support.TransactionSynchronizationManager; 032 033/** 034 * The purpose of this interceptor is to synchronously submit ResourceModifiedMessage to the 035 * subscription processing pipeline, ie, as part of processing the operation on a resource. 036 * It is meant to replace the SubscriptionMatcherInterceptor in integrated tests where 037 * scheduling is disabled. See {@link AsyncResourceModifiedProcessingSchedulerSvc} 038 * for further details on asynchronous submissions. 039 */ 040public class SynchronousSubscriptionMatcherInterceptor extends SubscriptionMatcherInterceptor { 041 private static final Logger ourLog = Logs.getSubscriptionTroubleshootingLog(); 042 043 @Autowired 044 private IResourceModifiedConsumer myResourceModifiedConsumer; 045 046 @Override 047 protected void processResourceModifiedMessage(ResourceModifiedMessage theResourceModifiedMessage) { 048 if (TransactionSynchronizationManager.isSynchronizationActive()) { 049 TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() { 050 @Override 051 public int getOrder() { 052 return 0; 053 } 054 055 @Override 056 public void afterCommit() { 057 doSubmitResourceModified(theResourceModifiedMessage); 058 } 059 }); 060 } else { 061 doSubmitResourceModified(theResourceModifiedMessage); 062 } 063 } 064 065 /** 066 * Submit the message through the broker channel to the matcher. 067 * 068 * Note: most of our integrated tests for subscription assume we can successfully inflate the message and therefore 069 * does not run with an actual database to persist the data. In these cases, submitting the complete message (i.e. 070 * with payload) is OK. However, there are a few tests that do not assume it and do run with an actual DB. For them, 071 * we should null out the payload body before submitting. This try-catch block only covers the case where the 072 * payload is too large, which is enough for now. However, for better practice we might want to consider splitting 073 * this interceptor into two, each for tests with/without DB connection. 074 * @param theResourceModifiedMessage 075 * @return the outcome of sending the message to the broker 076 */ 077 private ISendResult doSubmitResourceModified(ResourceModifiedMessage theResourceModifiedMessage) { 078 try { 079 ISendResult retval = myResourceModifiedConsumer.submitResourceModified(theResourceModifiedMessage); 080 if (!retval.isSuccessful()) { 081 ourLog.warn("Failed to send message to Delivery Channel."); 082 } 083 return retval; 084 } catch (Exception e) { 085 if (e instanceof PayloadTooLargeException || e.getCause() instanceof PayloadTooLargeException) { 086 ourLog.warn( 087 "Failed to send message to Subscription Matching Channel because the payload size is larger than broker " 088 + "max message size. Retry is about to be performed without payload."); 089 theResourceModifiedMessage.setPayloadToNull(); 090 return myResourceModifiedConsumer.submitResourceModified(theResourceModifiedMessage); 091 } else { 092 ourLog.error("Failed to send message to Subscription Matching Channel", e); 093 throw e; 094 } 095 } 096 } 097}