001/*- 002 * #%L 003 * HAPI FHIR Storage api 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.storage.interceptor.balp; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.rest.client.api.IGenericClient; 024import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException; 025import ca.uhn.fhir.util.BundleBuilder; 026import ca.uhn.fhir.util.ThreadPoolUtil; 027import jakarta.annotation.Nonnull; 028import jakarta.annotation.Nullable; 029import jakarta.annotation.PreDestroy; 030import org.hl7.fhir.instance.model.api.IBaseBundle; 031import org.hl7.fhir.instance.model.api.IBaseResource; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; 035 036import java.util.ArrayList; 037import java.util.List; 038import java.util.concurrent.atomic.AtomicLong; 039 040/** 041 * This implementation of the {@link IBalpAuditEventSink} transmits audit events to 042 * a FHIR endpoint for creation, using a standard fhir <i>create</i> event. The target 043 * server FHIR version does not need to match the FHIR version of the AuditEvent source, 044 * events will be converted automatically prior to sending. 045 * <p> 046 * This sink transmits events asynchronously using an in-memory queue. This means that 047 * in the event of a server shutdown or unavailability of the target server <b>data could be lost</b>. 048 * </p> 049 */ 050public class AsyncMemoryQueueBackedFhirClientBalpSink extends FhirClientBalpSink implements IBalpAuditEventSink { 051 052 public static final IBaseResource[] EMPTY_RESOURCE_ARRAY = new IBaseResource[0]; 053 private static final AtomicLong ourNextThreadId = new AtomicLong(0); 054 private static final Logger ourLog = LoggerFactory.getLogger(AsyncMemoryQueueBackedFhirClientBalpSink.class); 055 private final List<IBaseResource> myQueue = new ArrayList<>(100); 056 private final ThreadPoolTaskExecutor myThreadPool; 057 private final Runnable myTransmitterTask = new TransmitterTask(); 058 059 /** 060 * Sets the FhirContext to use when initiating outgoing connections 061 * 062 * @param theFhirContext The FhirContext instance. This context must be 063 * for the FHIR Version supported by the target/sink 064 * server (as opposed to the FHIR Version supported 065 * by the audit source). 066 * @param theTargetBaseUrl The FHIR server base URL for the target/sink server to 067 * receive audit events. 068 */ 069 public AsyncMemoryQueueBackedFhirClientBalpSink( 070 @Nonnull FhirContext theFhirContext, @Nonnull String theTargetBaseUrl) { 071 this(theFhirContext, theTargetBaseUrl, null); 072 } 073 074 /** 075 * Sets the FhirContext to use when initiating outgoing connections 076 * 077 * @param theFhirContext The FhirContext instance. This context must be 078 * for the FHIR Version supported by the target/sink 079 * server (as opposed to the FHIR Version supported 080 * by the audit source). 081 * @param theTargetBaseUrl The FHIR server base URL for the target/sink server to 082 * receive audit events. 083 * @param theClientInterceptors An optional list of interceptors to register against 084 * the client. May be {@literal null}. 085 */ 086 public AsyncMemoryQueueBackedFhirClientBalpSink( 087 @Nonnull FhirContext theFhirContext, 088 @Nonnull String theTargetBaseUrl, 089 @Nullable List<Object> theClientInterceptors) { 090 this(createClient(theFhirContext, theTargetBaseUrl, theClientInterceptors)); 091 } 092 093 /** 094 * Constructor 095 * 096 * @param theClient The FHIR client to use as a sink. 097 */ 098 public AsyncMemoryQueueBackedFhirClientBalpSink(IGenericClient theClient) { 099 super(theClient); 100 myThreadPool = ThreadPoolUtil.newThreadPool( 101 1, 1, "BalpClientSink-" + ourNextThreadId.getAndIncrement() + "-", Integer.MAX_VALUE); 102 } 103 104 @Override 105 protected void recordAuditEvent(IBaseResource theAuditEvent) { 106 synchronized (myQueue) { 107 myQueue.add(theAuditEvent); 108 } 109 myThreadPool.submit(myTransmitterTask); 110 } 111 112 @PreDestroy 113 public void stop() { 114 myThreadPool.shutdown(); 115 } 116 117 private class TransmitterTask implements Runnable { 118 119 @Override 120 public void run() { 121 IBaseResource[] queue; 122 synchronized (myQueue) { 123 if (myQueue.isEmpty()) { 124 queue = EMPTY_RESOURCE_ARRAY; 125 } else { 126 queue = myQueue.toArray(EMPTY_RESOURCE_ARRAY); 127 myQueue.clear(); 128 } 129 } 130 131 if (queue.length == 0) { 132 return; 133 } 134 135 BundleBuilder bundleBuilder = new BundleBuilder(myClient.getFhirContext()); 136 for (IBaseResource next : queue) { 137 bundleBuilder.addTransactionCreateEntry(next); 138 } 139 140 IBaseBundle transactionBundle = bundleBuilder.getBundle(); 141 try { 142 myClient.transaction().withBundle(transactionBundle).execute(); 143 return; 144 } catch (BaseServerResponseException e) { 145 ourLog.error( 146 "Failed to transmit AuditEvent items to target. Will re-attempt {} failed events once. Error: {}", 147 queue.length, 148 e.toString()); 149 } 150 151 // Retry once then give up 152 for (IBaseResource next : queue) { 153 try { 154 myClient.create().resource(next).execute(); 155 } catch (BaseServerResponseException e) { 156 ourLog.error("Second failure uploading AuditEvent. Error: {}", e.toString()); 157 } 158 } 159 } 160 } 161}