001/*- 002 * #%L 003 * HAPI FHIR Subscription Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.subscription.submit.interceptor; 021 022import ca.uhn.fhir.jpa.subscription.async.AsyncResourceModifiedProcessingSchedulerSvc; 023import ca.uhn.fhir.jpa.subscription.channel.api.PayloadTooLargeException; 024import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer; 025import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; 026import org.springframework.beans.factory.annotation.Autowired; 027import org.springframework.messaging.MessageDeliveryException; 028import org.springframework.transaction.support.TransactionSynchronizationAdapter; 029import org.springframework.transaction.support.TransactionSynchronizationManager; 030 031/** 032 * The purpose of this interceptor is to synchronously submit ResourceModifiedMessage to the 033 * subscription processing pipeline, ie, as part of processing the operation on a resource. 034 * It is meant to replace the SubscriptionMatcherInterceptor in integrated tests where 035 * scheduling is disabled. See {@link AsyncResourceModifiedProcessingSchedulerSvc} 036 * for further details on asynchronous submissions. 037 */ 038public class SynchronousSubscriptionMatcherInterceptor extends SubscriptionMatcherInterceptor { 039 040 @Autowired 041 private IResourceModifiedConsumer myResourceModifiedConsumer; 042 043 @Override 044 protected void processResourceModifiedMessage(ResourceModifiedMessage theResourceModifiedMessage) { 045 if (TransactionSynchronizationManager.isSynchronizationActive()) { 046 TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() { 047 @Override 048 public int getOrder() { 049 return 0; 050 } 051 052 @Override 053 public void afterCommit() { 054 doSubmitResourceModified(theResourceModifiedMessage); 055 } 056 }); 057 } else { 058 doSubmitResourceModified(theResourceModifiedMessage); 059 } 060 } 061 062 /** 063 * Submit the message through the broker channel to the matcher. 064 * 065 * Note: most of our integrated tests for subscription assume we can successfully inflate the message and therefore 066 * does not run with an actual database to persist the data. In these cases, submitting the complete message (i.e. 067 * with payload) is OK. However, there are a few tests that do not assume it and do run with an actual DB. For them, 068 * we should null out the payload body before submitting. This try-catch block only covers the case where the 069 * payload is too large, which is enough for now. However, for better practice we might want to consider splitting 070 * this interceptor into two, each for tests with/without DB connection. 071 * @param theResourceModifiedMessage 072 */ 073 private void doSubmitResourceModified(ResourceModifiedMessage theResourceModifiedMessage) { 074 try { 075 myResourceModifiedConsumer.submitResourceModified(theResourceModifiedMessage); 076 } catch (MessageDeliveryException e) { 077 if (e.getCause() instanceof PayloadTooLargeException) { 078 theResourceModifiedMessage.setPayloadToNull(); 079 myResourceModifiedConsumer.submitResourceModified(theResourceModifiedMessage); 080 } 081 } 082 } 083}