
001package ca.uhn.fhir.jpa.subscription.submit.svc; 002 003/*- 004 * #%L 005 * HAPI FHIR Subscription Server 006 * %% 007 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 008 * %% 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 * #L% 021 */ 022 023import ca.uhn.fhir.broker.api.ChannelProducerSettings; 024import ca.uhn.fhir.broker.api.IChannelProducer; 025import ca.uhn.fhir.broker.api.ISendResult; 026import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; 027import ca.uhn.fhir.jpa.model.config.SubscriptionSettings; 028import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessage; 029import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessagePK; 030import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory; 031import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer; 032import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; 033import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; 034import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionMatcherInterceptor; 035import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException; 036import ca.uhn.fhir.subscription.api.IResourceModifiedConsumerWithRetries; 037import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc; 038import ca.uhn.fhir.util.IoUtils; 039import com.google.common.annotations.VisibleForTesting; 040import jakarta.annotation.PreDestroy; 041import org.apache.commons.lang3.Validate; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044import org.springframework.context.event.ContextRefreshedEvent; 045import org.springframework.context.event.EventListener; 046import org.springframework.messaging.MessageDeliveryException; 047import org.springframework.transaction.annotation.Propagation; 048import org.springframework.transaction.support.TransactionCallback; 049 050import static ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchingListener.SUBSCRIPTION_MATCHING_CHANNEL_NAME; 051 052/** 053 * This service provides two distinct contexts in which it submits messages to the subscription pipeline. 054 * 055 * It implements {@link IResourceModifiedConsumer} for synchronous submissions where retry upon failures is not required. 056 * 057 * It implements {@link IResourceModifiedConsumerWithRetries} for synchronous submissions performed as part of processing 058 * an operation on a resource (see {@link SubscriptionMatcherInterceptor}). Submissions in such context require retries 059 * upon submission failure. 060 * 061 * 062 */ 063public class ResourceModifiedSubmitterSvc implements IResourceModifiedConsumer, IResourceModifiedConsumerWithRetries { 064 065 private static final Logger ourLog = LoggerFactory.getLogger(ResourceModifiedSubmitterSvc.class); 066 private volatile IChannelProducer<ResourceModifiedMessage> myMatchingChannelProducer; 067 068 private final SubscriptionSettings mySubscriptionSettings; 069 private final SubscriptionChannelFactory mySubscriptionChannelFactory; 070 private final IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc; 071 private final IHapiTransactionService myHapiTransactionService; 072 073 @EventListener(classes = {ContextRefreshedEvent.class}) 074 public void startIfNeeded() { 075 if (!mySubscriptionSettings.hasSupportedSubscriptionTypes()) { 076 ourLog.debug( 077 "Subscriptions are disabled on this server. Skipping {} channel creation.", 078 SUBSCRIPTION_MATCHING_CHANNEL_NAME); 079 return; 080 } 081 if (myMatchingChannelProducer == null) { 082 myMatchingChannelProducer = mySubscriptionChannelFactory.newMatchingProducer( 083 SUBSCRIPTION_MATCHING_CHANNEL_NAME, getChannelProducerSettings()); 084 } 085 } 086 087 public ResourceModifiedSubmitterSvc( 088 SubscriptionSettings theSubscriptionSettings, 089 SubscriptionChannelFactory theSubscriptionChannelFactory, 090 IResourceModifiedMessagePersistenceSvc resourceModifiedMessagePersistenceSvc, 091 IHapiTransactionService theHapiTransactionService) { 092 mySubscriptionSettings = theSubscriptionSettings; 093 mySubscriptionChannelFactory = theSubscriptionChannelFactory; 094 myResourceModifiedMessagePersistenceSvc = resourceModifiedMessagePersistenceSvc; 095 myHapiTransactionService = theHapiTransactionService; 096 } 097 098 /** 099 * @inheritDoc Submit a message to the broker without retries. 100 * <p> 101 * Implementation of the {@link IResourceModifiedConsumer} 102 * @return the result of the send operation 103 */ 104 @Override 105 public ISendResult submitResourceModified(ResourceModifiedMessage theMsg) { 106 startIfNeeded(); 107 108 ourLog.trace("Sending resource modified message to processing channel"); 109 Validate.notNull( 110 myMatchingChannelProducer, 111 "A SubscriptionMatcherInterceptor has been registered without calling start() on it."); 112 return myMatchingChannelProducer.send(new ResourceModifiedJsonMessage(theMsg)); 113 } 114 115 /** 116 * This method will inflate the ResourceModifiedMessage represented by the IPersistedResourceModifiedMessage and attempts 117 * to submit it to the subscription processing pipeline. 118 * 119 * If submission succeeds, the IPersistedResourceModifiedMessage is deleted and true is returned. In the event where submission 120 * fails, we return false and the IPersistedResourceModifiedMessage is rollback for later re-submission. 121 * 122 * @param thePersistedResourceModifiedMessage A ResourceModifiedMessage in it's IPersistedResourceModifiedMessage that requires submission. 123 * @return Whether the message was successfully submitted to the broker. 124 */ 125 @Override 126 public boolean submitPersisedResourceModifiedMessage( 127 IPersistedResourceModifiedMessage thePersistedResourceModifiedMessage) { 128 return myHapiTransactionService 129 .withSystemRequest() 130 .withPropagation(Propagation.REQUIRES_NEW) 131 .execute(doProcessResourceModifiedInTransaction(thePersistedResourceModifiedMessage)); 132 } 133 134 /** 135 * This method is the cornerstone in the submit and retry upon failure mechanism for messages needing submission to the subscription processing pipeline. 136 * It requires execution in a transaction for rollback of deleting the persistedResourceModifiedMessage pointed to by <code>thePersistedResourceModifiedMessage<code/> 137 * in the event where submission would fail. 138 * 139 * @param thePersistedResourceModifiedMessage the primary key pointing to the persisted version (IPersistedResourceModifiedMessage) of a ResourceModifiedMessage needing submission 140 * @return true upon successful submission, false otherwise. 141 */ 142 protected TransactionCallback<Boolean> doProcessResourceModifiedInTransaction( 143 IPersistedResourceModifiedMessage thePersistedResourceModifiedMessage) { 144 return theStatus -> { 145 boolean processed = true; 146 ResourceModifiedMessage resourceModifiedMessage = null; 147 148 try { 149 // delete the entry to lock the row to ensure unique processing 150 boolean wasDeleted = deletePersistedResourceModifiedMessage( 151 thePersistedResourceModifiedMessage.getPersistedResourceModifiedMessagePk()); 152 153 // submit the resource modified message with empty payload, actual inflation is done by the matcher. 154 resourceModifiedMessage = 155 createResourceModifiedMessageWithoutInflation(thePersistedResourceModifiedMessage); 156 157 if (wasDeleted) { 158 submitResourceModified(resourceModifiedMessage); 159 } 160 } catch (MessageDeliveryException exception) { 161 // we encountered an issue when trying to send the message so mark the transaction for rollback 162 String payloadId = "[unknown]"; 163 String subscriptionId = "[unknown]"; 164 if (resourceModifiedMessage != null) { 165 payloadId = resourceModifiedMessage.getPayloadId(); 166 subscriptionId = resourceModifiedMessage.getSubscriptionId(); 167 } 168 ourLog.error( 169 "Channel submission failed for resource with id {} matching subscription with id {}. Further attempts will be performed at later time.", 170 payloadId, 171 subscriptionId, 172 exception); 173 processed = false; 174 theStatus.setRollbackOnly(); 175 } catch (Exception ex) { 176 // catch other errors 177 ourLog.error( 178 "Unexpected error encountered while processing resource modified message. Marking as processed to prevent further errors.", 179 ex); 180 processed = true; 181 } 182 183 return processed; 184 }; 185 } 186 187 private ResourceModifiedMessage createResourceModifiedMessageWithoutInflation( 188 IPersistedResourceModifiedMessage thePersistedResourceModifiedMessage) { 189 return myResourceModifiedMessagePersistenceSvc.createResourceModifiedMessageFromEntityWithoutInflation( 190 thePersistedResourceModifiedMessage); 191 } 192 193 private boolean deletePersistedResourceModifiedMessage(IPersistedResourceModifiedMessagePK theResourceModifiedPK) { 194 try { 195 // delete the entry to lock the row to ensure unique processing 196 return myResourceModifiedMessagePersistenceSvc.deleteByPK(theResourceModifiedPK); 197 } catch (ResourceNotFoundException exception) { 198 ourLog.warn( 199 "thePersistedResourceModifiedMessage with {} and version {} could not be deleted as it may have already been deleted.", 200 theResourceModifiedPK.getResourcePid(), 201 theResourceModifiedPK.getResourceVersion()); 202 // we were not able to delete the pk. this implies that someone else did read/delete the PK and processed 203 // the message 204 // successfully before we did. 205 206 return false; 207 } catch (Exception ex) { 208 ourLog.error("Unknown exception when deleting persisted resource modified message. Returning false.", ex); 209 return false; 210 } 211 } 212 213 private ChannelProducerSettings getChannelProducerSettings() { 214 ChannelProducerSettings channelProducerSettings = new ChannelProducerSettings(); 215 channelProducerSettings.setQualifyChannelName( 216 mySubscriptionSettings.isQualifySubscriptionMatchingChannelName()); 217 return channelProducerSettings; 218 } 219 220 @PreDestroy 221 public void shutdown() throws Exception { 222 if (myMatchingChannelProducer instanceof AutoCloseable producer) { 223 IoUtils.closeQuietly(producer, ourLog); 224 } 225 } 226 227 @VisibleForTesting 228 public IChannelProducer<ResourceModifiedMessage> getMatchingChannelProducerForUnitTest() { 229 startIfNeeded(); 230 return myMatchingChannelProducer; 231 } 232}