001package ca.uhn.fhir.jpa.subscription.submit.svc; 002 003/*- 004 * #%L 005 * HAPI FHIR Subscription Server 006 * %% 007 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 008 * %% 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 * #L% 021 */ 022 023import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; 024import ca.uhn.fhir.jpa.model.config.SubscriptionSettings; 025import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessage; 026import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessagePK; 027import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings; 028import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer; 029import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory; 030import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer; 031import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; 032import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; 033import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionMatcherInterceptor; 034import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException; 035import ca.uhn.fhir.subscription.api.IResourceModifiedConsumerWithRetries; 036import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc; 037import org.apache.commons.lang3.Validate; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040import org.springframework.context.event.ContextRefreshedEvent; 041import org.springframework.context.event.EventListener; 042import org.springframework.messaging.MessageChannel; 043import org.springframework.messaging.MessageDeliveryException; 044import org.springframework.transaction.annotation.Propagation; 045import org.springframework.transaction.support.TransactionCallback; 046 047import static ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchingSubscriber.SUBSCRIPTION_MATCHING_CHANNEL_NAME; 048 049/** 050 * This service provides two distinct contexts in which it submits messages to the subscription pipeline. 051 * 052 * It implements {@link IResourceModifiedConsumer} for synchronous submissions where retry upon failures is not required. 053 * 054 * It implements {@link IResourceModifiedConsumerWithRetries} for synchronous submissions performed as part of processing 055 * an operation on a resource (see {@link SubscriptionMatcherInterceptor}). Submissions in such context require retries 056 * upon submission failure. 057 * 058 * 059 */ 060public class ResourceModifiedSubmitterSvc implements IResourceModifiedConsumer, IResourceModifiedConsumerWithRetries { 061 062 private static final Logger ourLog = LoggerFactory.getLogger(ResourceModifiedSubmitterSvc.class); 063 private volatile MessageChannel myMatchingChannel; 064 065 private final SubscriptionSettings mySubscriptionSettings; 066 private final SubscriptionChannelFactory mySubscriptionChannelFactory; 067 private final IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc; 068 private final IHapiTransactionService myHapiTransactionService; 069 070 @EventListener(classes = {ContextRefreshedEvent.class}) 071 public void startIfNeeded() { 072 if (!mySubscriptionSettings.hasSupportedSubscriptionTypes()) { 073 ourLog.debug( 074 "Subscriptions are disabled on this server. Skipping {} channel creation.", 075 SUBSCRIPTION_MATCHING_CHANNEL_NAME); 076 return; 077 } 078 if (myMatchingChannel == null) { 079 myMatchingChannel = mySubscriptionChannelFactory.newMatchingSendingChannel( 080 SUBSCRIPTION_MATCHING_CHANNEL_NAME, getChannelProducerSettings()); 081 } 082 } 083 084 public ResourceModifiedSubmitterSvc( 085 SubscriptionSettings theSubscriptionSettings, 086 SubscriptionChannelFactory theSubscriptionChannelFactory, 087 IResourceModifiedMessagePersistenceSvc resourceModifiedMessagePersistenceSvc, 088 IHapiTransactionService theHapiTransactionService) { 089 mySubscriptionSettings = theSubscriptionSettings; 090 mySubscriptionChannelFactory = theSubscriptionChannelFactory; 091 myResourceModifiedMessagePersistenceSvc = resourceModifiedMessagePersistenceSvc; 092 myHapiTransactionService = theHapiTransactionService; 093 } 094 095 /** 096 * @inheritDoc 097 * Submit a message to the broker without retries. 098 * 099 * Implementation of the {@link IResourceModifiedConsumer} 100 * 101 */ 102 @Override 103 public void submitResourceModified(ResourceModifiedMessage theMsg) { 104 startIfNeeded(); 105 106 ourLog.trace("Sending resource modified message to processing channel"); 107 Validate.notNull( 108 myMatchingChannel, 109 "A SubscriptionMatcherInterceptor has been registered without calling start() on it."); 110 myMatchingChannel.send(new ResourceModifiedJsonMessage(theMsg)); 111 } 112 113 /** 114 * This method will inflate the ResourceModifiedMessage represented by the IPersistedResourceModifiedMessage and attempts 115 * to submit it to the subscription processing pipeline. 116 * 117 * If submission succeeds, the IPersistedResourceModifiedMessage is deleted and true is returned. In the event where submission 118 * fails, we return false and the IPersistedResourceModifiedMessage is rollback for later re-submission. 119 * 120 * @param thePersistedResourceModifiedMessage A ResourceModifiedMessage in it's IPersistedResourceModifiedMessage that requires submission. 121 * @return Whether the message was successfully submitted to the broker. 122 */ 123 @Override 124 public boolean submitPersisedResourceModifiedMessage( 125 IPersistedResourceModifiedMessage thePersistedResourceModifiedMessage) { 126 return myHapiTransactionService 127 .withSystemRequest() 128 .withPropagation(Propagation.REQUIRES_NEW) 129 .execute(doProcessResourceModifiedInTransaction(thePersistedResourceModifiedMessage)); 130 } 131 132 /** 133 * This method is the cornerstone in the submit and retry upon failure mechanism for messages needing submission to the subscription processing pipeline. 134 * It requires execution in a transaction for rollback of deleting the persistedResourceModifiedMessage pointed to by <code>thePersistedResourceModifiedMessage<code/> 135 * in the event where submission would fail. 136 * 137 * @param thePersistedResourceModifiedMessage the primary key pointing to the persisted version (IPersistedResourceModifiedMessage) of a ResourceModifiedMessage needing submission 138 * @return true upon successful submission, false otherwise. 139 */ 140 protected TransactionCallback<Boolean> doProcessResourceModifiedInTransaction( 141 IPersistedResourceModifiedMessage thePersistedResourceModifiedMessage) { 142 return theStatus -> { 143 boolean processed = true; 144 ResourceModifiedMessage resourceModifiedMessage = null; 145 146 try { 147 // delete the entry to lock the row to ensure unique processing 148 boolean wasDeleted = deletePersistedResourceModifiedMessage( 149 thePersistedResourceModifiedMessage.getPersistedResourceModifiedMessagePk()); 150 151 // submit the resource modified message with empty payload, actual inflation is done by the matcher. 152 resourceModifiedMessage = 153 createResourceModifiedMessageWithoutInflation(thePersistedResourceModifiedMessage); 154 155 if (wasDeleted) { 156 submitResourceModified(resourceModifiedMessage); 157 } 158 } catch (MessageDeliveryException exception) { 159 // we encountered an issue when trying to send the message so mark the transaction for rollback 160 String payloadId = "[unknown]"; 161 String subscriptionId = "[unknown]"; 162 if (resourceModifiedMessage != null) { 163 payloadId = resourceModifiedMessage.getPayloadId(); 164 subscriptionId = resourceModifiedMessage.getSubscriptionId(); 165 } 166 ourLog.error( 167 "Channel submission failed for resource with id {} matching subscription with id {}. Further attempts will be performed at later time.", 168 payloadId, 169 subscriptionId, 170 exception); 171 processed = false; 172 theStatus.setRollbackOnly(); 173 } catch (Exception ex) { 174 // catch other errors 175 ourLog.error( 176 "Unexpected error encountered while processing resource modified message. Marking as processed to prevent further errors.", 177 ex); 178 processed = true; 179 } 180 181 return processed; 182 }; 183 } 184 185 private ResourceModifiedMessage createResourceModifiedMessageWithoutInflation( 186 IPersistedResourceModifiedMessage thePersistedResourceModifiedMessage) { 187 return myResourceModifiedMessagePersistenceSvc.createResourceModifiedMessageFromEntityWithoutInflation( 188 thePersistedResourceModifiedMessage); 189 } 190 191 private boolean deletePersistedResourceModifiedMessage(IPersistedResourceModifiedMessagePK theResourceModifiedPK) { 192 try { 193 // delete the entry to lock the row to ensure unique processing 194 return myResourceModifiedMessagePersistenceSvc.deleteByPK(theResourceModifiedPK); 195 } catch (ResourceNotFoundException exception) { 196 ourLog.warn( 197 "thePersistedResourceModifiedMessage with {} and version {} could not be deleted as it may have already been deleted.", 198 theResourceModifiedPK.getResourcePid(), 199 theResourceModifiedPK.getResourceVersion()); 200 // we were not able to delete the pk. this implies that someone else did read/delete the PK and processed 201 // the message 202 // successfully before we did. 203 204 return false; 205 } catch (Exception ex) { 206 ourLog.error("Unknown exception when deleting persisted resource modified message. Returning false.", ex); 207 return false; 208 } 209 } 210 211 private ChannelProducerSettings getChannelProducerSettings() { 212 ChannelProducerSettings channelProducerSettings = new ChannelProducerSettings(); 213 channelProducerSettings.setQualifyChannelName( 214 mySubscriptionSettings.isQualifySubscriptionMatchingChannelName()); 215 return channelProducerSettings; 216 } 217 218 public IChannelProducer getProcessingChannelForUnitTest() { 219 startIfNeeded(); 220 return (IChannelProducer) myMatchingChannel; 221 } 222}