001/*-
002 * #%L
003 * HAPI FHIR Storage api
004 * %%
005 * Copyright (C) 2014 - 2025 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.dao.expunge;
021
022import ca.uhn.fhir.i18n.Msg;
023import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
024import ca.uhn.fhir.rest.api.server.RequestDetails;
025import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
026import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
027import ca.uhn.fhir.util.StopWatch;
028import com.google.common.collect.Lists;
029import jakarta.annotation.Nullable;
030import org.apache.commons.lang3.concurrent.BasicThreadFactory;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034import java.util.ArrayList;
035import java.util.List;
036import java.util.concurrent.Callable;
037import java.util.concurrent.ExecutionException;
038import java.util.concurrent.ExecutorService;
039import java.util.concurrent.Future;
040import java.util.concurrent.LinkedBlockingQueue;
041import java.util.concurrent.RejectedExecutionException;
042import java.util.concurrent.RejectedExecutionHandler;
043import java.util.concurrent.ThreadPoolExecutor;
044import java.util.concurrent.TimeUnit;
045import java.util.function.Consumer;
046import java.util.stream.Collectors;
047
048public class PartitionRunner {
049        private static final Logger ourLog = LoggerFactory.getLogger(PartitionRunner.class);
050        private static final int MAX_POOL_SIZE = 1000;
051
052        private final String myProcessName;
053        private final String myThreadPrefix;
054        private final int myBatchSize;
055        private final int myThreadCount;
056        private final HapiTransactionService myTransactionService;
057        private final RequestDetails myRequestDetails;
058
059        /**
060         * Constructor - Use this constructor if you do not want any transaction management
061         */
062        public PartitionRunner(String theProcessName, String theThreadPrefix, int theBatchSize, int theThreadCount) {
063                this(theProcessName, theThreadPrefix, theBatchSize, theThreadCount, null, null);
064        }
065
066        /**
067         * Constructor - Use this constructor and provide a {@link RequestDetails} and {@link HapiTransactionService} if
068         * you want each individual callable task to be performed in a managed transaction.
069         */
070        public PartitionRunner(
071                        String theProcessName,
072                        String theThreadPrefix,
073                        int theBatchSize,
074                        int theThreadCount,
075                        @Nullable HapiTransactionService theTransactionService,
076                        @Nullable RequestDetails theRequestDetails) {
077                myProcessName = theProcessName;
078                myThreadPrefix = theThreadPrefix;
079                myBatchSize = theBatchSize;
080                myThreadCount = theThreadCount;
081                myTransactionService = theTransactionService;
082                myRequestDetails = theRequestDetails;
083        }
084
085        public <T> void runInPartitionedThreads(List<T> theResourceIds, Consumer<List<T>> partitionConsumer) {
086
087                List<Callable<Void>> runnableTasks = buildCallableTasks(theResourceIds, partitionConsumer);
088                if (runnableTasks.isEmpty()) {
089                        return;
090                }
091
092                if (myTransactionService != null) {
093                        // Wrap each Callable task in an invocation to HapiTransactionService#execute
094                        runnableTasks = runnableTasks.stream()
095                                        .map(t -> (Callable<Void>) () ->
096                                                        myTransactionService.withRequest(myRequestDetails).execute(t))
097                                        .toList();
098                }
099
100                if (runnableTasks.size() == 1) {
101                        try {
102                                runnableTasks.get(0).call();
103                                return;
104                        } catch (PreconditionFailedException preconditionFailedException) {
105                                throw preconditionFailedException;
106                        } catch (Exception e) {
107                                ourLog.error("Error while " + myProcessName, e);
108                                throw new InternalErrorException(Msg.code(1084) + e);
109                        }
110                }
111
112                ExecutorService executorService = buildExecutor(runnableTasks.size());
113                try {
114                        List<Future<?>> futures =
115                                        runnableTasks.stream().map(executorService::submit).collect(Collectors.toList());
116                        // wait for all the threads to finish
117                        for (Future<?> future : futures) {
118                                future.get();
119                        }
120                } catch (InterruptedException e) {
121                        ourLog.error("Interrupted while " + myProcessName, e);
122                        Thread.currentThread().interrupt();
123                } catch (ExecutionException e) {
124                        ourLog.error("Error while " + myProcessName, e);
125                        throw new InternalErrorException(Msg.code(1085) + e);
126                } finally {
127                        executorService.shutdown();
128                }
129        }
130
131        private <T> List<Callable<Void>> buildCallableTasks(List<T> theResourceIds, Consumer<List<T>> partitionConsumer) {
132                List<Callable<Void>> retval = new ArrayList<>();
133
134                if (theResourceIds.size() > myBatchSize) {
135                        ourLog.info("Splitting batch job of {} entries into chunks of {}", theResourceIds.size(), myBatchSize);
136                } else {
137                        ourLog.info("Creating batch job of {} entries", theResourceIds.size());
138                }
139                List<List<T>> partitions = Lists.partition(theResourceIds, myBatchSize);
140
141                for (List<T> nextPartition : partitions) {
142                        if (!nextPartition.isEmpty()) {
143                                Callable<Void> callableTask = () -> {
144                                        ourLog.info(myProcessName + " {} resources", nextPartition.size());
145                                        partitionConsumer.accept(nextPartition);
146                                        return null;
147                                };
148                                retval.add(callableTask);
149                        }
150                }
151
152                return retval;
153        }
154
155        private ExecutorService buildExecutor(int numberOfTasks) {
156                int threadCount = Math.min(numberOfTasks, myThreadCount);
157                assert (threadCount > 0);
158
159                ourLog.info(myProcessName + " with {} threads", threadCount);
160                LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>(MAX_POOL_SIZE);
161                BasicThreadFactory threadFactory = new BasicThreadFactory.Builder()
162                                .namingPattern(myThreadPrefix + "-%d")
163                                .daemon(false)
164                                .priority(Thread.NORM_PRIORITY)
165                                .build();
166                RejectedExecutionHandler rejectedExecutionHandler = (theRunnable, theExecutor) -> {
167                        ourLog.info(
168                                        "Note: " + myThreadPrefix
169                                                        + " executor queue is full ({} elements), waiting for a slot to become available!",
170                                        executorQueue.size());
171                        StopWatch sw = new StopWatch();
172                        try {
173                                executorQueue.put(theRunnable);
174                        } catch (InterruptedException e) {
175                                throw new RejectedExecutionException(
176                                                Msg.code(1086) + "Task " + theRunnable.toString() + " rejected from " + e);
177                        }
178                        ourLog.info("Slot become available after {}ms", sw.getMillis());
179                };
180
181                // setting corePoolSize and maximumPoolSize to be the same as threadCount
182                // to ensure that the number of allocated threads for the expunge operation does not exceed the configured limit
183                // see ThreadPoolExecutor documentation for details
184                return new ThreadPoolExecutor(
185                                threadCount,
186                                threadCount,
187                                0L,
188                                TimeUnit.MILLISECONDS,
189                                executorQueue,
190                                threadFactory,
191                                rejectedExecutionHandler);
192        }
193}