001/*-
002 * #%L
003 * HAPI FHIR JPA Server
004 * %%
005 * Copyright (C) 2014 - 2025 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.batch2;
021
022import ca.uhn.fhir.batch2.api.IJobPersistence;
023import ca.uhn.fhir.batch2.api.JobOperationResultJson;
024import ca.uhn.fhir.batch2.model.BatchInstanceStatusDTO;
025import ca.uhn.fhir.batch2.model.BatchWorkChunkStatusDTO;
026import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest;
027import ca.uhn.fhir.batch2.model.JobInstance;
028import ca.uhn.fhir.batch2.model.StatusEnum;
029import ca.uhn.fhir.batch2.model.WorkChunk;
030import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent;
031import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
032import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent;
033import ca.uhn.fhir.batch2.model.WorkChunkMetadata;
034import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
035import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
036import ca.uhn.fhir.interceptor.api.HookParams;
037import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
038import ca.uhn.fhir.interceptor.api.Pointcut;
039import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
040import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkMetadataViewRepository;
041import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
042import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
043import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity;
044import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
045import ca.uhn.fhir.jpa.entity.Batch2WorkChunkMetadataView;
046import ca.uhn.fhir.model.api.PagingIterator;
047import ca.uhn.fhir.rest.api.server.RequestDetails;
048import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
049import ca.uhn.fhir.util.Batch2JobDefinitionConstants;
050import ca.uhn.fhir.util.Logs;
051import com.fasterxml.jackson.core.JsonParser;
052import com.fasterxml.jackson.databind.JsonNode;
053import com.fasterxml.jackson.databind.ObjectMapper;
054import com.fasterxml.jackson.databind.node.ObjectNode;
055import jakarta.annotation.Nonnull;
056import jakarta.annotation.Nullable;
057import jakarta.persistence.EntityManager;
058import jakarta.persistence.LockModeType;
059import org.apache.commons.collections4.ListUtils;
060import org.apache.commons.lang3.Validate;
061import org.slf4j.Logger;
062import org.springframework.data.domain.Page;
063import org.springframework.data.domain.PageImpl;
064import org.springframework.data.domain.PageRequest;
065import org.springframework.data.domain.Pageable;
066import org.springframework.data.domain.Sort;
067import org.springframework.transaction.annotation.Propagation;
068import org.springframework.transaction.annotation.Transactional;
069import org.springframework.transaction.support.TransactionSynchronizationManager;
070
071import java.time.Instant;
072import java.util.Collections;
073import java.util.Date;
074import java.util.HashSet;
075import java.util.Iterator;
076import java.util.List;
077import java.util.Objects;
078import java.util.Optional;
079import java.util.Set;
080import java.util.UUID;
081import java.util.function.Consumer;
082import java.util.stream.Collectors;
083import java.util.stream.Stream;
084
085import static ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor.MAX_CHUNK_ERROR_COUNT;
086import static ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity.ERROR_MSG_MAX_LENGTH;
087import static org.apache.commons.lang3.StringUtils.isBlank;
088
089public class JpaJobPersistenceImpl implements IJobPersistence {
090        private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
091        public static final String CREATE_TIME = "myCreateTime";
092
093        private final IBatch2JobInstanceRepository myJobInstanceRepository;
094        private final IBatch2WorkChunkRepository myWorkChunkRepository;
095        private final IBatch2WorkChunkMetadataViewRepository myWorkChunkMetadataViewRepo;
096        private final EntityManager myEntityManager;
097        private final IHapiTransactionService myTransactionService;
098        private final IInterceptorBroadcaster myInterceptorBroadcaster;
099
100        /**
101         * Constructor
102         */
103        public JpaJobPersistenceImpl(
104                        IBatch2JobInstanceRepository theJobInstanceRepository,
105                        IBatch2WorkChunkRepository theWorkChunkRepository,
106                        IBatch2WorkChunkMetadataViewRepository theWorkChunkMetadataViewRepo,
107                        IHapiTransactionService theTransactionService,
108                        EntityManager theEntityManager,
109                        IInterceptorBroadcaster theInterceptorBroadcaster) {
110                Validate.notNull(theJobInstanceRepository, "theJobInstanceRepository");
111                Validate.notNull(theWorkChunkRepository, "theWorkChunkRepository");
112                myJobInstanceRepository = theJobInstanceRepository;
113                myWorkChunkRepository = theWorkChunkRepository;
114                myWorkChunkMetadataViewRepo = theWorkChunkMetadataViewRepo;
115                myTransactionService = theTransactionService;
116                myEntityManager = theEntityManager;
117                myInterceptorBroadcaster = theInterceptorBroadcaster;
118        }
119
120        @Override
121        @Transactional(propagation = Propagation.REQUIRED)
122        public String onWorkChunkCreate(WorkChunkCreateEvent theBatchWorkChunk) {
123                Batch2WorkChunkEntity entity = new Batch2WorkChunkEntity();
124                entity.setId(UUID.randomUUID().toString());
125                entity.setSequence(theBatchWorkChunk.sequence);
126                entity.setJobDefinitionId(theBatchWorkChunk.jobDefinitionId);
127                entity.setJobDefinitionVersion(theBatchWorkChunk.jobDefinitionVersion);
128                entity.setTargetStepId(theBatchWorkChunk.targetStepId);
129                entity.setInstanceId(theBatchWorkChunk.instanceId);
130                entity.setSerializedData(theBatchWorkChunk.serializedData);
131                entity.setCreateTime(new Date());
132                entity.setStartTime(new Date());
133                entity.setStatus(getOnCreateStatus(theBatchWorkChunk));
134
135                ourLog.debug("Create work chunk {}/{}/{}", entity.getInstanceId(), entity.getId(), entity.getTargetStepId());
136                ourLog.trace(
137                                "Create work chunk data {}/{}: {}", entity.getInstanceId(), entity.getId(), entity.getSerializedData());
138                myTransactionService.withSystemRequestOnDefaultPartition().execute(() -> myWorkChunkRepository.save(entity));
139
140                return entity.getId();
141        }
142
143        /**
144         * Gets the initial onCreate state for the given workchunk.
145         * Gated job chunks start in GATE_WAITING; they will be transitioned to READY during maintenance pass when all
146         * chunks in the previous step are COMPLETED.
147         * Non gated job chunks start in READY
148         */
149        private static WorkChunkStatusEnum getOnCreateStatus(WorkChunkCreateEvent theBatchWorkChunk) {
150                if (theBatchWorkChunk.isGatedExecution) {
151                        return WorkChunkStatusEnum.GATE_WAITING;
152                } else {
153                        return WorkChunkStatusEnum.READY;
154                }
155        }
156
157        @Override
158        @Transactional(propagation = Propagation.REQUIRED)
159        public Optional<WorkChunk> onWorkChunkDequeue(String theChunkId) {
160                // take a lock on the chunk id to ensure that the maintenance run isn't doing anything.
161                Batch2WorkChunkEntity chunkLock =
162                                myEntityManager.find(Batch2WorkChunkEntity.class, theChunkId, LockModeType.PESSIMISTIC_WRITE);
163                // remove from the current state to avoid stale data.
164                myEntityManager.detach(chunkLock);
165
166                // NOTE: Ideally, IN_PROGRESS wouldn't be allowed here.  On chunk failure, we probably shouldn't be allowed.
167                // But how does re-run happen if k8s kills a processor mid run?
168                List<WorkChunkStatusEnum> priorStates =
169                                List.of(WorkChunkStatusEnum.QUEUED, WorkChunkStatusEnum.ERRORED, WorkChunkStatusEnum.IN_PROGRESS);
170                int rowsModified = myWorkChunkRepository.updateChunkStatusForStart(
171                                theChunkId, new Date(), WorkChunkStatusEnum.IN_PROGRESS, priorStates);
172
173                if (rowsModified == 0) {
174                        ourLog.info("Attempting to start chunk {} but it was already started.", theChunkId);
175                        return Optional.empty();
176                } else {
177                        Optional<Batch2WorkChunkEntity> chunk = myWorkChunkRepository.findById(theChunkId);
178                        return chunk.map(this::toChunk);
179                }
180        }
181
182        @Override
183        @Transactional(propagation = Propagation.REQUIRED)
184        public String storeNewInstance(JobInstance theInstance) {
185                Validate.isTrue(isBlank(theInstance.getInstanceId()));
186
187                invokePreStorageBatchHooks(theInstance);
188
189                Batch2JobInstanceEntity entity = new Batch2JobInstanceEntity();
190                entity.setId(UUID.randomUUID().toString());
191                entity.setDefinitionId(theInstance.getJobDefinitionId());
192                entity.setDefinitionVersion(theInstance.getJobDefinitionVersion());
193                entity.setStatus(theInstance.getStatus());
194                entity.setParams(theInstance.getParameters());
195                entity.setCurrentGatedStepId(theInstance.getCurrentGatedStepId());
196                entity.setFastTracking(theInstance.isFastTracking());
197                entity.setCreateTime(new Date());
198                entity.setStartTime(new Date());
199                entity.setReport(theInstance.getReport());
200                entity.setTriggeringUsername(theInstance.getTriggeringUsername());
201                entity.setTriggeringClientId(theInstance.getTriggeringClientId());
202                entity.setUserDataJson(theInstance.getUserDataAsString());
203
204                entity = myJobInstanceRepository.save(entity);
205                return entity.getId();
206        }
207
208        @Override
209        @Transactional(propagation = Propagation.REQUIRES_NEW)
210        public List<JobInstance> fetchInstances(
211                        String theJobDefinitionId, Set<StatusEnum> theStatuses, Date theCutoff, Pageable thePageable) {
212                return toInstanceList(myJobInstanceRepository.findInstancesByJobIdAndStatusAndExpiry(
213                                theJobDefinitionId, theStatuses, theCutoff, thePageable));
214        }
215
216        @Override
217        @Transactional(propagation = Propagation.REQUIRES_NEW)
218        public List<JobInstance> fetchInstancesByJobDefinitionIdAndStatus(
219                        String theJobDefinitionId, Set<StatusEnum> theRequestedStatuses, int thePageSize, int thePageIndex) {
220                PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME);
221                return toInstanceList(myJobInstanceRepository.fetchInstancesByJobDefinitionIdAndStatus(
222                                theJobDefinitionId, theRequestedStatuses, pageRequest));
223        }
224
225        @Override
226        @Transactional(propagation = Propagation.REQUIRES_NEW)
227        public List<JobInstance> fetchInstancesByJobDefinitionId(
228                        String theJobDefinitionId, int thePageSize, int thePageIndex) {
229                PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME);
230                return toInstanceList(myJobInstanceRepository.findInstancesByJobDefinitionId(theJobDefinitionId, pageRequest));
231        }
232
233        @Override
234        @Transactional(propagation = Propagation.REQUIRES_NEW)
235        public Page<JobInstance> fetchJobInstances(JobInstanceFetchRequest theRequest) {
236                PageRequest pageRequest =
237                                PageRequest.of(theRequest.getPageStart(), theRequest.getBatchSize(), theRequest.getSort());
238
239                String jobStatus = theRequest.getJobStatus();
240                if (Objects.equals(jobStatus, "")) {
241                        Page<Batch2JobInstanceEntity> pageOfEntities = myJobInstanceRepository.findAll(pageRequest);
242                        return pageOfEntities.map(this::toInstance);
243                }
244
245                StatusEnum status = StatusEnum.valueOf(jobStatus);
246                List<JobInstance> jobs = toInstanceList(myJobInstanceRepository.findInstancesByJobStatus(status, pageRequest));
247                Integer jobsOfStatus = myJobInstanceRepository.findTotalJobsOfStatus(status);
248                return new PageImpl<>(jobs, pageRequest, jobsOfStatus);
249        }
250
251        private List<JobInstance> toInstanceList(List<Batch2JobInstanceEntity> theInstancesByJobDefinitionId) {
252                return theInstancesByJobDefinitionId.stream().map(this::toInstance).collect(Collectors.toList());
253        }
254
255        @Override
256        @Nonnull
257        public Optional<JobInstance> fetchInstance(String theInstanceId) {
258                return myTransactionService
259                                .withSystemRequestOnDefaultPartition()
260                                .execute(() -> myJobInstanceRepository.findById(theInstanceId).map(this::toInstance));
261        }
262
263        @Nonnull
264        @Override
265        public List<BatchWorkChunkStatusDTO> fetchWorkChunkStatusForInstance(String theInstanceId) {
266                return myTransactionService
267                                .withSystemRequestOnDefaultPartition()
268                                .execute(() -> myWorkChunkRepository.fetchWorkChunkStatusForInstance(theInstanceId));
269        }
270
271        @Nonnull
272        @Override
273        public BatchInstanceStatusDTO fetchBatchInstanceStatus(String theInstanceId) {
274                return myTransactionService
275                                .withSystemRequestOnDefaultPartition()
276                                .execute(() -> myJobInstanceRepository.fetchBatchInstanceStatus(theInstanceId));
277        }
278
279        @Override
280        @Transactional(propagation = Propagation.REQUIRES_NEW)
281        public List<JobInstance> fetchInstances(FetchJobInstancesRequest theRequest, int thePage, int theBatchSize) {
282                String definitionId = theRequest.getJobDefinition();
283                String params = theRequest.getParameters();
284                Set<StatusEnum> statuses = theRequest.getStatuses();
285
286                Pageable pageable = PageRequest.of(thePage, theBatchSize);
287
288                List<Batch2JobInstanceEntity> instanceEntities;
289
290                if (statuses != null && !statuses.isEmpty()) {
291                        if (Batch2JobDefinitionConstants.BULK_EXPORT.equals(definitionId)) {
292                                if (originalRequestUrlTruncation(params) != null) {
293                                        params = originalRequestUrlTruncation(params);
294                                }
295                        }
296                        instanceEntities = myJobInstanceRepository.findInstancesByJobIdParamsAndStatus(
297                                        definitionId, params, statuses, pageable);
298                } else {
299                        instanceEntities = myJobInstanceRepository.findInstancesByJobIdAndParams(definitionId, params, pageable);
300                }
301                return toInstanceList(instanceEntities);
302        }
303
304        private String originalRequestUrlTruncation(String theParams) {
305                try {
306                        ObjectMapper mapper = new ObjectMapper();
307                        mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
308                        mapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
309                        JsonNode rootNode = mapper.readTree(theParams);
310                        String originalUrl = "originalRequestUrl";
311
312                        if (rootNode instanceof ObjectNode) {
313                                ObjectNode objectNode = (ObjectNode) rootNode;
314
315                                if (objectNode.has(originalUrl)) {
316                                        String url = objectNode.get(originalUrl).asText();
317                                        if (url.contains("?")) {
318                                                objectNode.put(originalUrl, url.split("\\?")[0]);
319                                        }
320                                }
321                                return mapper.writeValueAsString(objectNode);
322                        }
323                } catch (Exception e) {
324                        ourLog.info("Error Truncating Original Request Url", e);
325                }
326                return null;
327        }
328
329        @Override
330        @Transactional(propagation = Propagation.REQUIRES_NEW)
331        public List<JobInstance> fetchInstances(int thePageSize, int thePageIndex) {
332                // default sort is myCreateTime Asc
333                PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME);
334                return myTransactionService
335                                .withSystemRequestOnDefaultPartition()
336                                .execute(() -> myJobInstanceRepository.findAll(pageRequest).stream()
337                                                .map(this::toInstance)
338                                                .collect(Collectors.toList()));
339        }
340
341        @Override
342        public void enqueueWorkChunkForProcessing(String theChunkId, Consumer<Integer> theCallback) {
343                int updated = myWorkChunkRepository.updateChunkStatus(
344                                theChunkId, WorkChunkStatusEnum.READY, WorkChunkStatusEnum.QUEUED);
345                theCallback.accept(updated);
346        }
347
348        @Override
349        public int updatePollWaitingChunksForJobIfReady(String theInstanceId) {
350                return myWorkChunkRepository.updateWorkChunksForPollWaiting(
351                                theInstanceId,
352                                Date.from(Instant.now()),
353                                Set.of(WorkChunkStatusEnum.POLL_WAITING),
354                                WorkChunkStatusEnum.READY);
355        }
356
357        @Override
358        @Transactional(propagation = Propagation.REQUIRES_NEW)
359        public List<JobInstance> fetchRecentInstances(int thePageSize, int thePageIndex) {
360                PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.DESC, CREATE_TIME);
361                return myTransactionService
362                                .withSystemRequestOnDefaultPartition()
363                                .execute(() -> myJobInstanceRepository.findAll(pageRequest).stream()
364                                                .map(this::toInstance)
365                                                .collect(Collectors.toList()));
366        }
367
368        private WorkChunk toChunk(Batch2WorkChunkEntity theEntity) {
369                return JobInstanceUtil.fromEntityToWorkChunk(theEntity);
370        }
371
372        private JobInstance toInstance(Batch2JobInstanceEntity theEntity) {
373                return JobInstanceUtil.fromEntityToInstance(theEntity);
374        }
375
376        @Override
377        public WorkChunkStatusEnum onWorkChunkError(WorkChunkErrorEvent theParameters) {
378                String chunkId = theParameters.getChunkId();
379                String errorMessage = truncateErrorMessage(theParameters.getErrorMsg());
380
381                return myTransactionService.withSystemRequestOnDefaultPartition().execute(() -> {
382                        int changeCount = myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError(
383                                        chunkId, new Date(), errorMessage, WorkChunkStatusEnum.ERRORED);
384                        Validate.isTrue(changeCount > 0, "No changed chunk matching %s", chunkId);
385
386                        int failChangeCount = myWorkChunkRepository.updateChunkForTooManyErrors(
387                                        WorkChunkStatusEnum.FAILED, chunkId, MAX_CHUNK_ERROR_COUNT, ERROR_MSG_MAX_LENGTH);
388
389                        if (failChangeCount > 0) {
390                                return WorkChunkStatusEnum.FAILED;
391                        } else {
392                                return WorkChunkStatusEnum.ERRORED;
393                        }
394                });
395        }
396
397        @Override
398        public void onWorkChunkPollDelay(String theChunkId, Date theDeadline) {
399                int updated = myWorkChunkRepository.updateWorkChunkNextPollTime(
400                                theChunkId, WorkChunkStatusEnum.POLL_WAITING, Set.of(WorkChunkStatusEnum.IN_PROGRESS), theDeadline);
401
402                if (updated != 1) {
403                        ourLog.warn("Expected to update 1 work chunk's poll delay; but found {}", updated);
404                }
405        }
406
407        @Override
408        public void onWorkChunkFailed(String theChunkId, String theErrorMessage) {
409                ourLog.info("Marking chunk {} as failed with message: {}", theChunkId, theErrorMessage);
410                String errorMessage = truncateErrorMessage(theErrorMessage);
411                myTransactionService
412                                .withSystemRequestOnDefaultPartition()
413                                .execute(() -> myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError(
414                                                theChunkId, new Date(), errorMessage, WorkChunkStatusEnum.FAILED));
415        }
416
417        @Override
418        public void onWorkChunkCompletion(WorkChunkCompletionEvent theEvent) {
419                myTransactionService
420                                .withSystemRequestOnDefaultPartition()
421                                .execute(() -> myWorkChunkRepository.updateChunkStatusAndClearDataForEndSuccess(
422                                                theEvent.getChunkId(),
423                                                new Date(),
424                                                theEvent.getRecordsProcessed(),
425                                                theEvent.getRecoveredErrorCount(),
426                                                WorkChunkStatusEnum.COMPLETED,
427                                                theEvent.getRecoveredWarningMessage()));
428        }
429
430        @Nullable
431        private static String truncateErrorMessage(String theErrorMessage) {
432                String errorMessage;
433                if (theErrorMessage != null && theErrorMessage.length() > ERROR_MSG_MAX_LENGTH) {
434                        ourLog.warn("Truncating error message that is too long to store in database: {}", theErrorMessage);
435                        errorMessage = theErrorMessage.substring(0, ERROR_MSG_MAX_LENGTH);
436                } else {
437                        errorMessage = theErrorMessage;
438                }
439                return errorMessage;
440        }
441
442        @Override
443        public void markWorkChunksWithStatusAndWipeData(
444                        String theInstanceId, List<String> theChunkIds, WorkChunkStatusEnum theStatus, String theErrorMessage) {
445                assert TransactionSynchronizationManager.isActualTransactionActive();
446
447                ourLog.debug("Marking all chunks for instance {} to status {}", theInstanceId, theStatus);
448                String errorMessage = truncateErrorMessage(theErrorMessage);
449                List<List<String>> listOfListOfIds = ListUtils.partition(theChunkIds, 100);
450                for (List<String> idList : listOfListOfIds) {
451                        myWorkChunkRepository.updateAllChunksForInstanceStatusClearDataAndSetError(
452                                        idList, new Date(), theStatus, errorMessage);
453                }
454        }
455
456        @Override
457        public Set<WorkChunkStatusEnum> getDistinctWorkChunkStatesForJobAndStep(
458                        String theInstanceId, String theCurrentStepId) {
459                if (getRunningJob(theInstanceId) == null) {
460                        return Collections.unmodifiableSet(new HashSet<>());
461                }
462                return myWorkChunkRepository.getDistinctStatusesForStep(theInstanceId, theCurrentStepId);
463        }
464
465        private Batch2JobInstanceEntity getRunningJob(String theInstanceId) {
466                Optional<Batch2JobInstanceEntity> instance = myJobInstanceRepository.findById(theInstanceId);
467                if (instance.isEmpty()) {
468                        return null;
469                }
470                if (instance.get().getStatus().isEnded()) {
471                        return null;
472                }
473                return instance.get();
474        }
475
476        private void fetchChunks(
477                        String theInstanceId,
478                        boolean theIncludeData,
479                        int thePageSize,
480                        int thePageIndex,
481                        Consumer<WorkChunk> theConsumer) {
482                myTransactionService
483                                .withSystemRequestOnDefaultPartition()
484                                .withPropagation(Propagation.REQUIRES_NEW)
485                                .execute(() -> {
486                                        List<Batch2WorkChunkEntity> chunks;
487                                        if (theIncludeData) {
488                                                chunks = myWorkChunkRepository.fetchChunks(
489                                                                PageRequest.of(thePageIndex, thePageSize), theInstanceId);
490                                        } else {
491                                                chunks = myWorkChunkRepository.fetchChunksNoData(
492                                                                PageRequest.of(thePageIndex, thePageSize), theInstanceId);
493                                        }
494                                        for (Batch2WorkChunkEntity chunk : chunks) {
495                                                theConsumer.accept(toChunk(chunk));
496                                        }
497                                });
498        }
499
500        @Override
501        public void updateInstanceUpdateTime(String theInstanceId) {
502                myJobInstanceRepository.updateInstanceUpdateTime(theInstanceId, new Date());
503        }
504
505        @Override
506        public WorkChunk createWorkChunk(WorkChunk theWorkChunk) {
507                if (theWorkChunk.getId() == null) {
508                        theWorkChunk.setId(UUID.randomUUID().toString());
509                }
510                return toChunk(myWorkChunkRepository.save(Batch2WorkChunkEntity.fromWorkChunk(theWorkChunk)));
511        }
512
513        /**
514         * Note: Not @Transactional because the transaction happens in a lambda that's called outside of this method's scope
515         */
516        @Override
517        public Iterator<WorkChunk> fetchAllWorkChunksIterator(String theInstanceId, boolean theWithData) {
518                return new PagingIterator<>((thePageIndex, theBatchSize, theConsumer) ->
519                                fetchChunks(theInstanceId, theWithData, theBatchSize, thePageIndex, theConsumer));
520        }
521
522        @Override
523        public Stream<WorkChunk> fetchAllWorkChunksForStepStream(String theInstanceId, String theStepId) {
524                return myWorkChunkRepository
525                                .fetchChunksForStep(theInstanceId, theStepId)
526                                .map(this::toChunk);
527        }
528
529        @Override
530        public Page<WorkChunkMetadata> fetchAllWorkChunkMetadataForJobInStates(
531                        Pageable thePageable, String theInstanceId, Set<WorkChunkStatusEnum> theStates) {
532                Page<Batch2WorkChunkMetadataView> page =
533                                myWorkChunkMetadataViewRepo.fetchWorkChunkMetadataForJobInStates(thePageable, theInstanceId, theStates);
534
535                return page.map(Batch2WorkChunkMetadataView::toChunkMetadata);
536        }
537
538        @Override
539        public boolean updateInstance(String theInstanceId, JobInstanceUpdateCallback theModifier) {
540                /*
541                 * We may already have a copy of the entity in the L1 cache, and it may be
542                 * stale if the scheduled maintenance service has touched it recently. So
543                 * we fetch it and then refresh-lock it so that we don't fail if someone
544                 * else has touched it.
545                 */
546                Batch2JobInstanceEntity instanceEntity = myEntityManager.find(Batch2JobInstanceEntity.class, theInstanceId);
547                myEntityManager.refresh(instanceEntity, LockModeType.PESSIMISTIC_WRITE);
548
549                if (null == instanceEntity) {
550                        ourLog.error("No instance found with Id {}", theInstanceId);
551                        return false;
552                }
553                // convert to JobInstance for public api
554                JobInstance jobInstance = JobInstanceUtil.fromEntityToInstance(instanceEntity);
555
556                // run the modification callback
557                boolean wasModified = theModifier.doUpdate(jobInstance);
558
559                if (wasModified) {
560                        // copy fields back for flush.
561                        JobInstanceUtil.fromInstanceToEntity(jobInstance, instanceEntity);
562                }
563
564                return wasModified;
565        }
566
567        @Override
568        @Transactional(propagation = Propagation.REQUIRES_NEW)
569        public void deleteInstanceAndChunks(String theInstanceId) {
570                ourLog.info("Deleting instance and chunks: {}", theInstanceId);
571                myWorkChunkRepository.deleteAllForInstance(theInstanceId);
572                myJobInstanceRepository.deleteById(theInstanceId);
573        }
574
575        @Override
576        @Transactional(propagation = Propagation.REQUIRES_NEW)
577        public void deleteChunksAndMarkInstanceAsChunksPurged(String theInstanceId) {
578                ourLog.info("Deleting all chunks for instance ID: {}", theInstanceId);
579                int updateCount = myJobInstanceRepository.updateWorkChunksPurgedTrue(theInstanceId);
580                int deleteCount = myWorkChunkRepository.deleteAllForInstance(theInstanceId);
581                ourLog.debug("Purged {} chunks, and updated {} instance.", deleteCount, updateCount);
582        }
583
584        @Override
585        public boolean markInstanceAsStatusWhenStatusIn(
586                        String theInstanceId, StatusEnum theStatusEnum, Set<StatusEnum> thePriorStates) {
587                int recordsChanged =
588                                myJobInstanceRepository.updateInstanceStatusIfIn(theInstanceId, theStatusEnum, thePriorStates);
589                ourLog.debug(
590                                "Update job {} to status {} if in status {}: {}",
591                                theInstanceId,
592                                theStatusEnum,
593                                thePriorStates,
594                                recordsChanged > 0);
595                return recordsChanged > 0;
596        }
597
598        @Override
599        @Transactional(propagation = Propagation.REQUIRES_NEW)
600        public JobOperationResultJson cancelInstance(String theInstanceId) {
601                int recordsChanged = myJobInstanceRepository.updateInstanceCancelled(theInstanceId, true);
602                String operationString = "Cancel job instance " + theInstanceId;
603
604                // wipmb For 6.8 - This is too detailed to be down here - this should be up at the api layer.
605                // Replace with boolean result or ResourceNotFound exception.  Build the message up at the ui.
606                String messagePrefix = "Job instance <" + theInstanceId + ">";
607                if (recordsChanged > 0) {
608                        return JobOperationResultJson.newSuccess(operationString, messagePrefix + " successfully cancelled.");
609                } else {
610                        Optional<JobInstance> instance = fetchInstance(theInstanceId);
611                        if (instance.isPresent()) {
612                                return JobOperationResultJson.newFailure(
613                                                operationString, messagePrefix + " was already cancelled.  Nothing to do.");
614                        } else {
615                                return JobOperationResultJson.newFailure(operationString, messagePrefix + " not found.");
616                        }
617                }
618        }
619
620        private void invokePreStorageBatchHooks(JobInstance theJobInstance) {
621                if (myInterceptorBroadcaster.hasHooks(Pointcut.STORAGE_PRESTORAGE_BATCH_JOB_CREATE)) {
622                        HookParams params = new HookParams()
623                                        .add(JobInstance.class, theJobInstance)
624                                        .add(RequestDetails.class, new SystemRequestDetails());
625
626                        myInterceptorBroadcaster.callHooks(Pointcut.STORAGE_PRESTORAGE_BATCH_JOB_CREATE, params);
627                }
628        }
629
630        @Override
631        @Transactional(propagation = Propagation.REQUIRES_NEW)
632        public boolean advanceJobStepAndUpdateChunkStatus(
633                        String theJobInstanceId, String theNextStepId, boolean theIsReductionStep) {
634                boolean changed = updateInstance(theJobInstanceId, instance -> {
635                        if (instance.getCurrentGatedStepId().equals(theNextStepId)) {
636                                // someone else beat us here.  No changes
637                                return false;
638                        }
639                        ourLog.debug("Moving gated instance {} to the next step {}.", theJobInstanceId, theNextStepId);
640                        instance.setCurrentGatedStepId(theNextStepId);
641                        return true;
642                });
643
644                if (changed) {
645                        ourLog.debug(
646                                        "Updating chunk status from GATE_WAITING to READY for gated instance {} in step {}.",
647                                        theJobInstanceId,
648                                        theNextStepId);
649                        WorkChunkStatusEnum nextStep =
650                                        theIsReductionStep ? WorkChunkStatusEnum.REDUCTION_READY : WorkChunkStatusEnum.READY;
651                        // when we reach here, the current step id is equal to theNextStepId
652                        // Up to 7.1, gated jobs' work chunks are created in status QUEUED but not actually queued for the
653                        // workers.
654                        // In order to keep them compatible, turn QUEUED chunks into READY, too.
655                        // TODO: 'QUEUED' from the IN clause will be removed after 7.6.0.
656                        int numChanged = myWorkChunkRepository.updateAllChunksForStepWithStatus(
657                                        theJobInstanceId,
658                                        theNextStepId,
659                                        List.of(WorkChunkStatusEnum.GATE_WAITING, WorkChunkStatusEnum.QUEUED),
660                                        nextStep);
661                        ourLog.debug(
662                                        "Updated {} chunks of gated instance {} for step {} from fake QUEUED to READY.",
663                                        numChanged,
664                                        theJobInstanceId,
665                                        theNextStepId);
666                }
667
668                return changed;
669        }
670}