001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.bulk.imprt.svc; 021 022import ca.uhn.fhir.batch2.api.IJobCoordinator; 023import ca.uhn.fhir.batch2.importpull.models.Batch2BulkImportPullJobParameters; 024import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; 025import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; 026import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc; 027import ca.uhn.fhir.jpa.bulk.imprt.model.ActivateJobResult; 028import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobFileJson; 029import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobJson; 030import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobStatusEnum; 031import ca.uhn.fhir.jpa.dao.data.IBulkImportJobDao; 032import ca.uhn.fhir.jpa.dao.data.IBulkImportJobFileDao; 033import ca.uhn.fhir.jpa.entity.BulkImportJobEntity; 034import ca.uhn.fhir.jpa.entity.BulkImportJobFileEntity; 035import ca.uhn.fhir.jpa.model.sched.HapiJob; 036import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs; 037import ca.uhn.fhir.jpa.model.sched.ISchedulerService; 038import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; 039import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; 040import ca.uhn.fhir.util.Logs; 041import ca.uhn.fhir.util.ValidateUtil; 042import com.apicatalog.jsonld.StringUtils; 043import jakarta.annotation.Nonnull; 044import jakarta.annotation.PostConstruct; 045import org.apache.commons.lang3.time.DateUtils; 046import org.quartz.JobExecutionContext; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049import org.springframework.beans.factory.annotation.Autowired; 050import org.springframework.data.domain.PageRequest; 051import org.springframework.data.domain.Pageable; 052import org.springframework.data.domain.Slice; 053import org.springframework.transaction.PlatformTransactionManager; 054import org.springframework.transaction.annotation.Propagation; 055import org.springframework.transaction.annotation.Transactional; 056import org.springframework.transaction.support.TransactionTemplate; 057 058import java.util.List; 059import java.util.Objects; 060import java.util.Optional; 061import java.util.UUID; 062import java.util.concurrent.Semaphore; 063 064import static ca.uhn.fhir.batch2.jobs.importpull.BulkImportPullConfig.BULK_IMPORT_JOB_NAME; 065 066public class BulkDataImportSvcImpl implements IBulkDataImportSvc, IHasScheduledJobs { 067 private static final Logger ourLog = LoggerFactory.getLogger(BulkDataImportSvcImpl.class); 068 private final Semaphore myRunningJobSemaphore = new Semaphore(1); 069 070 @Autowired 071 private IBulkImportJobDao myJobDao; 072 073 @Autowired 074 private IBulkImportJobFileDao myJobFileDao; 075 076 @Autowired 077 private PlatformTransactionManager myTxManager; 078 079 private TransactionTemplate myTxTemplate; 080 081 @Autowired 082 private IJobCoordinator myJobCoordinator; 083 084 @Autowired 085 private JpaStorageSettings myStorageSettings; 086 087 @PostConstruct 088 public void start() { 089 myTxTemplate = new TransactionTemplate(myTxManager); 090 } 091 092 @Override 093 public void scheduleJobs(ISchedulerService theSchedulerService) { 094 // This job should be local so that each node in the cluster can pick up jobs 095 ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); 096 jobDetail.setId(ActivationJob.class.getName()); 097 jobDetail.setJobClass(ActivationJob.class); 098 theSchedulerService.scheduleLocalJob(10 * DateUtils.MILLIS_PER_SECOND, jobDetail); 099 } 100 101 @Override 102 @Transactional 103 public String createNewJob( 104 BulkImportJobJson theJobDescription, @Nonnull List<BulkImportJobFileJson> theInitialFiles) { 105 ValidateUtil.isNotNullOrThrowUnprocessableEntity(theJobDescription, "Job must not be null"); 106 ValidateUtil.isNotNullOrThrowUnprocessableEntity( 107 theJobDescription.getProcessingMode(), "Job File Processing mode must not be null"); 108 ValidateUtil.isTrueOrThrowInvalidRequest( 109 theJobDescription.getBatchSize() > 0, "Job File Batch Size must be > 0"); 110 111 String biJobId = UUID.randomUUID().toString(); 112 113 ourLog.info( 114 "Creating new Bulk Import job with {} files, assigning bijob ID: {}", theInitialFiles.size(), biJobId); 115 116 BulkImportJobEntity job = new BulkImportJobEntity(); 117 job.setJobId(biJobId); 118 job.setFileCount(theInitialFiles.size()); 119 job.setStatus(BulkImportJobStatusEnum.STAGING); 120 job.setJobDescription(theJobDescription.getJobDescription()); 121 job.setBatchSize(theJobDescription.getBatchSize()); 122 job.setRowProcessingMode(theJobDescription.getProcessingMode()); 123 job = myJobDao.save(job); 124 125 int nextSequence = 0; 126 addFilesToJob(theInitialFiles, job, nextSequence); 127 128 return biJobId; 129 } 130 131 @Override 132 @Transactional 133 public void addFilesToJob(String theBiJobId, List<BulkImportJobFileJson> theFiles) { 134 ourLog.info("Adding {} files to bulk import job with bijob id {}", theFiles.size(), theBiJobId); 135 136 BulkImportJobEntity job = findJobByBiJobId(theBiJobId); 137 138 ValidateUtil.isTrueOrThrowInvalidRequest( 139 job.getStatus() == BulkImportJobStatusEnum.STAGING, 140 "bijob id %s has status %s and can not be added to", 141 theBiJobId, 142 job.getStatus()); 143 144 addFilesToJob(theFiles, job, job.getFileCount()); 145 146 job.setFileCount(job.getFileCount() + theFiles.size()); 147 myJobDao.save(job); 148 } 149 150 private BulkImportJobEntity findJobByBiJobId(String theBiJobId) { 151 BulkImportJobEntity job = myJobDao.findByJobId(theBiJobId) 152 .orElseThrow(() -> new InvalidRequestException("Unknown bijob id: " + theBiJobId)); 153 return job; 154 } 155 156 @Override 157 @Transactional 158 public void markJobAsReadyForActivation(String theBiJobId) { 159 ourLog.info("Activating bulk import bijob {}", theBiJobId); 160 161 BulkImportJobEntity job = findJobByBiJobId(theBiJobId); 162 ValidateUtil.isTrueOrThrowInvalidRequest( 163 job.getStatus() == BulkImportJobStatusEnum.STAGING, 164 "Bulk import bijob %s can not be activated in status: %s", 165 theBiJobId, 166 job.getStatus()); 167 168 job.setStatus(BulkImportJobStatusEnum.READY); 169 myJobDao.save(job); 170 } 171 172 /** 173 * To be called by the job scheduler 174 */ 175 @Transactional(propagation = Propagation.NEVER) 176 @Override 177 public ActivateJobResult activateNextReadyJob() { 178 if (!myStorageSettings.isEnableTaskBulkImportJobExecution()) { 179 Logs.getBatchTroubleshootingLog() 180 .trace("Bulk import job execution is not enabled on this server. No action taken."); 181 return new ActivateJobResult(false, null); 182 } 183 184 if (!myRunningJobSemaphore.tryAcquire()) { 185 Logs.getBatchTroubleshootingLog().trace("Already have a running batch job, not going to check for more"); 186 return new ActivateJobResult(false, null); 187 } 188 189 try { 190 ActivateJobResult retval = doActivateNextReadyJob(); 191 if (!StringUtils.isBlank(retval.jobId)) { 192 ourLog.info("Batch job submitted with batch job id {}", retval.jobId); 193 } 194 return retval; 195 } finally { 196 myRunningJobSemaphore.release(); 197 } 198 } 199 200 private ActivateJobResult doActivateNextReadyJob() { 201 Optional<BulkImportJobEntity> jobToProcessOpt = Objects.requireNonNull(myTxTemplate.execute(t -> { 202 Pageable page = PageRequest.of(0, 1); 203 Slice<BulkImportJobEntity> submittedJobs = myJobDao.findByStatus(page, BulkImportJobStatusEnum.READY); 204 if (submittedJobs.isEmpty()) { 205 return Optional.empty(); 206 } 207 return Optional.of(submittedJobs.getContent().get(0)); 208 })); 209 210 if (!jobToProcessOpt.isPresent()) { 211 return new ActivateJobResult(false, null); 212 } 213 214 BulkImportJobEntity bulkImportJobEntity = jobToProcessOpt.get(); 215 216 String jobUuid = bulkImportJobEntity.getJobId(); 217 String biJobId = null; 218 try { 219 biJobId = processJob(bulkImportJobEntity); 220 } catch (Exception e) { 221 ourLog.error("Failure while preparing bulk export extract", e); 222 myTxTemplate.execute(t -> { 223 Optional<BulkImportJobEntity> submittedJobs = myJobDao.findByJobId(jobUuid); 224 if (submittedJobs.isPresent()) { 225 BulkImportJobEntity jobEntity = submittedJobs.get(); 226 jobEntity.setStatus(BulkImportJobStatusEnum.ERROR); 227 jobEntity.setStatusMessage(e.getMessage()); 228 myJobDao.save(jobEntity); 229 } 230 return new ActivateJobResult(false, null); 231 }); 232 } 233 234 return new ActivateJobResult(true, biJobId); 235 } 236 237 @Override 238 @Transactional 239 public void setJobToStatus(String theBiJobId, BulkImportJobStatusEnum theStatus) { 240 setJobToStatus(theBiJobId, theStatus, null); 241 } 242 243 @Override 244 public void setJobToStatus(String theBiJobId, BulkImportJobStatusEnum theStatus, String theStatusMessage) { 245 BulkImportJobEntity job = findJobByBiJobId(theBiJobId); 246 job.setStatus(theStatus); 247 job.setStatusMessage(theStatusMessage); 248 myJobDao.save(job); 249 } 250 251 @Override 252 @Transactional 253 public BulkImportJobJson fetchJob(String theBiJobId) { 254 BulkImportJobEntity job = findJobByBiJobId(theBiJobId); 255 return job.toJson(); 256 } 257 258 @Override 259 public JobInfo getJobStatus(String theBiJobId) { 260 BulkImportJobEntity theJob = findJobByBiJobId(theBiJobId); 261 return new JobInfo() 262 .setStatus(theJob.getStatus()) 263 .setStatusMessage(theJob.getStatusMessage()) 264 .setStatusTime(theJob.getStatusTime()); 265 } 266 267 @Transactional 268 @Override 269 public BulkImportJobFileJson fetchFile(String theBiJobId, int theFileIndex) { 270 BulkImportJobEntity job = findJobByBiJobId(theBiJobId); 271 272 return myJobFileDao 273 .findForJob(job, theFileIndex) 274 .map(t -> t.toJson()) 275 .orElseThrow(() -> 276 new IllegalArgumentException("Invalid index " + theFileIndex + " for bijob " + theBiJobId)); 277 } 278 279 @Transactional 280 @Override 281 public String getFileDescription(String theBiJobId, int theFileIndex) { 282 BulkImportJobEntity job = findJobByBiJobId(theBiJobId); 283 284 return myJobFileDao.findFileDescriptionForJob(job, theFileIndex).orElse(""); 285 } 286 287 @Override 288 @Transactional 289 public void deleteJobFiles(String theBiJobId) { 290 BulkImportJobEntity job = findJobByBiJobId(theBiJobId); 291 List<Long> files = myJobFileDao.findAllIdsForJob(theBiJobId); 292 for (Long next : files) { 293 myJobFileDao.deleteById(next); 294 } 295 myJobDao.delete(job); 296 } 297 298 private String processJob(BulkImportJobEntity theBulkExportJobEntity) { 299 String biJobId = theBulkExportJobEntity.getJobId(); 300 int batchSize = theBulkExportJobEntity.getBatchSize(); 301 302 Batch2BulkImportPullJobParameters jobParameters = new Batch2BulkImportPullJobParameters(); 303 jobParameters.setJobId(biJobId); 304 jobParameters.setBatchSize(batchSize); 305 306 JobInstanceStartRequest request = new JobInstanceStartRequest(); 307 request.setJobDefinitionId(BULK_IMPORT_JOB_NAME); 308 request.setParameters(jobParameters); 309 310 ourLog.info("Submitting bulk import with bijob id {} to job scheduler", biJobId); 311 312 return myJobCoordinator.startInstance(request).getInstanceId(); 313 } 314 315 private void addFilesToJob( 316 @Nonnull List<BulkImportJobFileJson> theInitialFiles, BulkImportJobEntity job, int nextSequence) { 317 for (BulkImportJobFileJson nextFile : theInitialFiles) { 318 ValidateUtil.isNotBlankOrThrowUnprocessableEntity( 319 nextFile.getContents(), "Job File Contents mode must not be null"); 320 321 BulkImportJobFileEntity jobFile = new BulkImportJobFileEntity(); 322 jobFile.setJob(job); 323 jobFile.setContents(nextFile.getContents()); 324 jobFile.setTenantName(nextFile.getTenantName()); 325 jobFile.setFileDescription(nextFile.getDescription()); 326 jobFile.setFileSequence(nextSequence++); 327 myJobFileDao.save(jobFile); 328 } 329 } 330 331 public static class ActivationJob implements HapiJob { 332 @Autowired 333 private IBulkDataImportSvc myTarget; 334 335 @Override 336 public void execute(JobExecutionContext theContext) { 337 myTarget.activateNextReadyJob(); 338 } 339 } 340}