
001package ca.uhn.fhir.jpa.bulk.imprt.svc; 002 003/*- 004 * #%L 005 * HAPI FHIR JPA Server 006 * %% 007 * Copyright (C) 2014 - 2022 Smile CDR, Inc. 008 * %% 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 * #L% 021 */ 022 023import ca.uhn.fhir.jpa.api.config.DaoConfig; 024import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter; 025import ca.uhn.fhir.jpa.batch.config.BatchConstants; 026import ca.uhn.fhir.jpa.batch.log.Logs; 027import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc; 028import ca.uhn.fhir.jpa.bulk.imprt.job.BulkImportJobConfig; 029import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobFileJson; 030import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobJson; 031import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobStatusEnum; 032import ca.uhn.fhir.jpa.dao.data.IBulkImportJobDao; 033import ca.uhn.fhir.jpa.dao.data.IBulkImportJobFileDao; 034import ca.uhn.fhir.jpa.entity.BulkImportJobEntity; 035import ca.uhn.fhir.jpa.entity.BulkImportJobFileEntity; 036import ca.uhn.fhir.jpa.model.sched.HapiJob; 037import ca.uhn.fhir.jpa.model.sched.ISchedulerService; 038import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; 039import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; 040import ca.uhn.fhir.util.ValidateUtil; 041import org.apache.commons.lang3.time.DateUtils; 042import org.quartz.JobExecutionContext; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045import org.springframework.batch.core.JobParametersBuilder; 046import org.springframework.batch.core.JobParametersInvalidException; 047import org.springframework.beans.factory.annotation.Autowired; 048import org.springframework.beans.factory.annotation.Qualifier; 049import org.springframework.data.domain.PageRequest; 050import org.springframework.data.domain.Pageable; 051import org.springframework.data.domain.Slice; 052import org.springframework.transaction.PlatformTransactionManager; 053import org.springframework.transaction.support.TransactionTemplate; 054 055import javax.annotation.Nonnull; 056import javax.annotation.PostConstruct; 057import javax.transaction.Transactional; 058import java.util.List; 059import java.util.Objects; 060import java.util.Optional; 061import java.util.UUID; 062import java.util.concurrent.Semaphore; 063 064import static org.apache.commons.lang3.StringUtils.isNotBlank; 065 066public class BulkDataImportSvcImpl implements IBulkDataImportSvc { 067 private static final Logger ourLog = LoggerFactory.getLogger(BulkDataImportSvcImpl.class); 068 private final Semaphore myRunningJobSemaphore = new Semaphore(1); 069 @Autowired 070 private IBulkImportJobDao myJobDao; 071 @Autowired 072 private IBulkImportJobFileDao myJobFileDao; 073 @Autowired 074 private PlatformTransactionManager myTxManager; 075 private TransactionTemplate myTxTemplate; 076 @Autowired 077 private ISchedulerService mySchedulerService; 078 @Autowired 079 private IBatchJobSubmitter myJobSubmitter; 080 @Autowired 081 @Qualifier(BatchConstants.BULK_IMPORT_JOB_NAME) 082 private org.springframework.batch.core.Job myBulkImportJob; 083 @Autowired 084 private DaoConfig myDaoConfig; 085 086 @PostConstruct 087 public void start() { 088 myTxTemplate = new TransactionTemplate(myTxManager); 089 090 // This job should be local so that each node in the cluster can pick up jobs 091 ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); 092 jobDetail.setId(ActivationJob.class.getName()); 093 jobDetail.setJobClass(ActivationJob.class); 094 mySchedulerService.scheduleLocalJob(10 * DateUtils.MILLIS_PER_SECOND, jobDetail); 095 } 096 097 @Override 098 @Transactional 099 public String createNewJob(BulkImportJobJson theJobDescription, @Nonnull List<BulkImportJobFileJson> theInitialFiles) { 100 ValidateUtil.isNotNullOrThrowUnprocessableEntity(theJobDescription, "Job must not be null"); 101 ValidateUtil.isNotNullOrThrowUnprocessableEntity(theJobDescription.getProcessingMode(), "Job File Processing mode must not be null"); 102 ValidateUtil.isTrueOrThrowInvalidRequest(theJobDescription.getBatchSize() > 0, "Job File Batch Size must be > 0"); 103 104 String jobId = UUID.randomUUID().toString(); 105 106 ourLog.info("Creating new Bulk Import job with {} files, assigning job ID: {}", theInitialFiles.size(), jobId); 107 108 BulkImportJobEntity job = new BulkImportJobEntity(); 109 job.setJobId(jobId); 110 job.setFileCount(theInitialFiles.size()); 111 job.setStatus(BulkImportJobStatusEnum.STAGING); 112 job.setJobDescription(theJobDescription.getJobDescription()); 113 job.setBatchSize(theJobDescription.getBatchSize()); 114 job.setRowProcessingMode(theJobDescription.getProcessingMode()); 115 job = myJobDao.save(job); 116 117 int nextSequence = 0; 118 addFilesToJob(theInitialFiles, job, nextSequence); 119 120 return jobId; 121 } 122 123 @Override 124 @Transactional 125 public void addFilesToJob(String theJobId, List<BulkImportJobFileJson> theFiles) { 126 ourLog.info("Adding {} files to bulk import job: {}", theFiles.size(), theJobId); 127 128 BulkImportJobEntity job = findJobByJobId(theJobId); 129 130 ValidateUtil.isTrueOrThrowInvalidRequest(job.getStatus() == BulkImportJobStatusEnum.STAGING, "Job %s has status %s and can not be added to", theJobId, job.getStatus()); 131 132 addFilesToJob(theFiles, job, job.getFileCount()); 133 134 job.setFileCount(job.getFileCount() + theFiles.size()); 135 myJobDao.save(job); 136 } 137 138 private BulkImportJobEntity findJobByJobId(String theJobId) { 139 BulkImportJobEntity job = myJobDao 140 .findByJobId(theJobId) 141 .orElseThrow(() -> new InvalidRequestException("Unknown job ID: " + theJobId)); 142 return job; 143 } 144 145 @Override 146 @Transactional 147 public void markJobAsReadyForActivation(String theJobId) { 148 ourLog.info("Activating bulk import job {}", theJobId); 149 150 BulkImportJobEntity job = findJobByJobId(theJobId); 151 ValidateUtil.isTrueOrThrowInvalidRequest(job.getStatus() == BulkImportJobStatusEnum.STAGING, "Bulk import job %s can not be activated in status: %s", theJobId, job.getStatus()); 152 153 job.setStatus(BulkImportJobStatusEnum.READY); 154 myJobDao.save(job); 155 } 156 157 /** 158 * To be called by the job scheduler 159 */ 160 @Transactional(value = Transactional.TxType.NEVER) 161 @Override 162 public boolean activateNextReadyJob() { 163 if (!myDaoConfig.isEnableTaskBulkImportJobExecution()) { 164 Logs.getBatchTroubleshootingLog().trace("Bulk import job execution is not enabled on this server. No action taken."); 165 return false; 166 } 167 168 if (!myRunningJobSemaphore.tryAcquire()) { 169 Logs.getBatchTroubleshootingLog().trace("Already have a running batch job, not going to check for more"); 170 return false; 171 } 172 173 try { 174 return doActivateNextReadyJob(); 175 } finally { 176 myRunningJobSemaphore.release(); 177 } 178 } 179 180 private boolean doActivateNextReadyJob() { 181 Optional<BulkImportJobEntity> jobToProcessOpt = Objects.requireNonNull(myTxTemplate.execute(t -> { 182 Pageable page = PageRequest.of(0, 1); 183 Slice<BulkImportJobEntity> submittedJobs = myJobDao.findByStatus(page, BulkImportJobStatusEnum.READY); 184 if (submittedJobs.isEmpty()) { 185 return Optional.empty(); 186 } 187 return Optional.of(submittedJobs.getContent().get(0)); 188 })); 189 190 if (!jobToProcessOpt.isPresent()) { 191 return false; 192 } 193 194 BulkImportJobEntity bulkImportJobEntity = jobToProcessOpt.get(); 195 196 String jobUuid = bulkImportJobEntity.getJobId(); 197 try { 198 processJob(bulkImportJobEntity); 199 } catch (Exception e) { 200 ourLog.error("Failure while preparing bulk export extract", e); 201 myTxTemplate.execute(t -> { 202 Optional<BulkImportJobEntity> submittedJobs = myJobDao.findByJobId(jobUuid); 203 if (submittedJobs.isPresent()) { 204 BulkImportJobEntity jobEntity = submittedJobs.get(); 205 jobEntity.setStatus(BulkImportJobStatusEnum.ERROR); 206 jobEntity.setStatusMessage(e.getMessage()); 207 myJobDao.save(jobEntity); 208 } 209 return false; 210 }); 211 } 212 213 return true; 214 } 215 216 @Override 217 @Transactional 218 public void setJobToStatus(String theJobId, BulkImportJobStatusEnum theStatus) { 219 setJobToStatus(theJobId, theStatus, null); 220 } 221 222 @Override 223 public void setJobToStatus(String theJobId, BulkImportJobStatusEnum theStatus, String theStatusMessage) { 224 BulkImportJobEntity job = findJobByJobId(theJobId); 225 job.setStatus(theStatus); 226 job.setStatusMessage(theStatusMessage); 227 myJobDao.save(job); 228 } 229 230 @Override 231 @Transactional 232 public BulkImportJobJson fetchJob(String theJobId) { 233 BulkImportJobEntity job = findJobByJobId(theJobId); 234 return job.toJson(); 235 } 236 237 @Override 238 public JobInfo getJobStatus(String theJobId) { 239 BulkImportJobEntity theJob = findJobByJobId(theJobId); 240 return new JobInfo() 241 .setStatus(theJob.getStatus()) 242 .setStatusMessage(theJob.getStatusMessage()) 243 .setStatusTime(theJob.getStatusTime()); 244 } 245 246 @Transactional 247 @Override 248 public BulkImportJobFileJson fetchFile(String theJobId, int theFileIndex) { 249 BulkImportJobEntity job = findJobByJobId(theJobId); 250 251 return myJobFileDao 252 .findForJob(job, theFileIndex) 253 .map(t -> t.toJson()) 254 .orElseThrow(() -> new IllegalArgumentException("Invalid index " + theFileIndex + " for job " + theJobId)); 255 } 256 257 @Transactional 258 @Override 259 public String getFileDescription(String theJobId, int theFileIndex) { 260 BulkImportJobEntity job = findJobByJobId(theJobId); 261 262 return myJobFileDao.findFileDescriptionForJob(job, theFileIndex).orElse(""); 263 } 264 265 @Override 266 @Transactional 267 public void deleteJobFiles(String theJobId) { 268 BulkImportJobEntity job = findJobByJobId(theJobId); 269 List<Long> files = myJobFileDao.findAllIdsForJob(theJobId); 270 for (Long next : files) { 271 myJobFileDao.deleteById(next); 272 } 273 myJobDao.delete(job); 274 } 275 276 private void processJob(BulkImportJobEntity theBulkExportJobEntity) throws JobParametersInvalidException { 277 String jobId = theBulkExportJobEntity.getJobId(); 278 int batchSize = theBulkExportJobEntity.getBatchSize(); 279 ValidateUtil.isTrueOrThrowInvalidRequest(batchSize > 0, "Batch size must be positive"); 280 281 JobParametersBuilder parameters = new JobParametersBuilder() 282 .addString(BatchConstants.JOB_UUID_PARAMETER, jobId) 283 .addLong(BulkImportJobConfig.JOB_PARAM_COMMIT_INTERVAL, (long) batchSize); 284 285 if (isNotBlank(theBulkExportJobEntity.getJobDescription())) { 286 parameters.addString(BatchConstants.JOB_DESCRIPTION, theBulkExportJobEntity.getJobDescription()); 287 } 288 289 ourLog.info("Submitting bulk import job {} to job scheduler", jobId); 290 291 myJobSubmitter.runJob(myBulkImportJob, parameters.toJobParameters()); 292 } 293 294 private void addFilesToJob(@Nonnull List<BulkImportJobFileJson> theInitialFiles, BulkImportJobEntity job, int nextSequence) { 295 for (BulkImportJobFileJson nextFile : theInitialFiles) { 296 ValidateUtil.isNotBlankOrThrowUnprocessableEntity(nextFile.getContents(), "Job File Contents mode must not be null"); 297 298 BulkImportJobFileEntity jobFile = new BulkImportJobFileEntity(); 299 jobFile.setJob(job); 300 jobFile.setContents(nextFile.getContents()); 301 jobFile.setTenantName(nextFile.getTenantName()); 302 jobFile.setFileDescription(nextFile.getDescription()); 303 jobFile.setFileSequence(nextSequence++); 304 myJobFileDao.save(jobFile); 305 } 306 } 307 308 309 public static class ActivationJob implements HapiJob { 310 @Autowired 311 private IBulkDataImportSvc myTarget; 312 313 @Override 314 public void execute(JobExecutionContext theContext) { 315 myTarget.activateNextReadyJob(); 316 } 317 } 318}