001/*- 002 * #%L 003 * HAPI FHIR Storage api 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.dao.expunge; 021 022import ca.uhn.fhir.i18n.Msg; 023import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; 024import ca.uhn.fhir.rest.api.server.RequestDetails; 025import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 026import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException; 027import ca.uhn.fhir.util.StopWatch; 028import com.google.common.collect.Lists; 029import jakarta.annotation.Nullable; 030import org.apache.commons.lang3.concurrent.BasicThreadFactory; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034import java.util.ArrayList; 035import java.util.List; 036import java.util.concurrent.Callable; 037import java.util.concurrent.ExecutionException; 038import java.util.concurrent.ExecutorService; 039import java.util.concurrent.Future; 040import java.util.concurrent.LinkedBlockingQueue; 041import java.util.concurrent.RejectedExecutionException; 042import java.util.concurrent.RejectedExecutionHandler; 043import java.util.concurrent.ThreadPoolExecutor; 044import java.util.concurrent.TimeUnit; 045import java.util.function.Consumer; 046import java.util.stream.Collectors; 047 048public class PartitionRunner { 049 private static final Logger ourLog = LoggerFactory.getLogger(PartitionRunner.class); 050 private static final int MAX_POOL_SIZE = 1000; 051 052 private final String myProcessName; 053 private final String myThreadPrefix; 054 private final int myBatchSize; 055 private final int myThreadCount; 056 private final HapiTransactionService myTransactionService; 057 private final RequestDetails myRequestDetails; 058 059 /** 060 * Constructor - Use this constructor if you do not want any transaction management 061 */ 062 public PartitionRunner(String theProcessName, String theThreadPrefix, int theBatchSize, int theThreadCount) { 063 this(theProcessName, theThreadPrefix, theBatchSize, theThreadCount, null, null); 064 } 065 066 /** 067 * Constructor - Use this constructor and provide a {@link RequestDetails} and {@link HapiTransactionService} if 068 * you want each individual callable task to be performed in a managed transaction. 069 */ 070 public PartitionRunner( 071 String theProcessName, 072 String theThreadPrefix, 073 int theBatchSize, 074 int theThreadCount, 075 @Nullable HapiTransactionService theTransactionService, 076 @Nullable RequestDetails theRequestDetails) { 077 myProcessName = theProcessName; 078 myThreadPrefix = theThreadPrefix; 079 myBatchSize = theBatchSize; 080 myThreadCount = theThreadCount; 081 myTransactionService = theTransactionService; 082 myRequestDetails = theRequestDetails; 083 } 084 085 public <T> void runInPartitionedThreads(List<T> theResourceIds, Consumer<List<T>> partitionConsumer) { 086 087 List<Callable<Void>> runnableTasks = buildCallableTasks(theResourceIds, partitionConsumer); 088 if (runnableTasks.isEmpty()) { 089 return; 090 } 091 092 if (myTransactionService != null) { 093 // Wrap each Callable task in an invocation to HapiTransactionService#execute 094 runnableTasks = runnableTasks.stream() 095 .map(t -> (Callable<Void>) () -> { 096 return myTransactionService 097 .withRequest(myRequestDetails) 098 .execute(t); 099 }) 100 .collect(Collectors.toList()); 101 } 102 103 if (runnableTasks.size() == 1) { 104 try { 105 runnableTasks.get(0).call(); 106 return; 107 } catch (PreconditionFailedException preconditionFailedException) { 108 throw preconditionFailedException; 109 } catch (Exception e) { 110 ourLog.error("Error while " + myProcessName, e); 111 throw new InternalErrorException(Msg.code(1084) + e); 112 } 113 } 114 115 ExecutorService executorService = buildExecutor(runnableTasks.size()); 116 try { 117 List<Future<?>> futures = runnableTasks.stream() 118 .map(t -> executorService.submit(() -> t.call())) 119 .collect(Collectors.toList()); 120 // wait for all the threads to finish 121 for (Future<?> future : futures) { 122 future.get(); 123 } 124 } catch (InterruptedException e) { 125 ourLog.error("Interrupted while " + myProcessName, e); 126 Thread.currentThread().interrupt(); 127 } catch (ExecutionException e) { 128 ourLog.error("Error while " + myProcessName, e); 129 throw new InternalErrorException(Msg.code(1085) + e); 130 } finally { 131 executorService.shutdown(); 132 } 133 } 134 135 private <T> List<Callable<Void>> buildCallableTasks(List<T> theResourceIds, Consumer<List<T>> partitionConsumer) { 136 List<Callable<Void>> retval = new ArrayList<>(); 137 138 if (myBatchSize > theResourceIds.size()) { 139 ourLog.info("Splitting batch job of {} entries into chunks of {}", theResourceIds.size(), myBatchSize); 140 } else { 141 ourLog.info("Creating batch job of {} entries", theResourceIds.size()); 142 } 143 List<List<T>> partitions = Lists.partition(theResourceIds, myBatchSize); 144 145 for (List<T> nextPartition : partitions) { 146 if (!nextPartition.isEmpty()) { 147 Callable<Void> callableTask = () -> { 148 ourLog.info(myProcessName + " {} resources", nextPartition.size()); 149 partitionConsumer.accept(nextPartition); 150 return null; 151 }; 152 retval.add(callableTask); 153 } 154 } 155 156 return retval; 157 } 158 159 private ExecutorService buildExecutor(int numberOfTasks) { 160 int threadCount = Math.min(numberOfTasks, myThreadCount); 161 assert (threadCount > 0); 162 163 ourLog.info(myProcessName + " with {} threads", threadCount); 164 LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>(MAX_POOL_SIZE); 165 BasicThreadFactory threadFactory = new BasicThreadFactory.Builder() 166 .namingPattern(myThreadPrefix + "-%d") 167 .daemon(false) 168 .priority(Thread.NORM_PRIORITY) 169 .build(); 170 RejectedExecutionHandler rejectedExecutionHandler = (theRunnable, theExecutor) -> { 171 ourLog.info( 172 "Note: " + myThreadPrefix 173 + " executor queue is full ({} elements), waiting for a slot to become available!", 174 executorQueue.size()); 175 StopWatch sw = new StopWatch(); 176 try { 177 executorQueue.put(theRunnable); 178 } catch (InterruptedException e) { 179 throw new RejectedExecutionException( 180 Msg.code(1086) + "Task " + theRunnable.toString() + " rejected from " + e); 181 } 182 ourLog.info("Slot become available after {}ms", sw.getMillis()); 183 }; 184 185 // setting corePoolSize and maximumPoolSize to be the same as threadCount 186 // to ensure that the number of allocated threads for the expunge operation does not exceed the configured limit 187 // see ThreadPoolExecutor documentation for details 188 return new ThreadPoolExecutor( 189 threadCount, 190 threadCount, 191 0L, 192 TimeUnit.MILLISECONDS, 193 executorQueue, 194 threadFactory, 195 rejectedExecutionHandler); 196 } 197}