001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.batch2; 021 022import ca.uhn.fhir.batch2.api.IJobPersistence; 023import ca.uhn.fhir.batch2.api.JobOperationResultJson; 024import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest; 025import ca.uhn.fhir.batch2.model.JobInstance; 026import ca.uhn.fhir.batch2.model.StatusEnum; 027import ca.uhn.fhir.batch2.model.WorkChunk; 028import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent; 029import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent; 030import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent; 031import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; 032import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest; 033import ca.uhn.fhir.interceptor.api.HookParams; 034import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; 035import ca.uhn.fhir.interceptor.api.Pointcut; 036import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository; 037import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository; 038import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; 039import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity; 040import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity; 041import ca.uhn.fhir.model.api.PagingIterator; 042import ca.uhn.fhir.rest.api.server.RequestDetails; 043import ca.uhn.fhir.rest.api.server.SystemRequestDetails; 044import ca.uhn.fhir.util.Batch2JobDefinitionConstants; 045import ca.uhn.fhir.util.Logs; 046import com.fasterxml.jackson.core.JsonParser; 047import com.fasterxml.jackson.databind.JsonNode; 048import com.fasterxml.jackson.databind.ObjectMapper; 049import com.fasterxml.jackson.databind.node.ObjectNode; 050import jakarta.annotation.Nonnull; 051import jakarta.annotation.Nullable; 052import jakarta.persistence.EntityManager; 053import jakarta.persistence.LockModeType; 054import jakarta.persistence.Query; 055import org.apache.commons.collections4.ListUtils; 056import org.apache.commons.lang3.Validate; 057import org.slf4j.Logger; 058import org.springframework.data.domain.Page; 059import org.springframework.data.domain.PageImpl; 060import org.springframework.data.domain.PageRequest; 061import org.springframework.data.domain.Pageable; 062import org.springframework.data.domain.Sort; 063import org.springframework.transaction.annotation.Propagation; 064import org.springframework.transaction.annotation.Transactional; 065import org.springframework.transaction.support.TransactionSynchronizationManager; 066 067import java.util.Date; 068import java.util.Iterator; 069import java.util.List; 070import java.util.Objects; 071import java.util.Optional; 072import java.util.Set; 073import java.util.UUID; 074import java.util.function.Consumer; 075import java.util.stream.Collectors; 076import java.util.stream.Stream; 077 078import static ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor.MAX_CHUNK_ERROR_COUNT; 079import static ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity.ERROR_MSG_MAX_LENGTH; 080import static org.apache.commons.lang3.StringUtils.isBlank; 081 082public class JpaJobPersistenceImpl implements IJobPersistence { 083 private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); 084 public static final String CREATE_TIME = "myCreateTime"; 085 086 private final IBatch2JobInstanceRepository myJobInstanceRepository; 087 private final IBatch2WorkChunkRepository myWorkChunkRepository; 088 private final EntityManager myEntityManager; 089 private final IHapiTransactionService myTransactionService; 090 private final IInterceptorBroadcaster myInterceptorBroadcaster; 091 092 /** 093 * Constructor 094 */ 095 public JpaJobPersistenceImpl( 096 IBatch2JobInstanceRepository theJobInstanceRepository, 097 IBatch2WorkChunkRepository theWorkChunkRepository, 098 IHapiTransactionService theTransactionService, 099 EntityManager theEntityManager, 100 IInterceptorBroadcaster theInterceptorBroadcaster) { 101 Validate.notNull(theJobInstanceRepository); 102 Validate.notNull(theWorkChunkRepository); 103 myJobInstanceRepository = theJobInstanceRepository; 104 myWorkChunkRepository = theWorkChunkRepository; 105 myTransactionService = theTransactionService; 106 myEntityManager = theEntityManager; 107 myInterceptorBroadcaster = theInterceptorBroadcaster; 108 } 109 110 @Override 111 @Transactional(propagation = Propagation.REQUIRED) 112 public String onWorkChunkCreate(WorkChunkCreateEvent theBatchWorkChunk) { 113 Batch2WorkChunkEntity entity = new Batch2WorkChunkEntity(); 114 entity.setId(UUID.randomUUID().toString()); 115 entity.setSequence(theBatchWorkChunk.sequence); 116 entity.setJobDefinitionId(theBatchWorkChunk.jobDefinitionId); 117 entity.setJobDefinitionVersion(theBatchWorkChunk.jobDefinitionVersion); 118 entity.setTargetStepId(theBatchWorkChunk.targetStepId); 119 entity.setInstanceId(theBatchWorkChunk.instanceId); 120 entity.setSerializedData(theBatchWorkChunk.serializedData); 121 entity.setCreateTime(new Date()); 122 entity.setStartTime(new Date()); 123 entity.setStatus(WorkChunkStatusEnum.QUEUED); 124 ourLog.debug("Create work chunk {}/{}/{}", entity.getInstanceId(), entity.getId(), entity.getTargetStepId()); 125 ourLog.trace( 126 "Create work chunk data {}/{}: {}", entity.getInstanceId(), entity.getId(), entity.getSerializedData()); 127 myTransactionService.withSystemRequestOnDefaultPartition().execute(() -> myWorkChunkRepository.save(entity)); 128 return entity.getId(); 129 } 130 131 @Override 132 @Transactional(propagation = Propagation.REQUIRED) 133 public Optional<WorkChunk> onWorkChunkDequeue(String theChunkId) { 134 // NOTE: Ideally, IN_PROGRESS wouldn't be allowed here. On chunk failure, we probably shouldn't be allowed. 135 // But how does re-run happen if k8s kills a processor mid run? 136 List<WorkChunkStatusEnum> priorStates = 137 List.of(WorkChunkStatusEnum.QUEUED, WorkChunkStatusEnum.ERRORED, WorkChunkStatusEnum.IN_PROGRESS); 138 int rowsModified = myWorkChunkRepository.updateChunkStatusForStart( 139 theChunkId, new Date(), WorkChunkStatusEnum.IN_PROGRESS, priorStates); 140 if (rowsModified == 0) { 141 ourLog.info("Attempting to start chunk {} but it was already started.", theChunkId); 142 return Optional.empty(); 143 } else { 144 Optional<Batch2WorkChunkEntity> chunk = myWorkChunkRepository.findById(theChunkId); 145 return chunk.map(this::toChunk); 146 } 147 } 148 149 @Override 150 @Transactional(propagation = Propagation.REQUIRED) 151 public String storeNewInstance(JobInstance theInstance) { 152 Validate.isTrue(isBlank(theInstance.getInstanceId())); 153 154 invokePreStorageBatchHooks(theInstance); 155 156 Batch2JobInstanceEntity entity = new Batch2JobInstanceEntity(); 157 entity.setId(UUID.randomUUID().toString()); 158 entity.setDefinitionId(theInstance.getJobDefinitionId()); 159 entity.setDefinitionVersion(theInstance.getJobDefinitionVersion()); 160 entity.setStatus(theInstance.getStatus()); 161 entity.setParams(theInstance.getParameters()); 162 entity.setCurrentGatedStepId(theInstance.getCurrentGatedStepId()); 163 entity.setFastTracking(theInstance.isFastTracking()); 164 entity.setCreateTime(new Date()); 165 entity.setStartTime(new Date()); 166 entity.setReport(theInstance.getReport()); 167 entity.setTriggeringUsername(theInstance.getTriggeringUsername()); 168 entity.setTriggeringClientId(theInstance.getTriggeringClientId()); 169 170 entity = myJobInstanceRepository.save(entity); 171 return entity.getId(); 172 } 173 174 @Override 175 @Transactional(propagation = Propagation.REQUIRES_NEW) 176 public List<JobInstance> fetchInstances( 177 String theJobDefinitionId, Set<StatusEnum> theStatuses, Date theCutoff, Pageable thePageable) { 178 return toInstanceList(myJobInstanceRepository.findInstancesByJobIdAndStatusAndExpiry( 179 theJobDefinitionId, theStatuses, theCutoff, thePageable)); 180 } 181 182 @Override 183 @Transactional(propagation = Propagation.REQUIRES_NEW) 184 public List<JobInstance> fetchInstancesByJobDefinitionIdAndStatus( 185 String theJobDefinitionId, Set<StatusEnum> theRequestedStatuses, int thePageSize, int thePageIndex) { 186 PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME); 187 return toInstanceList(myJobInstanceRepository.fetchInstancesByJobDefinitionIdAndStatus( 188 theJobDefinitionId, theRequestedStatuses, pageRequest)); 189 } 190 191 @Override 192 @Transactional(propagation = Propagation.REQUIRES_NEW) 193 public List<JobInstance> fetchInstancesByJobDefinitionId( 194 String theJobDefinitionId, int thePageSize, int thePageIndex) { 195 PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME); 196 return toInstanceList(myJobInstanceRepository.findInstancesByJobDefinitionId(theJobDefinitionId, pageRequest)); 197 } 198 199 @Override 200 @Transactional(propagation = Propagation.REQUIRES_NEW) 201 public Page<JobInstance> fetchJobInstances(JobInstanceFetchRequest theRequest) { 202 PageRequest pageRequest = 203 PageRequest.of(theRequest.getPageStart(), theRequest.getBatchSize(), theRequest.getSort()); 204 205 String jobStatus = theRequest.getJobStatus(); 206 if (Objects.equals(jobStatus, "")) { 207 Page<Batch2JobInstanceEntity> pageOfEntities = myJobInstanceRepository.findAll(pageRequest); 208 return pageOfEntities.map(this::toInstance); 209 } 210 211 StatusEnum status = StatusEnum.valueOf(jobStatus); 212 List<JobInstance> jobs = toInstanceList(myJobInstanceRepository.findInstancesByJobStatus(status, pageRequest)); 213 Integer jobsOfStatus = myJobInstanceRepository.findTotalJobsOfStatus(status); 214 return new PageImpl<>(jobs, pageRequest, jobsOfStatus); 215 } 216 217 private List<JobInstance> toInstanceList(List<Batch2JobInstanceEntity> theInstancesByJobDefinitionId) { 218 return theInstancesByJobDefinitionId.stream().map(this::toInstance).collect(Collectors.toList()); 219 } 220 221 @Override 222 @Nonnull 223 public Optional<JobInstance> fetchInstance(String theInstanceId) { 224 return myTransactionService 225 .withSystemRequestOnDefaultPartition() 226 .execute(() -> myJobInstanceRepository.findById(theInstanceId).map(this::toInstance)); 227 } 228 229 @Override 230 @Transactional(propagation = Propagation.REQUIRES_NEW) 231 public List<JobInstance> fetchInstances(FetchJobInstancesRequest theRequest, int thePage, int theBatchSize) { 232 String definitionId = theRequest.getJobDefinition(); 233 String params = theRequest.getParameters(); 234 Set<StatusEnum> statuses = theRequest.getStatuses(); 235 236 Pageable pageable = PageRequest.of(thePage, theBatchSize); 237 238 List<Batch2JobInstanceEntity> instanceEntities; 239 240 if (statuses != null && !statuses.isEmpty()) { 241 if (Batch2JobDefinitionConstants.BULK_EXPORT.equals(definitionId)) { 242 if (originalRequestUrlTruncation(params) != null) { 243 params = originalRequestUrlTruncation(params); 244 } 245 } 246 instanceEntities = myJobInstanceRepository.findInstancesByJobIdParamsAndStatus( 247 definitionId, params, statuses, pageable); 248 } else { 249 instanceEntities = myJobInstanceRepository.findInstancesByJobIdAndParams(definitionId, params, pageable); 250 } 251 return toInstanceList(instanceEntities); 252 } 253 254 private String originalRequestUrlTruncation(String theParams) { 255 try { 256 ObjectMapper mapper = new ObjectMapper(); 257 mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true); 258 mapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true); 259 JsonNode rootNode = mapper.readTree(theParams); 260 String originalUrl = "originalRequestUrl"; 261 262 if (rootNode instanceof ObjectNode) { 263 ObjectNode objectNode = (ObjectNode) rootNode; 264 265 if (objectNode.has(originalUrl)) { 266 String url = objectNode.get(originalUrl).asText(); 267 if (url.contains("?")) { 268 objectNode.put(originalUrl, url.split("\\?")[0]); 269 } 270 } 271 return mapper.writeValueAsString(objectNode); 272 } 273 } catch (Exception e) { 274 ourLog.info("Error Truncating Original Request Url", e); 275 } 276 return null; 277 } 278 279 @Override 280 @Transactional(propagation = Propagation.REQUIRES_NEW) 281 public List<JobInstance> fetchInstances(int thePageSize, int thePageIndex) { 282 // default sort is myCreateTime Asc 283 PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME); 284 return myTransactionService 285 .withSystemRequestOnDefaultPartition() 286 .execute(() -> myJobInstanceRepository.findAll(pageRequest).stream() 287 .map(this::toInstance) 288 .collect(Collectors.toList())); 289 } 290 291 @Override 292 @Transactional(propagation = Propagation.REQUIRES_NEW) 293 public List<JobInstance> fetchRecentInstances(int thePageSize, int thePageIndex) { 294 PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.DESC, CREATE_TIME); 295 return myTransactionService 296 .withSystemRequestOnDefaultPartition() 297 .execute(() -> myJobInstanceRepository.findAll(pageRequest).stream() 298 .map(this::toInstance) 299 .collect(Collectors.toList())); 300 } 301 302 private WorkChunk toChunk(Batch2WorkChunkEntity theEntity) { 303 return JobInstanceUtil.fromEntityToWorkChunk(theEntity); 304 } 305 306 private JobInstance toInstance(Batch2JobInstanceEntity theEntity) { 307 return JobInstanceUtil.fromEntityToInstance(theEntity); 308 } 309 310 @Override 311 public WorkChunkStatusEnum onWorkChunkError(WorkChunkErrorEvent theParameters) { 312 String chunkId = theParameters.getChunkId(); 313 String errorMessage = truncateErrorMessage(theParameters.getErrorMsg()); 314 315 return myTransactionService.withSystemRequestOnDefaultPartition().execute(() -> { 316 int changeCount = myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError( 317 chunkId, new Date(), errorMessage, WorkChunkStatusEnum.ERRORED); 318 Validate.isTrue(changeCount > 0, "changed chunk matching %s", chunkId); 319 320 Query query = myEntityManager.createQuery("update Batch2WorkChunkEntity " + "set myStatus = :failed " 321 + ",myErrorMessage = CONCAT('Too many errors: ', CAST(myErrorCount as string), '. Last error msg was ', myErrorMessage) " 322 + "where myId = :chunkId and myErrorCount > :maxCount"); 323 query.setParameter("chunkId", chunkId); 324 query.setParameter("failed", WorkChunkStatusEnum.FAILED); 325 query.setParameter("maxCount", MAX_CHUNK_ERROR_COUNT); 326 int failChangeCount = query.executeUpdate(); 327 328 if (failChangeCount > 0) { 329 return WorkChunkStatusEnum.FAILED; 330 } else { 331 return WorkChunkStatusEnum.ERRORED; 332 } 333 }); 334 } 335 336 @Override 337 public void onWorkChunkFailed(String theChunkId, String theErrorMessage) { 338 ourLog.info("Marking chunk {} as failed with message: {}", theChunkId, theErrorMessage); 339 String errorMessage = truncateErrorMessage(theErrorMessage); 340 myTransactionService 341 .withSystemRequestOnDefaultPartition() 342 .execute(() -> myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError( 343 theChunkId, new Date(), errorMessage, WorkChunkStatusEnum.FAILED)); 344 } 345 346 @Override 347 public void onWorkChunkCompletion(WorkChunkCompletionEvent theEvent) { 348 myTransactionService 349 .withSystemRequestOnDefaultPartition() 350 .execute(() -> myWorkChunkRepository.updateChunkStatusAndClearDataForEndSuccess( 351 theEvent.getChunkId(), 352 new Date(), 353 theEvent.getRecordsProcessed(), 354 theEvent.getRecoveredErrorCount(), 355 WorkChunkStatusEnum.COMPLETED, 356 theEvent.getRecoveredWarningMessage())); 357 } 358 359 @Nullable 360 private static String truncateErrorMessage(String theErrorMessage) { 361 String errorMessage; 362 if (theErrorMessage != null && theErrorMessage.length() > ERROR_MSG_MAX_LENGTH) { 363 ourLog.warn("Truncating error message that is too long to store in database: {}", theErrorMessage); 364 errorMessage = theErrorMessage.substring(0, ERROR_MSG_MAX_LENGTH); 365 } else { 366 errorMessage = theErrorMessage; 367 } 368 return errorMessage; 369 } 370 371 @Override 372 public void markWorkChunksWithStatusAndWipeData( 373 String theInstanceId, List<String> theChunkIds, WorkChunkStatusEnum theStatus, String theErrorMessage) { 374 assert TransactionSynchronizationManager.isActualTransactionActive(); 375 376 ourLog.debug("Marking all chunks for instance {} to status {}", theInstanceId, theStatus); 377 String errorMessage = truncateErrorMessage(theErrorMessage); 378 List<List<String>> listOfListOfIds = ListUtils.partition(theChunkIds, 100); 379 for (List<String> idList : listOfListOfIds) { 380 myWorkChunkRepository.updateAllChunksForInstanceStatusClearDataAndSetError( 381 idList, new Date(), theStatus, errorMessage); 382 } 383 } 384 385 @Override 386 @Transactional(propagation = Propagation.REQUIRES_NEW) 387 public boolean canAdvanceInstanceToNextStep(String theInstanceId, String theCurrentStepId) { 388 Optional<Batch2JobInstanceEntity> instance = myJobInstanceRepository.findById(theInstanceId); 389 if (instance.isEmpty()) { 390 return false; 391 } 392 if (instance.get().getStatus().isEnded()) { 393 return false; 394 } 395 Set<WorkChunkStatusEnum> statusesForStep = 396 myWorkChunkRepository.getDistinctStatusesForStep(theInstanceId, theCurrentStepId); 397 398 ourLog.debug( 399 "Checking whether gated job can advanced to next step. [instanceId={}, currentStepId={}, statusesForStep={}]", 400 theInstanceId, 401 theCurrentStepId, 402 statusesForStep); 403 return statusesForStep.isEmpty() || statusesForStep.equals(Set.of(WorkChunkStatusEnum.COMPLETED)); 404 } 405 406 private void fetchChunks( 407 String theInstanceId, 408 boolean theIncludeData, 409 int thePageSize, 410 int thePageIndex, 411 Consumer<WorkChunk> theConsumer) { 412 myTransactionService 413 .withSystemRequestOnDefaultPartition() 414 .withPropagation(Propagation.REQUIRES_NEW) 415 .execute(() -> { 416 List<Batch2WorkChunkEntity> chunks; 417 if (theIncludeData) { 418 chunks = myWorkChunkRepository.fetchChunks( 419 PageRequest.of(thePageIndex, thePageSize), theInstanceId); 420 } else { 421 chunks = myWorkChunkRepository.fetchChunksNoData( 422 PageRequest.of(thePageIndex, thePageSize), theInstanceId); 423 } 424 for (Batch2WorkChunkEntity chunk : chunks) { 425 theConsumer.accept(toChunk(chunk)); 426 } 427 }); 428 } 429 430 @Override 431 public List<String> fetchAllChunkIdsForStepWithStatus( 432 String theInstanceId, String theStepId, WorkChunkStatusEnum theStatusEnum) { 433 return myTransactionService 434 .withSystemRequest() 435 .withPropagation(Propagation.REQUIRES_NEW) 436 .execute(() -> myWorkChunkRepository.fetchAllChunkIdsForStepWithStatus( 437 theInstanceId, theStepId, theStatusEnum)); 438 } 439 440 @Override 441 public void updateInstanceUpdateTime(String theInstanceId) { 442 myJobInstanceRepository.updateInstanceUpdateTime(theInstanceId, new Date()); 443 } 444 445 /** 446 * Note: Not @Transactional because the transaction happens in a lambda that's called outside of this method's scope 447 */ 448 @Override 449 public Iterator<WorkChunk> fetchAllWorkChunksIterator(String theInstanceId, boolean theWithData) { 450 return new PagingIterator<>((thePageIndex, theBatchSize, theConsumer) -> 451 fetchChunks(theInstanceId, theWithData, theBatchSize, thePageIndex, theConsumer)); 452 } 453 454 @Override 455 public Stream<WorkChunk> fetchAllWorkChunksForStepStream(String theInstanceId, String theStepId) { 456 return myWorkChunkRepository 457 .fetchChunksForStep(theInstanceId, theStepId) 458 .map(this::toChunk); 459 } 460 461 @Override 462 public boolean updateInstance(String theInstanceId, JobInstanceUpdateCallback theModifier) { 463 Batch2JobInstanceEntity instanceEntity = 464 myEntityManager.find(Batch2JobInstanceEntity.class, theInstanceId, LockModeType.PESSIMISTIC_WRITE); 465 if (null == instanceEntity) { 466 ourLog.error("No instance found with Id {}", theInstanceId); 467 return false; 468 } 469 // convert to JobInstance for public api 470 JobInstance jobInstance = JobInstanceUtil.fromEntityToInstance(instanceEntity); 471 472 // run the modification callback 473 boolean wasModified = theModifier.doUpdate(jobInstance); 474 475 if (wasModified) { 476 // copy fields back for flush. 477 JobInstanceUtil.fromInstanceToEntity(jobInstance, instanceEntity); 478 } 479 480 return wasModified; 481 } 482 483 @Override 484 @Transactional(propagation = Propagation.REQUIRES_NEW) 485 public void deleteInstanceAndChunks(String theInstanceId) { 486 ourLog.info("Deleting instance and chunks: {}", theInstanceId); 487 myWorkChunkRepository.deleteAllForInstance(theInstanceId); 488 myJobInstanceRepository.deleteById(theInstanceId); 489 } 490 491 @Override 492 @Transactional(propagation = Propagation.REQUIRES_NEW) 493 public void deleteChunksAndMarkInstanceAsChunksPurged(String theInstanceId) { 494 ourLog.info("Deleting all chunks for instance ID: {}", theInstanceId); 495 int updateCount = myJobInstanceRepository.updateWorkChunksPurgedTrue(theInstanceId); 496 int deleteCount = myWorkChunkRepository.deleteAllForInstance(theInstanceId); 497 ourLog.debug("Purged {} chunks, and updated {} instance.", deleteCount, updateCount); 498 } 499 500 @Override 501 public boolean markInstanceAsStatusWhenStatusIn( 502 String theInstanceId, StatusEnum theStatusEnum, Set<StatusEnum> thePriorStates) { 503 int recordsChanged = 504 myJobInstanceRepository.updateInstanceStatusIfIn(theInstanceId, theStatusEnum, thePriorStates); 505 ourLog.debug( 506 "Update job {} to status {} if in status {}: {}", 507 theInstanceId, 508 theStatusEnum, 509 thePriorStates, 510 recordsChanged > 0); 511 return recordsChanged > 0; 512 } 513 514 @Override 515 @Transactional(propagation = Propagation.REQUIRES_NEW) 516 public JobOperationResultJson cancelInstance(String theInstanceId) { 517 int recordsChanged = myJobInstanceRepository.updateInstanceCancelled(theInstanceId, true); 518 String operationString = "Cancel job instance " + theInstanceId; 519 520 // wipmb For 6.8 - This is too detailed to be down here - this should be up at the api layer. 521 // Replace with boolean result or ResourceNotFound exception. Build the message up at the ui. 522 String messagePrefix = "Job instance <" + theInstanceId + ">"; 523 if (recordsChanged > 0) { 524 return JobOperationResultJson.newSuccess(operationString, messagePrefix + " successfully cancelled."); 525 } else { 526 Optional<JobInstance> instance = fetchInstance(theInstanceId); 527 if (instance.isPresent()) { 528 return JobOperationResultJson.newFailure( 529 operationString, messagePrefix + " was already cancelled. Nothing to do."); 530 } else { 531 return JobOperationResultJson.newFailure(operationString, messagePrefix + " not found."); 532 } 533 } 534 } 535 536 private void invokePreStorageBatchHooks(JobInstance theJobInstance) { 537 if (myInterceptorBroadcaster.hasHooks(Pointcut.STORAGE_PRESTORAGE_BATCH_JOB_CREATE)) { 538 HookParams params = new HookParams() 539 .add(JobInstance.class, theJobInstance) 540 .add(RequestDetails.class, new SystemRequestDetails()); 541 542 myInterceptorBroadcaster.callHooks(Pointcut.STORAGE_PRESTORAGE_BATCH_JOB_CREATE, params); 543 } 544 } 545}