001package ca.uhn.fhir.jpa.bulk.imprt.job;
002
003/*-
004 * #%L
005 * HAPI FHIR JPA Server
006 * %%
007 * Copyright (C) 2014 - 2022 Smile CDR, Inc.
008 * %%
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *      http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 * #L%
021 */
022
023import ca.uhn.fhir.jpa.api.config.DaoConfig;
024import ca.uhn.fhir.jpa.batch.config.BatchConstants;
025import ca.uhn.fhir.jpa.bulk.imprt.model.ParsedBulkImportRecord;
026import org.slf4j.Logger;
027import org.slf4j.LoggerFactory;
028import org.springframework.batch.core.Job;
029import org.springframework.batch.core.JobParametersValidator;
030import org.springframework.batch.core.Step;
031import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
032import org.springframework.batch.core.configuration.annotation.JobScope;
033import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
034import org.springframework.batch.core.configuration.annotation.StepScope;
035import org.springframework.batch.core.partition.PartitionHandler;
036import org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler;
037import org.springframework.batch.core.step.item.KeyGenerator;
038import org.springframework.batch.item.ItemWriter;
039import org.springframework.batch.repeat.CompletionPolicy;
040import org.springframework.batch.repeat.RepeatContext;
041import org.springframework.batch.repeat.exception.ExceptionHandler;
042import org.springframework.beans.factory.annotation.Autowired;
043import org.springframework.beans.factory.annotation.Qualifier;
044import org.springframework.context.annotation.Bean;
045import org.springframework.context.annotation.Configuration;
046import org.springframework.context.annotation.Lazy;
047import org.springframework.core.task.TaskExecutor;
048import org.springframework.retry.RetryPolicy;
049import org.springframework.retry.policy.CompositeRetryPolicy;
050import org.springframework.retry.policy.SimpleRetryPolicy;
051import org.springframework.retry.policy.TimeoutRetryPolicy;
052
053import javax.batch.api.chunk.listener.RetryProcessListener;
054
055import static ca.uhn.fhir.jpa.batch.config.BatchConstants.BULK_IMPORT_JOB_NAME;
056import static ca.uhn.fhir.jpa.batch.config.BatchConstants.BULK_IMPORT_PROCESSING_STEP;
057
058/**
059 * Spring batch Job configuration file. Contains all necessary plumbing to run a
060 * Bulk Export job.
061 */
062@Configuration
063public class BulkImportJobConfig {
064
065        public static final String JOB_PARAM_COMMIT_INTERVAL = "commitInterval";
066        private static final Logger ourLog = LoggerFactory.getLogger(BulkImportJobConfig.class);
067        @Autowired
068        private StepBuilderFactory myStepBuilderFactory;
069        @Autowired
070        private JobBuilderFactory myJobBuilderFactory;
071        @Autowired
072        @Qualifier(BatchConstants.JOB_LAUNCHING_TASK_EXECUTOR)
073        private TaskExecutor myTaskExecutor;
074        @Autowired
075        private DaoConfig myDaoConfig;
076
077        @Bean(name = BULK_IMPORT_JOB_NAME)
078        @Lazy
079        public Job bulkImportJob() throws Exception {
080                return myJobBuilderFactory.get(BULK_IMPORT_JOB_NAME)
081                        .validator(bulkImportJobParameterValidator())
082                        .start(bulkImportProcessingStep())
083                        .next(bulkImportCloseJobStep())
084                        .build();
085        }
086
087        @Bean
088        public JobParametersValidator bulkImportJobParameterValidator() {
089                return new BulkImportJobParameterValidator();
090        }
091
092        @Bean
093        public CreateBulkImportEntityTasklet createBulkImportEntityTasklet() {
094                return new CreateBulkImportEntityTasklet();
095        }
096
097        @Bean
098        @JobScope
099        public ActivateBulkImportEntityStepListener activateBulkImportEntityStepListener() {
100                return new ActivateBulkImportEntityStepListener();
101        }
102
103        @Bean
104        public Step bulkImportProcessingStep() throws Exception {
105                return myStepBuilderFactory.get(BULK_IMPORT_PROCESSING_STEP)
106                        .partitioner(BULK_IMPORT_PROCESSING_STEP, bulkImportPartitioner())
107                        .partitionHandler(partitionHandler())
108                        .listener(activateBulkImportEntityStepListener())
109                        .listener(errorLisener())
110                        .gridSize(10)
111                        .build();
112        }
113
114        private ChunkAroundListener errorLisener() {
115                return new ChunkAroundListener();
116        }
117
118        private PartitionHandler partitionHandler() throws Exception {
119                assert myTaskExecutor != null;
120
121                TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
122                retVal.setStep(bulkImportProcessFilesStep());
123                retVal.setTaskExecutor(myTaskExecutor);
124                retVal.afterPropertiesSet();
125                return retVal;
126        }
127
128        @Bean
129        public Step bulkImportCloseJobStep() {
130                return myStepBuilderFactory.get("bulkImportCloseJobStep")
131                        .tasklet(bulkImportJobCloser())
132                        .build();
133        }
134
135        @Bean
136        @JobScope
137        public BulkImportJobCloser bulkImportJobCloser() {
138                return new BulkImportJobCloser();
139        }
140
141        @Bean
142        @JobScope
143        public BulkImportPartitioner bulkImportPartitioner() {
144                return new BulkImportPartitioner();
145        }
146
147        @Bean
148        public Step bulkImportProcessFilesStep() {
149
150                return myStepBuilderFactory.get("bulkImportProcessFilesStep")
151                        .<ParsedBulkImportRecord, ParsedBulkImportRecord>chunk(completionPolicy())
152                        .reader(bulkImportFileReader())
153                        .writer(bulkImportFileWriter())
154                        .listener(bulkImportStepListener())
155                        .listener(completionPolicy())
156                        .faultTolerant()
157                        .retryPolicy(bulkImportProcessFilesStepRetryPolicy())
158                        .build();
159        }
160
161        private RetryPolicy bulkImportProcessFilesStepRetryPolicy() {
162                TimeoutRetryPolicy timeoutPolicy = new TimeoutRetryPolicy();
163                timeoutPolicy.setTimeout(10000);
164
165                SimpleRetryPolicy countRetryPolicy = new SimpleRetryPolicy(myDaoConfig.getBulkImportMaxRetryCount());
166
167                CompositeRetryPolicy compositeRetryPolicy = new CompositeRetryPolicy();
168                compositeRetryPolicy.setPolicies(new RetryPolicy[]{timeoutPolicy, countRetryPolicy});
169                return compositeRetryPolicy;
170        }
171
172        @Bean
173        @StepScope
174        public CompletionPolicy completionPolicy() {
175                return new BulkImportProcessStepCompletionPolicy();
176        }
177
178        @Bean
179        @StepScope
180        public BulkImportFileWriter bulkImportFileWriter() {
181                return new BulkImportFileWriter();
182        }
183
184        @Bean
185        @StepScope
186        public BulkImportFileReader bulkImportFileReader() {
187                return new BulkImportFileReader();
188        }
189
190        @Bean
191        @StepScope
192        public BulkImportStepListener bulkImportStepListener() {
193                return new BulkImportStepListener();
194        }
195
196        public static class ChunkAroundListener implements RetryProcessListener {
197
198                @Override
199                public void onRetryProcessException(Object item, Exception ex) throws Exception {
200                        throw ex;
201                }
202        }
203
204}