001/*-
002 * #%L
003 * HAPI FHIR Storage api
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.storage.interceptor.balp;
021
022import ca.uhn.fhir.context.FhirContext;
023import ca.uhn.fhir.rest.client.api.IGenericClient;
024import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException;
025import ca.uhn.fhir.util.BundleBuilder;
026import ca.uhn.fhir.util.ThreadPoolUtil;
027import jakarta.annotation.Nonnull;
028import jakarta.annotation.Nullable;
029import jakarta.annotation.PreDestroy;
030import org.hl7.fhir.instance.model.api.IBaseBundle;
031import org.hl7.fhir.instance.model.api.IBaseResource;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
035
036import java.util.ArrayList;
037import java.util.List;
038import java.util.concurrent.atomic.AtomicLong;
039
040/**
041 * This implementation of the {@link IBalpAuditEventSink} transmits audit events to
042 * a FHIR endpoint for creation, using a standard fhir <i>create</i> event. The target
043 * server FHIR version does not need to match the FHIR version of the AuditEvent source,
044 * events will be converted automatically prior to sending.
045 * <p>
046 * This sink transmits events asynchronously using an in-memory queue. This means that
047 * in the event of a server shutdown or unavailability of the target server <b>data could be lost</b>.
048 * </p>
049 */
050public class AsyncMemoryQueueBackedFhirClientBalpSink extends FhirClientBalpSink implements IBalpAuditEventSink {
051
052        public static final IBaseResource[] EMPTY_RESOURCE_ARRAY = new IBaseResource[0];
053        private static final AtomicLong ourNextThreadId = new AtomicLong(0);
054        private static final Logger ourLog = LoggerFactory.getLogger(AsyncMemoryQueueBackedFhirClientBalpSink.class);
055        private final List<IBaseResource> myQueue = new ArrayList<>(100);
056        private final ThreadPoolTaskExecutor myThreadPool;
057        private final Runnable myTransmitterTask = new TransmitterTask();
058
059        /**
060         * Sets the FhirContext to use when initiating outgoing connections
061         *
062         * @param theFhirContext   The FhirContext instance. This context must be
063         *                         for the FHIR Version supported by the target/sink
064         *                         server (as opposed to the FHIR Version supported
065         *                         by the audit source).
066         * @param theTargetBaseUrl The FHIR server base URL for the target/sink server to
067         *                         receive audit events.
068         */
069        public AsyncMemoryQueueBackedFhirClientBalpSink(
070                        @Nonnull FhirContext theFhirContext, @Nonnull String theTargetBaseUrl) {
071                this(theFhirContext, theTargetBaseUrl, null);
072        }
073
074        /**
075         * Sets the FhirContext to use when initiating outgoing connections
076         *
077         * @param theFhirContext        The FhirContext instance. This context must be
078         *                              for the FHIR Version supported by the target/sink
079         *                              server (as opposed to the FHIR Version supported
080         *                              by the audit source).
081         * @param theTargetBaseUrl      The FHIR server base URL for the target/sink server to
082         *                              receive audit events.
083         * @param theClientInterceptors An optional list of interceptors to register against
084         *                              the client. May be {@literal null}.
085         */
086        public AsyncMemoryQueueBackedFhirClientBalpSink(
087                        @Nonnull FhirContext theFhirContext,
088                        @Nonnull String theTargetBaseUrl,
089                        @Nullable List<Object> theClientInterceptors) {
090                this(createClient(theFhirContext, theTargetBaseUrl, theClientInterceptors));
091        }
092
093        /**
094         * Constructor
095         *
096         * @param theClient The FHIR client to use as a sink.
097         */
098        public AsyncMemoryQueueBackedFhirClientBalpSink(IGenericClient theClient) {
099                super(theClient);
100                myThreadPool = ThreadPoolUtil.newThreadPool(
101                                1, 1, "BalpClientSink-" + ourNextThreadId.getAndIncrement() + "-", Integer.MAX_VALUE);
102        }
103
104        @Override
105        protected void recordAuditEvent(IBaseResource theAuditEvent) {
106                synchronized (myQueue) {
107                        myQueue.add(theAuditEvent);
108                }
109                myThreadPool.submit(myTransmitterTask);
110        }
111
112        @PreDestroy
113        public void stop() {
114                myThreadPool.shutdown();
115        }
116
117        private class TransmitterTask implements Runnable {
118
119                @Override
120                public void run() {
121                        IBaseResource[] queue;
122                        synchronized (myQueue) {
123                                if (myQueue.isEmpty()) {
124                                        queue = EMPTY_RESOURCE_ARRAY;
125                                } else {
126                                        queue = myQueue.toArray(EMPTY_RESOURCE_ARRAY);
127                                        myQueue.clear();
128                                }
129                        }
130
131                        if (queue.length == 0) {
132                                return;
133                        }
134
135                        BundleBuilder bundleBuilder = new BundleBuilder(myClient.getFhirContext());
136                        for (IBaseResource next : queue) {
137                                bundleBuilder.addTransactionCreateEntry(next);
138                        }
139
140                        IBaseBundle transactionBundle = bundleBuilder.getBundle();
141                        try {
142                                myClient.transaction().withBundle(transactionBundle).execute();
143                                return;
144                        } catch (BaseServerResponseException e) {
145                                ourLog.error(
146                                                "Failed to transmit AuditEvent items to target. Will re-attempt {} failed events once. Error: {}",
147                                                queue.length,
148                                                e.toString());
149                        }
150
151                        // Retry once then give up
152                        for (IBaseResource next : queue) {
153                                try {
154                                        myClient.create().resource(next).execute();
155                                } catch (BaseServerResponseException e) {
156                                        ourLog.error("Second failure uploading AuditEvent. Error: {}", e.toString());
157                                }
158                        }
159                }
160        }
161}