001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.bulk.imprt.svc; 021 022import ca.uhn.fhir.batch2.api.IJobCoordinator; 023import ca.uhn.fhir.batch2.importpull.models.Batch2BulkImportPullJobParameters; 024import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; 025import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; 026import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc; 027import ca.uhn.fhir.jpa.bulk.imprt.model.ActivateJobResult; 028import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobFileJson; 029import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobJson; 030import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobStatusEnum; 031import ca.uhn.fhir.jpa.dao.data.IBulkImportJobDao; 032import ca.uhn.fhir.jpa.dao.data.IBulkImportJobFileDao; 033import ca.uhn.fhir.jpa.entity.BulkImportJobEntity; 034import ca.uhn.fhir.jpa.entity.BulkImportJobFileEntity; 035import ca.uhn.fhir.jpa.model.sched.HapiJob; 036import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs; 037import ca.uhn.fhir.jpa.model.sched.ISchedulerService; 038import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; 039import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; 040import ca.uhn.fhir.util.Logs; 041import ca.uhn.fhir.util.ValidateUtil; 042import com.apicatalog.jsonld.StringUtils; 043import jakarta.annotation.Nonnull; 044import jakarta.annotation.PostConstruct; 045import org.apache.commons.lang3.time.DateUtils; 046import org.quartz.JobExecutionContext; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049import org.springframework.beans.factory.annotation.Autowired; 050import org.springframework.data.domain.PageRequest; 051import org.springframework.data.domain.Pageable; 052import org.springframework.data.domain.Slice; 053import org.springframework.transaction.PlatformTransactionManager; 054import org.springframework.transaction.annotation.Propagation; 055import org.springframework.transaction.annotation.Transactional; 056import org.springframework.transaction.support.TransactionTemplate; 057 058import java.util.List; 059import java.util.Objects; 060import java.util.Optional; 061import java.util.UUID; 062import java.util.concurrent.Semaphore; 063 064import static ca.uhn.fhir.batch2.jobs.importpull.BulkImportPullConfig.BULK_IMPORT_JOB_NAME; 065 066public class BulkDataImportSvcImpl implements IBulkDataImportSvc, IHasScheduledJobs { 067 private static final Logger ourLog = LoggerFactory.getLogger(BulkDataImportSvcImpl.class); 068 private final Semaphore myRunningJobSemaphore = new Semaphore(1); 069 070 @Autowired 071 private IBulkImportJobDao myJobDao; 072 073 @Autowired 074 private IBulkImportJobFileDao myJobFileDao; 075 076 @Autowired 077 private PlatformTransactionManager myTxManager; 078 079 private TransactionTemplate myTxTemplate; 080 081 @Autowired 082 private IJobCoordinator myJobCoordinator; 083 084 @Autowired 085 private JpaStorageSettings myStorageSettings; 086 087 @PostConstruct 088 public void start() { 089 myTxTemplate = new TransactionTemplate(myTxManager); 090 } 091 092 @Override 093 public void scheduleJobs(ISchedulerService theSchedulerService) { 094 // This job should be local so that each node in the cluster can pick up jobs 095 ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); 096 jobDetail.setId(ActivationJob.class.getName()); 097 jobDetail.setJobClass(ActivationJob.class); 098 theSchedulerService.scheduleLocalJob(10 * DateUtils.MILLIS_PER_SECOND, jobDetail); 099 } 100 101 @Override 102 @Transactional 103 public String createNewJob( 104 BulkImportJobJson theJobDescription, @Nonnull List<BulkImportJobFileJson> theInitialFiles) { 105 ValidateUtil.isNotNullOrThrowUnprocessableEntity(theJobDescription, "Job must not be null"); 106 ValidateUtil.isNotNullOrThrowUnprocessableEntity( 107 theJobDescription.getProcessingMode(), "Job File Processing mode must not be null"); 108 ValidateUtil.isTrueOrThrowInvalidRequest( 109 theJobDescription.getBatchSize() > 0, "Job File Batch Size must be > 0"); 110 111 String biJobId = UUID.randomUUID().toString(); 112 113 ourLog.info( 114 "Creating new Bulk Import job with {} files, assigning bijob ID: {}", theInitialFiles.size(), biJobId); 115 116 BulkImportJobEntity job = new BulkImportJobEntity(); 117 job.setJobId(biJobId); 118 job.setFileCount(theInitialFiles.size()); 119 job.setStatus(BulkImportJobStatusEnum.STAGING); 120 job.setJobDescription(theJobDescription.getJobDescription()); 121 job.setBatchSize(theJobDescription.getBatchSize()); 122 job.setRowProcessingMode(theJobDescription.getProcessingMode()); 123 job = myJobDao.save(job); 124 125 int nextSequence = 0; 126 addFilesToJob(theInitialFiles, job, nextSequence); 127 128 return biJobId; 129 } 130 131 @Override 132 @Transactional 133 public void addFilesToJob(String theBiJobId, List<BulkImportJobFileJson> theFiles) { 134 ourLog.info("Adding {} files to bulk import job with bijob id {}", theFiles.size(), theBiJobId); 135 136 BulkImportJobEntity job = findJobByBiJobId(theBiJobId); 137 138 ValidateUtil.isTrueOrThrowInvalidRequest( 139 job.getStatus() == BulkImportJobStatusEnum.STAGING, 140 "bijob id %s has status %s and can not be added to", 141 theBiJobId, 142 job.getStatus()); 143 144 addFilesToJob(theFiles, job, job.getFileCount()); 145 146 job.setFileCount(job.getFileCount() + theFiles.size()); 147 myJobDao.save(job); 148 } 149 150 private BulkImportJobEntity findJobByBiJobId(String theBiJobId) { 151 BulkImportJobEntity job = myJobDao.findByJobId(theBiJobId) 152 .orElseThrow(() -> new InvalidRequestException("Unknown bijob id: " + theBiJobId)); 153 return job; 154 } 155 156 @Override 157 @Transactional 158 public void markJobAsReadyForActivation(String theBiJobId) { 159 ourLog.info("Activating bulk import bijob {}", theBiJobId); 160 161 BulkImportJobEntity job = findJobByBiJobId(theBiJobId); 162 ValidateUtil.isTrueOrThrowInvalidRequest( 163 job.getStatus() == BulkImportJobStatusEnum.STAGING, 164 "Bulk import bijob %s can not be activated in status: %s", 165 theBiJobId, 166 job.getStatus()); 167 168 job.setStatus(BulkImportJobStatusEnum.READY); 169 myJobDao.save(job); 170 } 171 172 /** 173 * To be called by the job scheduler 174 */ 175 @Transactional(propagation = Propagation.NEVER) 176 @Override 177 public ActivateJobResult activateNextReadyJob() { 178 if (!myStorageSettings.isEnableTaskBulkImportJobExecution()) { 179 Logs.getBatchTroubleshootingLog() 180 .trace("Bulk import job execution is not enabled on this server. No action taken."); 181 return new ActivateJobResult(false, null); 182 } 183 184 if (!myRunningJobSemaphore.tryAcquire()) { 185 Logs.getBatchTroubleshootingLog().trace("Already have a running batch job, not going to check for more"); 186 return new ActivateJobResult(false, null); 187 } 188 189 try { 190 ActivateJobResult retval = doActivateNextReadyJob(); 191 if (!StringUtils.isBlank(retval.jobId)) { 192 ourLog.info("Batch job submitted with batch job id {}", retval.jobId); 193 } 194 return retval; 195 } finally { 196 myRunningJobSemaphore.release(); 197 } 198 } 199 200 private ActivateJobResult doActivateNextReadyJob() { 201 Optional<BulkImportJobEntity> jobToProcessOpt = Objects.requireNonNull(myTxTemplate.execute(t -> { 202 Pageable page = PageRequest.of(0, 1); 203 Slice<BulkImportJobEntity> submittedJobs = myJobDao.findByStatus(page, BulkImportJobStatusEnum.READY); 204 if (submittedJobs.isEmpty()) { 205 return Optional.empty(); 206 } 207 return Optional.of(submittedJobs.getContent().get(0)); 208 })); 209 210 if (!jobToProcessOpt.isPresent()) { 211 return new ActivateJobResult(false, null); 212 } 213 214 BulkImportJobEntity bulkImportJobEntity = jobToProcessOpt.get(); 215 216 String jobUuid = bulkImportJobEntity.getJobId(); 217 String biJobId = null; 218 try { 219 biJobId = processJob(bulkImportJobEntity); 220 // set job status to RUNNING so it would not be processed again 221 myTxTemplate.execute(t -> { 222 bulkImportJobEntity.setStatus(BulkImportJobStatusEnum.RUNNING); 223 myJobDao.save(bulkImportJobEntity); 224 return null; 225 }); 226 } catch (Exception e) { 227 ourLog.error("Failure while preparing bulk export extract", e); 228 myTxTemplate.execute(t -> { 229 Optional<BulkImportJobEntity> submittedJobs = myJobDao.findByJobId(jobUuid); 230 if (submittedJobs.isPresent()) { 231 BulkImportJobEntity jobEntity = submittedJobs.get(); 232 jobEntity.setStatus(BulkImportJobStatusEnum.ERROR); 233 jobEntity.setStatusMessage(e.getMessage()); 234 myJobDao.save(jobEntity); 235 } 236 return new ActivateJobResult(false, null); 237 }); 238 } 239 240 return new ActivateJobResult(true, biJobId); 241 } 242 243 @Override 244 @Transactional 245 public void setJobToStatus(String theBiJobId, BulkImportJobStatusEnum theStatus) { 246 setJobToStatus(theBiJobId, theStatus, null); 247 } 248 249 @Override 250 public void setJobToStatus(String theBiJobId, BulkImportJobStatusEnum theStatus, String theStatusMessage) { 251 BulkImportJobEntity job = findJobByBiJobId(theBiJobId); 252 job.setStatus(theStatus); 253 job.setStatusMessage(theStatusMessage); 254 myJobDao.save(job); 255 } 256 257 @Override 258 @Transactional 259 public BulkImportJobJson fetchJob(String theBiJobId) { 260 BulkImportJobEntity job = findJobByBiJobId(theBiJobId); 261 return job.toJson(); 262 } 263 264 @Override 265 @Transactional 266 public JobInfo getJobStatus(String theBiJobId) { 267 BulkImportJobEntity theJob = findJobByBiJobId(theBiJobId); 268 return new JobInfo() 269 .setStatus(theJob.getStatus()) 270 .setStatusMessage(theJob.getStatusMessage()) 271 .setStatusTime(theJob.getStatusTime()); 272 } 273 274 @Transactional 275 @Override 276 public BulkImportJobFileJson fetchFile(String theBiJobId, int theFileIndex) { 277 BulkImportJobEntity job = findJobByBiJobId(theBiJobId); 278 279 return myJobFileDao 280 .findForJob(job, theFileIndex) 281 .map(t -> t.toJson()) 282 .orElseThrow(() -> 283 new IllegalArgumentException("Invalid index " + theFileIndex + " for bijob " + theBiJobId)); 284 } 285 286 @Transactional 287 @Override 288 public String getFileDescription(String theBiJobId, int theFileIndex) { 289 BulkImportJobEntity job = findJobByBiJobId(theBiJobId); 290 291 return myJobFileDao.findFileDescriptionForJob(job, theFileIndex).orElse(""); 292 } 293 294 @Override 295 @Transactional 296 public void deleteJobFiles(String theBiJobId) { 297 BulkImportJobEntity job = findJobByBiJobId(theBiJobId); 298 List<Long> files = myJobFileDao.findAllIdsForJob(theBiJobId); 299 for (Long next : files) { 300 myJobFileDao.deleteById(next); 301 } 302 myJobDao.delete(job); 303 } 304 305 private String processJob(BulkImportJobEntity theBulkExportJobEntity) { 306 String biJobId = theBulkExportJobEntity.getJobId(); 307 int batchSize = theBulkExportJobEntity.getBatchSize(); 308 309 Batch2BulkImportPullJobParameters jobParameters = new Batch2BulkImportPullJobParameters(); 310 jobParameters.setJobId(biJobId); 311 jobParameters.setBatchSize(batchSize); 312 313 JobInstanceStartRequest request = new JobInstanceStartRequest(); 314 request.setJobDefinitionId(BULK_IMPORT_JOB_NAME); 315 request.setParameters(jobParameters); 316 317 ourLog.info("Submitting bulk import with bijob id {} to job scheduler", biJobId); 318 319 return myJobCoordinator.startInstance(request).getInstanceId(); 320 } 321 322 private void addFilesToJob( 323 @Nonnull List<BulkImportJobFileJson> theInitialFiles, BulkImportJobEntity job, int nextSequence) { 324 for (BulkImportJobFileJson nextFile : theInitialFiles) { 325 ValidateUtil.isNotBlankOrThrowUnprocessableEntity( 326 nextFile.getContents(), "Job File Contents mode must not be null"); 327 328 BulkImportJobFileEntity jobFile = new BulkImportJobFileEntity(); 329 jobFile.setJob(job); 330 jobFile.setContents(nextFile.getContents()); 331 jobFile.setTenantName(nextFile.getTenantName()); 332 jobFile.setFileDescription(nextFile.getDescription()); 333 jobFile.setFileSequence(nextSequence++); 334 myJobFileDao.save(jobFile); 335 } 336 } 337 338 public static class ActivationJob implements HapiJob { 339 @Autowired 340 private IBulkDataImportSvc myTarget; 341 342 @Override 343 public void execute(JobExecutionContext theContext) { 344 myTarget.activateNextReadyJob(); 345 } 346 } 347}