001/*-
002 * #%L
003 * HAPI FHIR Storage api
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.dao.expunge;
021
022import ca.uhn.fhir.i18n.Msg;
023import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
024import ca.uhn.fhir.rest.api.server.RequestDetails;
025import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
026import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
027import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
028import ca.uhn.fhir.util.StopWatch;
029import com.google.common.collect.Lists;
030import jakarta.annotation.Nullable;
031import org.apache.commons.lang3.concurrent.BasicThreadFactory;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035import java.util.ArrayList;
036import java.util.List;
037import java.util.concurrent.Callable;
038import java.util.concurrent.ExecutionException;
039import java.util.concurrent.ExecutorService;
040import java.util.concurrent.Future;
041import java.util.concurrent.LinkedBlockingQueue;
042import java.util.concurrent.RejectedExecutionException;
043import java.util.concurrent.RejectedExecutionHandler;
044import java.util.concurrent.ThreadPoolExecutor;
045import java.util.concurrent.TimeUnit;
046import java.util.function.Consumer;
047import java.util.stream.Collectors;
048
049public class PartitionRunner {
050        private static final Logger ourLog = LoggerFactory.getLogger(PartitionRunner.class);
051        private static final int MAX_POOL_SIZE = 1000;
052
053        private final String myProcessName;
054        private final String myThreadPrefix;
055        private final int myBatchSize;
056        private final int myThreadCount;
057        private final HapiTransactionService myTransactionService;
058        private final RequestDetails myRequestDetails;
059
060        /**
061         * Constructor - Use this constructor if you do not want any transaction management
062         */
063        public PartitionRunner(String theProcessName, String theThreadPrefix, int theBatchSize, int theThreadCount) {
064                this(theProcessName, theThreadPrefix, theBatchSize, theThreadCount, null, null);
065        }
066
067        /**
068         * Constructor - Use this constructor and provide a {@link RequestDetails} and {@link HapiTransactionService} if
069         * you want each individual callable task to be performed in a managed transaction.
070         */
071        public PartitionRunner(
072                        String theProcessName,
073                        String theThreadPrefix,
074                        int theBatchSize,
075                        int theThreadCount,
076                        @Nullable HapiTransactionService theTransactionService,
077                        @Nullable RequestDetails theRequestDetails) {
078                myProcessName = theProcessName;
079                myThreadPrefix = theThreadPrefix;
080                myBatchSize = theBatchSize;
081                myThreadCount = theThreadCount;
082                myTransactionService = theTransactionService;
083                myRequestDetails = theRequestDetails;
084        }
085
086        public void runInPartitionedThreads(
087                        List<IResourcePersistentId> theResourceIds, Consumer<List<IResourcePersistentId>> partitionConsumer) {
088
089                List<Callable<Void>> runnableTasks = buildCallableTasks(theResourceIds, partitionConsumer);
090                if (runnableTasks.size() == 0) {
091                        return;
092                }
093
094                if (myTransactionService != null) {
095                        // Wrap each Callable task in an invocation to HapiTransactionService#execute
096                        runnableTasks = runnableTasks.stream()
097                                        .map(t -> (Callable<Void>) () -> {
098                                                return myTransactionService
099                                                                .withRequest(myRequestDetails)
100                                                                .execute(t);
101                                        })
102                                        .collect(Collectors.toList());
103                }
104
105                if (runnableTasks.size() == 1) {
106                        try {
107                                runnableTasks.get(0).call();
108                                return;
109                        } catch (PreconditionFailedException preconditionFailedException) {
110                                throw preconditionFailedException;
111                        } catch (Exception e) {
112                                ourLog.error("Error while " + myProcessName, e);
113                                throw new InternalErrorException(Msg.code(1084) + e);
114                        }
115                }
116
117                ExecutorService executorService = buildExecutor(runnableTasks.size());
118                try {
119                        List<Future<?>> futures = runnableTasks.stream()
120                                        .map(t -> executorService.submit(() -> t.call()))
121                                        .collect(Collectors.toList());
122                        // wait for all the threads to finish
123                        for (Future<?> future : futures) {
124                                future.get();
125                        }
126                } catch (InterruptedException e) {
127                        ourLog.error("Interrupted while " + myProcessName, e);
128                        Thread.currentThread().interrupt();
129                } catch (ExecutionException e) {
130                        ourLog.error("Error while " + myProcessName, e);
131                        throw new InternalErrorException(Msg.code(1085) + e);
132                } finally {
133                        executorService.shutdown();
134                }
135        }
136
137        private List<Callable<Void>> buildCallableTasks(
138                        List<IResourcePersistentId> theResourceIds, Consumer<List<IResourcePersistentId>> partitionConsumer) {
139                List<Callable<Void>> retval = new ArrayList<>();
140
141                if (myBatchSize > theResourceIds.size()) {
142                        ourLog.info("Splitting batch job of {} entries into chunks of {}", theResourceIds.size(), myBatchSize);
143                } else {
144                        ourLog.info("Creating batch job of {} entries", theResourceIds.size());
145                }
146                List<List<IResourcePersistentId>> partitions = Lists.partition(theResourceIds, myBatchSize);
147
148                for (List<IResourcePersistentId> nextPartition : partitions) {
149                        if (nextPartition.size() > 0) {
150                                Callable<Void> callableTask = () -> {
151                                        ourLog.info(myProcessName + " {} resources", nextPartition.size());
152                                        partitionConsumer.accept(nextPartition);
153                                        return null;
154                                };
155                                retval.add(callableTask);
156                        }
157                }
158
159                return retval;
160        }
161
162        private ExecutorService buildExecutor(int numberOfTasks) {
163                int threadCount = Math.min(numberOfTasks, myThreadCount);
164                assert (threadCount > 0);
165
166                ourLog.info(myProcessName + " with {} threads", threadCount);
167                LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>(MAX_POOL_SIZE);
168                BasicThreadFactory threadFactory = new BasicThreadFactory.Builder()
169                                .namingPattern(myThreadPrefix + "-%d")
170                                .daemon(false)
171                                .priority(Thread.NORM_PRIORITY)
172                                .build();
173                RejectedExecutionHandler rejectedExecutionHandler = (theRunnable, theExecutor) -> {
174                        ourLog.info(
175                                        "Note: " + myThreadPrefix
176                                                        + " executor queue is full ({} elements), waiting for a slot to become available!",
177                                        executorQueue.size());
178                        StopWatch sw = new StopWatch();
179                        try {
180                                executorQueue.put(theRunnable);
181                        } catch (InterruptedException e) {
182                                throw new RejectedExecutionException(
183                                                Msg.code(1086) + "Task " + theRunnable.toString() + " rejected from " + e);
184                        }
185                        ourLog.info("Slot become available after {}ms", sw.getMillis());
186                };
187
188                // setting corePoolSize and maximumPoolSize to be the same as threadCount
189                // to ensure that the number of allocated threads for the expunge operation does not exceed the configured limit
190                // see ThreadPoolExecutor documentation for details
191                return new ThreadPoolExecutor(
192                                threadCount,
193                                threadCount,
194                                0L,
195                                TimeUnit.MILLISECONDS,
196                                executorQueue,
197                                threadFactory,
198                                rejectedExecutionHandler);
199        }
200}