001/*-
002 * #%L
003 * HAPI FHIR Storage api
004 * %%
005 * Copyright (C) 2014 - 2025 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.dao.expunge;
021
022import ca.uhn.fhir.i18n.Msg;
023import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
024import ca.uhn.fhir.rest.api.server.RequestDetails;
025import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
026import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
027import ca.uhn.fhir.util.StopWatch;
028import com.google.common.collect.Lists;
029import jakarta.annotation.Nullable;
030import org.apache.commons.lang3.concurrent.BasicThreadFactory;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034import java.util.ArrayList;
035import java.util.List;
036import java.util.concurrent.Callable;
037import java.util.concurrent.ExecutionException;
038import java.util.concurrent.ExecutorService;
039import java.util.concurrent.Future;
040import java.util.concurrent.LinkedBlockingQueue;
041import java.util.concurrent.RejectedExecutionException;
042import java.util.concurrent.RejectedExecutionHandler;
043import java.util.concurrent.ThreadPoolExecutor;
044import java.util.concurrent.TimeUnit;
045import java.util.function.Consumer;
046import java.util.stream.Collectors;
047
048public class PartitionRunner {
049        private static final Logger ourLog = LoggerFactory.getLogger(PartitionRunner.class);
050        private static final int MAX_POOL_SIZE = 1000;
051
052        private final String myProcessName;
053        private final String myThreadPrefix;
054        private final int myBatchSize;
055        private final int myThreadCount;
056        private final HapiTransactionService myTransactionService;
057        private final RequestDetails myRequestDetails;
058
059        /**
060         * Constructor - Use this constructor if you do not want any transaction management
061         */
062        public PartitionRunner(String theProcessName, String theThreadPrefix, int theBatchSize, int theThreadCount) {
063                this(theProcessName, theThreadPrefix, theBatchSize, theThreadCount, null, null);
064        }
065
066        /**
067         * Constructor - Use this constructor and provide a {@link RequestDetails} and {@link HapiTransactionService} if
068         * you want each individual callable task to be performed in a managed transaction.
069         */
070        public PartitionRunner(
071                        String theProcessName,
072                        String theThreadPrefix,
073                        int theBatchSize,
074                        int theThreadCount,
075                        @Nullable HapiTransactionService theTransactionService,
076                        @Nullable RequestDetails theRequestDetails) {
077                myProcessName = theProcessName;
078                myThreadPrefix = theThreadPrefix;
079                myBatchSize = theBatchSize;
080                myThreadCount = theThreadCount;
081                myTransactionService = theTransactionService;
082                myRequestDetails = theRequestDetails;
083        }
084
085        public <T> void runInPartitionedThreads(List<T> theResourceIds, Consumer<List<T>> partitionConsumer) {
086
087                List<Callable<Void>> runnableTasks = buildCallableTasks(theResourceIds, partitionConsumer);
088                if (runnableTasks.isEmpty()) {
089                        return;
090                }
091
092                if (myTransactionService != null) {
093                        // Wrap each Callable task in an invocation to HapiTransactionService#execute
094                        runnableTasks = runnableTasks.stream()
095                                        .map(t -> (Callable<Void>) () -> {
096                                                return myTransactionService
097                                                                .withRequest(myRequestDetails)
098                                                                .execute(t);
099                                        })
100                                        .collect(Collectors.toList());
101                }
102
103                if (runnableTasks.size() == 1) {
104                        try {
105                                runnableTasks.get(0).call();
106                                return;
107                        } catch (PreconditionFailedException preconditionFailedException) {
108                                throw preconditionFailedException;
109                        } catch (Exception e) {
110                                ourLog.error("Error while " + myProcessName, e);
111                                throw new InternalErrorException(Msg.code(1084) + e);
112                        }
113                }
114
115                ExecutorService executorService = buildExecutor(runnableTasks.size());
116                try {
117                        List<Future<?>> futures = runnableTasks.stream()
118                                        .map(t -> executorService.submit(() -> t.call()))
119                                        .collect(Collectors.toList());
120                        // wait for all the threads to finish
121                        for (Future<?> future : futures) {
122                                future.get();
123                        }
124                } catch (InterruptedException e) {
125                        ourLog.error("Interrupted while " + myProcessName, e);
126                        Thread.currentThread().interrupt();
127                } catch (ExecutionException e) {
128                        ourLog.error("Error while " + myProcessName, e);
129                        throw new InternalErrorException(Msg.code(1085) + e);
130                } finally {
131                        executorService.shutdown();
132                }
133        }
134
135        private <T> List<Callable<Void>> buildCallableTasks(List<T> theResourceIds, Consumer<List<T>> partitionConsumer) {
136                List<Callable<Void>> retval = new ArrayList<>();
137
138                if (myBatchSize > theResourceIds.size()) {
139                        ourLog.info("Splitting batch job of {} entries into chunks of {}", theResourceIds.size(), myBatchSize);
140                } else {
141                        ourLog.info("Creating batch job of {} entries", theResourceIds.size());
142                }
143                List<List<T>> partitions = Lists.partition(theResourceIds, myBatchSize);
144
145                for (List<T> nextPartition : partitions) {
146                        if (!nextPartition.isEmpty()) {
147                                Callable<Void> callableTask = () -> {
148                                        ourLog.info(myProcessName + " {} resources", nextPartition.size());
149                                        partitionConsumer.accept(nextPartition);
150                                        return null;
151                                };
152                                retval.add(callableTask);
153                        }
154                }
155
156                return retval;
157        }
158
159        private ExecutorService buildExecutor(int numberOfTasks) {
160                int threadCount = Math.min(numberOfTasks, myThreadCount);
161                assert (threadCount > 0);
162
163                ourLog.info(myProcessName + " with {} threads", threadCount);
164                LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>(MAX_POOL_SIZE);
165                BasicThreadFactory threadFactory = new BasicThreadFactory.Builder()
166                                .namingPattern(myThreadPrefix + "-%d")
167                                .daemon(false)
168                                .priority(Thread.NORM_PRIORITY)
169                                .build();
170                RejectedExecutionHandler rejectedExecutionHandler = (theRunnable, theExecutor) -> {
171                        ourLog.info(
172                                        "Note: " + myThreadPrefix
173                                                        + " executor queue is full ({} elements), waiting for a slot to become available!",
174                                        executorQueue.size());
175                        StopWatch sw = new StopWatch();
176                        try {
177                                executorQueue.put(theRunnable);
178                        } catch (InterruptedException e) {
179                                throw new RejectedExecutionException(
180                                                Msg.code(1086) + "Task " + theRunnable.toString() + " rejected from " + e);
181                        }
182                        ourLog.info("Slot become available after {}ms", sw.getMillis());
183                };
184
185                // setting corePoolSize and maximumPoolSize to be the same as threadCount
186                // to ensure that the number of allocated threads for the expunge operation does not exceed the configured limit
187                // see ThreadPoolExecutor documentation for details
188                return new ThreadPoolExecutor(
189                                threadCount,
190                                threadCount,
191                                0L,
192                                TimeUnit.MILLISECONDS,
193                                executorQueue,
194                                threadFactory,
195                                rejectedExecutionHandler);
196        }
197}