001/*-
002 * #%L
003 * HAPI FHIR JPA Server
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.batch2;
021
022import ca.uhn.fhir.batch2.api.IJobPersistence;
023import ca.uhn.fhir.batch2.api.JobOperationResultJson;
024import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest;
025import ca.uhn.fhir.batch2.model.JobInstance;
026import ca.uhn.fhir.batch2.model.StatusEnum;
027import ca.uhn.fhir.batch2.model.WorkChunk;
028import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent;
029import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
030import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent;
031import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
032import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
033import ca.uhn.fhir.interceptor.api.HookParams;
034import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
035import ca.uhn.fhir.interceptor.api.Pointcut;
036import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
037import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
038import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
039import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity;
040import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
041import ca.uhn.fhir.model.api.PagingIterator;
042import ca.uhn.fhir.rest.api.server.RequestDetails;
043import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
044import ca.uhn.fhir.util.Batch2JobDefinitionConstants;
045import ca.uhn.fhir.util.Logs;
046import com.fasterxml.jackson.core.JsonParser;
047import com.fasterxml.jackson.databind.JsonNode;
048import com.fasterxml.jackson.databind.ObjectMapper;
049import com.fasterxml.jackson.databind.node.ObjectNode;
050import jakarta.annotation.Nonnull;
051import jakarta.annotation.Nullable;
052import jakarta.persistence.EntityManager;
053import jakarta.persistence.LockModeType;
054import jakarta.persistence.Query;
055import org.apache.commons.collections4.ListUtils;
056import org.apache.commons.lang3.Validate;
057import org.slf4j.Logger;
058import org.springframework.data.domain.Page;
059import org.springframework.data.domain.PageImpl;
060import org.springframework.data.domain.PageRequest;
061import org.springframework.data.domain.Pageable;
062import org.springframework.data.domain.Sort;
063import org.springframework.transaction.annotation.Propagation;
064import org.springframework.transaction.annotation.Transactional;
065import org.springframework.transaction.support.TransactionSynchronizationManager;
066
067import java.util.Date;
068import java.util.Iterator;
069import java.util.List;
070import java.util.Objects;
071import java.util.Optional;
072import java.util.Set;
073import java.util.UUID;
074import java.util.function.Consumer;
075import java.util.stream.Collectors;
076import java.util.stream.Stream;
077
078import static ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor.MAX_CHUNK_ERROR_COUNT;
079import static ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity.ERROR_MSG_MAX_LENGTH;
080import static org.apache.commons.lang3.StringUtils.isBlank;
081
082public class JpaJobPersistenceImpl implements IJobPersistence {
083        private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
084        public static final String CREATE_TIME = "myCreateTime";
085
086        private final IBatch2JobInstanceRepository myJobInstanceRepository;
087        private final IBatch2WorkChunkRepository myWorkChunkRepository;
088        private final EntityManager myEntityManager;
089        private final IHapiTransactionService myTransactionService;
090        private final IInterceptorBroadcaster myInterceptorBroadcaster;
091
092        /**
093         * Constructor
094         */
095        public JpaJobPersistenceImpl(
096                        IBatch2JobInstanceRepository theJobInstanceRepository,
097                        IBatch2WorkChunkRepository theWorkChunkRepository,
098                        IHapiTransactionService theTransactionService,
099                        EntityManager theEntityManager,
100                        IInterceptorBroadcaster theInterceptorBroadcaster) {
101                Validate.notNull(theJobInstanceRepository);
102                Validate.notNull(theWorkChunkRepository);
103                myJobInstanceRepository = theJobInstanceRepository;
104                myWorkChunkRepository = theWorkChunkRepository;
105                myTransactionService = theTransactionService;
106                myEntityManager = theEntityManager;
107                myInterceptorBroadcaster = theInterceptorBroadcaster;
108        }
109
110        @Override
111        @Transactional(propagation = Propagation.REQUIRED)
112        public String onWorkChunkCreate(WorkChunkCreateEvent theBatchWorkChunk) {
113                Batch2WorkChunkEntity entity = new Batch2WorkChunkEntity();
114                entity.setId(UUID.randomUUID().toString());
115                entity.setSequence(theBatchWorkChunk.sequence);
116                entity.setJobDefinitionId(theBatchWorkChunk.jobDefinitionId);
117                entity.setJobDefinitionVersion(theBatchWorkChunk.jobDefinitionVersion);
118                entity.setTargetStepId(theBatchWorkChunk.targetStepId);
119                entity.setInstanceId(theBatchWorkChunk.instanceId);
120                entity.setSerializedData(theBatchWorkChunk.serializedData);
121                entity.setCreateTime(new Date());
122                entity.setStartTime(new Date());
123                entity.setStatus(WorkChunkStatusEnum.QUEUED);
124                ourLog.debug("Create work chunk {}/{}/{}", entity.getInstanceId(), entity.getId(), entity.getTargetStepId());
125                ourLog.trace(
126                                "Create work chunk data {}/{}: {}", entity.getInstanceId(), entity.getId(), entity.getSerializedData());
127                myTransactionService.withSystemRequestOnDefaultPartition().execute(() -> myWorkChunkRepository.save(entity));
128                return entity.getId();
129        }
130
131        @Override
132        @Transactional(propagation = Propagation.REQUIRED)
133        public Optional<WorkChunk> onWorkChunkDequeue(String theChunkId) {
134                // NOTE: Ideally, IN_PROGRESS wouldn't be allowed here.  On chunk failure, we probably shouldn't be allowed.
135                // But how does re-run happen if k8s kills a processor mid run?
136                List<WorkChunkStatusEnum> priorStates =
137                                List.of(WorkChunkStatusEnum.QUEUED, WorkChunkStatusEnum.ERRORED, WorkChunkStatusEnum.IN_PROGRESS);
138                int rowsModified = myWorkChunkRepository.updateChunkStatusForStart(
139                                theChunkId, new Date(), WorkChunkStatusEnum.IN_PROGRESS, priorStates);
140                if (rowsModified == 0) {
141                        ourLog.info("Attempting to start chunk {} but it was already started.", theChunkId);
142                        return Optional.empty();
143                } else {
144                        Optional<Batch2WorkChunkEntity> chunk = myWorkChunkRepository.findById(theChunkId);
145                        return chunk.map(this::toChunk);
146                }
147        }
148
149        @Override
150        @Transactional(propagation = Propagation.REQUIRED)
151        public String storeNewInstance(JobInstance theInstance) {
152                Validate.isTrue(isBlank(theInstance.getInstanceId()));
153
154                invokePreStorageBatchHooks(theInstance);
155
156                Batch2JobInstanceEntity entity = new Batch2JobInstanceEntity();
157                entity.setId(UUID.randomUUID().toString());
158                entity.setDefinitionId(theInstance.getJobDefinitionId());
159                entity.setDefinitionVersion(theInstance.getJobDefinitionVersion());
160                entity.setStatus(theInstance.getStatus());
161                entity.setParams(theInstance.getParameters());
162                entity.setCurrentGatedStepId(theInstance.getCurrentGatedStepId());
163                entity.setFastTracking(theInstance.isFastTracking());
164                entity.setCreateTime(new Date());
165                entity.setStartTime(new Date());
166                entity.setReport(theInstance.getReport());
167                entity.setTriggeringUsername(theInstance.getTriggeringUsername());
168                entity.setTriggeringClientId(theInstance.getTriggeringClientId());
169
170                entity = myJobInstanceRepository.save(entity);
171                return entity.getId();
172        }
173
174        @Override
175        @Transactional(propagation = Propagation.REQUIRES_NEW)
176        public List<JobInstance> fetchInstances(
177                        String theJobDefinitionId, Set<StatusEnum> theStatuses, Date theCutoff, Pageable thePageable) {
178                return toInstanceList(myJobInstanceRepository.findInstancesByJobIdAndStatusAndExpiry(
179                                theJobDefinitionId, theStatuses, theCutoff, thePageable));
180        }
181
182        @Override
183        @Transactional(propagation = Propagation.REQUIRES_NEW)
184        public List<JobInstance> fetchInstancesByJobDefinitionIdAndStatus(
185                        String theJobDefinitionId, Set<StatusEnum> theRequestedStatuses, int thePageSize, int thePageIndex) {
186                PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME);
187                return toInstanceList(myJobInstanceRepository.fetchInstancesByJobDefinitionIdAndStatus(
188                                theJobDefinitionId, theRequestedStatuses, pageRequest));
189        }
190
191        @Override
192        @Transactional(propagation = Propagation.REQUIRES_NEW)
193        public List<JobInstance> fetchInstancesByJobDefinitionId(
194                        String theJobDefinitionId, int thePageSize, int thePageIndex) {
195                PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME);
196                return toInstanceList(myJobInstanceRepository.findInstancesByJobDefinitionId(theJobDefinitionId, pageRequest));
197        }
198
199        @Override
200        @Transactional(propagation = Propagation.REQUIRES_NEW)
201        public Page<JobInstance> fetchJobInstances(JobInstanceFetchRequest theRequest) {
202                PageRequest pageRequest =
203                                PageRequest.of(theRequest.getPageStart(), theRequest.getBatchSize(), theRequest.getSort());
204
205                String jobStatus = theRequest.getJobStatus();
206                if (Objects.equals(jobStatus, "")) {
207                        Page<Batch2JobInstanceEntity> pageOfEntities = myJobInstanceRepository.findAll(pageRequest);
208                        return pageOfEntities.map(this::toInstance);
209                }
210
211                StatusEnum status = StatusEnum.valueOf(jobStatus);
212                List<JobInstance> jobs = toInstanceList(myJobInstanceRepository.findInstancesByJobStatus(status, pageRequest));
213                Integer jobsOfStatus = myJobInstanceRepository.findTotalJobsOfStatus(status);
214                return new PageImpl<>(jobs, pageRequest, jobsOfStatus);
215        }
216
217        private List<JobInstance> toInstanceList(List<Batch2JobInstanceEntity> theInstancesByJobDefinitionId) {
218                return theInstancesByJobDefinitionId.stream().map(this::toInstance).collect(Collectors.toList());
219        }
220
221        @Override
222        @Nonnull
223        public Optional<JobInstance> fetchInstance(String theInstanceId) {
224                return myTransactionService
225                                .withSystemRequestOnDefaultPartition()
226                                .execute(() -> myJobInstanceRepository.findById(theInstanceId).map(this::toInstance));
227        }
228
229        @Override
230        @Transactional(propagation = Propagation.REQUIRES_NEW)
231        public List<JobInstance> fetchInstances(FetchJobInstancesRequest theRequest, int thePage, int theBatchSize) {
232                String definitionId = theRequest.getJobDefinition();
233                String params = theRequest.getParameters();
234                Set<StatusEnum> statuses = theRequest.getStatuses();
235
236                Pageable pageable = PageRequest.of(thePage, theBatchSize);
237
238                List<Batch2JobInstanceEntity> instanceEntities;
239
240                if (statuses != null && !statuses.isEmpty()) {
241                        if (Batch2JobDefinitionConstants.BULK_EXPORT.equals(definitionId)) {
242                                if (originalRequestUrlTruncation(params) != null) {
243                                        params = originalRequestUrlTruncation(params);
244                                }
245                        }
246                        instanceEntities = myJobInstanceRepository.findInstancesByJobIdParamsAndStatus(
247                                        definitionId, params, statuses, pageable);
248                } else {
249                        instanceEntities = myJobInstanceRepository.findInstancesByJobIdAndParams(definitionId, params, pageable);
250                }
251                return toInstanceList(instanceEntities);
252        }
253
254        private String originalRequestUrlTruncation(String theParams) {
255                try {
256                        ObjectMapper mapper = new ObjectMapper();
257                        mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
258                        mapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
259                        JsonNode rootNode = mapper.readTree(theParams);
260                        String originalUrl = "originalRequestUrl";
261
262                        if (rootNode instanceof ObjectNode) {
263                                ObjectNode objectNode = (ObjectNode) rootNode;
264
265                                if (objectNode.has(originalUrl)) {
266                                        String url = objectNode.get(originalUrl).asText();
267                                        if (url.contains("?")) {
268                                                objectNode.put(originalUrl, url.split("\\?")[0]);
269                                        }
270                                }
271                                return mapper.writeValueAsString(objectNode);
272                        }
273                } catch (Exception e) {
274                        ourLog.info("Error Truncating Original Request Url", e);
275                }
276                return null;
277        }
278
279        @Override
280        @Transactional(propagation = Propagation.REQUIRES_NEW)
281        public List<JobInstance> fetchInstances(int thePageSize, int thePageIndex) {
282                // default sort is myCreateTime Asc
283                PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME);
284                return myTransactionService
285                                .withSystemRequestOnDefaultPartition()
286                                .execute(() -> myJobInstanceRepository.findAll(pageRequest).stream()
287                                                .map(this::toInstance)
288                                                .collect(Collectors.toList()));
289        }
290
291        @Override
292        @Transactional(propagation = Propagation.REQUIRES_NEW)
293        public List<JobInstance> fetchRecentInstances(int thePageSize, int thePageIndex) {
294                PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.DESC, CREATE_TIME);
295                return myTransactionService
296                                .withSystemRequestOnDefaultPartition()
297                                .execute(() -> myJobInstanceRepository.findAll(pageRequest).stream()
298                                                .map(this::toInstance)
299                                                .collect(Collectors.toList()));
300        }
301
302        private WorkChunk toChunk(Batch2WorkChunkEntity theEntity) {
303                return JobInstanceUtil.fromEntityToWorkChunk(theEntity);
304        }
305
306        private JobInstance toInstance(Batch2JobInstanceEntity theEntity) {
307                return JobInstanceUtil.fromEntityToInstance(theEntity);
308        }
309
310        @Override
311        public WorkChunkStatusEnum onWorkChunkError(WorkChunkErrorEvent theParameters) {
312                String chunkId = theParameters.getChunkId();
313                String errorMessage = truncateErrorMessage(theParameters.getErrorMsg());
314
315                return myTransactionService.withSystemRequestOnDefaultPartition().execute(() -> {
316                        int changeCount = myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError(
317                                        chunkId, new Date(), errorMessage, WorkChunkStatusEnum.ERRORED);
318                        Validate.isTrue(changeCount > 0, "changed chunk matching %s", chunkId);
319
320                        Query query = myEntityManager.createQuery("update Batch2WorkChunkEntity " + "set myStatus = :failed "
321                                        + ",myErrorMessage = CONCAT('Too many errors: ', CAST(myErrorCount as string), '. Last error msg was ', myErrorMessage) "
322                                        + "where myId = :chunkId and myErrorCount > :maxCount");
323                        query.setParameter("chunkId", chunkId);
324                        query.setParameter("failed", WorkChunkStatusEnum.FAILED);
325                        query.setParameter("maxCount", MAX_CHUNK_ERROR_COUNT);
326                        int failChangeCount = query.executeUpdate();
327
328                        if (failChangeCount > 0) {
329                                return WorkChunkStatusEnum.FAILED;
330                        } else {
331                                return WorkChunkStatusEnum.ERRORED;
332                        }
333                });
334        }
335
336        @Override
337        public void onWorkChunkFailed(String theChunkId, String theErrorMessage) {
338                ourLog.info("Marking chunk {} as failed with message: {}", theChunkId, theErrorMessage);
339                String errorMessage = truncateErrorMessage(theErrorMessage);
340                myTransactionService
341                                .withSystemRequestOnDefaultPartition()
342                                .execute(() -> myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError(
343                                                theChunkId, new Date(), errorMessage, WorkChunkStatusEnum.FAILED));
344        }
345
346        @Override
347        public void onWorkChunkCompletion(WorkChunkCompletionEvent theEvent) {
348                myTransactionService
349                                .withSystemRequestOnDefaultPartition()
350                                .execute(() -> myWorkChunkRepository.updateChunkStatusAndClearDataForEndSuccess(
351                                                theEvent.getChunkId(),
352                                                new Date(),
353                                                theEvent.getRecordsProcessed(),
354                                                theEvent.getRecoveredErrorCount(),
355                                                WorkChunkStatusEnum.COMPLETED,
356                                                theEvent.getRecoveredWarningMessage()));
357        }
358
359        @Nullable
360        private static String truncateErrorMessage(String theErrorMessage) {
361                String errorMessage;
362                if (theErrorMessage != null && theErrorMessage.length() > ERROR_MSG_MAX_LENGTH) {
363                        ourLog.warn("Truncating error message that is too long to store in database: {}", theErrorMessage);
364                        errorMessage = theErrorMessage.substring(0, ERROR_MSG_MAX_LENGTH);
365                } else {
366                        errorMessage = theErrorMessage;
367                }
368                return errorMessage;
369        }
370
371        @Override
372        public void markWorkChunksWithStatusAndWipeData(
373                        String theInstanceId, List<String> theChunkIds, WorkChunkStatusEnum theStatus, String theErrorMessage) {
374                assert TransactionSynchronizationManager.isActualTransactionActive();
375
376                ourLog.debug("Marking all chunks for instance {} to status {}", theInstanceId, theStatus);
377                String errorMessage = truncateErrorMessage(theErrorMessage);
378                List<List<String>> listOfListOfIds = ListUtils.partition(theChunkIds, 100);
379                for (List<String> idList : listOfListOfIds) {
380                        myWorkChunkRepository.updateAllChunksForInstanceStatusClearDataAndSetError(
381                                        idList, new Date(), theStatus, errorMessage);
382                }
383        }
384
385        @Override
386        @Transactional(propagation = Propagation.REQUIRES_NEW)
387        public boolean canAdvanceInstanceToNextStep(String theInstanceId, String theCurrentStepId) {
388                Optional<Batch2JobInstanceEntity> instance = myJobInstanceRepository.findById(theInstanceId);
389                if (instance.isEmpty()) {
390                        return false;
391                }
392                if (instance.get().getStatus().isEnded()) {
393                        return false;
394                }
395                Set<WorkChunkStatusEnum> statusesForStep =
396                                myWorkChunkRepository.getDistinctStatusesForStep(theInstanceId, theCurrentStepId);
397
398                ourLog.debug(
399                                "Checking whether gated job can advanced to next step. [instanceId={}, currentStepId={}, statusesForStep={}]",
400                                theInstanceId,
401                                theCurrentStepId,
402                                statusesForStep);
403                return statusesForStep.isEmpty() || statusesForStep.equals(Set.of(WorkChunkStatusEnum.COMPLETED));
404        }
405
406        private void fetchChunks(
407                        String theInstanceId,
408                        boolean theIncludeData,
409                        int thePageSize,
410                        int thePageIndex,
411                        Consumer<WorkChunk> theConsumer) {
412                myTransactionService
413                                .withSystemRequestOnDefaultPartition()
414                                .withPropagation(Propagation.REQUIRES_NEW)
415                                .execute(() -> {
416                                        List<Batch2WorkChunkEntity> chunks;
417                                        if (theIncludeData) {
418                                                chunks = myWorkChunkRepository.fetchChunks(
419                                                                PageRequest.of(thePageIndex, thePageSize), theInstanceId);
420                                        } else {
421                                                chunks = myWorkChunkRepository.fetchChunksNoData(
422                                                                PageRequest.of(thePageIndex, thePageSize), theInstanceId);
423                                        }
424                                        for (Batch2WorkChunkEntity chunk : chunks) {
425                                                theConsumer.accept(toChunk(chunk));
426                                        }
427                                });
428        }
429
430        @Override
431        public List<String> fetchAllChunkIdsForStepWithStatus(
432                        String theInstanceId, String theStepId, WorkChunkStatusEnum theStatusEnum) {
433                return myTransactionService
434                                .withSystemRequest()
435                                .withPropagation(Propagation.REQUIRES_NEW)
436                                .execute(() -> myWorkChunkRepository.fetchAllChunkIdsForStepWithStatus(
437                                                theInstanceId, theStepId, theStatusEnum));
438        }
439
440        @Override
441        public void updateInstanceUpdateTime(String theInstanceId) {
442                myJobInstanceRepository.updateInstanceUpdateTime(theInstanceId, new Date());
443        }
444
445        /**
446         * Note: Not @Transactional because the transaction happens in a lambda that's called outside of this method's scope
447         */
448        @Override
449        public Iterator<WorkChunk> fetchAllWorkChunksIterator(String theInstanceId, boolean theWithData) {
450                return new PagingIterator<>((thePageIndex, theBatchSize, theConsumer) ->
451                                fetchChunks(theInstanceId, theWithData, theBatchSize, thePageIndex, theConsumer));
452        }
453
454        @Override
455        public Stream<WorkChunk> fetchAllWorkChunksForStepStream(String theInstanceId, String theStepId) {
456                return myWorkChunkRepository
457                                .fetchChunksForStep(theInstanceId, theStepId)
458                                .map(this::toChunk);
459        }
460
461        @Override
462        public boolean updateInstance(String theInstanceId, JobInstanceUpdateCallback theModifier) {
463                Batch2JobInstanceEntity instanceEntity =
464                                myEntityManager.find(Batch2JobInstanceEntity.class, theInstanceId, LockModeType.PESSIMISTIC_WRITE);
465                if (null == instanceEntity) {
466                        ourLog.error("No instance found with Id {}", theInstanceId);
467                        return false;
468                }
469                // convert to JobInstance for public api
470                JobInstance jobInstance = JobInstanceUtil.fromEntityToInstance(instanceEntity);
471
472                // run the modification callback
473                boolean wasModified = theModifier.doUpdate(jobInstance);
474
475                if (wasModified) {
476                        // copy fields back for flush.
477                        JobInstanceUtil.fromInstanceToEntity(jobInstance, instanceEntity);
478                }
479
480                return wasModified;
481        }
482
483        @Override
484        @Transactional(propagation = Propagation.REQUIRES_NEW)
485        public void deleteInstanceAndChunks(String theInstanceId) {
486                ourLog.info("Deleting instance and chunks: {}", theInstanceId);
487                myWorkChunkRepository.deleteAllForInstance(theInstanceId);
488                myJobInstanceRepository.deleteById(theInstanceId);
489        }
490
491        @Override
492        @Transactional(propagation = Propagation.REQUIRES_NEW)
493        public void deleteChunksAndMarkInstanceAsChunksPurged(String theInstanceId) {
494                ourLog.info("Deleting all chunks for instance ID: {}", theInstanceId);
495                int updateCount = myJobInstanceRepository.updateWorkChunksPurgedTrue(theInstanceId);
496                int deleteCount = myWorkChunkRepository.deleteAllForInstance(theInstanceId);
497                ourLog.debug("Purged {} chunks, and updated {} instance.", deleteCount, updateCount);
498        }
499
500        @Override
501        public boolean markInstanceAsStatusWhenStatusIn(
502                        String theInstanceId, StatusEnum theStatusEnum, Set<StatusEnum> thePriorStates) {
503                int recordsChanged =
504                                myJobInstanceRepository.updateInstanceStatusIfIn(theInstanceId, theStatusEnum, thePriorStates);
505                ourLog.debug(
506                                "Update job {} to status {} if in status {}: {}",
507                                theInstanceId,
508                                theStatusEnum,
509                                thePriorStates,
510                                recordsChanged > 0);
511                return recordsChanged > 0;
512        }
513
514        @Override
515        @Transactional(propagation = Propagation.REQUIRES_NEW)
516        public JobOperationResultJson cancelInstance(String theInstanceId) {
517                int recordsChanged = myJobInstanceRepository.updateInstanceCancelled(theInstanceId, true);
518                String operationString = "Cancel job instance " + theInstanceId;
519
520                // wipmb For 6.8 - This is too detailed to be down here - this should be up at the api layer.
521                // Replace with boolean result or ResourceNotFound exception.  Build the message up at the ui.
522                String messagePrefix = "Job instance <" + theInstanceId + ">";
523                if (recordsChanged > 0) {
524                        return JobOperationResultJson.newSuccess(operationString, messagePrefix + " successfully cancelled.");
525                } else {
526                        Optional<JobInstance> instance = fetchInstance(theInstanceId);
527                        if (instance.isPresent()) {
528                                return JobOperationResultJson.newFailure(
529                                                operationString, messagePrefix + " was already cancelled.  Nothing to do.");
530                        } else {
531                                return JobOperationResultJson.newFailure(operationString, messagePrefix + " not found.");
532                        }
533                }
534        }
535
536        private void invokePreStorageBatchHooks(JobInstance theJobInstance) {
537                if (myInterceptorBroadcaster.hasHooks(Pointcut.STORAGE_PRESTORAGE_BATCH_JOB_CREATE)) {
538                        HookParams params = new HookParams()
539                                        .add(JobInstance.class, theJobInstance)
540                                        .add(RequestDetails.class, new SystemRequestDetails());
541
542                        myInterceptorBroadcaster.callHooks(Pointcut.STORAGE_PRESTORAGE_BATCH_JOB_CREATE, params);
543                }
544        }
545}