001package ca.uhn.fhir.jpa.bulk.imprt.svc;
002
003/*-
004 * #%L
005 * HAPI FHIR JPA Server
006 * %%
007 * Copyright (C) 2014 - 2022 Smile CDR, Inc.
008 * %%
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *      http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 * #L%
021 */
022
023import ca.uhn.fhir.jpa.api.config.DaoConfig;
024import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
025import ca.uhn.fhir.jpa.batch.config.BatchConstants;
026import ca.uhn.fhir.jpa.batch.log.Logs;
027import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc;
028import ca.uhn.fhir.jpa.bulk.imprt.job.BulkImportJobConfig;
029import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobFileJson;
030import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobJson;
031import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobStatusEnum;
032import ca.uhn.fhir.jpa.dao.data.IBulkImportJobDao;
033import ca.uhn.fhir.jpa.dao.data.IBulkImportJobFileDao;
034import ca.uhn.fhir.jpa.entity.BulkImportJobEntity;
035import ca.uhn.fhir.jpa.entity.BulkImportJobFileEntity;
036import ca.uhn.fhir.jpa.model.sched.HapiJob;
037import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
038import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
039import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
040import ca.uhn.fhir.util.ValidateUtil;
041import org.apache.commons.lang3.time.DateUtils;
042import org.quartz.JobExecutionContext;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045import org.springframework.batch.core.JobParametersBuilder;
046import org.springframework.batch.core.JobParametersInvalidException;
047import org.springframework.beans.factory.annotation.Autowired;
048import org.springframework.beans.factory.annotation.Qualifier;
049import org.springframework.data.domain.PageRequest;
050import org.springframework.data.domain.Pageable;
051import org.springframework.data.domain.Slice;
052import org.springframework.transaction.PlatformTransactionManager;
053import org.springframework.transaction.support.TransactionTemplate;
054
055import javax.annotation.Nonnull;
056import javax.annotation.PostConstruct;
057import javax.transaction.Transactional;
058import java.util.List;
059import java.util.Objects;
060import java.util.Optional;
061import java.util.UUID;
062import java.util.concurrent.Semaphore;
063
064import static org.apache.commons.lang3.StringUtils.isNotBlank;
065
066public class BulkDataImportSvcImpl implements IBulkDataImportSvc {
067        private static final Logger ourLog = LoggerFactory.getLogger(BulkDataImportSvcImpl.class);
068        private final Semaphore myRunningJobSemaphore = new Semaphore(1);
069        @Autowired
070        private IBulkImportJobDao myJobDao;
071        @Autowired
072        private IBulkImportJobFileDao myJobFileDao;
073        @Autowired
074        private PlatformTransactionManager myTxManager;
075        private TransactionTemplate myTxTemplate;
076        @Autowired
077        private ISchedulerService mySchedulerService;
078        @Autowired
079        private IBatchJobSubmitter myJobSubmitter;
080        @Autowired
081        @Qualifier(BatchConstants.BULK_IMPORT_JOB_NAME)
082        private org.springframework.batch.core.Job myBulkImportJob;
083        @Autowired
084        private DaoConfig myDaoConfig;
085
086        @PostConstruct
087        public void start() {
088                myTxTemplate = new TransactionTemplate(myTxManager);
089
090                // This job should be local so that each node in the cluster can pick up jobs
091                ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
092                jobDetail.setId(ActivationJob.class.getName());
093                jobDetail.setJobClass(ActivationJob.class);
094                mySchedulerService.scheduleLocalJob(10 * DateUtils.MILLIS_PER_SECOND, jobDetail);
095        }
096
097        @Override
098        @Transactional
099        public String createNewJob(BulkImportJobJson theJobDescription, @Nonnull List<BulkImportJobFileJson> theInitialFiles) {
100                ValidateUtil.isNotNullOrThrowUnprocessableEntity(theJobDescription, "Job must not be null");
101                ValidateUtil.isNotNullOrThrowUnprocessableEntity(theJobDescription.getProcessingMode(), "Job File Processing mode must not be null");
102                ValidateUtil.isTrueOrThrowInvalidRequest(theJobDescription.getBatchSize() > 0, "Job File Batch Size must be > 0");
103
104                String jobId = UUID.randomUUID().toString();
105
106                ourLog.info("Creating new Bulk Import job with {} files, assigning job ID: {}", theInitialFiles.size(), jobId);
107
108                BulkImportJobEntity job = new BulkImportJobEntity();
109                job.setJobId(jobId);
110                job.setFileCount(theInitialFiles.size());
111                job.setStatus(BulkImportJobStatusEnum.STAGING);
112                job.setJobDescription(theJobDescription.getJobDescription());
113                job.setBatchSize(theJobDescription.getBatchSize());
114                job.setRowProcessingMode(theJobDescription.getProcessingMode());
115                job = myJobDao.save(job);
116
117                int nextSequence = 0;
118                addFilesToJob(theInitialFiles, job, nextSequence);
119
120                return jobId;
121        }
122
123        @Override
124        @Transactional
125        public void addFilesToJob(String theJobId, List<BulkImportJobFileJson> theFiles) {
126                ourLog.info("Adding {} files to bulk import job: {}", theFiles.size(), theJobId);
127
128                BulkImportJobEntity job = findJobByJobId(theJobId);
129
130                ValidateUtil.isTrueOrThrowInvalidRequest(job.getStatus() == BulkImportJobStatusEnum.STAGING, "Job %s has status %s and can not be added to", theJobId, job.getStatus());
131
132                addFilesToJob(theFiles, job, job.getFileCount());
133
134                job.setFileCount(job.getFileCount() + theFiles.size());
135                myJobDao.save(job);
136        }
137
138        private BulkImportJobEntity findJobByJobId(String theJobId) {
139                BulkImportJobEntity job = myJobDao
140                        .findByJobId(theJobId)
141                        .orElseThrow(() -> new InvalidRequestException("Unknown job ID: " + theJobId));
142                return job;
143        }
144
145        @Override
146        @Transactional
147        public void markJobAsReadyForActivation(String theJobId) {
148                ourLog.info("Activating bulk import job {}", theJobId);
149
150                BulkImportJobEntity job = findJobByJobId(theJobId);
151                ValidateUtil.isTrueOrThrowInvalidRequest(job.getStatus() == BulkImportJobStatusEnum.STAGING, "Bulk import job %s can not be activated in status: %s", theJobId, job.getStatus());
152
153                job.setStatus(BulkImportJobStatusEnum.READY);
154                myJobDao.save(job);
155        }
156
157        /**
158         * To be called by the job scheduler
159         */
160        @Transactional(value = Transactional.TxType.NEVER)
161        @Override
162        public boolean activateNextReadyJob() {
163                if (!myDaoConfig.isEnableTaskBulkImportJobExecution()) {
164                        Logs.getBatchTroubleshootingLog().trace("Bulk import job execution is not enabled on this server. No action taken.");
165                        return false;
166                }
167
168                if (!myRunningJobSemaphore.tryAcquire()) {
169                        Logs.getBatchTroubleshootingLog().trace("Already have a running batch job, not going to check for more");
170                        return false;
171                }
172
173                try {
174                        return doActivateNextReadyJob();
175                } finally {
176                        myRunningJobSemaphore.release();
177                }
178        }
179
180        private boolean doActivateNextReadyJob() {
181                Optional<BulkImportJobEntity> jobToProcessOpt = Objects.requireNonNull(myTxTemplate.execute(t -> {
182                        Pageable page = PageRequest.of(0, 1);
183                        Slice<BulkImportJobEntity> submittedJobs = myJobDao.findByStatus(page, BulkImportJobStatusEnum.READY);
184                        if (submittedJobs.isEmpty()) {
185                                return Optional.empty();
186                        }
187                        return Optional.of(submittedJobs.getContent().get(0));
188                }));
189
190                if (!jobToProcessOpt.isPresent()) {
191                        return false;
192                }
193
194                BulkImportJobEntity bulkImportJobEntity = jobToProcessOpt.get();
195
196                String jobUuid = bulkImportJobEntity.getJobId();
197                try {
198                        processJob(bulkImportJobEntity);
199                } catch (Exception e) {
200                        ourLog.error("Failure while preparing bulk export extract", e);
201                        myTxTemplate.execute(t -> {
202                                Optional<BulkImportJobEntity> submittedJobs = myJobDao.findByJobId(jobUuid);
203                                if (submittedJobs.isPresent()) {
204                                        BulkImportJobEntity jobEntity = submittedJobs.get();
205                                        jobEntity.setStatus(BulkImportJobStatusEnum.ERROR);
206                                        jobEntity.setStatusMessage(e.getMessage());
207                                        myJobDao.save(jobEntity);
208                                }
209                                return false;
210                        });
211                }
212
213                return true;
214        }
215
216        @Override
217        @Transactional
218        public void setJobToStatus(String theJobId, BulkImportJobStatusEnum theStatus) {
219                setJobToStatus(theJobId, theStatus, null);
220        }
221
222        @Override
223        public void setJobToStatus(String theJobId, BulkImportJobStatusEnum theStatus, String theStatusMessage) {
224                BulkImportJobEntity job = findJobByJobId(theJobId);
225                job.setStatus(theStatus);
226                job.setStatusMessage(theStatusMessage);
227                myJobDao.save(job);
228        }
229
230        @Override
231        @Transactional
232        public BulkImportJobJson fetchJob(String theJobId) {
233                BulkImportJobEntity job = findJobByJobId(theJobId);
234                return job.toJson();
235        }
236
237        @Override
238        public JobInfo getJobStatus(String theJobId) {
239                BulkImportJobEntity theJob = findJobByJobId(theJobId);
240                return new JobInfo()
241                        .setStatus(theJob.getStatus())
242                        .setStatusMessage(theJob.getStatusMessage())
243                        .setStatusTime(theJob.getStatusTime());
244        }
245
246        @Transactional
247        @Override
248        public BulkImportJobFileJson fetchFile(String theJobId, int theFileIndex) {
249                BulkImportJobEntity job = findJobByJobId(theJobId);
250
251                return myJobFileDao
252                        .findForJob(job, theFileIndex)
253                        .map(t -> t.toJson())
254                        .orElseThrow(() -> new IllegalArgumentException("Invalid index " + theFileIndex + " for job " + theJobId));
255        }
256
257        @Transactional
258        @Override
259        public String getFileDescription(String theJobId, int theFileIndex) {
260                BulkImportJobEntity job = findJobByJobId(theJobId);
261
262                return myJobFileDao.findFileDescriptionForJob(job, theFileIndex).orElse("");
263        }
264
265        @Override
266        @Transactional
267        public void deleteJobFiles(String theJobId) {
268                BulkImportJobEntity job = findJobByJobId(theJobId);
269                List<Long> files = myJobFileDao.findAllIdsForJob(theJobId);
270                for (Long next : files) {
271                        myJobFileDao.deleteById(next);
272                }
273                myJobDao.delete(job);
274        }
275
276        private void processJob(BulkImportJobEntity theBulkExportJobEntity) throws JobParametersInvalidException {
277                String jobId = theBulkExportJobEntity.getJobId();
278                int batchSize = theBulkExportJobEntity.getBatchSize();
279                ValidateUtil.isTrueOrThrowInvalidRequest(batchSize > 0, "Batch size must be positive");
280
281                JobParametersBuilder parameters = new JobParametersBuilder()
282                        .addString(BatchConstants.JOB_UUID_PARAMETER, jobId)
283                        .addLong(BulkImportJobConfig.JOB_PARAM_COMMIT_INTERVAL, (long) batchSize);
284
285                if (isNotBlank(theBulkExportJobEntity.getJobDescription())) {
286                        parameters.addString(BatchConstants.JOB_DESCRIPTION, theBulkExportJobEntity.getJobDescription());
287                }
288
289                ourLog.info("Submitting bulk import job {} to job scheduler", jobId);
290
291                myJobSubmitter.runJob(myBulkImportJob, parameters.toJobParameters());
292        }
293
294        private void addFilesToJob(@Nonnull List<BulkImportJobFileJson> theInitialFiles, BulkImportJobEntity job, int nextSequence) {
295                for (BulkImportJobFileJson nextFile : theInitialFiles) {
296                        ValidateUtil.isNotBlankOrThrowUnprocessableEntity(nextFile.getContents(), "Job File Contents mode must not be null");
297
298                        BulkImportJobFileEntity jobFile = new BulkImportJobFileEntity();
299                        jobFile.setJob(job);
300                        jobFile.setContents(nextFile.getContents());
301                        jobFile.setTenantName(nextFile.getTenantName());
302                        jobFile.setFileDescription(nextFile.getDescription());
303                        jobFile.setFileSequence(nextSequence++);
304                        myJobFileDao.save(jobFile);
305                }
306        }
307
308
309        public static class ActivationJob implements HapiJob {
310                @Autowired
311                private IBulkDataImportSvc myTarget;
312
313                @Override
314                public void execute(JobExecutionContext theContext) {
315                        myTarget.activateNextReadyJob();
316                }
317        }
318}