001/*-
002 * #%L
003 * HAPI FHIR JPA Server
004 * %%
005 * Copyright (C) 2014 - 2025 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.batch2;
021
022import ca.uhn.fhir.batch2.api.IJobPersistence;
023import ca.uhn.fhir.batch2.api.JobOperationResultJson;
024import ca.uhn.fhir.batch2.model.BatchInstanceStatusDTO;
025import ca.uhn.fhir.batch2.model.BatchWorkChunkStatusDTO;
026import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest;
027import ca.uhn.fhir.batch2.model.JobInstance;
028import ca.uhn.fhir.batch2.model.StatusEnum;
029import ca.uhn.fhir.batch2.model.WorkChunk;
030import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent;
031import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
032import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent;
033import ca.uhn.fhir.batch2.model.WorkChunkMetadata;
034import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
035import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
036import ca.uhn.fhir.interceptor.api.HookParams;
037import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
038import ca.uhn.fhir.interceptor.api.Pointcut;
039import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
040import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkMetadataViewRepository;
041import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
042import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
043import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity;
044import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
045import ca.uhn.fhir.jpa.entity.Batch2WorkChunkMetadataView;
046import ca.uhn.fhir.model.api.PagingIterator;
047import ca.uhn.fhir.rest.api.server.RequestDetails;
048import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
049import ca.uhn.fhir.util.Batch2JobDefinitionConstants;
050import ca.uhn.fhir.util.Logs;
051import com.fasterxml.jackson.core.JsonParser;
052import com.fasterxml.jackson.databind.JsonNode;
053import com.fasterxml.jackson.databind.ObjectMapper;
054import com.fasterxml.jackson.databind.node.ObjectNode;
055import jakarta.annotation.Nonnull;
056import jakarta.annotation.Nullable;
057import jakarta.persistence.EntityManager;
058import jakarta.persistence.LockModeType;
059import org.apache.commons.collections4.ListUtils;
060import org.apache.commons.lang3.StringUtils;
061import org.apache.commons.lang3.Validate;
062import org.slf4j.Logger;
063import org.springframework.data.domain.Page;
064import org.springframework.data.domain.PageRequest;
065import org.springframework.data.domain.Pageable;
066import org.springframework.data.domain.Sort;
067import org.springframework.transaction.annotation.Propagation;
068import org.springframework.transaction.annotation.Transactional;
069import org.springframework.transaction.support.TransactionSynchronizationManager;
070
071import java.time.Instant;
072import java.util.Collections;
073import java.util.Date;
074import java.util.HashSet;
075import java.util.Iterator;
076import java.util.List;
077import java.util.Optional;
078import java.util.Set;
079import java.util.UUID;
080import java.util.function.Consumer;
081import java.util.stream.Collectors;
082import java.util.stream.Stream;
083
084import static ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor.MAX_CHUNK_ERROR_COUNT;
085import static ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity.ERROR_MSG_MAX_LENGTH;
086import static org.apache.commons.lang3.StringUtils.isBlank;
087
088public class JpaJobPersistenceImpl implements IJobPersistence {
089        private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
090        public static final String CREATE_TIME = "myCreateTime";
091
092        private final IBatch2JobInstanceRepository myJobInstanceRepository;
093        private final IBatch2WorkChunkRepository myWorkChunkRepository;
094        private final IBatch2WorkChunkMetadataViewRepository myWorkChunkMetadataViewRepo;
095        private final EntityManager myEntityManager;
096        private final IHapiTransactionService myTransactionService;
097        private final IInterceptorBroadcaster myInterceptorBroadcaster;
098
099        /**
100         * Constructor
101         */
102        public JpaJobPersistenceImpl(
103                        IBatch2JobInstanceRepository theJobInstanceRepository,
104                        IBatch2WorkChunkRepository theWorkChunkRepository,
105                        IBatch2WorkChunkMetadataViewRepository theWorkChunkMetadataViewRepo,
106                        IHapiTransactionService theTransactionService,
107                        EntityManager theEntityManager,
108                        IInterceptorBroadcaster theInterceptorBroadcaster) {
109                Validate.notNull(theJobInstanceRepository, "theJobInstanceRepository");
110                Validate.notNull(theWorkChunkRepository, "theWorkChunkRepository");
111                myJobInstanceRepository = theJobInstanceRepository;
112                myWorkChunkRepository = theWorkChunkRepository;
113                myWorkChunkMetadataViewRepo = theWorkChunkMetadataViewRepo;
114                myTransactionService = theTransactionService;
115                myEntityManager = theEntityManager;
116                myInterceptorBroadcaster = theInterceptorBroadcaster;
117        }
118
119        @Override
120        @Transactional(propagation = Propagation.REQUIRED)
121        public String onWorkChunkCreate(WorkChunkCreateEvent theBatchWorkChunk) {
122                Batch2WorkChunkEntity entity = new Batch2WorkChunkEntity();
123                entity.setId(UUID.randomUUID().toString());
124                entity.setSequence(theBatchWorkChunk.sequence);
125                entity.setJobDefinitionId(theBatchWorkChunk.jobDefinitionId);
126                entity.setJobDefinitionVersion(theBatchWorkChunk.jobDefinitionVersion);
127                entity.setTargetStepId(theBatchWorkChunk.targetStepId);
128                entity.setInstanceId(theBatchWorkChunk.instanceId);
129                entity.setSerializedData(theBatchWorkChunk.serializedData);
130                entity.setCreateTime(new Date());
131                entity.setStartTime(new Date());
132                entity.setStatus(getOnCreateStatus(theBatchWorkChunk));
133
134                ourLog.debug("Create work chunk {}/{}/{}", entity.getInstanceId(), entity.getId(), entity.getTargetStepId());
135                ourLog.trace(
136                                "Create work chunk data {}/{}: {}", entity.getInstanceId(), entity.getId(), entity.getSerializedData());
137                myTransactionService.withSystemRequestOnDefaultPartition().execute(() -> myWorkChunkRepository.save(entity));
138
139                return entity.getId();
140        }
141
142        /**
143         * Gets the initial onCreate state for the given workchunk.
144         * Gated job chunks start in GATE_WAITING; they will be transitioned to READY during maintenance pass when all
145         * chunks in the previous step are COMPLETED.
146         * Non gated job chunks start in READY
147         */
148        private static WorkChunkStatusEnum getOnCreateStatus(WorkChunkCreateEvent theBatchWorkChunk) {
149                if (theBatchWorkChunk.isGatedExecution) {
150                        return WorkChunkStatusEnum.GATE_WAITING;
151                } else {
152                        return WorkChunkStatusEnum.READY;
153                }
154        }
155
156        @Override
157        @Transactional(propagation = Propagation.REQUIRED)
158        public Optional<WorkChunk> onWorkChunkDequeue(String theChunkId) {
159                // take a lock on the chunk id to ensure that the maintenance run isn't doing anything.
160                Batch2WorkChunkEntity chunkLock =
161                                myEntityManager.find(Batch2WorkChunkEntity.class, theChunkId, LockModeType.PESSIMISTIC_WRITE);
162
163                if (chunkLock == null) {
164                        ourLog.warn("Unknown chunk id {} encountered. Message will be discarded.", theChunkId);
165                        return Optional.empty();
166                }
167
168                // remove from the current state to avoid stale data.
169                myEntityManager.detach(chunkLock);
170
171                // NOTE: Ideally, IN_PROGRESS wouldn't be allowed here.  On chunk failure, we probably shouldn't be allowed.
172                // But how does re-run happen if k8s kills a processor mid run?
173                List<WorkChunkStatusEnum> priorStates =
174                                List.of(WorkChunkStatusEnum.QUEUED, WorkChunkStatusEnum.ERRORED, WorkChunkStatusEnum.IN_PROGRESS);
175                int rowsModified = myWorkChunkRepository.updateChunkStatusForStart(
176                                theChunkId, new Date(), WorkChunkStatusEnum.IN_PROGRESS, priorStates);
177
178                if (rowsModified == 0) {
179                        ourLog.info("Attempting to start chunk {} but it was already started.", theChunkId);
180                        return Optional.empty();
181                } else {
182                        Optional<Batch2WorkChunkEntity> chunk = myWorkChunkRepository.findById(theChunkId);
183                        return chunk.map(this::toChunk);
184                }
185        }
186
187        @Override
188        @Transactional(propagation = Propagation.REQUIRED)
189        public String storeNewInstance(JobInstance theInstance) {
190                Validate.isTrue(isBlank(theInstance.getInstanceId()));
191
192                invokePreStorageBatchHooks(theInstance);
193
194                Batch2JobInstanceEntity entity = new Batch2JobInstanceEntity();
195                entity.setId(UUID.randomUUID().toString());
196                entity.setDefinitionId(theInstance.getJobDefinitionId());
197                entity.setDefinitionVersion(theInstance.getJobDefinitionVersion());
198                entity.setStatus(theInstance.getStatus());
199                entity.setParams(theInstance.getParameters());
200                entity.setCurrentGatedStepId(theInstance.getCurrentGatedStepId());
201                entity.setFastTracking(theInstance.isFastTracking());
202                entity.setCreateTime(new Date());
203                entity.setStartTime(new Date());
204                entity.setReport(theInstance.getReport());
205                entity.setTriggeringUsername(theInstance.getTriggeringUsername());
206                entity.setTriggeringClientId(theInstance.getTriggeringClientId());
207                entity.setUserDataJson(theInstance.getUserDataAsString());
208
209                entity = myJobInstanceRepository.save(entity);
210                return entity.getId();
211        }
212
213        @Override
214        @Transactional(propagation = Propagation.REQUIRES_NEW)
215        public List<JobInstance> fetchInstances(
216                        String theJobDefinitionId, Set<StatusEnum> theStatuses, Date theCutoff, Pageable thePageable) {
217                return toInstanceList(myJobInstanceRepository.findInstancesByJobIdAndStatusAndExpiry(
218                                theJobDefinitionId, theStatuses, theCutoff, thePageable));
219        }
220
221        @Override
222        @Transactional(propagation = Propagation.REQUIRES_NEW)
223        public List<JobInstance> fetchInstancesByJobDefinitionIdAndStatus(
224                        String theJobDefinitionId, Set<StatusEnum> theRequestedStatuses, int thePageSize, int thePageIndex) {
225                PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME);
226                return toInstanceList(myJobInstanceRepository.fetchInstancesByJobDefinitionIdAndStatus(
227                                theJobDefinitionId, theRequestedStatuses, pageRequest));
228        }
229
230        @Override
231        @Transactional(propagation = Propagation.REQUIRES_NEW)
232        public List<JobInstance> fetchInstancesByJobDefinitionId(
233                        String theJobDefinitionId, int thePageSize, int thePageIndex) {
234                PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME);
235                return toInstanceList(myJobInstanceRepository.findInstancesByJobDefinitionId(theJobDefinitionId, pageRequest));
236        }
237
238        @Override
239        @Transactional(propagation = Propagation.REQUIRES_NEW)
240        public Page<JobInstance> fetchJobInstances(JobInstanceFetchRequest theRequest) {
241                PageRequest pageRequest =
242                                PageRequest.of(theRequest.getPageStart(), theRequest.getBatchSize(), theRequest.getSort());
243
244                return myTransactionService.withSystemRequestOnDefaultPartition().execute(() -> myJobInstanceRepository
245                                .findByJobDefinitionIdOrStatusOrIdOrCreateTime(
246                                                theRequest.getJobDefinitionId(),
247                                                StringUtils.isNotEmpty(theRequest.getJobStatus())
248                                                                ? StatusEnum.valueOf(theRequest.getJobStatus())
249                                                                : null,
250                                                theRequest.getJobId(),
251                                                theRequest.getJobCreateTimeFrom(),
252                                                theRequest.getJobCreateTimeTo(),
253                                                pageRequest)
254                                .map(this::toInstance));
255        }
256
257        private List<JobInstance> toInstanceList(List<Batch2JobInstanceEntity> theInstancesByJobDefinitionId) {
258                return theInstancesByJobDefinitionId.stream().map(this::toInstance).collect(Collectors.toList());
259        }
260
261        @Override
262        @Nonnull
263        public Optional<JobInstance> fetchInstance(String theInstanceId) {
264                return myTransactionService
265                                .withSystemRequestOnDefaultPartition()
266                                .execute(() -> myJobInstanceRepository.findById(theInstanceId).map(this::toInstance));
267        }
268
269        @Nonnull
270        @Override
271        public List<BatchWorkChunkStatusDTO> fetchWorkChunkStatusForInstance(String theInstanceId) {
272                return myTransactionService
273                                .withSystemRequestOnDefaultPartition()
274                                .execute(() -> myWorkChunkRepository.fetchWorkChunkStatusForInstance(theInstanceId));
275        }
276
277        @Nonnull
278        @Override
279        public BatchInstanceStatusDTO fetchBatchInstanceStatus(String theInstanceId) {
280                return myTransactionService
281                                .withSystemRequestOnDefaultPartition()
282                                .execute(() -> myJobInstanceRepository.fetchBatchInstanceStatus(theInstanceId));
283        }
284
285        @Override
286        @Transactional(propagation = Propagation.REQUIRES_NEW)
287        public List<JobInstance> fetchInstances(FetchJobInstancesRequest theRequest, int thePage, int theBatchSize) {
288                String definitionId = theRequest.getJobDefinition();
289                String params = theRequest.getParameters();
290                Set<StatusEnum> statuses = theRequest.getStatuses();
291
292                Pageable pageable = PageRequest.of(thePage, theBatchSize);
293
294                List<Batch2JobInstanceEntity> instanceEntities;
295
296                if (statuses != null && !statuses.isEmpty()) {
297                        // if we're not looking for cancelled jobs, we don't want jobs that
298                        // are in the process of being cancelled
299                        // (cancelling isn't instantaneous, but uses this flag)
300                        boolean isCancelled = statuses.contains(StatusEnum.CANCELLED);
301
302                        if (Batch2JobDefinitionConstants.BULK_EXPORT.equals(definitionId)) {
303                                if (originalRequestUrlTruncation(params) != null) {
304                                        params = originalRequestUrlTruncation(params);
305                                }
306                        }
307                        instanceEntities = myJobInstanceRepository.findInstancesByJobIdParamsAndStatus(
308                                        definitionId, params, statuses, isCancelled, pageable);
309                } else {
310                        instanceEntities = myJobInstanceRepository.findInstancesByJobIdAndParams(definitionId, params, pageable);
311                }
312                return toInstanceList(instanceEntities);
313        }
314
315        private String originalRequestUrlTruncation(String theParams) {
316                try {
317                        ObjectMapper mapper = new ObjectMapper();
318                        mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
319                        mapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
320                        JsonNode rootNode = mapper.readTree(theParams);
321                        String originalUrl = "originalRequestUrl";
322
323                        if (rootNode instanceof ObjectNode) {
324                                ObjectNode objectNode = (ObjectNode) rootNode;
325
326                                if (objectNode.has(originalUrl)) {
327                                        String url = objectNode.get(originalUrl).asText();
328                                        if (url.contains("?")) {
329                                                objectNode.put(originalUrl, url.split("\\?")[0]);
330                                        }
331                                }
332                                return mapper.writeValueAsString(objectNode);
333                        }
334                } catch (Exception e) {
335                        ourLog.info("Error Truncating Original Request Url", e);
336                }
337                return null;
338        }
339
340        @Override
341        @Transactional(propagation = Propagation.REQUIRES_NEW)
342        public List<JobInstance> fetchInstances(int thePageSize, int thePageIndex) {
343                // default sort is myCreateTime Asc
344                PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME);
345                return myTransactionService
346                                .withSystemRequestOnDefaultPartition()
347                                .execute(() -> myJobInstanceRepository.findAll(pageRequest).stream()
348                                                .map(this::toInstance)
349                                                .collect(Collectors.toList()));
350        }
351
352        @Override
353        public void enqueueWorkChunkForProcessing(String theChunkId, Consumer<Integer> theCallback) {
354                int updated = myWorkChunkRepository.updateChunkStatus(
355                                theChunkId, WorkChunkStatusEnum.READY, WorkChunkStatusEnum.QUEUED);
356                theCallback.accept(updated);
357        }
358
359        @Override
360        public int updatePollWaitingChunksForJobIfReady(String theInstanceId) {
361                return myWorkChunkRepository.updateWorkChunksForPollWaiting(
362                                theInstanceId,
363                                Date.from(Instant.now()),
364                                Set.of(WorkChunkStatusEnum.POLL_WAITING),
365                                WorkChunkStatusEnum.READY);
366        }
367
368        @Override
369        @Transactional(propagation = Propagation.REQUIRES_NEW)
370        public List<JobInstance> fetchRecentInstances(int thePageSize, int thePageIndex) {
371                PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.DESC, CREATE_TIME);
372                return myTransactionService
373                                .withSystemRequestOnDefaultPartition()
374                                .execute(() -> myJobInstanceRepository.findAll(pageRequest).stream()
375                                                .map(this::toInstance)
376                                                .collect(Collectors.toList()));
377        }
378
379        private WorkChunk toChunk(Batch2WorkChunkEntity theEntity) {
380                return JobInstanceUtil.fromEntityToWorkChunk(theEntity);
381        }
382
383        private JobInstance toInstance(Batch2JobInstanceEntity theEntity) {
384                return JobInstanceUtil.fromEntityToInstance(theEntity);
385        }
386
387        @Override
388        public WorkChunkStatusEnum onWorkChunkError(WorkChunkErrorEvent theParameters) {
389                String chunkId = theParameters.getChunkId();
390                String errorMessage = truncateErrorMessage(theParameters.getErrorMsg());
391
392                return myTransactionService.withSystemRequestOnDefaultPartition().execute(() -> {
393                        int changeCount = myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError(
394                                        chunkId, new Date(), errorMessage, WorkChunkStatusEnum.ERRORED);
395                        Validate.isTrue(changeCount > 0, "No changed chunk matching %s", chunkId);
396
397                        int failChangeCount = myWorkChunkRepository.updateChunkForTooManyErrors(
398                                        WorkChunkStatusEnum.FAILED, chunkId, MAX_CHUNK_ERROR_COUNT, ERROR_MSG_MAX_LENGTH);
399
400                        if (failChangeCount > 0) {
401                                return WorkChunkStatusEnum.FAILED;
402                        } else {
403                                return WorkChunkStatusEnum.ERRORED;
404                        }
405                });
406        }
407
408        @Override
409        public void onWorkChunkPollDelay(String theChunkId, Date theDeadline) {
410                int updated = myWorkChunkRepository.updateWorkChunkNextPollTime(
411                                theChunkId, WorkChunkStatusEnum.POLL_WAITING, Set.of(WorkChunkStatusEnum.IN_PROGRESS), theDeadline);
412
413                if (updated != 1) {
414                        ourLog.warn("Expected to update 1 work chunk's poll delay; but found {}", updated);
415                }
416        }
417
418        @Override
419        public void onWorkChunkFailed(String theChunkId, String theErrorMessage) {
420                ourLog.info("Marking chunk {} as failed with message: {}", theChunkId, theErrorMessage);
421                String errorMessage = truncateErrorMessage(theErrorMessage);
422                myTransactionService
423                                .withSystemRequestOnDefaultPartition()
424                                .execute(() -> myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError(
425                                                theChunkId, new Date(), errorMessage, WorkChunkStatusEnum.FAILED));
426        }
427
428        @Override
429        public void onWorkChunkCompletion(WorkChunkCompletionEvent theEvent) {
430                myTransactionService
431                                .withSystemRequestOnDefaultPartition()
432                                .execute(() -> myWorkChunkRepository.updateChunkStatusAndClearDataForEndSuccess(
433                                                theEvent.getChunkId(),
434                                                new Date(),
435                                                theEvent.getRecordsProcessed(),
436                                                theEvent.getRecoveredErrorCount(),
437                                                WorkChunkStatusEnum.COMPLETED,
438                                                theEvent.getRecoveredWarningMessage()));
439        }
440
441        @Nullable
442        private static String truncateErrorMessage(String theErrorMessage) {
443                String errorMessage;
444                if (theErrorMessage != null && theErrorMessage.length() > ERROR_MSG_MAX_LENGTH) {
445                        ourLog.warn("Truncating error message that is too long to store in database: {}", theErrorMessage);
446                        errorMessage = theErrorMessage.substring(0, ERROR_MSG_MAX_LENGTH);
447                } else {
448                        errorMessage = theErrorMessage;
449                }
450                return errorMessage;
451        }
452
453        @Override
454        public void markWorkChunksWithStatusAndWipeData(
455                        String theInstanceId, List<String> theChunkIds, WorkChunkStatusEnum theStatus, String theErrorMessage) {
456                assert TransactionSynchronizationManager.isActualTransactionActive();
457
458                ourLog.debug("Marking all chunks for instance {} to status {}", theInstanceId, theStatus);
459                String errorMessage = truncateErrorMessage(theErrorMessage);
460                List<List<String>> listOfListOfIds = ListUtils.partition(theChunkIds, 100);
461                for (List<String> idList : listOfListOfIds) {
462                        myWorkChunkRepository.updateAllChunksForInstanceStatusClearDataAndSetError(
463                                        idList, new Date(), theStatus, errorMessage);
464                }
465        }
466
467        @Override
468        public Set<WorkChunkStatusEnum> getDistinctWorkChunkStatesForJobAndStep(
469                        String theInstanceId, String theCurrentStepId) {
470                if (getRunningJob(theInstanceId) == null) {
471                        return Collections.unmodifiableSet(new HashSet<>());
472                }
473                return myWorkChunkRepository.getDistinctStatusesForStep(theInstanceId, theCurrentStepId);
474        }
475
476        private Batch2JobInstanceEntity getRunningJob(String theInstanceId) {
477                Optional<Batch2JobInstanceEntity> instance = myJobInstanceRepository.findById(theInstanceId);
478                if (instance.isEmpty()) {
479                        return null;
480                }
481                if (instance.get().getStatus().isEnded()) {
482                        return null;
483                }
484                return instance.get();
485        }
486
487        private void fetchChunks(
488                        String theInstanceId,
489                        boolean theIncludeData,
490                        int thePageSize,
491                        int thePageIndex,
492                        Consumer<WorkChunk> theConsumer) {
493                myTransactionService
494                                .withSystemRequestOnDefaultPartition()
495                                .withPropagation(Propagation.REQUIRES_NEW)
496                                .execute(() -> {
497                                        List<Batch2WorkChunkEntity> chunks;
498                                        if (theIncludeData) {
499                                                chunks = myWorkChunkRepository.fetchChunks(
500                                                                PageRequest.of(thePageIndex, thePageSize), theInstanceId);
501                                        } else {
502                                                chunks = myWorkChunkRepository.fetchChunksNoData(
503                                                                PageRequest.of(thePageIndex, thePageSize), theInstanceId);
504                                        }
505                                        for (Batch2WorkChunkEntity chunk : chunks) {
506                                                theConsumer.accept(toChunk(chunk));
507                                        }
508                                });
509        }
510
511        @Override
512        public void updateInstanceUpdateTime(String theInstanceId) {
513                myJobInstanceRepository.updateInstanceUpdateTime(theInstanceId, new Date());
514        }
515
516        @Override
517        public WorkChunk createWorkChunk(WorkChunk theWorkChunk) {
518                if (theWorkChunk.getId() == null) {
519                        theWorkChunk.setId(UUID.randomUUID().toString());
520                }
521                return toChunk(myWorkChunkRepository.save(Batch2WorkChunkEntity.fromWorkChunk(theWorkChunk)));
522        }
523
524        /**
525         * Note: Not @Transactional because the transaction happens in a lambda that's called outside of this method's scope
526         */
527        @Override
528        public Iterator<WorkChunk> fetchAllWorkChunksIterator(String theInstanceId, boolean theWithData) {
529                return new PagingIterator<>((thePageIndex, theBatchSize, theConsumer) ->
530                                fetchChunks(theInstanceId, theWithData, theBatchSize, thePageIndex, theConsumer));
531        }
532
533        @Override
534        public Stream<WorkChunk> fetchAllWorkChunksForStepStream(String theInstanceId, String theStepId) {
535                return myWorkChunkRepository
536                                .fetchChunksForStep(theInstanceId, theStepId)
537                                .map(this::toChunk);
538        }
539
540        @Override
541        public Page<WorkChunkMetadata> fetchAllWorkChunkMetadataForJobInStates(
542                        Pageable thePageable, String theInstanceId, Set<WorkChunkStatusEnum> theStates) {
543                Page<Batch2WorkChunkMetadataView> page =
544                                myWorkChunkMetadataViewRepo.fetchWorkChunkMetadataForJobInStates(thePageable, theInstanceId, theStates);
545
546                return page.map(Batch2WorkChunkMetadataView::toChunkMetadata);
547        }
548
549        @Override
550        public boolean updateInstance(String theInstanceId, JobInstanceUpdateCallback theModifier) {
551                /*
552                 * We may already have a copy of the entity in the L1 cache, and it may be
553                 * stale if the scheduled maintenance service has touched it recently. So
554                 * we fetch it and then refresh-lock it so that we don't fail if someone
555                 * else has touched it.
556                 */
557                Batch2JobInstanceEntity instanceEntity = myEntityManager.find(Batch2JobInstanceEntity.class, theInstanceId);
558                myEntityManager.refresh(instanceEntity, LockModeType.PESSIMISTIC_WRITE);
559
560                if (null == instanceEntity) {
561                        ourLog.error("No instance found with Id {}", theInstanceId);
562                        return false;
563                }
564                // convert to JobInstance for public api
565                JobInstance jobInstance = JobInstanceUtil.fromEntityToInstance(instanceEntity);
566
567                // run the modification callback
568                boolean wasModified = theModifier.doUpdate(jobInstance);
569
570                if (wasModified) {
571                        // copy fields back for flush.
572                        JobInstanceUtil.fromInstanceToEntity(jobInstance, instanceEntity);
573                }
574
575                return wasModified;
576        }
577
578        @Override
579        @Transactional(propagation = Propagation.REQUIRES_NEW)
580        public void deleteInstanceAndChunks(String theInstanceId) {
581                ourLog.info("Deleting instance and chunks: {}", theInstanceId);
582                myWorkChunkRepository.deleteAllForInstance(theInstanceId);
583                myJobInstanceRepository.deleteById(theInstanceId);
584        }
585
586        @Override
587        @Transactional(propagation = Propagation.REQUIRES_NEW)
588        public void deleteChunksAndMarkInstanceAsChunksPurged(String theInstanceId) {
589                ourLog.info("Deleting all chunks for instance ID: {}", theInstanceId);
590                int updateCount = myJobInstanceRepository.updateWorkChunksPurgedTrue(theInstanceId);
591                int deleteCount = myWorkChunkRepository.deleteAllForInstance(theInstanceId);
592                ourLog.debug("Purged {} chunks, and updated {} instance.", deleteCount, updateCount);
593        }
594
595        @Override
596        public boolean markInstanceAsStatusWhenStatusIn(
597                        String theInstanceId, StatusEnum theStatusEnum, Set<StatusEnum> thePriorStates) {
598                int recordsChanged =
599                                myJobInstanceRepository.updateInstanceStatusIfIn(theInstanceId, theStatusEnum, thePriorStates);
600                ourLog.debug(
601                                "Update job {} to status {} if in status {}: {}",
602                                theInstanceId,
603                                theStatusEnum,
604                                thePriorStates,
605                                recordsChanged > 0);
606                return recordsChanged > 0;
607        }
608
609        @Override
610        @Transactional(propagation = Propagation.REQUIRES_NEW)
611        public JobOperationResultJson cancelInstance(String theInstanceId) {
612                int recordsChanged = myJobInstanceRepository.updateInstanceCancelled(theInstanceId, true);
613                String operationString = "Cancel job instance " + theInstanceId;
614
615                // wipmb For 6.8 - This is too detailed to be down here - this should be up at the api layer.
616                // Replace with boolean result or ResourceNotFound exception.  Build the message up at the ui.
617                String messagePrefix = "Job instance <" + theInstanceId + ">";
618                if (recordsChanged > 0) {
619                        return JobOperationResultJson.newSuccess(operationString, messagePrefix + " successfully cancelled.");
620                } else {
621                        Optional<JobInstance> instance = fetchInstance(theInstanceId);
622                        if (instance.isPresent()) {
623                                return JobOperationResultJson.newFailure(
624                                                operationString, messagePrefix + " was already cancelled.  Nothing to do.");
625                        } else {
626                                return JobOperationResultJson.newFailure(operationString, messagePrefix + " not found.");
627                        }
628                }
629        }
630
631        private void invokePreStorageBatchHooks(JobInstance theJobInstance) {
632                if (myInterceptorBroadcaster.hasHooks(Pointcut.STORAGE_PRESTORAGE_BATCH_JOB_CREATE)) {
633                        HookParams params = new HookParams()
634                                        .add(JobInstance.class, theJobInstance)
635                                        .add(RequestDetails.class, new SystemRequestDetails());
636
637                        myInterceptorBroadcaster.callHooks(Pointcut.STORAGE_PRESTORAGE_BATCH_JOB_CREATE, params);
638                }
639        }
640
641        @Override
642        @Transactional(propagation = Propagation.REQUIRES_NEW)
643        public boolean advanceJobStepAndUpdateChunkStatus(
644                        String theJobInstanceId, String theNextStepId, boolean theIsReductionStep) {
645                boolean changed = updateInstance(theJobInstanceId, instance -> {
646                        if (instance.getCurrentGatedStepId().equals(theNextStepId)) {
647                                // someone else beat us here.  No changes
648                                return false;
649                        }
650                        ourLog.debug("Moving gated instance {} to the next step {}.", theJobInstanceId, theNextStepId);
651                        instance.setCurrentGatedStepId(theNextStepId);
652                        return true;
653                });
654
655                if (changed) {
656                        ourLog.debug(
657                                        "Updating chunk status from GATE_WAITING to READY for gated instance {} in step {}.",
658                                        theJobInstanceId,
659                                        theNextStepId);
660                        WorkChunkStatusEnum nextStep =
661                                        theIsReductionStep ? WorkChunkStatusEnum.REDUCTION_READY : WorkChunkStatusEnum.READY;
662                        // when we reach here, the current step id is equal to theNextStepId
663                        // Up to 7.1, gated jobs' work chunks are created in status QUEUED but not actually queued for the
664                        // workers.
665                        // In order to keep them compatible, turn QUEUED chunks into READY, too.
666                        // TODO: 'QUEUED' from the IN clause will be removed after 7.6.0.
667                        int numChanged = myWorkChunkRepository.updateAllChunksForStepWithStatus(
668                                        theJobInstanceId,
669                                        theNextStepId,
670                                        List.of(WorkChunkStatusEnum.GATE_WAITING, WorkChunkStatusEnum.QUEUED),
671                                        nextStep);
672                        ourLog.debug(
673                                        "Updated {} chunks of gated instance {} for step {} from fake QUEUED to READY.",
674                                        numChanged,
675                                        theJobInstanceId,
676                                        theNextStepId);
677                }
678
679                return changed;
680        }
681}