
001/*- 002 * #%L 003 * HAPI FHIR Storage api 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.dao.expunge; 021 022import ca.uhn.fhir.i18n.Msg; 023import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; 024import ca.uhn.fhir.rest.api.server.RequestDetails; 025import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 026import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException; 027import ca.uhn.fhir.util.StopWatch; 028import com.google.common.collect.Lists; 029import jakarta.annotation.Nullable; 030import org.apache.commons.lang3.concurrent.BasicThreadFactory; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034import java.util.ArrayList; 035import java.util.List; 036import java.util.concurrent.Callable; 037import java.util.concurrent.ExecutionException; 038import java.util.concurrent.ExecutorService; 039import java.util.concurrent.Future; 040import java.util.concurrent.LinkedBlockingQueue; 041import java.util.concurrent.RejectedExecutionException; 042import java.util.concurrent.RejectedExecutionHandler; 043import java.util.concurrent.ThreadPoolExecutor; 044import java.util.concurrent.TimeUnit; 045import java.util.function.Consumer; 046import java.util.stream.Collectors; 047 048public class PartitionRunner { 049 private static final Logger ourLog = LoggerFactory.getLogger(PartitionRunner.class); 050 private static final int MAX_POOL_SIZE = 1000; 051 052 private final String myProcessName; 053 private final String myThreadPrefix; 054 private final int myBatchSize; 055 private final int myThreadCount; 056 private final HapiTransactionService myTransactionService; 057 private final RequestDetails myRequestDetails; 058 059 /** 060 * Constructor - Use this constructor if you do not want any transaction management 061 */ 062 public PartitionRunner(String theProcessName, String theThreadPrefix, int theBatchSize, int theThreadCount) { 063 this(theProcessName, theThreadPrefix, theBatchSize, theThreadCount, null, null); 064 } 065 066 /** 067 * Constructor - Use this constructor and provide a {@link RequestDetails} and {@link HapiTransactionService} if 068 * you want each individual callable task to be performed in a managed transaction. 069 */ 070 public PartitionRunner( 071 String theProcessName, 072 String theThreadPrefix, 073 int theBatchSize, 074 int theThreadCount, 075 @Nullable HapiTransactionService theTransactionService, 076 @Nullable RequestDetails theRequestDetails) { 077 myProcessName = theProcessName; 078 myThreadPrefix = theThreadPrefix; 079 myBatchSize = theBatchSize; 080 myThreadCount = theThreadCount; 081 myTransactionService = theTransactionService; 082 myRequestDetails = theRequestDetails; 083 } 084 085 public <T> void runInPartitionedThreads(List<T> theResourceIds, Consumer<List<T>> partitionConsumer) { 086 087 List<Callable<Void>> runnableTasks = buildCallableTasks(theResourceIds, partitionConsumer); 088 if (runnableTasks.isEmpty()) { 089 return; 090 } 091 092 if (myTransactionService != null) { 093 // Wrap each Callable task in an invocation to HapiTransactionService#execute 094 runnableTasks = runnableTasks.stream() 095 .map(t -> (Callable<Void>) () -> 096 myTransactionService.withRequest(myRequestDetails).execute(t)) 097 .toList(); 098 } 099 100 if (runnableTasks.size() == 1) { 101 try { 102 runnableTasks.get(0).call(); 103 return; 104 } catch (PreconditionFailedException preconditionFailedException) { 105 throw preconditionFailedException; 106 } catch (Exception e) { 107 ourLog.error("Error while " + myProcessName, e); 108 throw new InternalErrorException(Msg.code(1084) + e); 109 } 110 } 111 112 ExecutorService executorService = buildExecutor(runnableTasks.size()); 113 try { 114 List<Future<?>> futures = 115 runnableTasks.stream().map(executorService::submit).collect(Collectors.toList()); 116 // wait for all the threads to finish 117 for (Future<?> future : futures) { 118 future.get(); 119 } 120 } catch (InterruptedException e) { 121 ourLog.error("Interrupted while " + myProcessName, e); 122 Thread.currentThread().interrupt(); 123 } catch (ExecutionException e) { 124 ourLog.error("Error while " + myProcessName, e); 125 throw new InternalErrorException(Msg.code(1085) + e); 126 } finally { 127 executorService.shutdown(); 128 } 129 } 130 131 private <T> List<Callable<Void>> buildCallableTasks(List<T> theResourceIds, Consumer<List<T>> partitionConsumer) { 132 List<Callable<Void>> retval = new ArrayList<>(); 133 134 if (theResourceIds.size() > myBatchSize) { 135 ourLog.info("Splitting batch job of {} entries into chunks of {}", theResourceIds.size(), myBatchSize); 136 } else { 137 ourLog.info("Creating batch job of {} entries", theResourceIds.size()); 138 } 139 List<List<T>> partitions = Lists.partition(theResourceIds, myBatchSize); 140 141 for (List<T> nextPartition : partitions) { 142 if (!nextPartition.isEmpty()) { 143 Callable<Void> callableTask = () -> { 144 ourLog.info(myProcessName + " {} resources", nextPartition.size()); 145 partitionConsumer.accept(nextPartition); 146 return null; 147 }; 148 retval.add(callableTask); 149 } 150 } 151 152 return retval; 153 } 154 155 private ExecutorService buildExecutor(int numberOfTasks) { 156 int threadCount = Math.min(numberOfTasks, myThreadCount); 157 assert (threadCount > 0); 158 159 ourLog.info(myProcessName + " with {} threads", threadCount); 160 LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>(MAX_POOL_SIZE); 161 BasicThreadFactory threadFactory = new BasicThreadFactory.Builder() 162 .namingPattern(myThreadPrefix + "-%d") 163 .daemon(false) 164 .priority(Thread.NORM_PRIORITY) 165 .build(); 166 RejectedExecutionHandler rejectedExecutionHandler = (theRunnable, theExecutor) -> { 167 ourLog.info( 168 "Note: " + myThreadPrefix 169 + " executor queue is full ({} elements), waiting for a slot to become available!", 170 executorQueue.size()); 171 StopWatch sw = new StopWatch(); 172 try { 173 executorQueue.put(theRunnable); 174 } catch (InterruptedException e) { 175 throw new RejectedExecutionException( 176 Msg.code(1086) + "Task " + theRunnable.toString() + " rejected from " + e); 177 } 178 ourLog.info("Slot become available after {}ms", sw.getMillis()); 179 }; 180 181 // setting corePoolSize and maximumPoolSize to be the same as threadCount 182 // to ensure that the number of allocated threads for the expunge operation does not exceed the configured limit 183 // see ThreadPoolExecutor documentation for details 184 return new ThreadPoolExecutor( 185 threadCount, 186 threadCount, 187 0L, 188 TimeUnit.MILLISECONDS, 189 executorQueue, 190 threadFactory, 191 rejectedExecutionHandler); 192 } 193}