001package ca.uhn.fhir.jpa.bulk.export.job;
002
003/*-
004 * #%L
005 * HAPI FHIR JPA Server
006 * %%
007 * Copyright (C) 2014 - 2021 Smile CDR, Inc.
008 * %%
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *      http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 * #L%
021 */
022
023import ca.uhn.fhir.context.FhirContext;
024import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
025import ca.uhn.fhir.jpa.batch.config.BatchConstants;
026import ca.uhn.fhir.jpa.batch.processor.GoldenResourceAnnotatingProcessor;
027import ca.uhn.fhir.jpa.batch.processor.PidToIBaseResourceProcessor;
028import ca.uhn.fhir.jpa.bulk.export.svc.BulkExportDaoSvc;
029import ca.uhn.fhir.jpa.dao.mdm.MdmExpansionCacheSvc;
030import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
031import org.hl7.fhir.instance.model.api.IBaseResource;
032import org.springframework.batch.core.Job;
033import org.springframework.batch.core.JobParametersValidator;
034import org.springframework.batch.core.Step;
035import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
036import org.springframework.batch.core.configuration.annotation.JobScope;
037import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
038import org.springframework.batch.core.configuration.annotation.StepScope;
039import org.springframework.batch.item.ItemProcessor;
040import org.springframework.batch.item.support.CompositeItemProcessor;
041import org.springframework.beans.factory.annotation.Autowired;
042import org.springframework.context.annotation.Bean;
043import org.springframework.context.annotation.Configuration;
044import org.springframework.context.annotation.Lazy;
045
046import java.util.ArrayList;
047import java.util.List;
048
049/**
050 * Spring batch Job configuration file. Contains all necessary plumbing to run a
051 * Bulk Export job.
052 */
053@Configuration
054public class BulkExportJobConfig {
055
056        public static final String READ_CHUNK_PARAMETER = "readChunkSize";
057        public static final String EXPAND_MDM_PARAMETER = "expandMdm";
058        public static final String GROUP_ID_PARAMETER = "groupId";
059        public static final String RESOURCE_TYPES_PARAMETER = "resourceTypes";
060        public static final int CHUNK_SIZE = 100;
061        public static final String JOB_DESCRIPTION = "jobDescription";
062
063        @Autowired
064        private FhirContext myFhirContext;
065
066        @Autowired
067        private DaoRegistry myDaoRegistry;
068
069        @Autowired
070        private StepBuilderFactory myStepBuilderFactory;
071
072        @Autowired
073        private JobBuilderFactory myJobBuilderFactory;
074
075        @Autowired
076        private PidToIBaseResourceProcessor myPidToIBaseResourceProcessor;
077
078        @Autowired
079        private GoldenResourceAnnotatingProcessor myGoldenResourceAnnotatingProcessor;
080
081        @Bean
082        public BulkExportDaoSvc bulkExportDaoSvc() {
083                return new BulkExportDaoSvc();
084        }
085
086
087        @Bean
088        @Lazy
089        @JobScope
090        public MdmExpansionCacheSvc mdmExpansionCacheSvc() {
091                return new MdmExpansionCacheSvc();
092        }
093
094
095        @Bean
096        @Lazy
097        public Job bulkExportJob() {
098                return myJobBuilderFactory.get(BatchConstants.BULK_EXPORT_JOB_NAME)
099                        .validator(bulkExportJobParameterValidator())
100                        .start(createBulkExportEntityStep())
101                        .next(bulkExportPartitionStep())
102                        .next(closeJobStep())
103                        .build();
104        }
105
106        @Bean
107        @Lazy
108        @StepScope
109        public CompositeItemProcessor<List<ResourcePersistentId>, List<IBaseResource>> inflateResourceThenAnnotateWithGoldenResourceProcessor() {
110                CompositeItemProcessor processor = new CompositeItemProcessor<>();
111                ArrayList<ItemProcessor> delegates = new ArrayList<>();
112                delegates.add(myPidToIBaseResourceProcessor);
113                delegates.add(myGoldenResourceAnnotatingProcessor);
114                processor.setDelegates(delegates);
115                return processor;
116        }
117
118        @Bean
119        @Lazy
120        public Job groupBulkExportJob() {
121                return myJobBuilderFactory.get(BatchConstants.GROUP_BULK_EXPORT_JOB_NAME)
122                        .validator(groupBulkJobParameterValidator())
123                        .validator(bulkExportJobParameterValidator())
124                        .start(createBulkExportEntityStep())
125                        .next(groupPartitionStep())
126                        .next(closeJobStep())
127                        .build();
128        }
129
130        @Bean
131        @Lazy
132        public Job patientBulkExportJob() {
133                return myJobBuilderFactory.get(BatchConstants.PATIENT_BULK_EXPORT_JOB_NAME)
134                        .validator(bulkExportJobParameterValidator())
135                        .start(createBulkExportEntityStep())
136                        .next(patientPartitionStep())
137                        .next(closeJobStep())
138                        .build();
139        }
140
141        @Bean
142        public GroupIdPresentValidator groupBulkJobParameterValidator() {
143                return new GroupIdPresentValidator();
144        }
145
146        @Bean
147        public Step createBulkExportEntityStep() {
148                return myStepBuilderFactory.get("createBulkExportEntityStep")
149                        .tasklet(createBulkExportEntityTasklet())
150                        .listener(bulkExportCreateEntityStepListener())
151                        .build();
152        }
153
154        @Bean
155        public CreateBulkExportEntityTasklet createBulkExportEntityTasklet() {
156                return new CreateBulkExportEntityTasklet();
157        }
158
159
160        @Bean
161        public JobParametersValidator bulkExportJobParameterValidator() {
162                return new BulkExportJobParameterValidator();
163        }
164
165        //Writers
166        @Bean
167        public Step groupBulkExportGenerateResourceFilesStep() {
168                return myStepBuilderFactory.get("groupBulkExportGenerateResourceFilesStep")
169                        .<List<ResourcePersistentId>, List<IBaseResource>>chunk(CHUNK_SIZE) //1000 resources per generated file, as the reader returns 10 resources at a time.
170                        .reader(groupBulkItemReader())
171                        .processor(inflateResourceThenAnnotateWithGoldenResourceProcessor())
172                        .writer(resourceToFileWriter())
173                        .listener(bulkExportGenerateResourceFilesStepListener())
174                        .build();
175        }
176
177        @Bean
178        public Step bulkExportGenerateResourceFilesStep() {
179                return myStepBuilderFactory.get(BatchConstants.BULK_EXPORT_GENERATE_RESOURCE_FILES_STEP)
180                        .<List<ResourcePersistentId>, List<IBaseResource>>chunk(CHUNK_SIZE) //1000 resources per generated file, as the reader returns 10 resources at a time.
181                        .reader(bulkItemReader())
182                        .processor(myPidToIBaseResourceProcessor)
183                        .writer(resourceToFileWriter())
184                        .listener(bulkExportGenerateResourceFilesStepListener())
185                        .build();
186        }
187
188        @Bean
189        public Step patientBulkExportGenerateResourceFilesStep() {
190                return myStepBuilderFactory.get("patientBulkExportGenerateResourceFilesStep")
191                        .<List<ResourcePersistentId>, List<IBaseResource>>chunk(CHUNK_SIZE) //1000 resources per generated file, as the reader returns 10 resources at a time.
192                        .reader(patientBulkItemReader())
193                        .processor(myPidToIBaseResourceProcessor)
194                        .writer(resourceToFileWriter())
195                        .listener(bulkExportGenerateResourceFilesStepListener())
196                        .build();
197        }
198
199        @Bean
200        @JobScope
201        public BulkExportJobCloser bulkExportJobCloser() {
202                return new BulkExportJobCloser();
203        }
204
205        @Bean
206        public Step closeJobStep() {
207                return myStepBuilderFactory.get("closeJobStep")
208                        .tasklet(bulkExportJobCloser())
209                        .build();
210        }
211
212        @Bean
213        @JobScope
214        public BulkExportCreateEntityStepListener bulkExportCreateEntityStepListener() {
215                return new BulkExportCreateEntityStepListener();
216        }
217
218        @Bean
219        @JobScope
220        public BulkExportGenerateResourceFilesStepListener bulkExportGenerateResourceFilesStepListener() {
221                return new BulkExportGenerateResourceFilesStepListener();
222        }
223
224        @Bean
225        public Step bulkExportPartitionStep() {
226                return myStepBuilderFactory.get("partitionStep")
227                        .partitioner(BatchConstants.BULK_EXPORT_GENERATE_RESOURCE_FILES_STEP, bulkExportResourceTypePartitioner())
228                        .step(bulkExportGenerateResourceFilesStep())
229                        .build();
230        }
231
232        @Bean
233        public Step groupPartitionStep() {
234                return myStepBuilderFactory.get("partitionStep")
235                        .partitioner("groupBulkExportGenerateResourceFilesStep", bulkExportResourceTypePartitioner())
236                        .step(groupBulkExportGenerateResourceFilesStep())
237                        .build();
238        }
239
240        @Bean
241        public Step patientPartitionStep() {
242                return myStepBuilderFactory.get("partitionStep")
243                        .partitioner("patientBulkExportGenerateResourceFilesStep", bulkExportResourceTypePartitioner())
244                        .step(patientBulkExportGenerateResourceFilesStep())
245                        .build();
246        }
247
248
249        @Bean
250        @StepScope
251        public GroupBulkItemReader groupBulkItemReader() {
252                return new GroupBulkItemReader();
253        }
254
255        @Bean
256        @StepScope
257        public PatientBulkItemReader patientBulkItemReader() {
258                return new PatientBulkItemReader();
259        }
260
261        @Bean
262        @StepScope
263        public BulkItemReader bulkItemReader() {
264                return new BulkItemReader();
265        }
266
267        @Bean
268        @JobScope
269        public ResourceTypePartitioner bulkExportResourceTypePartitioner() {
270                return new ResourceTypePartitioner();
271        }
272
273        @Bean
274        @StepScope
275        public ResourceToFileWriter resourceToFileWriter() {
276                return new ResourceToFileWriter(myFhirContext, myDaoRegistry);
277        }
278
279}