001package ca.uhn.fhir.jpa.bulk.imprt.job;
002
003/*-
004 * #%L
005 * HAPI FHIR JPA Server
006 * %%
007 * Copyright (C) 2014 - 2021 Smile CDR, Inc.
008 * %%
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *      http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 * #L%
021 */
022
023import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
024import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
025import ca.uhn.fhir.jpa.batch.config.BatchConstants;
026import ca.uhn.fhir.jpa.bulk.imprt.model.JobFileRowProcessingModeEnum;
027import ca.uhn.fhir.jpa.bulk.imprt.model.ParsedBulkImportRecord;
028import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
029import ca.uhn.fhir.util.StopWatch;
030import org.hl7.fhir.instance.model.api.IBaseResource;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033import org.springframework.batch.item.ItemWriter;
034import org.springframework.beans.factory.annotation.Autowired;
035import org.springframework.beans.factory.annotation.Value;
036import org.springframework.transaction.support.TransactionSynchronizationManager;
037
038import java.util.List;
039
040public class BulkImportFileWriter implements ItemWriter<ParsedBulkImportRecord> {
041
042        private static final Logger ourLog = LoggerFactory.getLogger(BulkImportFileWriter.class);
043        @Value("#{stepExecutionContext['" + BatchConstants.JOB_UUID_PARAMETER + "']}")
044        private String myJobUuid;
045        @Value("#{stepExecutionContext['" + BulkImportPartitioner.FILE_INDEX + "']}")
046        private int myFileIndex;
047        @Value("#{stepExecutionContext['" + BulkImportPartitioner.ROW_PROCESSING_MODE + "']}")
048        private JobFileRowProcessingModeEnum myRowProcessingMode;
049        @Autowired
050        private DaoRegistry myDaoRegistry;
051
052        @SuppressWarnings({"SwitchStatementWithTooFewBranches", "rawtypes", "unchecked"})
053        @Override
054        public void write(List<? extends ParsedBulkImportRecord> theItemLists) throws Exception {
055                assert TransactionSynchronizationManager.isActualTransactionActive();
056
057                String offsets = "unknown";
058                if (theItemLists.size() > 0) {
059                        offsets = theItemLists.get(0).getLineIndex() + " - " + theItemLists.get(theItemLists.size()-1).getLineIndex();
060                }
061
062                ourLog.info("Beginning bulk import write {} rows Job[{}] FileIndex[{}] Offset[{}]", theItemLists.size(), myJobUuid, myFileIndex, offsets);
063                StopWatch sw = new StopWatch();
064
065                for (ParsedBulkImportRecord nextItem : theItemLists) {
066
067                        SystemRequestDetails requestDetails = new SystemRequestDetails();
068                        requestDetails.setTenantId(nextItem.getTenantName());
069
070                        // Yeah this is a lame switch - We'll add more later I swear
071                        switch (myRowProcessingMode) {
072                                default:
073                                case FHIR_TRANSACTION:
074                                        IFhirSystemDao systemDao = myDaoRegistry.getSystemDao();
075                                        IBaseResource inputBundle = nextItem.getRowContent();
076                                        systemDao.transactionNested(requestDetails, inputBundle);
077                                        break;
078                        }
079
080                }
081
082                ourLog.info("Completed bulk import write {} rows Job[{}] FileIndex[{}] Offset[{}] in {}", theItemLists.size(), myJobUuid, myFileIndex, offsets, sw);
083        }
084
085}