
001/*- 002 * #%L 003 * HAPI FHIR Storage api 004 * %% 005 * Copyright (C) 2014 - 2023 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.dao.expunge; 021 022import ca.uhn.fhir.i18n.Msg; 023import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; 024import ca.uhn.fhir.rest.api.server.RequestDetails; 025import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; 026import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 027import ca.uhn.fhir.util.StopWatch; 028import com.google.common.collect.Lists; 029import org.apache.commons.lang3.concurrent.BasicThreadFactory; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033import javax.annotation.Nullable; 034import java.util.ArrayList; 035import java.util.List; 036import java.util.concurrent.Callable; 037import java.util.concurrent.ExecutionException; 038import java.util.concurrent.ExecutorService; 039import java.util.concurrent.Future; 040import java.util.concurrent.LinkedBlockingQueue; 041import java.util.concurrent.RejectedExecutionException; 042import java.util.concurrent.RejectedExecutionHandler; 043import java.util.concurrent.ThreadPoolExecutor; 044import java.util.concurrent.TimeUnit; 045import java.util.function.Consumer; 046import java.util.stream.Collectors; 047 048public class PartitionRunner { 049 private static final Logger ourLog = LoggerFactory.getLogger(PartitionRunner.class); 050 private static final int MAX_POOL_SIZE = 1000; 051 052 private final String myProcessName; 053 private final String myThreadPrefix; 054 private final int myBatchSize; 055 private final int myThreadCount; 056 private final HapiTransactionService myTransactionService; 057 private final RequestDetails myRequestDetails; 058 059 /** 060 * Constructor - Use this constructor if you do not want any transaction management 061 */ 062 public PartitionRunner(String theProcessName, String theThreadPrefix, int theBatchSize, int theThreadCount) { 063 this(theProcessName, theThreadPrefix, theBatchSize, theThreadCount, null, null); 064 } 065 066 /** 067 * Constructor - Use this constructor and provide a {@link RequestDetails} and {@link HapiTransactionService} if 068 * you want each individual callable task to be performed in a managed transaction. 069 */ 070 public PartitionRunner(String theProcessName, String theThreadPrefix, int theBatchSize, int theThreadCount, @Nullable HapiTransactionService theTransactionService, @Nullable RequestDetails theRequestDetails) { 071 myProcessName = theProcessName; 072 myThreadPrefix = theThreadPrefix; 073 myBatchSize = theBatchSize; 074 myThreadCount = theThreadCount; 075 myTransactionService = theTransactionService; 076 myRequestDetails = theRequestDetails; 077 } 078 079 public void runInPartitionedThreads(List<IResourcePersistentId> theResourceIds, Consumer<List<IResourcePersistentId>> partitionConsumer) { 080 081 List<Callable<Void>> runnableTasks = buildCallableTasks(theResourceIds, partitionConsumer); 082 if (runnableTasks.size() == 0) { 083 return; 084 } 085 086 if (myTransactionService != null) { 087 // Wrap each Callable task in an invocation to HapiTransactionService#execute 088 runnableTasks = runnableTasks 089 .stream() 090 .map(t -> (Callable<Void>) () -> { 091 return myTransactionService 092 .withRequest(myRequestDetails) 093 .execute(t); 094 }) 095 .collect(Collectors.toList()); 096 } 097 098 if (runnableTasks.size() == 1) { 099 try { 100 runnableTasks.get(0).call(); 101 return; 102 } catch (Exception e) { 103 ourLog.error("Error while " + myProcessName, e); 104 throw new InternalErrorException(Msg.code(1084) + e); 105 } 106 } 107 108 ExecutorService executorService = buildExecutor(runnableTasks.size()); 109 try { 110 List<Future<?>> futures = runnableTasks 111 .stream() 112 .map(t -> executorService.submit(() -> t.call())) 113 .collect(Collectors.toList()); 114 // wait for all the threads to finish 115 for (Future<?> future : futures) { 116 future.get(); 117 } 118 } catch (InterruptedException e) { 119 ourLog.error("Interrupted while " + myProcessName, e); 120 Thread.currentThread().interrupt(); 121 } catch (ExecutionException e) { 122 ourLog.error("Error while " + myProcessName, e); 123 throw new InternalErrorException(Msg.code(1085) + e); 124 } finally { 125 executorService.shutdown(); 126 } 127 } 128 129 private List<Callable<Void>> buildCallableTasks(List<IResourcePersistentId> theResourceIds, Consumer<List<IResourcePersistentId>> partitionConsumer) { 130 List<Callable<Void>> retval = new ArrayList<>(); 131 132 if (myBatchSize > theResourceIds.size()) { 133 ourLog.info("Splitting batch job of {} entries into chunks of {}", theResourceIds.size(), myBatchSize); 134 } else { 135 ourLog.info("Creating batch job of {} entries", theResourceIds.size()); 136 } 137 List<List<IResourcePersistentId>> partitions = Lists.partition(theResourceIds, myBatchSize); 138 139 for (List<IResourcePersistentId> nextPartition : partitions) { 140 if (nextPartition.size() > 0) { 141 Callable<Void> callableTask = () -> { 142 ourLog.info(myProcessName + " {} resources", nextPartition.size()); 143 partitionConsumer.accept(nextPartition); 144 return null; 145 }; 146 retval.add(callableTask); 147 } 148 } 149 150 return retval; 151 } 152 153 private ExecutorService buildExecutor(int numberOfTasks) { 154 int threadCount = Math.min(numberOfTasks, myThreadCount); 155 assert (threadCount > 0); 156 157 ourLog.info(myProcessName + " with {} threads", threadCount); 158 LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>(MAX_POOL_SIZE); 159 BasicThreadFactory threadFactory = new BasicThreadFactory.Builder() 160 .namingPattern(myThreadPrefix + "-%d") 161 .daemon(false) 162 .priority(Thread.NORM_PRIORITY) 163 .build(); 164 RejectedExecutionHandler rejectedExecutionHandler = (theRunnable, theExecutor) -> { 165 ourLog.info("Note: " + myThreadPrefix + " executor queue is full ({} elements), waiting for a slot to become available!", executorQueue.size()); 166 StopWatch sw = new StopWatch(); 167 try { 168 executorQueue.put(theRunnable); 169 } catch (InterruptedException e) { 170 throw new RejectedExecutionException(Msg.code(1086) + "Task " + theRunnable.toString() + 171 " rejected from " + e); 172 } 173 ourLog.info("Slot become available after {}ms", sw.getMillis()); 174 }; 175 return new ThreadPoolExecutor( 176 threadCount, 177 MAX_POOL_SIZE, 178 0L, 179 TimeUnit.MILLISECONDS, 180 executorQueue, 181 threadFactory, 182 rejectedExecutionHandler); 183 } 184}