001/*-
002 * #%L
003 * HAPI FHIR Storage api
004 * %%
005 * Copyright (C) 2014 - 2023 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.dao.expunge;
021
022import ca.uhn.fhir.i18n.Msg;
023import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
024import ca.uhn.fhir.rest.api.server.RequestDetails;
025import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
026import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
027import ca.uhn.fhir.util.StopWatch;
028import com.google.common.collect.Lists;
029import org.apache.commons.lang3.concurrent.BasicThreadFactory;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033import javax.annotation.Nullable;
034import java.util.ArrayList;
035import java.util.List;
036import java.util.concurrent.Callable;
037import java.util.concurrent.ExecutionException;
038import java.util.concurrent.ExecutorService;
039import java.util.concurrent.Future;
040import java.util.concurrent.LinkedBlockingQueue;
041import java.util.concurrent.RejectedExecutionException;
042import java.util.concurrent.RejectedExecutionHandler;
043import java.util.concurrent.ThreadPoolExecutor;
044import java.util.concurrent.TimeUnit;
045import java.util.function.Consumer;
046import java.util.stream.Collectors;
047
048public class PartitionRunner {
049        private static final Logger ourLog = LoggerFactory.getLogger(PartitionRunner.class);
050        private static final int MAX_POOL_SIZE = 1000;
051
052        private final String myProcessName;
053        private final String myThreadPrefix;
054        private final int myBatchSize;
055        private final int myThreadCount;
056        private final HapiTransactionService myTransactionService;
057        private final RequestDetails myRequestDetails;
058
059        /**
060         * Constructor - Use this constructor if you do not want any transaction management
061         */
062        public PartitionRunner(String theProcessName, String theThreadPrefix, int theBatchSize, int theThreadCount) {
063                this(theProcessName, theThreadPrefix, theBatchSize, theThreadCount, null, null);
064        }
065
066        /**
067         * Constructor - Use this constructor and provide a {@link RequestDetails} and {@link HapiTransactionService} if
068         * you want each individual callable task to be performed in a managed transaction.
069         */
070        public PartitionRunner(String theProcessName, String theThreadPrefix, int theBatchSize, int theThreadCount, @Nullable HapiTransactionService theTransactionService, @Nullable RequestDetails theRequestDetails) {
071                myProcessName = theProcessName;
072                myThreadPrefix = theThreadPrefix;
073                myBatchSize = theBatchSize;
074                myThreadCount = theThreadCount;
075                myTransactionService = theTransactionService;
076                myRequestDetails = theRequestDetails;
077        }
078
079        public void runInPartitionedThreads(List<IResourcePersistentId> theResourceIds, Consumer<List<IResourcePersistentId>> partitionConsumer) {
080
081                List<Callable<Void>> runnableTasks = buildCallableTasks(theResourceIds, partitionConsumer);
082                if (runnableTasks.size() == 0) {
083                        return;
084                }
085
086                if (myTransactionService != null) {
087                        // Wrap each Callable task in an invocation to HapiTransactionService#execute
088                        runnableTasks = runnableTasks
089                                .stream()
090                                .map(t -> (Callable<Void>) () -> {
091                                        return myTransactionService
092                                                .withRequest(myRequestDetails)
093                                                .execute(t);
094                                })
095                                .collect(Collectors.toList());
096                }
097
098                if (runnableTasks.size() == 1) {
099                        try {
100                                runnableTasks.get(0).call();
101                                return;
102                        } catch (Exception e) {
103                                ourLog.error("Error while " + myProcessName, e);
104                                throw new InternalErrorException(Msg.code(1084) + e);
105                        }
106                }
107
108                ExecutorService executorService = buildExecutor(runnableTasks.size());
109                try {
110                        List<Future<?>> futures = runnableTasks
111                                .stream()
112                                .map(t -> executorService.submit(() -> t.call()))
113                                .collect(Collectors.toList());
114                        // wait for all the threads to finish
115                        for (Future<?> future : futures) {
116                                future.get();
117                        }
118                } catch (InterruptedException e) {
119                        ourLog.error("Interrupted while " + myProcessName, e);
120                        Thread.currentThread().interrupt();
121                } catch (ExecutionException e) {
122                        ourLog.error("Error while " + myProcessName, e);
123                        throw new InternalErrorException(Msg.code(1085) + e);
124                } finally {
125                        executorService.shutdown();
126                }
127        }
128
129        private List<Callable<Void>> buildCallableTasks(List<IResourcePersistentId> theResourceIds, Consumer<List<IResourcePersistentId>> partitionConsumer) {
130                List<Callable<Void>> retval = new ArrayList<>();
131
132                if (myBatchSize > theResourceIds.size()) {
133                        ourLog.info("Splitting batch job of {} entries into chunks of {}", theResourceIds.size(), myBatchSize);
134                } else {
135                        ourLog.info("Creating batch job of {} entries", theResourceIds.size());
136                }
137                List<List<IResourcePersistentId>> partitions = Lists.partition(theResourceIds, myBatchSize);
138
139                for (List<IResourcePersistentId> nextPartition : partitions) {
140                        if (nextPartition.size() > 0) {
141                                Callable<Void> callableTask = () -> {
142                                        ourLog.info(myProcessName + " {} resources", nextPartition.size());
143                                        partitionConsumer.accept(nextPartition);
144                                        return null;
145                                };
146                                retval.add(callableTask);
147                        }
148                }
149
150                return retval;
151        }
152
153        private ExecutorService buildExecutor(int numberOfTasks) {
154                int threadCount = Math.min(numberOfTasks, myThreadCount);
155                assert (threadCount > 0);
156
157                ourLog.info(myProcessName + " with {} threads", threadCount);
158                LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>(MAX_POOL_SIZE);
159                BasicThreadFactory threadFactory = new BasicThreadFactory.Builder()
160                        .namingPattern(myThreadPrefix + "-%d")
161                        .daemon(false)
162                        .priority(Thread.NORM_PRIORITY)
163                        .build();
164                RejectedExecutionHandler rejectedExecutionHandler = (theRunnable, theExecutor) -> {
165                        ourLog.info("Note: " + myThreadPrefix + " executor queue is full ({} elements), waiting for a slot to become available!", executorQueue.size());
166                        StopWatch sw = new StopWatch();
167                        try {
168                                executorQueue.put(theRunnable);
169                        } catch (InterruptedException e) {
170                                throw new RejectedExecutionException(Msg.code(1086) + "Task " + theRunnable.toString() +
171                                        " rejected from " + e);
172                        }
173                        ourLog.info("Slot become available after {}ms", sw.getMillis());
174                };
175                return new ThreadPoolExecutor(
176                        threadCount,
177                        MAX_POOL_SIZE,
178                        0L,
179                        TimeUnit.MILLISECONDS,
180                        executorQueue,
181                        threadFactory,
182                        rejectedExecutionHandler);
183        }
184}