
001package ca.uhn.fhir.jpa.bulk.imprt.job; 002 003/*- 004 * #%L 005 * HAPI FHIR JPA Server 006 * %% 007 * Copyright (C) 2014 - 2022 Smile CDR, Inc. 008 * %% 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 * #L% 021 */ 022 023import ca.uhn.fhir.jpa.api.config.DaoConfig; 024import ca.uhn.fhir.jpa.batch.config.BatchConstants; 025import ca.uhn.fhir.jpa.bulk.imprt.model.ParsedBulkImportRecord; 026import org.slf4j.Logger; 027import org.slf4j.LoggerFactory; 028import org.springframework.batch.core.Job; 029import org.springframework.batch.core.JobParametersValidator; 030import org.springframework.batch.core.Step; 031import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; 032import org.springframework.batch.core.configuration.annotation.JobScope; 033import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; 034import org.springframework.batch.core.configuration.annotation.StepScope; 035import org.springframework.batch.core.partition.PartitionHandler; 036import org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler; 037import org.springframework.batch.core.step.item.KeyGenerator; 038import org.springframework.batch.item.ItemWriter; 039import org.springframework.batch.repeat.CompletionPolicy; 040import org.springframework.batch.repeat.RepeatContext; 041import org.springframework.batch.repeat.exception.ExceptionHandler; 042import org.springframework.beans.factory.annotation.Autowired; 043import org.springframework.beans.factory.annotation.Qualifier; 044import org.springframework.context.annotation.Bean; 045import org.springframework.context.annotation.Configuration; 046import org.springframework.context.annotation.Lazy; 047import org.springframework.core.task.TaskExecutor; 048import org.springframework.retry.RetryPolicy; 049import org.springframework.retry.policy.CompositeRetryPolicy; 050import org.springframework.retry.policy.SimpleRetryPolicy; 051import org.springframework.retry.policy.TimeoutRetryPolicy; 052 053import javax.batch.api.chunk.listener.RetryProcessListener; 054 055import static ca.uhn.fhir.jpa.batch.config.BatchConstants.BULK_IMPORT_JOB_NAME; 056import static ca.uhn.fhir.jpa.batch.config.BatchConstants.BULK_IMPORT_PROCESSING_STEP; 057 058/** 059 * Spring batch Job configuration file. Contains all necessary plumbing to run a 060 * Bulk Export job. 061 */ 062@Configuration 063public class BulkImportJobConfig { 064 065 public static final String JOB_PARAM_COMMIT_INTERVAL = "commitInterval"; 066 private static final Logger ourLog = LoggerFactory.getLogger(BulkImportJobConfig.class); 067 @Autowired 068 private StepBuilderFactory myStepBuilderFactory; 069 @Autowired 070 private JobBuilderFactory myJobBuilderFactory; 071 @Autowired 072 @Qualifier(BatchConstants.JOB_LAUNCHING_TASK_EXECUTOR) 073 private TaskExecutor myTaskExecutor; 074 @Autowired 075 private DaoConfig myDaoConfig; 076 077 @Bean(name = BULK_IMPORT_JOB_NAME) 078 @Lazy 079 public Job bulkImportJob() throws Exception { 080 return myJobBuilderFactory.get(BULK_IMPORT_JOB_NAME) 081 .validator(bulkImportJobParameterValidator()) 082 .start(bulkImportProcessingStep()) 083 .next(bulkImportCloseJobStep()) 084 .build(); 085 } 086 087 @Bean 088 public JobParametersValidator bulkImportJobParameterValidator() { 089 return new BulkImportJobParameterValidator(); 090 } 091 092 @Bean 093 public CreateBulkImportEntityTasklet createBulkImportEntityTasklet() { 094 return new CreateBulkImportEntityTasklet(); 095 } 096 097 @Bean 098 @JobScope 099 public ActivateBulkImportEntityStepListener activateBulkImportEntityStepListener() { 100 return new ActivateBulkImportEntityStepListener(); 101 } 102 103 @Bean 104 public Step bulkImportProcessingStep() throws Exception { 105 return myStepBuilderFactory.get(BULK_IMPORT_PROCESSING_STEP) 106 .partitioner(BULK_IMPORT_PROCESSING_STEP, bulkImportPartitioner()) 107 .partitionHandler(partitionHandler()) 108 .listener(activateBulkImportEntityStepListener()) 109 .listener(errorLisener()) 110 .gridSize(10) 111 .build(); 112 } 113 114 private ChunkAroundListener errorLisener() { 115 return new ChunkAroundListener(); 116 } 117 118 private PartitionHandler partitionHandler() throws Exception { 119 assert myTaskExecutor != null; 120 121 TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler(); 122 retVal.setStep(bulkImportProcessFilesStep()); 123 retVal.setTaskExecutor(myTaskExecutor); 124 retVal.afterPropertiesSet(); 125 return retVal; 126 } 127 128 @Bean 129 public Step bulkImportCloseJobStep() { 130 return myStepBuilderFactory.get("bulkImportCloseJobStep") 131 .tasklet(bulkImportJobCloser()) 132 .build(); 133 } 134 135 @Bean 136 @JobScope 137 public BulkImportJobCloser bulkImportJobCloser() { 138 return new BulkImportJobCloser(); 139 } 140 141 @Bean 142 @JobScope 143 public BulkImportPartitioner bulkImportPartitioner() { 144 return new BulkImportPartitioner(); 145 } 146 147 @Bean 148 public Step bulkImportProcessFilesStep() { 149 150 return myStepBuilderFactory.get("bulkImportProcessFilesStep") 151 .<ParsedBulkImportRecord, ParsedBulkImportRecord>chunk(completionPolicy()) 152 .reader(bulkImportFileReader()) 153 .writer(bulkImportFileWriter()) 154 .listener(bulkImportStepListener()) 155 .listener(completionPolicy()) 156 .faultTolerant() 157 .retryPolicy(bulkImportProcessFilesStepRetryPolicy()) 158 .build(); 159 } 160 161 private RetryPolicy bulkImportProcessFilesStepRetryPolicy() { 162 TimeoutRetryPolicy timeoutPolicy = new TimeoutRetryPolicy(); 163 timeoutPolicy.setTimeout(10000); 164 165 SimpleRetryPolicy countRetryPolicy = new SimpleRetryPolicy(myDaoConfig.getBulkImportMaxRetryCount()); 166 167 CompositeRetryPolicy compositeRetryPolicy = new CompositeRetryPolicy(); 168 compositeRetryPolicy.setPolicies(new RetryPolicy[]{timeoutPolicy, countRetryPolicy}); 169 return compositeRetryPolicy; 170 } 171 172 @Bean 173 @StepScope 174 public CompletionPolicy completionPolicy() { 175 return new BulkImportProcessStepCompletionPolicy(); 176 } 177 178 @Bean 179 @StepScope 180 public BulkImportFileWriter bulkImportFileWriter() { 181 return new BulkImportFileWriter(); 182 } 183 184 @Bean 185 @StepScope 186 public BulkImportFileReader bulkImportFileReader() { 187 return new BulkImportFileReader(); 188 } 189 190 @Bean 191 @StepScope 192 public BulkImportStepListener bulkImportStepListener() { 193 return new BulkImportStepListener(); 194 } 195 196 public static class ChunkAroundListener implements RetryProcessListener { 197 198 @Override 199 public void onRetryProcessException(Object item, Exception ex) throws Exception { 200 throw ex; 201 } 202 } 203 204}