
001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.batch2; 021 022import ca.uhn.fhir.batch2.api.IJobPersistence; 023import ca.uhn.fhir.batch2.api.JobOperationResultJson; 024import ca.uhn.fhir.batch2.model.BatchInstanceStatusDTO; 025import ca.uhn.fhir.batch2.model.BatchWorkChunkStatusDTO; 026import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest; 027import ca.uhn.fhir.batch2.model.JobInstance; 028import ca.uhn.fhir.batch2.model.StatusEnum; 029import ca.uhn.fhir.batch2.model.WorkChunk; 030import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent; 031import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent; 032import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent; 033import ca.uhn.fhir.batch2.model.WorkChunkMetadata; 034import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; 035import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest; 036import ca.uhn.fhir.interceptor.api.HookParams; 037import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; 038import ca.uhn.fhir.interceptor.api.Pointcut; 039import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository; 040import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkMetadataViewRepository; 041import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository; 042import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; 043import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity; 044import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity; 045import ca.uhn.fhir.jpa.entity.Batch2WorkChunkMetadataView; 046import ca.uhn.fhir.model.api.PagingIterator; 047import ca.uhn.fhir.rest.api.server.RequestDetails; 048import ca.uhn.fhir.rest.api.server.SystemRequestDetails; 049import ca.uhn.fhir.util.Batch2JobDefinitionConstants; 050import ca.uhn.fhir.util.Logs; 051import com.fasterxml.jackson.core.JsonParser; 052import com.fasterxml.jackson.databind.JsonNode; 053import com.fasterxml.jackson.databind.ObjectMapper; 054import com.fasterxml.jackson.databind.node.ObjectNode; 055import jakarta.annotation.Nonnull; 056import jakarta.annotation.Nullable; 057import jakarta.persistence.EntityManager; 058import jakarta.persistence.LockModeType; 059import org.apache.commons.collections4.ListUtils; 060import org.apache.commons.lang3.StringUtils; 061import org.apache.commons.lang3.Validate; 062import org.slf4j.Logger; 063import org.springframework.data.domain.Page; 064import org.springframework.data.domain.PageRequest; 065import org.springframework.data.domain.Pageable; 066import org.springframework.data.domain.Sort; 067import org.springframework.transaction.annotation.Propagation; 068import org.springframework.transaction.annotation.Transactional; 069import org.springframework.transaction.support.TransactionSynchronizationManager; 070 071import java.time.Instant; 072import java.util.Collections; 073import java.util.Date; 074import java.util.HashSet; 075import java.util.Iterator; 076import java.util.List; 077import java.util.Optional; 078import java.util.Set; 079import java.util.UUID; 080import java.util.function.Consumer; 081import java.util.stream.Collectors; 082import java.util.stream.Stream; 083 084import static ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor.MAX_CHUNK_ERROR_COUNT; 085import static ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity.ERROR_MSG_MAX_LENGTH; 086import static org.apache.commons.lang3.StringUtils.isBlank; 087 088public class JpaJobPersistenceImpl implements IJobPersistence { 089 private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); 090 public static final String CREATE_TIME = "myCreateTime"; 091 092 private final IBatch2JobInstanceRepository myJobInstanceRepository; 093 private final IBatch2WorkChunkRepository myWorkChunkRepository; 094 private final IBatch2WorkChunkMetadataViewRepository myWorkChunkMetadataViewRepo; 095 private final EntityManager myEntityManager; 096 private final IHapiTransactionService myTransactionService; 097 private final IInterceptorBroadcaster myInterceptorBroadcaster; 098 099 /** 100 * Constructor 101 */ 102 public JpaJobPersistenceImpl( 103 IBatch2JobInstanceRepository theJobInstanceRepository, 104 IBatch2WorkChunkRepository theWorkChunkRepository, 105 IBatch2WorkChunkMetadataViewRepository theWorkChunkMetadataViewRepo, 106 IHapiTransactionService theTransactionService, 107 EntityManager theEntityManager, 108 IInterceptorBroadcaster theInterceptorBroadcaster) { 109 Validate.notNull(theJobInstanceRepository, "theJobInstanceRepository"); 110 Validate.notNull(theWorkChunkRepository, "theWorkChunkRepository"); 111 myJobInstanceRepository = theJobInstanceRepository; 112 myWorkChunkRepository = theWorkChunkRepository; 113 myWorkChunkMetadataViewRepo = theWorkChunkMetadataViewRepo; 114 myTransactionService = theTransactionService; 115 myEntityManager = theEntityManager; 116 myInterceptorBroadcaster = theInterceptorBroadcaster; 117 } 118 119 @Override 120 @Transactional(propagation = Propagation.REQUIRED) 121 public String onWorkChunkCreate(WorkChunkCreateEvent theBatchWorkChunk) { 122 Batch2WorkChunkEntity entity = new Batch2WorkChunkEntity(); 123 entity.setId(UUID.randomUUID().toString()); 124 entity.setSequence(theBatchWorkChunk.sequence); 125 entity.setJobDefinitionId(theBatchWorkChunk.jobDefinitionId); 126 entity.setJobDefinitionVersion(theBatchWorkChunk.jobDefinitionVersion); 127 entity.setTargetStepId(theBatchWorkChunk.targetStepId); 128 entity.setInstanceId(theBatchWorkChunk.instanceId); 129 entity.setSerializedData(theBatchWorkChunk.serializedData); 130 entity.setCreateTime(new Date()); 131 entity.setStartTime(new Date()); 132 entity.setStatus(getOnCreateStatus(theBatchWorkChunk)); 133 134 ourLog.debug("Create work chunk {}/{}/{}", entity.getInstanceId(), entity.getId(), entity.getTargetStepId()); 135 ourLog.trace( 136 "Create work chunk data {}/{}: {}", entity.getInstanceId(), entity.getId(), entity.getSerializedData()); 137 myTransactionService.withSystemRequestOnDefaultPartition().execute(() -> myWorkChunkRepository.save(entity)); 138 139 return entity.getId(); 140 } 141 142 /** 143 * Gets the initial onCreate state for the given workchunk. 144 * Gated job chunks start in GATE_WAITING; they will be transitioned to READY during maintenance pass when all 145 * chunks in the previous step are COMPLETED. 146 * Non gated job chunks start in READY 147 */ 148 private static WorkChunkStatusEnum getOnCreateStatus(WorkChunkCreateEvent theBatchWorkChunk) { 149 if (theBatchWorkChunk.isGatedExecution) { 150 return WorkChunkStatusEnum.GATE_WAITING; 151 } else { 152 return WorkChunkStatusEnum.READY; 153 } 154 } 155 156 @Override 157 @Transactional(propagation = Propagation.REQUIRED) 158 public Optional<WorkChunk> onWorkChunkDequeue(String theChunkId) { 159 // take a lock on the chunk id to ensure that the maintenance run isn't doing anything. 160 Batch2WorkChunkEntity chunkLock = 161 myEntityManager.find(Batch2WorkChunkEntity.class, theChunkId, LockModeType.PESSIMISTIC_WRITE); 162 163 if (chunkLock == null) { 164 ourLog.warn("Unknown chunk id {} encountered. Message will be discarded.", theChunkId); 165 return Optional.empty(); 166 } 167 168 // remove from the current state to avoid stale data. 169 myEntityManager.detach(chunkLock); 170 171 // NOTE: Ideally, IN_PROGRESS wouldn't be allowed here. On chunk failure, we probably shouldn't be allowed. 172 // But how does re-run happen if k8s kills a processor mid run? 173 List<WorkChunkStatusEnum> priorStates = 174 List.of(WorkChunkStatusEnum.QUEUED, WorkChunkStatusEnum.ERRORED, WorkChunkStatusEnum.IN_PROGRESS); 175 int rowsModified = myWorkChunkRepository.updateChunkStatusForStart( 176 theChunkId, new Date(), WorkChunkStatusEnum.IN_PROGRESS, priorStates); 177 178 if (rowsModified == 0) { 179 ourLog.info("Attempting to start chunk {} but it was already started.", theChunkId); 180 return Optional.empty(); 181 } else { 182 Optional<Batch2WorkChunkEntity> chunk = myWorkChunkRepository.findById(theChunkId); 183 return chunk.map(this::toChunk); 184 } 185 } 186 187 @Override 188 @Transactional(propagation = Propagation.REQUIRED) 189 public String storeNewInstance(JobInstance theInstance) { 190 Validate.isTrue(isBlank(theInstance.getInstanceId())); 191 192 invokePreStorageBatchHooks(theInstance); 193 194 Batch2JobInstanceEntity entity = new Batch2JobInstanceEntity(); 195 entity.setId(UUID.randomUUID().toString()); 196 entity.setDefinitionId(theInstance.getJobDefinitionId()); 197 entity.setDefinitionVersion(theInstance.getJobDefinitionVersion()); 198 entity.setStatus(theInstance.getStatus()); 199 entity.setParams(theInstance.getParameters()); 200 entity.setCurrentGatedStepId(theInstance.getCurrentGatedStepId()); 201 entity.setFastTracking(theInstance.isFastTracking()); 202 entity.setCreateTime(new Date()); 203 entity.setStartTime(new Date()); 204 entity.setReport(theInstance.getReport()); 205 entity.setTriggeringUsername(theInstance.getTriggeringUsername()); 206 entity.setTriggeringClientId(theInstance.getTriggeringClientId()); 207 entity.setUserDataJson(theInstance.getUserDataAsString()); 208 209 entity = myJobInstanceRepository.save(entity); 210 return entity.getId(); 211 } 212 213 @Override 214 @Transactional(propagation = Propagation.REQUIRES_NEW) 215 public List<JobInstance> fetchInstances( 216 String theJobDefinitionId, Set<StatusEnum> theStatuses, Date theCutoff, Pageable thePageable) { 217 return toInstanceList(myJobInstanceRepository.findInstancesByJobIdAndStatusAndExpiry( 218 theJobDefinitionId, theStatuses, theCutoff, thePageable)); 219 } 220 221 @Override 222 @Transactional(propagation = Propagation.REQUIRES_NEW) 223 public List<JobInstance> fetchInstancesByJobDefinitionIdAndStatus( 224 String theJobDefinitionId, Set<StatusEnum> theRequestedStatuses, int thePageSize, int thePageIndex) { 225 PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME); 226 return toInstanceList(myJobInstanceRepository.fetchInstancesByJobDefinitionIdAndStatus( 227 theJobDefinitionId, theRequestedStatuses, pageRequest)); 228 } 229 230 @Override 231 @Transactional(propagation = Propagation.REQUIRES_NEW) 232 public List<JobInstance> fetchInstancesByJobDefinitionId( 233 String theJobDefinitionId, int thePageSize, int thePageIndex) { 234 PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME); 235 return toInstanceList(myJobInstanceRepository.findInstancesByJobDefinitionId(theJobDefinitionId, pageRequest)); 236 } 237 238 @Override 239 @Transactional(propagation = Propagation.REQUIRES_NEW) 240 public Page<JobInstance> fetchJobInstances(JobInstanceFetchRequest theRequest) { 241 PageRequest pageRequest = 242 PageRequest.of(theRequest.getPageStart(), theRequest.getBatchSize(), theRequest.getSort()); 243 244 return myTransactionService.withSystemRequestOnDefaultPartition().execute(() -> myJobInstanceRepository 245 .findByJobDefinitionIdOrStatusOrIdOrCreateTime( 246 theRequest.getJobDefinitionId(), 247 StringUtils.isNotEmpty(theRequest.getJobStatus()) 248 ? StatusEnum.valueOf(theRequest.getJobStatus()) 249 : null, 250 theRequest.getJobId(), 251 theRequest.getJobCreateTimeFrom(), 252 theRequest.getJobCreateTimeTo(), 253 pageRequest) 254 .map(this::toInstance)); 255 } 256 257 private List<JobInstance> toInstanceList(List<Batch2JobInstanceEntity> theInstancesByJobDefinitionId) { 258 return theInstancesByJobDefinitionId.stream().map(this::toInstance).collect(Collectors.toList()); 259 } 260 261 @Override 262 @Nonnull 263 public Optional<JobInstance> fetchInstance(String theInstanceId) { 264 return myTransactionService 265 .withSystemRequestOnDefaultPartition() 266 .execute(() -> myJobInstanceRepository.findById(theInstanceId).map(this::toInstance)); 267 } 268 269 @Nonnull 270 @Override 271 public List<BatchWorkChunkStatusDTO> fetchWorkChunkStatusForInstance(String theInstanceId) { 272 return myTransactionService 273 .withSystemRequestOnDefaultPartition() 274 .execute(() -> myWorkChunkRepository.fetchWorkChunkStatusForInstance(theInstanceId)); 275 } 276 277 @Nonnull 278 @Override 279 public BatchInstanceStatusDTO fetchBatchInstanceStatus(String theInstanceId) { 280 return myTransactionService 281 .withSystemRequestOnDefaultPartition() 282 .execute(() -> myJobInstanceRepository.fetchBatchInstanceStatus(theInstanceId)); 283 } 284 285 @Override 286 @Transactional(propagation = Propagation.REQUIRES_NEW) 287 public List<JobInstance> fetchInstances(FetchJobInstancesRequest theRequest, int thePage, int theBatchSize) { 288 String definitionId = theRequest.getJobDefinition(); 289 String params = theRequest.getParameters(); 290 Set<StatusEnum> statuses = theRequest.getStatuses(); 291 292 Pageable pageable = PageRequest.of(thePage, theBatchSize); 293 294 List<Batch2JobInstanceEntity> instanceEntities; 295 296 if (statuses != null && !statuses.isEmpty()) { 297 // if we're not looking for cancelled jobs, we don't want jobs that 298 // are in the process of being cancelled 299 // (cancelling isn't instantaneous, but uses this flag) 300 boolean isCancelled = statuses.contains(StatusEnum.CANCELLED); 301 302 if (Batch2JobDefinitionConstants.BULK_EXPORT.equals(definitionId)) { 303 if (originalRequestUrlTruncation(params) != null) { 304 params = originalRequestUrlTruncation(params); 305 } 306 } 307 instanceEntities = myJobInstanceRepository.findInstancesByJobIdParamsAndStatus( 308 definitionId, params, statuses, isCancelled, pageable); 309 } else { 310 instanceEntities = myJobInstanceRepository.findInstancesByJobIdAndParams(definitionId, params, pageable); 311 } 312 return toInstanceList(instanceEntities); 313 } 314 315 private String originalRequestUrlTruncation(String theParams) { 316 try { 317 ObjectMapper mapper = new ObjectMapper(); 318 mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true); 319 mapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true); 320 JsonNode rootNode = mapper.readTree(theParams); 321 String originalUrl = "originalRequestUrl"; 322 323 if (rootNode instanceof ObjectNode) { 324 ObjectNode objectNode = (ObjectNode) rootNode; 325 326 if (objectNode.has(originalUrl)) { 327 String url = objectNode.get(originalUrl).asText(); 328 if (url.contains("?")) { 329 objectNode.put(originalUrl, url.split("\\?")[0]); 330 } 331 } 332 return mapper.writeValueAsString(objectNode); 333 } 334 } catch (Exception e) { 335 ourLog.info("Error Truncating Original Request Url", e); 336 } 337 return null; 338 } 339 340 @Override 341 @Transactional(propagation = Propagation.REQUIRES_NEW) 342 public List<JobInstance> fetchInstances(int thePageSize, int thePageIndex) { 343 // default sort is myCreateTime Asc 344 PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME); 345 return myTransactionService 346 .withSystemRequestOnDefaultPartition() 347 .execute(() -> myJobInstanceRepository.findAll(pageRequest).stream() 348 .map(this::toInstance) 349 .collect(Collectors.toList())); 350 } 351 352 @Override 353 public void enqueueWorkChunkForProcessing(String theChunkId, Consumer<Integer> theCallback) { 354 int updated = myWorkChunkRepository.updateChunkStatus( 355 theChunkId, WorkChunkStatusEnum.READY, WorkChunkStatusEnum.QUEUED); 356 theCallback.accept(updated); 357 } 358 359 @Override 360 public int updatePollWaitingChunksForJobIfReady(String theInstanceId) { 361 return myWorkChunkRepository.updateWorkChunksForPollWaiting( 362 theInstanceId, 363 Date.from(Instant.now()), 364 Set.of(WorkChunkStatusEnum.POLL_WAITING), 365 WorkChunkStatusEnum.READY); 366 } 367 368 @Override 369 @Transactional(propagation = Propagation.REQUIRES_NEW) 370 public List<JobInstance> fetchRecentInstances(int thePageSize, int thePageIndex) { 371 PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.DESC, CREATE_TIME); 372 return myTransactionService 373 .withSystemRequestOnDefaultPartition() 374 .execute(() -> myJobInstanceRepository.findAll(pageRequest).stream() 375 .map(this::toInstance) 376 .collect(Collectors.toList())); 377 } 378 379 private WorkChunk toChunk(Batch2WorkChunkEntity theEntity) { 380 return JobInstanceUtil.fromEntityToWorkChunk(theEntity); 381 } 382 383 private JobInstance toInstance(Batch2JobInstanceEntity theEntity) { 384 return JobInstanceUtil.fromEntityToInstance(theEntity); 385 } 386 387 @Override 388 public WorkChunkStatusEnum onWorkChunkError(WorkChunkErrorEvent theParameters) { 389 String chunkId = theParameters.getChunkId(); 390 String errorMessage = truncateErrorMessage(theParameters.getErrorMsg()); 391 392 return myTransactionService.withSystemRequestOnDefaultPartition().execute(() -> { 393 int changeCount = myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError( 394 chunkId, new Date(), errorMessage, WorkChunkStatusEnum.ERRORED); 395 Validate.isTrue(changeCount > 0, "No changed chunk matching %s", chunkId); 396 397 int failChangeCount = myWorkChunkRepository.updateChunkForTooManyErrors( 398 WorkChunkStatusEnum.FAILED, chunkId, MAX_CHUNK_ERROR_COUNT, ERROR_MSG_MAX_LENGTH); 399 400 if (failChangeCount > 0) { 401 return WorkChunkStatusEnum.FAILED; 402 } else { 403 return WorkChunkStatusEnum.ERRORED; 404 } 405 }); 406 } 407 408 @Override 409 public void onWorkChunkPollDelay(String theChunkId, Date theDeadline) { 410 int updated = myWorkChunkRepository.updateWorkChunkNextPollTime( 411 theChunkId, WorkChunkStatusEnum.POLL_WAITING, Set.of(WorkChunkStatusEnum.IN_PROGRESS), theDeadline); 412 413 if (updated != 1) { 414 ourLog.warn("Expected to update 1 work chunk's poll delay; but found {}", updated); 415 } 416 } 417 418 @Override 419 public void onWorkChunkFailed(String theChunkId, String theErrorMessage) { 420 ourLog.info("Marking chunk {} as failed with message: {}", theChunkId, theErrorMessage); 421 String errorMessage = truncateErrorMessage(theErrorMessage); 422 myTransactionService 423 .withSystemRequestOnDefaultPartition() 424 .execute(() -> myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError( 425 theChunkId, new Date(), errorMessage, WorkChunkStatusEnum.FAILED)); 426 } 427 428 @Override 429 public void onWorkChunkCompletion(WorkChunkCompletionEvent theEvent) { 430 myTransactionService 431 .withSystemRequestOnDefaultPartition() 432 .execute(() -> myWorkChunkRepository.updateChunkStatusAndClearDataForEndSuccess( 433 theEvent.getChunkId(), 434 new Date(), 435 theEvent.getRecordsProcessed(), 436 theEvent.getRecoveredErrorCount(), 437 WorkChunkStatusEnum.COMPLETED, 438 theEvent.getRecoveredWarningMessage())); 439 } 440 441 @Nullable 442 private static String truncateErrorMessage(String theErrorMessage) { 443 String errorMessage; 444 if (theErrorMessage != null && theErrorMessage.length() > ERROR_MSG_MAX_LENGTH) { 445 ourLog.warn("Truncating error message that is too long to store in database: {}", theErrorMessage); 446 errorMessage = theErrorMessage.substring(0, ERROR_MSG_MAX_LENGTH); 447 } else { 448 errorMessage = theErrorMessage; 449 } 450 return errorMessage; 451 } 452 453 @Override 454 public void markWorkChunksWithStatusAndWipeData( 455 String theInstanceId, List<String> theChunkIds, WorkChunkStatusEnum theStatus, String theErrorMessage) { 456 assert TransactionSynchronizationManager.isActualTransactionActive(); 457 458 ourLog.debug("Marking all chunks for instance {} to status {}", theInstanceId, theStatus); 459 String errorMessage = truncateErrorMessage(theErrorMessage); 460 List<List<String>> listOfListOfIds = ListUtils.partition(theChunkIds, 100); 461 for (List<String> idList : listOfListOfIds) { 462 myWorkChunkRepository.updateAllChunksForInstanceStatusClearDataAndSetError( 463 idList, new Date(), theStatus, errorMessage); 464 } 465 } 466 467 @Override 468 public Set<WorkChunkStatusEnum> getDistinctWorkChunkStatesForJobAndStep( 469 String theInstanceId, String theCurrentStepId) { 470 if (getRunningJob(theInstanceId) == null) { 471 return Collections.unmodifiableSet(new HashSet<>()); 472 } 473 return myWorkChunkRepository.getDistinctStatusesForStep(theInstanceId, theCurrentStepId); 474 } 475 476 private Batch2JobInstanceEntity getRunningJob(String theInstanceId) { 477 Optional<Batch2JobInstanceEntity> instance = myJobInstanceRepository.findById(theInstanceId); 478 if (instance.isEmpty()) { 479 return null; 480 } 481 if (instance.get().getStatus().isEnded()) { 482 return null; 483 } 484 return instance.get(); 485 } 486 487 private void fetchChunks( 488 String theInstanceId, 489 boolean theIncludeData, 490 int thePageSize, 491 int thePageIndex, 492 Consumer<WorkChunk> theConsumer) { 493 myTransactionService 494 .withSystemRequestOnDefaultPartition() 495 .withPropagation(Propagation.REQUIRES_NEW) 496 .execute(() -> { 497 List<Batch2WorkChunkEntity> chunks; 498 if (theIncludeData) { 499 chunks = myWorkChunkRepository.fetchChunks( 500 PageRequest.of(thePageIndex, thePageSize), theInstanceId); 501 } else { 502 chunks = myWorkChunkRepository.fetchChunksNoData( 503 PageRequest.of(thePageIndex, thePageSize), theInstanceId); 504 } 505 for (Batch2WorkChunkEntity chunk : chunks) { 506 theConsumer.accept(toChunk(chunk)); 507 } 508 }); 509 } 510 511 @Override 512 public void updateInstanceUpdateTime(String theInstanceId) { 513 myJobInstanceRepository.updateInstanceUpdateTime(theInstanceId, new Date()); 514 } 515 516 @Override 517 public WorkChunk createWorkChunk(WorkChunk theWorkChunk) { 518 if (theWorkChunk.getId() == null) { 519 theWorkChunk.setId(UUID.randomUUID().toString()); 520 } 521 return toChunk(myWorkChunkRepository.save(Batch2WorkChunkEntity.fromWorkChunk(theWorkChunk))); 522 } 523 524 /** 525 * Note: Not @Transactional because the transaction happens in a lambda that's called outside of this method's scope 526 */ 527 @Override 528 public Iterator<WorkChunk> fetchAllWorkChunksIterator(String theInstanceId, boolean theWithData) { 529 return new PagingIterator<>((thePageIndex, theBatchSize, theConsumer) -> 530 fetchChunks(theInstanceId, theWithData, theBatchSize, thePageIndex, theConsumer)); 531 } 532 533 @Override 534 public Stream<WorkChunk> fetchAllWorkChunksForStepStream(String theInstanceId, String theStepId) { 535 return myWorkChunkRepository 536 .fetchChunksForStep(theInstanceId, theStepId) 537 .map(this::toChunk); 538 } 539 540 @Override 541 public Page<WorkChunkMetadata> fetchAllWorkChunkMetadataForJobInStates( 542 Pageable thePageable, String theInstanceId, Set<WorkChunkStatusEnum> theStates) { 543 Page<Batch2WorkChunkMetadataView> page = 544 myWorkChunkMetadataViewRepo.fetchWorkChunkMetadataForJobInStates(thePageable, theInstanceId, theStates); 545 546 return page.map(Batch2WorkChunkMetadataView::toChunkMetadata); 547 } 548 549 @Override 550 public boolean updateInstance(String theInstanceId, JobInstanceUpdateCallback theModifier) { 551 /* 552 * We may already have a copy of the entity in the L1 cache, and it may be 553 * stale if the scheduled maintenance service has touched it recently. So 554 * we fetch it and then refresh-lock it so that we don't fail if someone 555 * else has touched it. 556 */ 557 Batch2JobInstanceEntity instanceEntity = myEntityManager.find(Batch2JobInstanceEntity.class, theInstanceId); 558 myEntityManager.refresh(instanceEntity, LockModeType.PESSIMISTIC_WRITE); 559 560 if (null == instanceEntity) { 561 ourLog.error("No instance found with Id {}", theInstanceId); 562 return false; 563 } 564 // convert to JobInstance for public api 565 JobInstance jobInstance = JobInstanceUtil.fromEntityToInstance(instanceEntity); 566 567 // run the modification callback 568 boolean wasModified = theModifier.doUpdate(jobInstance); 569 570 if (wasModified) { 571 // copy fields back for flush. 572 JobInstanceUtil.fromInstanceToEntity(jobInstance, instanceEntity); 573 } 574 575 return wasModified; 576 } 577 578 @Override 579 @Transactional(propagation = Propagation.REQUIRES_NEW) 580 public void deleteInstanceAndChunks(String theInstanceId) { 581 ourLog.info("Deleting instance and chunks: {}", theInstanceId); 582 myWorkChunkRepository.deleteAllForInstance(theInstanceId); 583 myJobInstanceRepository.deleteById(theInstanceId); 584 } 585 586 @Override 587 @Transactional(propagation = Propagation.REQUIRES_NEW) 588 public void deleteChunksAndMarkInstanceAsChunksPurged(String theInstanceId) { 589 ourLog.info("Deleting all chunks for instance ID: {}", theInstanceId); 590 int updateCount = myJobInstanceRepository.updateWorkChunksPurgedTrue(theInstanceId); 591 int deleteCount = myWorkChunkRepository.deleteAllForInstance(theInstanceId); 592 ourLog.debug("Purged {} chunks, and updated {} instance.", deleteCount, updateCount); 593 } 594 595 @Override 596 public boolean markInstanceAsStatusWhenStatusIn( 597 String theInstanceId, StatusEnum theStatusEnum, Set<StatusEnum> thePriorStates) { 598 int recordsChanged = 599 myJobInstanceRepository.updateInstanceStatusIfIn(theInstanceId, theStatusEnum, thePriorStates); 600 ourLog.debug( 601 "Update job {} to status {} if in status {}: {}", 602 theInstanceId, 603 theStatusEnum, 604 thePriorStates, 605 recordsChanged > 0); 606 return recordsChanged > 0; 607 } 608 609 @Override 610 @Transactional(propagation = Propagation.REQUIRES_NEW) 611 public JobOperationResultJson cancelInstance(String theInstanceId) { 612 int recordsChanged = myJobInstanceRepository.updateInstanceCancelled(theInstanceId, true); 613 String operationString = "Cancel job instance " + theInstanceId; 614 615 // wipmb For 6.8 - This is too detailed to be down here - this should be up at the api layer. 616 // Replace with boolean result or ResourceNotFound exception. Build the message up at the ui. 617 String messagePrefix = "Job instance <" + theInstanceId + ">"; 618 if (recordsChanged > 0) { 619 return JobOperationResultJson.newSuccess(operationString, messagePrefix + " successfully cancelled."); 620 } else { 621 Optional<JobInstance> instance = fetchInstance(theInstanceId); 622 if (instance.isPresent()) { 623 return JobOperationResultJson.newFailure( 624 operationString, messagePrefix + " was already cancelled. Nothing to do."); 625 } else { 626 return JobOperationResultJson.newFailure(operationString, messagePrefix + " not found."); 627 } 628 } 629 } 630 631 private void invokePreStorageBatchHooks(JobInstance theJobInstance) { 632 if (myInterceptorBroadcaster.hasHooks(Pointcut.STORAGE_PRESTORAGE_BATCH_JOB_CREATE)) { 633 HookParams params = new HookParams() 634 .add(JobInstance.class, theJobInstance) 635 .add(RequestDetails.class, new SystemRequestDetails()); 636 637 myInterceptorBroadcaster.callHooks(Pointcut.STORAGE_PRESTORAGE_BATCH_JOB_CREATE, params); 638 } 639 } 640 641 @Override 642 @Transactional(propagation = Propagation.REQUIRES_NEW) 643 public boolean advanceJobStepAndUpdateChunkStatus( 644 String theJobInstanceId, String theNextStepId, boolean theIsReductionStep) { 645 boolean changed = updateInstance(theJobInstanceId, instance -> { 646 if (instance.getCurrentGatedStepId().equals(theNextStepId)) { 647 // someone else beat us here. No changes 648 return false; 649 } 650 ourLog.debug("Moving gated instance {} to the next step {}.", theJobInstanceId, theNextStepId); 651 instance.setCurrentGatedStepId(theNextStepId); 652 return true; 653 }); 654 655 if (changed) { 656 ourLog.debug( 657 "Updating chunk status from GATE_WAITING to READY for gated instance {} in step {}.", 658 theJobInstanceId, 659 theNextStepId); 660 WorkChunkStatusEnum nextStep = 661 theIsReductionStep ? WorkChunkStatusEnum.REDUCTION_READY : WorkChunkStatusEnum.READY; 662 // when we reach here, the current step id is equal to theNextStepId 663 // Up to 7.1, gated jobs' work chunks are created in status QUEUED but not actually queued for the 664 // workers. 665 // In order to keep them compatible, turn QUEUED chunks into READY, too. 666 // TODO: 'QUEUED' from the IN clause will be removed after 7.6.0. 667 int numChanged = myWorkChunkRepository.updateAllChunksForStepWithStatus( 668 theJobInstanceId, 669 theNextStepId, 670 List.of(WorkChunkStatusEnum.GATE_WAITING, WorkChunkStatusEnum.QUEUED), 671 nextStep); 672 ourLog.debug( 673 "Updated {} chunks of gated instance {} for step {} from fake QUEUED to READY.", 674 numChanged, 675 theJobInstanceId, 676 theNextStepId); 677 } 678 679 return changed; 680 } 681}