001/*- 002 * #%L 003 * HAPI FHIR Storage api 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.dao.expunge; 021 022import ca.uhn.fhir.i18n.Msg; 023import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; 024import ca.uhn.fhir.rest.api.server.RequestDetails; 025import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; 026import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 027import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException; 028import ca.uhn.fhir.util.StopWatch; 029import com.google.common.collect.Lists; 030import jakarta.annotation.Nullable; 031import org.apache.commons.lang3.concurrent.BasicThreadFactory; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035import java.util.ArrayList; 036import java.util.List; 037import java.util.concurrent.Callable; 038import java.util.concurrent.ExecutionException; 039import java.util.concurrent.ExecutorService; 040import java.util.concurrent.Future; 041import java.util.concurrent.LinkedBlockingQueue; 042import java.util.concurrent.RejectedExecutionException; 043import java.util.concurrent.RejectedExecutionHandler; 044import java.util.concurrent.ThreadPoolExecutor; 045import java.util.concurrent.TimeUnit; 046import java.util.function.Consumer; 047import java.util.stream.Collectors; 048 049public class PartitionRunner { 050 private static final Logger ourLog = LoggerFactory.getLogger(PartitionRunner.class); 051 private static final int MAX_POOL_SIZE = 1000; 052 053 private final String myProcessName; 054 private final String myThreadPrefix; 055 private final int myBatchSize; 056 private final int myThreadCount; 057 private final HapiTransactionService myTransactionService; 058 private final RequestDetails myRequestDetails; 059 060 /** 061 * Constructor - Use this constructor if you do not want any transaction management 062 */ 063 public PartitionRunner(String theProcessName, String theThreadPrefix, int theBatchSize, int theThreadCount) { 064 this(theProcessName, theThreadPrefix, theBatchSize, theThreadCount, null, null); 065 } 066 067 /** 068 * Constructor - Use this constructor and provide a {@link RequestDetails} and {@link HapiTransactionService} if 069 * you want each individual callable task to be performed in a managed transaction. 070 */ 071 public PartitionRunner( 072 String theProcessName, 073 String theThreadPrefix, 074 int theBatchSize, 075 int theThreadCount, 076 @Nullable HapiTransactionService theTransactionService, 077 @Nullable RequestDetails theRequestDetails) { 078 myProcessName = theProcessName; 079 myThreadPrefix = theThreadPrefix; 080 myBatchSize = theBatchSize; 081 myThreadCount = theThreadCount; 082 myTransactionService = theTransactionService; 083 myRequestDetails = theRequestDetails; 084 } 085 086 public void runInPartitionedThreads( 087 List<IResourcePersistentId> theResourceIds, Consumer<List<IResourcePersistentId>> partitionConsumer) { 088 089 List<Callable<Void>> runnableTasks = buildCallableTasks(theResourceIds, partitionConsumer); 090 if (runnableTasks.size() == 0) { 091 return; 092 } 093 094 if (myTransactionService != null) { 095 // Wrap each Callable task in an invocation to HapiTransactionService#execute 096 runnableTasks = runnableTasks.stream() 097 .map(t -> (Callable<Void>) () -> { 098 return myTransactionService 099 .withRequest(myRequestDetails) 100 .execute(t); 101 }) 102 .collect(Collectors.toList()); 103 } 104 105 if (runnableTasks.size() == 1) { 106 try { 107 runnableTasks.get(0).call(); 108 return; 109 } catch (PreconditionFailedException preconditionFailedException) { 110 throw preconditionFailedException; 111 } catch (Exception e) { 112 ourLog.error("Error while " + myProcessName, e); 113 throw new InternalErrorException(Msg.code(1084) + e); 114 } 115 } 116 117 ExecutorService executorService = buildExecutor(runnableTasks.size()); 118 try { 119 List<Future<?>> futures = runnableTasks.stream() 120 .map(t -> executorService.submit(() -> t.call())) 121 .collect(Collectors.toList()); 122 // wait for all the threads to finish 123 for (Future<?> future : futures) { 124 future.get(); 125 } 126 } catch (InterruptedException e) { 127 ourLog.error("Interrupted while " + myProcessName, e); 128 Thread.currentThread().interrupt(); 129 } catch (ExecutionException e) { 130 ourLog.error("Error while " + myProcessName, e); 131 throw new InternalErrorException(Msg.code(1085) + e); 132 } finally { 133 executorService.shutdown(); 134 } 135 } 136 137 private List<Callable<Void>> buildCallableTasks( 138 List<IResourcePersistentId> theResourceIds, Consumer<List<IResourcePersistentId>> partitionConsumer) { 139 List<Callable<Void>> retval = new ArrayList<>(); 140 141 if (myBatchSize > theResourceIds.size()) { 142 ourLog.info("Splitting batch job of {} entries into chunks of {}", theResourceIds.size(), myBatchSize); 143 } else { 144 ourLog.info("Creating batch job of {} entries", theResourceIds.size()); 145 } 146 List<List<IResourcePersistentId>> partitions = Lists.partition(theResourceIds, myBatchSize); 147 148 for (List<IResourcePersistentId> nextPartition : partitions) { 149 if (nextPartition.size() > 0) { 150 Callable<Void> callableTask = () -> { 151 ourLog.info(myProcessName + " {} resources", nextPartition.size()); 152 partitionConsumer.accept(nextPartition); 153 return null; 154 }; 155 retval.add(callableTask); 156 } 157 } 158 159 return retval; 160 } 161 162 private ExecutorService buildExecutor(int numberOfTasks) { 163 int threadCount = Math.min(numberOfTasks, myThreadCount); 164 assert (threadCount > 0); 165 166 ourLog.info(myProcessName + " with {} threads", threadCount); 167 LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>(MAX_POOL_SIZE); 168 BasicThreadFactory threadFactory = new BasicThreadFactory.Builder() 169 .namingPattern(myThreadPrefix + "-%d") 170 .daemon(false) 171 .priority(Thread.NORM_PRIORITY) 172 .build(); 173 RejectedExecutionHandler rejectedExecutionHandler = (theRunnable, theExecutor) -> { 174 ourLog.info( 175 "Note: " + myThreadPrefix 176 + " executor queue is full ({} elements), waiting for a slot to become available!", 177 executorQueue.size()); 178 StopWatch sw = new StopWatch(); 179 try { 180 executorQueue.put(theRunnable); 181 } catch (InterruptedException e) { 182 throw new RejectedExecutionException( 183 Msg.code(1086) + "Task " + theRunnable.toString() + " rejected from " + e); 184 } 185 ourLog.info("Slot become available after {}ms", sw.getMillis()); 186 }; 187 188 // setting corePoolSize and maximumPoolSize to be the same as threadCount 189 // to ensure that the number of allocated threads for the expunge operation does not exceed the configured limit 190 // see ThreadPoolExecutor documentation for details 191 return new ThreadPoolExecutor( 192 threadCount, 193 threadCount, 194 0L, 195 TimeUnit.MILLISECONDS, 196 executorQueue, 197 threadFactory, 198 rejectedExecutionHandler); 199 } 200}