001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.batch2; 021 022import ca.uhn.fhir.batch2.api.IJobPersistence; 023import ca.uhn.fhir.batch2.api.JobOperationResultJson; 024import ca.uhn.fhir.batch2.model.BatchInstanceStatusDTO; 025import ca.uhn.fhir.batch2.model.BatchWorkChunkStatusDTO; 026import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest; 027import ca.uhn.fhir.batch2.model.JobInstance; 028import ca.uhn.fhir.batch2.model.StatusEnum; 029import ca.uhn.fhir.batch2.model.WorkChunk; 030import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent; 031import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent; 032import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent; 033import ca.uhn.fhir.batch2.model.WorkChunkMetadata; 034import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; 035import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest; 036import ca.uhn.fhir.interceptor.api.HookParams; 037import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; 038import ca.uhn.fhir.interceptor.api.Pointcut; 039import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository; 040import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkMetadataViewRepository; 041import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository; 042import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; 043import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity; 044import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity; 045import ca.uhn.fhir.jpa.entity.Batch2WorkChunkMetadataView; 046import ca.uhn.fhir.model.api.PagingIterator; 047import ca.uhn.fhir.rest.api.server.RequestDetails; 048import ca.uhn.fhir.rest.api.server.SystemRequestDetails; 049import ca.uhn.fhir.util.Batch2JobDefinitionConstants; 050import ca.uhn.fhir.util.Logs; 051import com.fasterxml.jackson.core.JsonParser; 052import com.fasterxml.jackson.databind.JsonNode; 053import com.fasterxml.jackson.databind.ObjectMapper; 054import com.fasterxml.jackson.databind.node.ObjectNode; 055import jakarta.annotation.Nonnull; 056import jakarta.annotation.Nullable; 057import jakarta.persistence.EntityManager; 058import jakarta.persistence.LockModeType; 059import jakarta.persistence.Query; 060import org.apache.commons.collections4.ListUtils; 061import org.apache.commons.lang3.Validate; 062import org.slf4j.Logger; 063import org.springframework.data.domain.Page; 064import org.springframework.data.domain.PageImpl; 065import org.springframework.data.domain.PageRequest; 066import org.springframework.data.domain.Pageable; 067import org.springframework.data.domain.Sort; 068import org.springframework.transaction.annotation.Propagation; 069import org.springframework.transaction.annotation.Transactional; 070import org.springframework.transaction.support.TransactionSynchronizationManager; 071 072import java.time.Instant; 073import java.util.Collections; 074import java.util.Date; 075import java.util.HashSet; 076import java.util.Iterator; 077import java.util.List; 078import java.util.Objects; 079import java.util.Optional; 080import java.util.Set; 081import java.util.UUID; 082import java.util.function.Consumer; 083import java.util.stream.Collectors; 084import java.util.stream.Stream; 085 086import static ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor.MAX_CHUNK_ERROR_COUNT; 087import static ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity.ERROR_MSG_MAX_LENGTH; 088import static org.apache.commons.lang3.StringUtils.isBlank; 089 090public class JpaJobPersistenceImpl implements IJobPersistence { 091 private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); 092 public static final String CREATE_TIME = "myCreateTime"; 093 094 private final IBatch2JobInstanceRepository myJobInstanceRepository; 095 private final IBatch2WorkChunkRepository myWorkChunkRepository; 096 private final IBatch2WorkChunkMetadataViewRepository myWorkChunkMetadataViewRepo; 097 private final EntityManager myEntityManager; 098 private final IHapiTransactionService myTransactionService; 099 private final IInterceptorBroadcaster myInterceptorBroadcaster; 100 101 /** 102 * Constructor 103 */ 104 public JpaJobPersistenceImpl( 105 IBatch2JobInstanceRepository theJobInstanceRepository, 106 IBatch2WorkChunkRepository theWorkChunkRepository, 107 IBatch2WorkChunkMetadataViewRepository theWorkChunkMetadataViewRepo, 108 IHapiTransactionService theTransactionService, 109 EntityManager theEntityManager, 110 IInterceptorBroadcaster theInterceptorBroadcaster) { 111 Validate.notNull(theJobInstanceRepository, "theJobInstanceRepository"); 112 Validate.notNull(theWorkChunkRepository, "theWorkChunkRepository"); 113 myJobInstanceRepository = theJobInstanceRepository; 114 myWorkChunkRepository = theWorkChunkRepository; 115 myWorkChunkMetadataViewRepo = theWorkChunkMetadataViewRepo; 116 myTransactionService = theTransactionService; 117 myEntityManager = theEntityManager; 118 myInterceptorBroadcaster = theInterceptorBroadcaster; 119 } 120 121 @Override 122 @Transactional(propagation = Propagation.REQUIRED) 123 public String onWorkChunkCreate(WorkChunkCreateEvent theBatchWorkChunk) { 124 Batch2WorkChunkEntity entity = new Batch2WorkChunkEntity(); 125 entity.setId(UUID.randomUUID().toString()); 126 entity.setSequence(theBatchWorkChunk.sequence); 127 entity.setJobDefinitionId(theBatchWorkChunk.jobDefinitionId); 128 entity.setJobDefinitionVersion(theBatchWorkChunk.jobDefinitionVersion); 129 entity.setTargetStepId(theBatchWorkChunk.targetStepId); 130 entity.setInstanceId(theBatchWorkChunk.instanceId); 131 entity.setSerializedData(theBatchWorkChunk.serializedData); 132 entity.setCreateTime(new Date()); 133 entity.setStartTime(new Date()); 134 entity.setStatus(getOnCreateStatus(theBatchWorkChunk)); 135 136 ourLog.debug("Create work chunk {}/{}/{}", entity.getInstanceId(), entity.getId(), entity.getTargetStepId()); 137 ourLog.trace( 138 "Create work chunk data {}/{}: {}", entity.getInstanceId(), entity.getId(), entity.getSerializedData()); 139 myTransactionService.withSystemRequestOnDefaultPartition().execute(() -> myWorkChunkRepository.save(entity)); 140 141 return entity.getId(); 142 } 143 144 /** 145 * Gets the initial onCreate state for the given workchunk. 146 * Gated job chunks start in GATE_WAITING; they will be transitioned to READY during maintenance pass when all 147 * chunks in the previous step are COMPLETED. 148 * Non gated job chunks start in READY 149 */ 150 private static WorkChunkStatusEnum getOnCreateStatus(WorkChunkCreateEvent theBatchWorkChunk) { 151 if (theBatchWorkChunk.isGatedExecution) { 152 return WorkChunkStatusEnum.GATE_WAITING; 153 } else { 154 return WorkChunkStatusEnum.READY; 155 } 156 } 157 158 @Override 159 @Transactional(propagation = Propagation.REQUIRED) 160 public Optional<WorkChunk> onWorkChunkDequeue(String theChunkId) { 161 // take a lock on the chunk id to ensure that the maintenance run isn't doing anything. 162 Batch2WorkChunkEntity chunkLock = 163 myEntityManager.find(Batch2WorkChunkEntity.class, theChunkId, LockModeType.PESSIMISTIC_WRITE); 164 // remove from the current state to avoid stale data. 165 myEntityManager.detach(chunkLock); 166 167 // NOTE: Ideally, IN_PROGRESS wouldn't be allowed here. On chunk failure, we probably shouldn't be allowed. 168 // But how does re-run happen if k8s kills a processor mid run? 169 List<WorkChunkStatusEnum> priorStates = 170 List.of(WorkChunkStatusEnum.QUEUED, WorkChunkStatusEnum.ERRORED, WorkChunkStatusEnum.IN_PROGRESS); 171 int rowsModified = myWorkChunkRepository.updateChunkStatusForStart( 172 theChunkId, new Date(), WorkChunkStatusEnum.IN_PROGRESS, priorStates); 173 174 if (rowsModified == 0) { 175 ourLog.info("Attempting to start chunk {} but it was already started.", theChunkId); 176 return Optional.empty(); 177 } else { 178 Optional<Batch2WorkChunkEntity> chunk = myWorkChunkRepository.findById(theChunkId); 179 return chunk.map(this::toChunk); 180 } 181 } 182 183 @Override 184 @Transactional(propagation = Propagation.REQUIRED) 185 public String storeNewInstance(JobInstance theInstance) { 186 Validate.isTrue(isBlank(theInstance.getInstanceId())); 187 188 invokePreStorageBatchHooks(theInstance); 189 190 Batch2JobInstanceEntity entity = new Batch2JobInstanceEntity(); 191 entity.setId(UUID.randomUUID().toString()); 192 entity.setDefinitionId(theInstance.getJobDefinitionId()); 193 entity.setDefinitionVersion(theInstance.getJobDefinitionVersion()); 194 entity.setStatus(theInstance.getStatus()); 195 entity.setParams(theInstance.getParameters()); 196 entity.setCurrentGatedStepId(theInstance.getCurrentGatedStepId()); 197 entity.setFastTracking(theInstance.isFastTracking()); 198 entity.setCreateTime(new Date()); 199 entity.setStartTime(new Date()); 200 entity.setReport(theInstance.getReport()); 201 entity.setTriggeringUsername(theInstance.getTriggeringUsername()); 202 entity.setTriggeringClientId(theInstance.getTriggeringClientId()); 203 204 entity = myJobInstanceRepository.save(entity); 205 return entity.getId(); 206 } 207 208 @Override 209 @Transactional(propagation = Propagation.REQUIRES_NEW) 210 public List<JobInstance> fetchInstances( 211 String theJobDefinitionId, Set<StatusEnum> theStatuses, Date theCutoff, Pageable thePageable) { 212 return toInstanceList(myJobInstanceRepository.findInstancesByJobIdAndStatusAndExpiry( 213 theJobDefinitionId, theStatuses, theCutoff, thePageable)); 214 } 215 216 @Override 217 @Transactional(propagation = Propagation.REQUIRES_NEW) 218 public List<JobInstance> fetchInstancesByJobDefinitionIdAndStatus( 219 String theJobDefinitionId, Set<StatusEnum> theRequestedStatuses, int thePageSize, int thePageIndex) { 220 PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME); 221 return toInstanceList(myJobInstanceRepository.fetchInstancesByJobDefinitionIdAndStatus( 222 theJobDefinitionId, theRequestedStatuses, pageRequest)); 223 } 224 225 @Override 226 @Transactional(propagation = Propagation.REQUIRES_NEW) 227 public List<JobInstance> fetchInstancesByJobDefinitionId( 228 String theJobDefinitionId, int thePageSize, int thePageIndex) { 229 PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME); 230 return toInstanceList(myJobInstanceRepository.findInstancesByJobDefinitionId(theJobDefinitionId, pageRequest)); 231 } 232 233 @Override 234 @Transactional(propagation = Propagation.REQUIRES_NEW) 235 public Page<JobInstance> fetchJobInstances(JobInstanceFetchRequest theRequest) { 236 PageRequest pageRequest = 237 PageRequest.of(theRequest.getPageStart(), theRequest.getBatchSize(), theRequest.getSort()); 238 239 String jobStatus = theRequest.getJobStatus(); 240 if (Objects.equals(jobStatus, "")) { 241 Page<Batch2JobInstanceEntity> pageOfEntities = myJobInstanceRepository.findAll(pageRequest); 242 return pageOfEntities.map(this::toInstance); 243 } 244 245 StatusEnum status = StatusEnum.valueOf(jobStatus); 246 List<JobInstance> jobs = toInstanceList(myJobInstanceRepository.findInstancesByJobStatus(status, pageRequest)); 247 Integer jobsOfStatus = myJobInstanceRepository.findTotalJobsOfStatus(status); 248 return new PageImpl<>(jobs, pageRequest, jobsOfStatus); 249 } 250 251 private List<JobInstance> toInstanceList(List<Batch2JobInstanceEntity> theInstancesByJobDefinitionId) { 252 return theInstancesByJobDefinitionId.stream().map(this::toInstance).collect(Collectors.toList()); 253 } 254 255 @Override 256 @Nonnull 257 public Optional<JobInstance> fetchInstance(String theInstanceId) { 258 return myTransactionService 259 .withSystemRequestOnDefaultPartition() 260 .execute(() -> myJobInstanceRepository.findById(theInstanceId).map(this::toInstance)); 261 } 262 263 @Nonnull 264 @Override 265 public List<BatchWorkChunkStatusDTO> fetchWorkChunkStatusForInstance(String theInstanceId) { 266 return myTransactionService 267 .withSystemRequestOnDefaultPartition() 268 .execute(() -> myWorkChunkRepository.fetchWorkChunkStatusForInstance(theInstanceId)); 269 } 270 271 @Nonnull 272 @Override 273 public BatchInstanceStatusDTO fetchBatchInstanceStatus(String theInstanceId) { 274 return myTransactionService 275 .withSystemRequestOnDefaultPartition() 276 .execute(() -> myJobInstanceRepository.fetchBatchInstanceStatus(theInstanceId)); 277 } 278 279 @Override 280 @Transactional(propagation = Propagation.REQUIRES_NEW) 281 public List<JobInstance> fetchInstances(FetchJobInstancesRequest theRequest, int thePage, int theBatchSize) { 282 String definitionId = theRequest.getJobDefinition(); 283 String params = theRequest.getParameters(); 284 Set<StatusEnum> statuses = theRequest.getStatuses(); 285 286 Pageable pageable = PageRequest.of(thePage, theBatchSize); 287 288 List<Batch2JobInstanceEntity> instanceEntities; 289 290 if (statuses != null && !statuses.isEmpty()) { 291 if (Batch2JobDefinitionConstants.BULK_EXPORT.equals(definitionId)) { 292 if (originalRequestUrlTruncation(params) != null) { 293 params = originalRequestUrlTruncation(params); 294 } 295 } 296 instanceEntities = myJobInstanceRepository.findInstancesByJobIdParamsAndStatus( 297 definitionId, params, statuses, pageable); 298 } else { 299 instanceEntities = myJobInstanceRepository.findInstancesByJobIdAndParams(definitionId, params, pageable); 300 } 301 return toInstanceList(instanceEntities); 302 } 303 304 private String originalRequestUrlTruncation(String theParams) { 305 try { 306 ObjectMapper mapper = new ObjectMapper(); 307 mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true); 308 mapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true); 309 JsonNode rootNode = mapper.readTree(theParams); 310 String originalUrl = "originalRequestUrl"; 311 312 if (rootNode instanceof ObjectNode) { 313 ObjectNode objectNode = (ObjectNode) rootNode; 314 315 if (objectNode.has(originalUrl)) { 316 String url = objectNode.get(originalUrl).asText(); 317 if (url.contains("?")) { 318 objectNode.put(originalUrl, url.split("\\?")[0]); 319 } 320 } 321 return mapper.writeValueAsString(objectNode); 322 } 323 } catch (Exception e) { 324 ourLog.info("Error Truncating Original Request Url", e); 325 } 326 return null; 327 } 328 329 @Override 330 @Transactional(propagation = Propagation.REQUIRES_NEW) 331 public List<JobInstance> fetchInstances(int thePageSize, int thePageIndex) { 332 // default sort is myCreateTime Asc 333 PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME); 334 return myTransactionService 335 .withSystemRequestOnDefaultPartition() 336 .execute(() -> myJobInstanceRepository.findAll(pageRequest).stream() 337 .map(this::toInstance) 338 .collect(Collectors.toList())); 339 } 340 341 @Override 342 public void enqueueWorkChunkForProcessing(String theChunkId, Consumer<Integer> theCallback) { 343 int updated = myWorkChunkRepository.updateChunkStatus( 344 theChunkId, WorkChunkStatusEnum.READY, WorkChunkStatusEnum.QUEUED); 345 theCallback.accept(updated); 346 } 347 348 @Override 349 public int updatePollWaitingChunksForJobIfReady(String theInstanceId) { 350 return myWorkChunkRepository.updateWorkChunksForPollWaiting( 351 theInstanceId, 352 Date.from(Instant.now()), 353 Set.of(WorkChunkStatusEnum.POLL_WAITING), 354 WorkChunkStatusEnum.READY); 355 } 356 357 @Override 358 @Transactional(propagation = Propagation.REQUIRES_NEW) 359 public List<JobInstance> fetchRecentInstances(int thePageSize, int thePageIndex) { 360 PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.DESC, CREATE_TIME); 361 return myTransactionService 362 .withSystemRequestOnDefaultPartition() 363 .execute(() -> myJobInstanceRepository.findAll(pageRequest).stream() 364 .map(this::toInstance) 365 .collect(Collectors.toList())); 366 } 367 368 private WorkChunk toChunk(Batch2WorkChunkEntity theEntity) { 369 return JobInstanceUtil.fromEntityToWorkChunk(theEntity); 370 } 371 372 private JobInstance toInstance(Batch2JobInstanceEntity theEntity) { 373 return JobInstanceUtil.fromEntityToInstance(theEntity); 374 } 375 376 @Override 377 public WorkChunkStatusEnum onWorkChunkError(WorkChunkErrorEvent theParameters) { 378 String chunkId = theParameters.getChunkId(); 379 String errorMessage = truncateErrorMessage(theParameters.getErrorMsg()); 380 381 return myTransactionService.withSystemRequestOnDefaultPartition().execute(() -> { 382 int changeCount = myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError( 383 chunkId, new Date(), errorMessage, WorkChunkStatusEnum.ERRORED); 384 Validate.isTrue(changeCount > 0, "changed chunk matching %s", chunkId); 385 386 Query query = myEntityManager.createQuery("update Batch2WorkChunkEntity " + "set myStatus = :failed " 387 + ",myErrorMessage = CONCAT('Too many errors: ', CAST(myErrorCount as string), '. Last error msg was ', myErrorMessage) " 388 + "where myId = :chunkId and myErrorCount > :maxCount"); 389 query.setParameter("chunkId", chunkId); 390 query.setParameter("failed", WorkChunkStatusEnum.FAILED); 391 query.setParameter("maxCount", MAX_CHUNK_ERROR_COUNT); 392 int failChangeCount = query.executeUpdate(); 393 394 if (failChangeCount > 0) { 395 return WorkChunkStatusEnum.FAILED; 396 } else { 397 return WorkChunkStatusEnum.ERRORED; 398 } 399 }); 400 } 401 402 @Override 403 public void onWorkChunkPollDelay(String theChunkId, Date theDeadline) { 404 int updated = myWorkChunkRepository.updateWorkChunkNextPollTime( 405 theChunkId, WorkChunkStatusEnum.POLL_WAITING, Set.of(WorkChunkStatusEnum.IN_PROGRESS), theDeadline); 406 407 if (updated != 1) { 408 ourLog.warn("Expected to update 1 work chunk's poll delay; but found {}", updated); 409 } 410 } 411 412 @Override 413 public void onWorkChunkFailed(String theChunkId, String theErrorMessage) { 414 ourLog.info("Marking chunk {} as failed with message: {}", theChunkId, theErrorMessage); 415 String errorMessage = truncateErrorMessage(theErrorMessage); 416 myTransactionService 417 .withSystemRequestOnDefaultPartition() 418 .execute(() -> myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError( 419 theChunkId, new Date(), errorMessage, WorkChunkStatusEnum.FAILED)); 420 } 421 422 @Override 423 public void onWorkChunkCompletion(WorkChunkCompletionEvent theEvent) { 424 myTransactionService 425 .withSystemRequestOnDefaultPartition() 426 .execute(() -> myWorkChunkRepository.updateChunkStatusAndClearDataForEndSuccess( 427 theEvent.getChunkId(), 428 new Date(), 429 theEvent.getRecordsProcessed(), 430 theEvent.getRecoveredErrorCount(), 431 WorkChunkStatusEnum.COMPLETED, 432 theEvent.getRecoveredWarningMessage())); 433 } 434 435 @Nullable 436 private static String truncateErrorMessage(String theErrorMessage) { 437 String errorMessage; 438 if (theErrorMessage != null && theErrorMessage.length() > ERROR_MSG_MAX_LENGTH) { 439 ourLog.warn("Truncating error message that is too long to store in database: {}", theErrorMessage); 440 errorMessage = theErrorMessage.substring(0, ERROR_MSG_MAX_LENGTH); 441 } else { 442 errorMessage = theErrorMessage; 443 } 444 return errorMessage; 445 } 446 447 @Override 448 public void markWorkChunksWithStatusAndWipeData( 449 String theInstanceId, List<String> theChunkIds, WorkChunkStatusEnum theStatus, String theErrorMessage) { 450 assert TransactionSynchronizationManager.isActualTransactionActive(); 451 452 ourLog.debug("Marking all chunks for instance {} to status {}", theInstanceId, theStatus); 453 String errorMessage = truncateErrorMessage(theErrorMessage); 454 List<List<String>> listOfListOfIds = ListUtils.partition(theChunkIds, 100); 455 for (List<String> idList : listOfListOfIds) { 456 myWorkChunkRepository.updateAllChunksForInstanceStatusClearDataAndSetError( 457 idList, new Date(), theStatus, errorMessage); 458 } 459 } 460 461 @Override 462 public Set<WorkChunkStatusEnum> getDistinctWorkChunkStatesForJobAndStep( 463 String theInstanceId, String theCurrentStepId) { 464 if (getRunningJob(theInstanceId) == null) { 465 return Collections.unmodifiableSet(new HashSet<>()); 466 } 467 return myWorkChunkRepository.getDistinctStatusesForStep(theInstanceId, theCurrentStepId); 468 } 469 470 private Batch2JobInstanceEntity getRunningJob(String theInstanceId) { 471 Optional<Batch2JobInstanceEntity> instance = myJobInstanceRepository.findById(theInstanceId); 472 if (instance.isEmpty()) { 473 return null; 474 } 475 if (instance.get().getStatus().isEnded()) { 476 return null; 477 } 478 return instance.get(); 479 } 480 481 private void fetchChunks( 482 String theInstanceId, 483 boolean theIncludeData, 484 int thePageSize, 485 int thePageIndex, 486 Consumer<WorkChunk> theConsumer) { 487 myTransactionService 488 .withSystemRequestOnDefaultPartition() 489 .withPropagation(Propagation.REQUIRES_NEW) 490 .execute(() -> { 491 List<Batch2WorkChunkEntity> chunks; 492 if (theIncludeData) { 493 chunks = myWorkChunkRepository.fetchChunks( 494 PageRequest.of(thePageIndex, thePageSize), theInstanceId); 495 } else { 496 chunks = myWorkChunkRepository.fetchChunksNoData( 497 PageRequest.of(thePageIndex, thePageSize), theInstanceId); 498 } 499 for (Batch2WorkChunkEntity chunk : chunks) { 500 theConsumer.accept(toChunk(chunk)); 501 } 502 }); 503 } 504 505 @Override 506 public void updateInstanceUpdateTime(String theInstanceId) { 507 myJobInstanceRepository.updateInstanceUpdateTime(theInstanceId, new Date()); 508 } 509 510 @Override 511 public WorkChunk createWorkChunk(WorkChunk theWorkChunk) { 512 if (theWorkChunk.getId() == null) { 513 theWorkChunk.setId(UUID.randomUUID().toString()); 514 } 515 return toChunk(myWorkChunkRepository.save(Batch2WorkChunkEntity.fromWorkChunk(theWorkChunk))); 516 } 517 518 /** 519 * Note: Not @Transactional because the transaction happens in a lambda that's called outside of this method's scope 520 */ 521 @Override 522 public Iterator<WorkChunk> fetchAllWorkChunksIterator(String theInstanceId, boolean theWithData) { 523 return new PagingIterator<>((thePageIndex, theBatchSize, theConsumer) -> 524 fetchChunks(theInstanceId, theWithData, theBatchSize, thePageIndex, theConsumer)); 525 } 526 527 @Override 528 public Stream<WorkChunk> fetchAllWorkChunksForStepStream(String theInstanceId, String theStepId) { 529 return myWorkChunkRepository 530 .fetchChunksForStep(theInstanceId, theStepId) 531 .map(this::toChunk); 532 } 533 534 @Override 535 public Page<WorkChunkMetadata> fetchAllWorkChunkMetadataForJobInStates( 536 Pageable thePageable, String theInstanceId, Set<WorkChunkStatusEnum> theStates) { 537 Page<Batch2WorkChunkMetadataView> page = 538 myWorkChunkMetadataViewRepo.fetchWorkChunkMetadataForJobInStates(thePageable, theInstanceId, theStates); 539 540 return page.map(Batch2WorkChunkMetadataView::toChunkMetadata); 541 } 542 543 @Override 544 public boolean updateInstance(String theInstanceId, JobInstanceUpdateCallback theModifier) { 545 Batch2JobInstanceEntity instanceEntity = 546 myEntityManager.find(Batch2JobInstanceEntity.class, theInstanceId, LockModeType.PESSIMISTIC_WRITE); 547 if (null == instanceEntity) { 548 ourLog.error("No instance found with Id {}", theInstanceId); 549 return false; 550 } 551 // convert to JobInstance for public api 552 JobInstance jobInstance = JobInstanceUtil.fromEntityToInstance(instanceEntity); 553 554 // run the modification callback 555 boolean wasModified = theModifier.doUpdate(jobInstance); 556 557 if (wasModified) { 558 // copy fields back for flush. 559 JobInstanceUtil.fromInstanceToEntity(jobInstance, instanceEntity); 560 } 561 562 return wasModified; 563 } 564 565 @Override 566 @Transactional(propagation = Propagation.REQUIRES_NEW) 567 public void deleteInstanceAndChunks(String theInstanceId) { 568 ourLog.info("Deleting instance and chunks: {}", theInstanceId); 569 myWorkChunkRepository.deleteAllForInstance(theInstanceId); 570 myJobInstanceRepository.deleteById(theInstanceId); 571 } 572 573 @Override 574 @Transactional(propagation = Propagation.REQUIRES_NEW) 575 public void deleteChunksAndMarkInstanceAsChunksPurged(String theInstanceId) { 576 ourLog.info("Deleting all chunks for instance ID: {}", theInstanceId); 577 int updateCount = myJobInstanceRepository.updateWorkChunksPurgedTrue(theInstanceId); 578 int deleteCount = myWorkChunkRepository.deleteAllForInstance(theInstanceId); 579 ourLog.debug("Purged {} chunks, and updated {} instance.", deleteCount, updateCount); 580 } 581 582 @Override 583 public boolean markInstanceAsStatusWhenStatusIn( 584 String theInstanceId, StatusEnum theStatusEnum, Set<StatusEnum> thePriorStates) { 585 int recordsChanged = 586 myJobInstanceRepository.updateInstanceStatusIfIn(theInstanceId, theStatusEnum, thePriorStates); 587 ourLog.debug( 588 "Update job {} to status {} if in status {}: {}", 589 theInstanceId, 590 theStatusEnum, 591 thePriorStates, 592 recordsChanged > 0); 593 return recordsChanged > 0; 594 } 595 596 @Override 597 @Transactional(propagation = Propagation.REQUIRES_NEW) 598 public JobOperationResultJson cancelInstance(String theInstanceId) { 599 int recordsChanged = myJobInstanceRepository.updateInstanceCancelled(theInstanceId, true); 600 String operationString = "Cancel job instance " + theInstanceId; 601 602 // wipmb For 6.8 - This is too detailed to be down here - this should be up at the api layer. 603 // Replace with boolean result or ResourceNotFound exception. Build the message up at the ui. 604 String messagePrefix = "Job instance <" + theInstanceId + ">"; 605 if (recordsChanged > 0) { 606 return JobOperationResultJson.newSuccess(operationString, messagePrefix + " successfully cancelled."); 607 } else { 608 Optional<JobInstance> instance = fetchInstance(theInstanceId); 609 if (instance.isPresent()) { 610 return JobOperationResultJson.newFailure( 611 operationString, messagePrefix + " was already cancelled. Nothing to do."); 612 } else { 613 return JobOperationResultJson.newFailure(operationString, messagePrefix + " not found."); 614 } 615 } 616 } 617 618 private void invokePreStorageBatchHooks(JobInstance theJobInstance) { 619 if (myInterceptorBroadcaster.hasHooks(Pointcut.STORAGE_PRESTORAGE_BATCH_JOB_CREATE)) { 620 HookParams params = new HookParams() 621 .add(JobInstance.class, theJobInstance) 622 .add(RequestDetails.class, new SystemRequestDetails()); 623 624 myInterceptorBroadcaster.callHooks(Pointcut.STORAGE_PRESTORAGE_BATCH_JOB_CREATE, params); 625 } 626 } 627 628 @Override 629 @Transactional(propagation = Propagation.REQUIRES_NEW) 630 public boolean advanceJobStepAndUpdateChunkStatus( 631 String theJobInstanceId, String theNextStepId, boolean theIsReductionStep) { 632 boolean changed = updateInstance(theJobInstanceId, instance -> { 633 if (instance.getCurrentGatedStepId().equals(theNextStepId)) { 634 // someone else beat us here. No changes 635 return false; 636 } 637 ourLog.debug("Moving gated instance {} to the next step {}.", theJobInstanceId, theNextStepId); 638 instance.setCurrentGatedStepId(theNextStepId); 639 return true; 640 }); 641 642 if (changed) { 643 ourLog.debug( 644 "Updating chunk status from GATE_WAITING to READY for gated instance {} in step {}.", 645 theJobInstanceId, 646 theNextStepId); 647 WorkChunkStatusEnum nextStep = 648 theIsReductionStep ? WorkChunkStatusEnum.REDUCTION_READY : WorkChunkStatusEnum.READY; 649 // when we reach here, the current step id is equal to theNextStepId 650 // Up to 7.1, gated jobs' work chunks are created in status QUEUED but not actually queued for the 651 // workers. 652 // In order to keep them compatible, turn QUEUED chunks into READY, too. 653 // TODO: 'QUEUED' from the IN clause will be removed after 7.6.0. 654 int numChanged = myWorkChunkRepository.updateAllChunksForStepWithStatus( 655 theJobInstanceId, 656 theNextStepId, 657 List.of(WorkChunkStatusEnum.GATE_WAITING, WorkChunkStatusEnum.QUEUED), 658 nextStep); 659 ourLog.debug( 660 "Updated {} chunks of gated instance {} for step {} from fake QUEUED to READY.", 661 numChanged, 662 theJobInstanceId, 663 theNextStepId); 664 } 665 666 return changed; 667 } 668}