
001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.batch2; 021 022import ca.uhn.fhir.batch2.api.IJobPersistence; 023import ca.uhn.fhir.batch2.api.JobOperationResultJson; 024import ca.uhn.fhir.batch2.model.BatchInstanceStatusDTO; 025import ca.uhn.fhir.batch2.model.BatchWorkChunkStatusDTO; 026import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest; 027import ca.uhn.fhir.batch2.model.JobInstance; 028import ca.uhn.fhir.batch2.model.StatusEnum; 029import ca.uhn.fhir.batch2.model.WorkChunk; 030import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent; 031import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent; 032import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent; 033import ca.uhn.fhir.batch2.model.WorkChunkMetadata; 034import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; 035import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest; 036import ca.uhn.fhir.interceptor.api.HookParams; 037import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; 038import ca.uhn.fhir.interceptor.api.Pointcut; 039import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository; 040import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkMetadataViewRepository; 041import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository; 042import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; 043import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity; 044import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity; 045import ca.uhn.fhir.jpa.entity.Batch2WorkChunkMetadataView; 046import ca.uhn.fhir.model.api.PagingIterator; 047import ca.uhn.fhir.rest.api.server.RequestDetails; 048import ca.uhn.fhir.rest.api.server.SystemRequestDetails; 049import ca.uhn.fhir.util.Batch2JobDefinitionConstants; 050import ca.uhn.fhir.util.Logs; 051import com.fasterxml.jackson.core.JsonParser; 052import com.fasterxml.jackson.databind.JsonNode; 053import com.fasterxml.jackson.databind.ObjectMapper; 054import com.fasterxml.jackson.databind.node.ObjectNode; 055import jakarta.annotation.Nonnull; 056import jakarta.annotation.Nullable; 057import jakarta.persistence.EntityManager; 058import jakarta.persistence.LockModeType; 059import org.apache.commons.collections4.ListUtils; 060import org.apache.commons.lang3.Validate; 061import org.slf4j.Logger; 062import org.springframework.data.domain.Page; 063import org.springframework.data.domain.PageImpl; 064import org.springframework.data.domain.PageRequest; 065import org.springframework.data.domain.Pageable; 066import org.springframework.data.domain.Sort; 067import org.springframework.transaction.annotation.Propagation; 068import org.springframework.transaction.annotation.Transactional; 069import org.springframework.transaction.support.TransactionSynchronizationManager; 070 071import java.time.Instant; 072import java.util.Collections; 073import java.util.Date; 074import java.util.HashSet; 075import java.util.Iterator; 076import java.util.List; 077import java.util.Objects; 078import java.util.Optional; 079import java.util.Set; 080import java.util.UUID; 081import java.util.function.Consumer; 082import java.util.stream.Collectors; 083import java.util.stream.Stream; 084 085import static ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor.MAX_CHUNK_ERROR_COUNT; 086import static ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity.ERROR_MSG_MAX_LENGTH; 087import static org.apache.commons.lang3.StringUtils.isBlank; 088 089public class JpaJobPersistenceImpl implements IJobPersistence { 090 private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); 091 public static final String CREATE_TIME = "myCreateTime"; 092 093 private final IBatch2JobInstanceRepository myJobInstanceRepository; 094 private final IBatch2WorkChunkRepository myWorkChunkRepository; 095 private final IBatch2WorkChunkMetadataViewRepository myWorkChunkMetadataViewRepo; 096 private final EntityManager myEntityManager; 097 private final IHapiTransactionService myTransactionService; 098 private final IInterceptorBroadcaster myInterceptorBroadcaster; 099 100 /** 101 * Constructor 102 */ 103 public JpaJobPersistenceImpl( 104 IBatch2JobInstanceRepository theJobInstanceRepository, 105 IBatch2WorkChunkRepository theWorkChunkRepository, 106 IBatch2WorkChunkMetadataViewRepository theWorkChunkMetadataViewRepo, 107 IHapiTransactionService theTransactionService, 108 EntityManager theEntityManager, 109 IInterceptorBroadcaster theInterceptorBroadcaster) { 110 Validate.notNull(theJobInstanceRepository, "theJobInstanceRepository"); 111 Validate.notNull(theWorkChunkRepository, "theWorkChunkRepository"); 112 myJobInstanceRepository = theJobInstanceRepository; 113 myWorkChunkRepository = theWorkChunkRepository; 114 myWorkChunkMetadataViewRepo = theWorkChunkMetadataViewRepo; 115 myTransactionService = theTransactionService; 116 myEntityManager = theEntityManager; 117 myInterceptorBroadcaster = theInterceptorBroadcaster; 118 } 119 120 @Override 121 @Transactional(propagation = Propagation.REQUIRED) 122 public String onWorkChunkCreate(WorkChunkCreateEvent theBatchWorkChunk) { 123 Batch2WorkChunkEntity entity = new Batch2WorkChunkEntity(); 124 entity.setId(UUID.randomUUID().toString()); 125 entity.setSequence(theBatchWorkChunk.sequence); 126 entity.setJobDefinitionId(theBatchWorkChunk.jobDefinitionId); 127 entity.setJobDefinitionVersion(theBatchWorkChunk.jobDefinitionVersion); 128 entity.setTargetStepId(theBatchWorkChunk.targetStepId); 129 entity.setInstanceId(theBatchWorkChunk.instanceId); 130 entity.setSerializedData(theBatchWorkChunk.serializedData); 131 entity.setCreateTime(new Date()); 132 entity.setStartTime(new Date()); 133 entity.setStatus(getOnCreateStatus(theBatchWorkChunk)); 134 135 ourLog.debug("Create work chunk {}/{}/{}", entity.getInstanceId(), entity.getId(), entity.getTargetStepId()); 136 ourLog.trace( 137 "Create work chunk data {}/{}: {}", entity.getInstanceId(), entity.getId(), entity.getSerializedData()); 138 myTransactionService.withSystemRequestOnDefaultPartition().execute(() -> myWorkChunkRepository.save(entity)); 139 140 return entity.getId(); 141 } 142 143 /** 144 * Gets the initial onCreate state for the given workchunk. 145 * Gated job chunks start in GATE_WAITING; they will be transitioned to READY during maintenance pass when all 146 * chunks in the previous step are COMPLETED. 147 * Non gated job chunks start in READY 148 */ 149 private static WorkChunkStatusEnum getOnCreateStatus(WorkChunkCreateEvent theBatchWorkChunk) { 150 if (theBatchWorkChunk.isGatedExecution) { 151 return WorkChunkStatusEnum.GATE_WAITING; 152 } else { 153 return WorkChunkStatusEnum.READY; 154 } 155 } 156 157 @Override 158 @Transactional(propagation = Propagation.REQUIRED) 159 public Optional<WorkChunk> onWorkChunkDequeue(String theChunkId) { 160 // take a lock on the chunk id to ensure that the maintenance run isn't doing anything. 161 Batch2WorkChunkEntity chunkLock = 162 myEntityManager.find(Batch2WorkChunkEntity.class, theChunkId, LockModeType.PESSIMISTIC_WRITE); 163 164 if (chunkLock == null) { 165 ourLog.warn("Unknown chunk id {} encountered. Message will be discarded.", theChunkId); 166 return Optional.empty(); 167 } 168 169 // remove from the current state to avoid stale data. 170 myEntityManager.detach(chunkLock); 171 172 // NOTE: Ideally, IN_PROGRESS wouldn't be allowed here. On chunk failure, we probably shouldn't be allowed. 173 // But how does re-run happen if k8s kills a processor mid run? 174 List<WorkChunkStatusEnum> priorStates = 175 List.of(WorkChunkStatusEnum.QUEUED, WorkChunkStatusEnum.ERRORED, WorkChunkStatusEnum.IN_PROGRESS); 176 int rowsModified = myWorkChunkRepository.updateChunkStatusForStart( 177 theChunkId, new Date(), WorkChunkStatusEnum.IN_PROGRESS, priorStates); 178 179 if (rowsModified == 0) { 180 ourLog.info("Attempting to start chunk {} but it was already started.", theChunkId); 181 return Optional.empty(); 182 } else { 183 Optional<Batch2WorkChunkEntity> chunk = myWorkChunkRepository.findById(theChunkId); 184 return chunk.map(this::toChunk); 185 } 186 } 187 188 @Override 189 @Transactional(propagation = Propagation.REQUIRED) 190 public String storeNewInstance(JobInstance theInstance) { 191 Validate.isTrue(isBlank(theInstance.getInstanceId())); 192 193 invokePreStorageBatchHooks(theInstance); 194 195 Batch2JobInstanceEntity entity = new Batch2JobInstanceEntity(); 196 entity.setId(UUID.randomUUID().toString()); 197 entity.setDefinitionId(theInstance.getJobDefinitionId()); 198 entity.setDefinitionVersion(theInstance.getJobDefinitionVersion()); 199 entity.setStatus(theInstance.getStatus()); 200 entity.setParams(theInstance.getParameters()); 201 entity.setCurrentGatedStepId(theInstance.getCurrentGatedStepId()); 202 entity.setFastTracking(theInstance.isFastTracking()); 203 entity.setCreateTime(new Date()); 204 entity.setStartTime(new Date()); 205 entity.setReport(theInstance.getReport()); 206 entity.setTriggeringUsername(theInstance.getTriggeringUsername()); 207 entity.setTriggeringClientId(theInstance.getTriggeringClientId()); 208 entity.setUserDataJson(theInstance.getUserDataAsString()); 209 210 entity = myJobInstanceRepository.save(entity); 211 return entity.getId(); 212 } 213 214 @Override 215 @Transactional(propagation = Propagation.REQUIRES_NEW) 216 public List<JobInstance> fetchInstances( 217 String theJobDefinitionId, Set<StatusEnum> theStatuses, Date theCutoff, Pageable thePageable) { 218 return toInstanceList(myJobInstanceRepository.findInstancesByJobIdAndStatusAndExpiry( 219 theJobDefinitionId, theStatuses, theCutoff, thePageable)); 220 } 221 222 @Override 223 @Transactional(propagation = Propagation.REQUIRES_NEW) 224 public List<JobInstance> fetchInstancesByJobDefinitionIdAndStatus( 225 String theJobDefinitionId, Set<StatusEnum> theRequestedStatuses, int thePageSize, int thePageIndex) { 226 PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME); 227 return toInstanceList(myJobInstanceRepository.fetchInstancesByJobDefinitionIdAndStatus( 228 theJobDefinitionId, theRequestedStatuses, pageRequest)); 229 } 230 231 @Override 232 @Transactional(propagation = Propagation.REQUIRES_NEW) 233 public List<JobInstance> fetchInstancesByJobDefinitionId( 234 String theJobDefinitionId, int thePageSize, int thePageIndex) { 235 PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME); 236 return toInstanceList(myJobInstanceRepository.findInstancesByJobDefinitionId(theJobDefinitionId, pageRequest)); 237 } 238 239 @Override 240 @Transactional(propagation = Propagation.REQUIRES_NEW) 241 public Page<JobInstance> fetchJobInstances(JobInstanceFetchRequest theRequest) { 242 PageRequest pageRequest = 243 PageRequest.of(theRequest.getPageStart(), theRequest.getBatchSize(), theRequest.getSort()); 244 245 String jobStatus = theRequest.getJobStatus(); 246 if (Objects.equals(jobStatus, "")) { 247 Page<Batch2JobInstanceEntity> pageOfEntities = myJobInstanceRepository.findAll(pageRequest); 248 return pageOfEntities.map(this::toInstance); 249 } 250 251 StatusEnum status = StatusEnum.valueOf(jobStatus); 252 List<JobInstance> jobs = toInstanceList(myJobInstanceRepository.findInstancesByJobStatus(status, pageRequest)); 253 Integer jobsOfStatus = myJobInstanceRepository.findTotalJobsOfStatus(status); 254 return new PageImpl<>(jobs, pageRequest, jobsOfStatus); 255 } 256 257 private List<JobInstance> toInstanceList(List<Batch2JobInstanceEntity> theInstancesByJobDefinitionId) { 258 return theInstancesByJobDefinitionId.stream().map(this::toInstance).collect(Collectors.toList()); 259 } 260 261 @Override 262 @Nonnull 263 public Optional<JobInstance> fetchInstance(String theInstanceId) { 264 return myTransactionService 265 .withSystemRequestOnDefaultPartition() 266 .execute(() -> myJobInstanceRepository.findById(theInstanceId).map(this::toInstance)); 267 } 268 269 @Nonnull 270 @Override 271 public List<BatchWorkChunkStatusDTO> fetchWorkChunkStatusForInstance(String theInstanceId) { 272 return myTransactionService 273 .withSystemRequestOnDefaultPartition() 274 .execute(() -> myWorkChunkRepository.fetchWorkChunkStatusForInstance(theInstanceId)); 275 } 276 277 @Nonnull 278 @Override 279 public BatchInstanceStatusDTO fetchBatchInstanceStatus(String theInstanceId) { 280 return myTransactionService 281 .withSystemRequestOnDefaultPartition() 282 .execute(() -> myJobInstanceRepository.fetchBatchInstanceStatus(theInstanceId)); 283 } 284 285 @Override 286 @Transactional(propagation = Propagation.REQUIRES_NEW) 287 public List<JobInstance> fetchInstances(FetchJobInstancesRequest theRequest, int thePage, int theBatchSize) { 288 String definitionId = theRequest.getJobDefinition(); 289 String params = theRequest.getParameters(); 290 Set<StatusEnum> statuses = theRequest.getStatuses(); 291 292 Pageable pageable = PageRequest.of(thePage, theBatchSize); 293 294 List<Batch2JobInstanceEntity> instanceEntities; 295 296 if (statuses != null && !statuses.isEmpty()) { 297 if (Batch2JobDefinitionConstants.BULK_EXPORT.equals(definitionId)) { 298 if (originalRequestUrlTruncation(params) != null) { 299 params = originalRequestUrlTruncation(params); 300 } 301 } 302 instanceEntities = myJobInstanceRepository.findInstancesByJobIdParamsAndStatus( 303 definitionId, params, statuses, pageable); 304 } else { 305 instanceEntities = myJobInstanceRepository.findInstancesByJobIdAndParams(definitionId, params, pageable); 306 } 307 return toInstanceList(instanceEntities); 308 } 309 310 private String originalRequestUrlTruncation(String theParams) { 311 try { 312 ObjectMapper mapper = new ObjectMapper(); 313 mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true); 314 mapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true); 315 JsonNode rootNode = mapper.readTree(theParams); 316 String originalUrl = "originalRequestUrl"; 317 318 if (rootNode instanceof ObjectNode) { 319 ObjectNode objectNode = (ObjectNode) rootNode; 320 321 if (objectNode.has(originalUrl)) { 322 String url = objectNode.get(originalUrl).asText(); 323 if (url.contains("?")) { 324 objectNode.put(originalUrl, url.split("\\?")[0]); 325 } 326 } 327 return mapper.writeValueAsString(objectNode); 328 } 329 } catch (Exception e) { 330 ourLog.info("Error Truncating Original Request Url", e); 331 } 332 return null; 333 } 334 335 @Override 336 @Transactional(propagation = Propagation.REQUIRES_NEW) 337 public List<JobInstance> fetchInstances(int thePageSize, int thePageIndex) { 338 // default sort is myCreateTime Asc 339 PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME); 340 return myTransactionService 341 .withSystemRequestOnDefaultPartition() 342 .execute(() -> myJobInstanceRepository.findAll(pageRequest).stream() 343 .map(this::toInstance) 344 .collect(Collectors.toList())); 345 } 346 347 @Override 348 public void enqueueWorkChunkForProcessing(String theChunkId, Consumer<Integer> theCallback) { 349 int updated = myWorkChunkRepository.updateChunkStatus( 350 theChunkId, WorkChunkStatusEnum.READY, WorkChunkStatusEnum.QUEUED); 351 theCallback.accept(updated); 352 } 353 354 @Override 355 public int updatePollWaitingChunksForJobIfReady(String theInstanceId) { 356 return myWorkChunkRepository.updateWorkChunksForPollWaiting( 357 theInstanceId, 358 Date.from(Instant.now()), 359 Set.of(WorkChunkStatusEnum.POLL_WAITING), 360 WorkChunkStatusEnum.READY); 361 } 362 363 @Override 364 @Transactional(propagation = Propagation.REQUIRES_NEW) 365 public List<JobInstance> fetchRecentInstances(int thePageSize, int thePageIndex) { 366 PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.DESC, CREATE_TIME); 367 return myTransactionService 368 .withSystemRequestOnDefaultPartition() 369 .execute(() -> myJobInstanceRepository.findAll(pageRequest).stream() 370 .map(this::toInstance) 371 .collect(Collectors.toList())); 372 } 373 374 private WorkChunk toChunk(Batch2WorkChunkEntity theEntity) { 375 return JobInstanceUtil.fromEntityToWorkChunk(theEntity); 376 } 377 378 private JobInstance toInstance(Batch2JobInstanceEntity theEntity) { 379 return JobInstanceUtil.fromEntityToInstance(theEntity); 380 } 381 382 @Override 383 public WorkChunkStatusEnum onWorkChunkError(WorkChunkErrorEvent theParameters) { 384 String chunkId = theParameters.getChunkId(); 385 String errorMessage = truncateErrorMessage(theParameters.getErrorMsg()); 386 387 return myTransactionService.withSystemRequestOnDefaultPartition().execute(() -> { 388 int changeCount = myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError( 389 chunkId, new Date(), errorMessage, WorkChunkStatusEnum.ERRORED); 390 Validate.isTrue(changeCount > 0, "No changed chunk matching %s", chunkId); 391 392 int failChangeCount = myWorkChunkRepository.updateChunkForTooManyErrors( 393 WorkChunkStatusEnum.FAILED, chunkId, MAX_CHUNK_ERROR_COUNT, ERROR_MSG_MAX_LENGTH); 394 395 if (failChangeCount > 0) { 396 return WorkChunkStatusEnum.FAILED; 397 } else { 398 return WorkChunkStatusEnum.ERRORED; 399 } 400 }); 401 } 402 403 @Override 404 public void onWorkChunkPollDelay(String theChunkId, Date theDeadline) { 405 int updated = myWorkChunkRepository.updateWorkChunkNextPollTime( 406 theChunkId, WorkChunkStatusEnum.POLL_WAITING, Set.of(WorkChunkStatusEnum.IN_PROGRESS), theDeadline); 407 408 if (updated != 1) { 409 ourLog.warn("Expected to update 1 work chunk's poll delay; but found {}", updated); 410 } 411 } 412 413 @Override 414 public void onWorkChunkFailed(String theChunkId, String theErrorMessage) { 415 ourLog.info("Marking chunk {} as failed with message: {}", theChunkId, theErrorMessage); 416 String errorMessage = truncateErrorMessage(theErrorMessage); 417 myTransactionService 418 .withSystemRequestOnDefaultPartition() 419 .execute(() -> myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError( 420 theChunkId, new Date(), errorMessage, WorkChunkStatusEnum.FAILED)); 421 } 422 423 @Override 424 public void onWorkChunkCompletion(WorkChunkCompletionEvent theEvent) { 425 myTransactionService 426 .withSystemRequestOnDefaultPartition() 427 .execute(() -> myWorkChunkRepository.updateChunkStatusAndClearDataForEndSuccess( 428 theEvent.getChunkId(), 429 new Date(), 430 theEvent.getRecordsProcessed(), 431 theEvent.getRecoveredErrorCount(), 432 WorkChunkStatusEnum.COMPLETED, 433 theEvent.getRecoveredWarningMessage())); 434 } 435 436 @Nullable 437 private static String truncateErrorMessage(String theErrorMessage) { 438 String errorMessage; 439 if (theErrorMessage != null && theErrorMessage.length() > ERROR_MSG_MAX_LENGTH) { 440 ourLog.warn("Truncating error message that is too long to store in database: {}", theErrorMessage); 441 errorMessage = theErrorMessage.substring(0, ERROR_MSG_MAX_LENGTH); 442 } else { 443 errorMessage = theErrorMessage; 444 } 445 return errorMessage; 446 } 447 448 @Override 449 public void markWorkChunksWithStatusAndWipeData( 450 String theInstanceId, List<String> theChunkIds, WorkChunkStatusEnum theStatus, String theErrorMessage) { 451 assert TransactionSynchronizationManager.isActualTransactionActive(); 452 453 ourLog.debug("Marking all chunks for instance {} to status {}", theInstanceId, theStatus); 454 String errorMessage = truncateErrorMessage(theErrorMessage); 455 List<List<String>> listOfListOfIds = ListUtils.partition(theChunkIds, 100); 456 for (List<String> idList : listOfListOfIds) { 457 myWorkChunkRepository.updateAllChunksForInstanceStatusClearDataAndSetError( 458 idList, new Date(), theStatus, errorMessage); 459 } 460 } 461 462 @Override 463 public Set<WorkChunkStatusEnum> getDistinctWorkChunkStatesForJobAndStep( 464 String theInstanceId, String theCurrentStepId) { 465 if (getRunningJob(theInstanceId) == null) { 466 return Collections.unmodifiableSet(new HashSet<>()); 467 } 468 return myWorkChunkRepository.getDistinctStatusesForStep(theInstanceId, theCurrentStepId); 469 } 470 471 private Batch2JobInstanceEntity getRunningJob(String theInstanceId) { 472 Optional<Batch2JobInstanceEntity> instance = myJobInstanceRepository.findById(theInstanceId); 473 if (instance.isEmpty()) { 474 return null; 475 } 476 if (instance.get().getStatus().isEnded()) { 477 return null; 478 } 479 return instance.get(); 480 } 481 482 private void fetchChunks( 483 String theInstanceId, 484 boolean theIncludeData, 485 int thePageSize, 486 int thePageIndex, 487 Consumer<WorkChunk> theConsumer) { 488 myTransactionService 489 .withSystemRequestOnDefaultPartition() 490 .withPropagation(Propagation.REQUIRES_NEW) 491 .execute(() -> { 492 List<Batch2WorkChunkEntity> chunks; 493 if (theIncludeData) { 494 chunks = myWorkChunkRepository.fetchChunks( 495 PageRequest.of(thePageIndex, thePageSize), theInstanceId); 496 } else { 497 chunks = myWorkChunkRepository.fetchChunksNoData( 498 PageRequest.of(thePageIndex, thePageSize), theInstanceId); 499 } 500 for (Batch2WorkChunkEntity chunk : chunks) { 501 theConsumer.accept(toChunk(chunk)); 502 } 503 }); 504 } 505 506 @Override 507 public void updateInstanceUpdateTime(String theInstanceId) { 508 myJobInstanceRepository.updateInstanceUpdateTime(theInstanceId, new Date()); 509 } 510 511 @Override 512 public WorkChunk createWorkChunk(WorkChunk theWorkChunk) { 513 if (theWorkChunk.getId() == null) { 514 theWorkChunk.setId(UUID.randomUUID().toString()); 515 } 516 return toChunk(myWorkChunkRepository.save(Batch2WorkChunkEntity.fromWorkChunk(theWorkChunk))); 517 } 518 519 /** 520 * Note: Not @Transactional because the transaction happens in a lambda that's called outside of this method's scope 521 */ 522 @Override 523 public Iterator<WorkChunk> fetchAllWorkChunksIterator(String theInstanceId, boolean theWithData) { 524 return new PagingIterator<>((thePageIndex, theBatchSize, theConsumer) -> 525 fetchChunks(theInstanceId, theWithData, theBatchSize, thePageIndex, theConsumer)); 526 } 527 528 @Override 529 public Stream<WorkChunk> fetchAllWorkChunksForStepStream(String theInstanceId, String theStepId) { 530 return myWorkChunkRepository 531 .fetchChunksForStep(theInstanceId, theStepId) 532 .map(this::toChunk); 533 } 534 535 @Override 536 public Page<WorkChunkMetadata> fetchAllWorkChunkMetadataForJobInStates( 537 Pageable thePageable, String theInstanceId, Set<WorkChunkStatusEnum> theStates) { 538 Page<Batch2WorkChunkMetadataView> page = 539 myWorkChunkMetadataViewRepo.fetchWorkChunkMetadataForJobInStates(thePageable, theInstanceId, theStates); 540 541 return page.map(Batch2WorkChunkMetadataView::toChunkMetadata); 542 } 543 544 @Override 545 public boolean updateInstance(String theInstanceId, JobInstanceUpdateCallback theModifier) { 546 /* 547 * We may already have a copy of the entity in the L1 cache, and it may be 548 * stale if the scheduled maintenance service has touched it recently. So 549 * we fetch it and then refresh-lock it so that we don't fail if someone 550 * else has touched it. 551 */ 552 Batch2JobInstanceEntity instanceEntity = myEntityManager.find(Batch2JobInstanceEntity.class, theInstanceId); 553 myEntityManager.refresh(instanceEntity, LockModeType.PESSIMISTIC_WRITE); 554 555 if (null == instanceEntity) { 556 ourLog.error("No instance found with Id {}", theInstanceId); 557 return false; 558 } 559 // convert to JobInstance for public api 560 JobInstance jobInstance = JobInstanceUtil.fromEntityToInstance(instanceEntity); 561 562 // run the modification callback 563 boolean wasModified = theModifier.doUpdate(jobInstance); 564 565 if (wasModified) { 566 // copy fields back for flush. 567 JobInstanceUtil.fromInstanceToEntity(jobInstance, instanceEntity); 568 } 569 570 return wasModified; 571 } 572 573 @Override 574 @Transactional(propagation = Propagation.REQUIRES_NEW) 575 public void deleteInstanceAndChunks(String theInstanceId) { 576 ourLog.info("Deleting instance and chunks: {}", theInstanceId); 577 myWorkChunkRepository.deleteAllForInstance(theInstanceId); 578 myJobInstanceRepository.deleteById(theInstanceId); 579 } 580 581 @Override 582 @Transactional(propagation = Propagation.REQUIRES_NEW) 583 public void deleteChunksAndMarkInstanceAsChunksPurged(String theInstanceId) { 584 ourLog.info("Deleting all chunks for instance ID: {}", theInstanceId); 585 int updateCount = myJobInstanceRepository.updateWorkChunksPurgedTrue(theInstanceId); 586 int deleteCount = myWorkChunkRepository.deleteAllForInstance(theInstanceId); 587 ourLog.debug("Purged {} chunks, and updated {} instance.", deleteCount, updateCount); 588 } 589 590 @Override 591 public boolean markInstanceAsStatusWhenStatusIn( 592 String theInstanceId, StatusEnum theStatusEnum, Set<StatusEnum> thePriorStates) { 593 int recordsChanged = 594 myJobInstanceRepository.updateInstanceStatusIfIn(theInstanceId, theStatusEnum, thePriorStates); 595 ourLog.debug( 596 "Update job {} to status {} if in status {}: {}", 597 theInstanceId, 598 theStatusEnum, 599 thePriorStates, 600 recordsChanged > 0); 601 return recordsChanged > 0; 602 } 603 604 @Override 605 @Transactional(propagation = Propagation.REQUIRES_NEW) 606 public JobOperationResultJson cancelInstance(String theInstanceId) { 607 int recordsChanged = myJobInstanceRepository.updateInstanceCancelled(theInstanceId, true); 608 String operationString = "Cancel job instance " + theInstanceId; 609 610 // wipmb For 6.8 - This is too detailed to be down here - this should be up at the api layer. 611 // Replace with boolean result or ResourceNotFound exception. Build the message up at the ui. 612 String messagePrefix = "Job instance <" + theInstanceId + ">"; 613 if (recordsChanged > 0) { 614 return JobOperationResultJson.newSuccess(operationString, messagePrefix + " successfully cancelled."); 615 } else { 616 Optional<JobInstance> instance = fetchInstance(theInstanceId); 617 if (instance.isPresent()) { 618 return JobOperationResultJson.newFailure( 619 operationString, messagePrefix + " was already cancelled. Nothing to do."); 620 } else { 621 return JobOperationResultJson.newFailure(operationString, messagePrefix + " not found."); 622 } 623 } 624 } 625 626 private void invokePreStorageBatchHooks(JobInstance theJobInstance) { 627 if (myInterceptorBroadcaster.hasHooks(Pointcut.STORAGE_PRESTORAGE_BATCH_JOB_CREATE)) { 628 HookParams params = new HookParams() 629 .add(JobInstance.class, theJobInstance) 630 .add(RequestDetails.class, new SystemRequestDetails()); 631 632 myInterceptorBroadcaster.callHooks(Pointcut.STORAGE_PRESTORAGE_BATCH_JOB_CREATE, params); 633 } 634 } 635 636 @Override 637 @Transactional(propagation = Propagation.REQUIRES_NEW) 638 public boolean advanceJobStepAndUpdateChunkStatus( 639 String theJobInstanceId, String theNextStepId, boolean theIsReductionStep) { 640 boolean changed = updateInstance(theJobInstanceId, instance -> { 641 if (instance.getCurrentGatedStepId().equals(theNextStepId)) { 642 // someone else beat us here. No changes 643 return false; 644 } 645 ourLog.debug("Moving gated instance {} to the next step {}.", theJobInstanceId, theNextStepId); 646 instance.setCurrentGatedStepId(theNextStepId); 647 return true; 648 }); 649 650 if (changed) { 651 ourLog.debug( 652 "Updating chunk status from GATE_WAITING to READY for gated instance {} in step {}.", 653 theJobInstanceId, 654 theNextStepId); 655 WorkChunkStatusEnum nextStep = 656 theIsReductionStep ? WorkChunkStatusEnum.REDUCTION_READY : WorkChunkStatusEnum.READY; 657 // when we reach here, the current step id is equal to theNextStepId 658 // Up to 7.1, gated jobs' work chunks are created in status QUEUED but not actually queued for the 659 // workers. 660 // In order to keep them compatible, turn QUEUED chunks into READY, too. 661 // TODO: 'QUEUED' from the IN clause will be removed after 7.6.0. 662 int numChanged = myWorkChunkRepository.updateAllChunksForStepWithStatus( 663 theJobInstanceId, 664 theNextStepId, 665 List.of(WorkChunkStatusEnum.GATE_WAITING, WorkChunkStatusEnum.QUEUED), 666 nextStep); 667 ourLog.debug( 668 "Updated {} chunks of gated instance {} for step {} from fake QUEUED to READY.", 669 numChanged, 670 theJobInstanceId, 671 theNextStepId); 672 } 673 674 return changed; 675 } 676}