001/*-
002 * #%L
003 * HAPI FHIR JPA Server
004 * %%
005 * Copyright (C) 2014 - 2025 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.batch2;
021
022import ca.uhn.fhir.batch2.api.IJobPersistence;
023import ca.uhn.fhir.batch2.api.JobOperationResultJson;
024import ca.uhn.fhir.batch2.model.BatchInstanceStatusDTO;
025import ca.uhn.fhir.batch2.model.BatchWorkChunkStatusDTO;
026import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest;
027import ca.uhn.fhir.batch2.model.JobInstance;
028import ca.uhn.fhir.batch2.model.StatusEnum;
029import ca.uhn.fhir.batch2.model.WorkChunk;
030import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent;
031import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
032import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent;
033import ca.uhn.fhir.batch2.model.WorkChunkMetadata;
034import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
035import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
036import ca.uhn.fhir.interceptor.api.HookParams;
037import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
038import ca.uhn.fhir.interceptor.api.Pointcut;
039import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
040import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkMetadataViewRepository;
041import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
042import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
043import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity;
044import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
045import ca.uhn.fhir.jpa.entity.Batch2WorkChunkMetadataView;
046import ca.uhn.fhir.model.api.PagingIterator;
047import ca.uhn.fhir.rest.api.server.RequestDetails;
048import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
049import ca.uhn.fhir.util.Batch2JobDefinitionConstants;
050import ca.uhn.fhir.util.Logs;
051import com.fasterxml.jackson.core.JsonParser;
052import com.fasterxml.jackson.databind.JsonNode;
053import com.fasterxml.jackson.databind.ObjectMapper;
054import com.fasterxml.jackson.databind.node.ObjectNode;
055import jakarta.annotation.Nonnull;
056import jakarta.annotation.Nullable;
057import jakarta.persistence.EntityManager;
058import jakarta.persistence.LockModeType;
059import org.apache.commons.collections4.ListUtils;
060import org.apache.commons.lang3.Validate;
061import org.slf4j.Logger;
062import org.springframework.data.domain.Page;
063import org.springframework.data.domain.PageImpl;
064import org.springframework.data.domain.PageRequest;
065import org.springframework.data.domain.Pageable;
066import org.springframework.data.domain.Sort;
067import org.springframework.transaction.annotation.Propagation;
068import org.springframework.transaction.annotation.Transactional;
069import org.springframework.transaction.support.TransactionSynchronizationManager;
070
071import java.time.Instant;
072import java.util.Collections;
073import java.util.Date;
074import java.util.HashSet;
075import java.util.Iterator;
076import java.util.List;
077import java.util.Objects;
078import java.util.Optional;
079import java.util.Set;
080import java.util.UUID;
081import java.util.function.Consumer;
082import java.util.stream.Collectors;
083import java.util.stream.Stream;
084
085import static ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor.MAX_CHUNK_ERROR_COUNT;
086import static ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity.ERROR_MSG_MAX_LENGTH;
087import static org.apache.commons.lang3.StringUtils.isBlank;
088
089public class JpaJobPersistenceImpl implements IJobPersistence {
090        private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
091        public static final String CREATE_TIME = "myCreateTime";
092
093        private final IBatch2JobInstanceRepository myJobInstanceRepository;
094        private final IBatch2WorkChunkRepository myWorkChunkRepository;
095        private final IBatch2WorkChunkMetadataViewRepository myWorkChunkMetadataViewRepo;
096        private final EntityManager myEntityManager;
097        private final IHapiTransactionService myTransactionService;
098        private final IInterceptorBroadcaster myInterceptorBroadcaster;
099
100        /**
101         * Constructor
102         */
103        public JpaJobPersistenceImpl(
104                        IBatch2JobInstanceRepository theJobInstanceRepository,
105                        IBatch2WorkChunkRepository theWorkChunkRepository,
106                        IBatch2WorkChunkMetadataViewRepository theWorkChunkMetadataViewRepo,
107                        IHapiTransactionService theTransactionService,
108                        EntityManager theEntityManager,
109                        IInterceptorBroadcaster theInterceptorBroadcaster) {
110                Validate.notNull(theJobInstanceRepository, "theJobInstanceRepository");
111                Validate.notNull(theWorkChunkRepository, "theWorkChunkRepository");
112                myJobInstanceRepository = theJobInstanceRepository;
113                myWorkChunkRepository = theWorkChunkRepository;
114                myWorkChunkMetadataViewRepo = theWorkChunkMetadataViewRepo;
115                myTransactionService = theTransactionService;
116                myEntityManager = theEntityManager;
117                myInterceptorBroadcaster = theInterceptorBroadcaster;
118        }
119
120        @Override
121        @Transactional(propagation = Propagation.REQUIRED)
122        public String onWorkChunkCreate(WorkChunkCreateEvent theBatchWorkChunk) {
123                Batch2WorkChunkEntity entity = new Batch2WorkChunkEntity();
124                entity.setId(UUID.randomUUID().toString());
125                entity.setSequence(theBatchWorkChunk.sequence);
126                entity.setJobDefinitionId(theBatchWorkChunk.jobDefinitionId);
127                entity.setJobDefinitionVersion(theBatchWorkChunk.jobDefinitionVersion);
128                entity.setTargetStepId(theBatchWorkChunk.targetStepId);
129                entity.setInstanceId(theBatchWorkChunk.instanceId);
130                entity.setSerializedData(theBatchWorkChunk.serializedData);
131                entity.setCreateTime(new Date());
132                entity.setStartTime(new Date());
133                entity.setStatus(getOnCreateStatus(theBatchWorkChunk));
134
135                ourLog.debug("Create work chunk {}/{}/{}", entity.getInstanceId(), entity.getId(), entity.getTargetStepId());
136                ourLog.trace(
137                                "Create work chunk data {}/{}: {}", entity.getInstanceId(), entity.getId(), entity.getSerializedData());
138                myTransactionService.withSystemRequestOnDefaultPartition().execute(() -> myWorkChunkRepository.save(entity));
139
140                return entity.getId();
141        }
142
143        /**
144         * Gets the initial onCreate state for the given workchunk.
145         * Gated job chunks start in GATE_WAITING; they will be transitioned to READY during maintenance pass when all
146         * chunks in the previous step are COMPLETED.
147         * Non gated job chunks start in READY
148         */
149        private static WorkChunkStatusEnum getOnCreateStatus(WorkChunkCreateEvent theBatchWorkChunk) {
150                if (theBatchWorkChunk.isGatedExecution) {
151                        return WorkChunkStatusEnum.GATE_WAITING;
152                } else {
153                        return WorkChunkStatusEnum.READY;
154                }
155        }
156
157        @Override
158        @Transactional(propagation = Propagation.REQUIRED)
159        public Optional<WorkChunk> onWorkChunkDequeue(String theChunkId) {
160                // take a lock on the chunk id to ensure that the maintenance run isn't doing anything.
161                Batch2WorkChunkEntity chunkLock =
162                                myEntityManager.find(Batch2WorkChunkEntity.class, theChunkId, LockModeType.PESSIMISTIC_WRITE);
163
164                if (chunkLock == null) {
165                        ourLog.warn("Unknown chunk id {} encountered. Message will be discarded.", theChunkId);
166                        return Optional.empty();
167                }
168
169                // remove from the current state to avoid stale data.
170                myEntityManager.detach(chunkLock);
171
172                // NOTE: Ideally, IN_PROGRESS wouldn't be allowed here.  On chunk failure, we probably shouldn't be allowed.
173                // But how does re-run happen if k8s kills a processor mid run?
174                List<WorkChunkStatusEnum> priorStates =
175                                List.of(WorkChunkStatusEnum.QUEUED, WorkChunkStatusEnum.ERRORED, WorkChunkStatusEnum.IN_PROGRESS);
176                int rowsModified = myWorkChunkRepository.updateChunkStatusForStart(
177                                theChunkId, new Date(), WorkChunkStatusEnum.IN_PROGRESS, priorStates);
178
179                if (rowsModified == 0) {
180                        ourLog.info("Attempting to start chunk {} but it was already started.", theChunkId);
181                        return Optional.empty();
182                } else {
183                        Optional<Batch2WorkChunkEntity> chunk = myWorkChunkRepository.findById(theChunkId);
184                        return chunk.map(this::toChunk);
185                }
186        }
187
188        @Override
189        @Transactional(propagation = Propagation.REQUIRED)
190        public String storeNewInstance(JobInstance theInstance) {
191                Validate.isTrue(isBlank(theInstance.getInstanceId()));
192
193                invokePreStorageBatchHooks(theInstance);
194
195                Batch2JobInstanceEntity entity = new Batch2JobInstanceEntity();
196                entity.setId(UUID.randomUUID().toString());
197                entity.setDefinitionId(theInstance.getJobDefinitionId());
198                entity.setDefinitionVersion(theInstance.getJobDefinitionVersion());
199                entity.setStatus(theInstance.getStatus());
200                entity.setParams(theInstance.getParameters());
201                entity.setCurrentGatedStepId(theInstance.getCurrentGatedStepId());
202                entity.setFastTracking(theInstance.isFastTracking());
203                entity.setCreateTime(new Date());
204                entity.setStartTime(new Date());
205                entity.setReport(theInstance.getReport());
206                entity.setTriggeringUsername(theInstance.getTriggeringUsername());
207                entity.setTriggeringClientId(theInstance.getTriggeringClientId());
208                entity.setUserDataJson(theInstance.getUserDataAsString());
209
210                entity = myJobInstanceRepository.save(entity);
211                return entity.getId();
212        }
213
214        @Override
215        @Transactional(propagation = Propagation.REQUIRES_NEW)
216        public List<JobInstance> fetchInstances(
217                        String theJobDefinitionId, Set<StatusEnum> theStatuses, Date theCutoff, Pageable thePageable) {
218                return toInstanceList(myJobInstanceRepository.findInstancesByJobIdAndStatusAndExpiry(
219                                theJobDefinitionId, theStatuses, theCutoff, thePageable));
220        }
221
222        @Override
223        @Transactional(propagation = Propagation.REQUIRES_NEW)
224        public List<JobInstance> fetchInstancesByJobDefinitionIdAndStatus(
225                        String theJobDefinitionId, Set<StatusEnum> theRequestedStatuses, int thePageSize, int thePageIndex) {
226                PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME);
227                return toInstanceList(myJobInstanceRepository.fetchInstancesByJobDefinitionIdAndStatus(
228                                theJobDefinitionId, theRequestedStatuses, pageRequest));
229        }
230
231        @Override
232        @Transactional(propagation = Propagation.REQUIRES_NEW)
233        public List<JobInstance> fetchInstancesByJobDefinitionId(
234                        String theJobDefinitionId, int thePageSize, int thePageIndex) {
235                PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME);
236                return toInstanceList(myJobInstanceRepository.findInstancesByJobDefinitionId(theJobDefinitionId, pageRequest));
237        }
238
239        @Override
240        @Transactional(propagation = Propagation.REQUIRES_NEW)
241        public Page<JobInstance> fetchJobInstances(JobInstanceFetchRequest theRequest) {
242                PageRequest pageRequest =
243                                PageRequest.of(theRequest.getPageStart(), theRequest.getBatchSize(), theRequest.getSort());
244
245                String jobStatus = theRequest.getJobStatus();
246                if (Objects.equals(jobStatus, "")) {
247                        Page<Batch2JobInstanceEntity> pageOfEntities = myJobInstanceRepository.findAll(pageRequest);
248                        return pageOfEntities.map(this::toInstance);
249                }
250
251                StatusEnum status = StatusEnum.valueOf(jobStatus);
252                List<JobInstance> jobs = toInstanceList(myJobInstanceRepository.findInstancesByJobStatus(status, pageRequest));
253                Integer jobsOfStatus = myJobInstanceRepository.findTotalJobsOfStatus(status);
254                return new PageImpl<>(jobs, pageRequest, jobsOfStatus);
255        }
256
257        private List<JobInstance> toInstanceList(List<Batch2JobInstanceEntity> theInstancesByJobDefinitionId) {
258                return theInstancesByJobDefinitionId.stream().map(this::toInstance).collect(Collectors.toList());
259        }
260
261        @Override
262        @Nonnull
263        public Optional<JobInstance> fetchInstance(String theInstanceId) {
264                return myTransactionService
265                                .withSystemRequestOnDefaultPartition()
266                                .execute(() -> myJobInstanceRepository.findById(theInstanceId).map(this::toInstance));
267        }
268
269        @Nonnull
270        @Override
271        public List<BatchWorkChunkStatusDTO> fetchWorkChunkStatusForInstance(String theInstanceId) {
272                return myTransactionService
273                                .withSystemRequestOnDefaultPartition()
274                                .execute(() -> myWorkChunkRepository.fetchWorkChunkStatusForInstance(theInstanceId));
275        }
276
277        @Nonnull
278        @Override
279        public BatchInstanceStatusDTO fetchBatchInstanceStatus(String theInstanceId) {
280                return myTransactionService
281                                .withSystemRequestOnDefaultPartition()
282                                .execute(() -> myJobInstanceRepository.fetchBatchInstanceStatus(theInstanceId));
283        }
284
285        @Override
286        @Transactional(propagation = Propagation.REQUIRES_NEW)
287        public List<JobInstance> fetchInstances(FetchJobInstancesRequest theRequest, int thePage, int theBatchSize) {
288                String definitionId = theRequest.getJobDefinition();
289                String params = theRequest.getParameters();
290                Set<StatusEnum> statuses = theRequest.getStatuses();
291
292                Pageable pageable = PageRequest.of(thePage, theBatchSize);
293
294                List<Batch2JobInstanceEntity> instanceEntities;
295
296                if (statuses != null && !statuses.isEmpty()) {
297                        if (Batch2JobDefinitionConstants.BULK_EXPORT.equals(definitionId)) {
298                                if (originalRequestUrlTruncation(params) != null) {
299                                        params = originalRequestUrlTruncation(params);
300                                }
301                        }
302                        instanceEntities = myJobInstanceRepository.findInstancesByJobIdParamsAndStatus(
303                                        definitionId, params, statuses, pageable);
304                } else {
305                        instanceEntities = myJobInstanceRepository.findInstancesByJobIdAndParams(definitionId, params, pageable);
306                }
307                return toInstanceList(instanceEntities);
308        }
309
310        private String originalRequestUrlTruncation(String theParams) {
311                try {
312                        ObjectMapper mapper = new ObjectMapper();
313                        mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
314                        mapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
315                        JsonNode rootNode = mapper.readTree(theParams);
316                        String originalUrl = "originalRequestUrl";
317
318                        if (rootNode instanceof ObjectNode) {
319                                ObjectNode objectNode = (ObjectNode) rootNode;
320
321                                if (objectNode.has(originalUrl)) {
322                                        String url = objectNode.get(originalUrl).asText();
323                                        if (url.contains("?")) {
324                                                objectNode.put(originalUrl, url.split("\\?")[0]);
325                                        }
326                                }
327                                return mapper.writeValueAsString(objectNode);
328                        }
329                } catch (Exception e) {
330                        ourLog.info("Error Truncating Original Request Url", e);
331                }
332                return null;
333        }
334
335        @Override
336        @Transactional(propagation = Propagation.REQUIRES_NEW)
337        public List<JobInstance> fetchInstances(int thePageSize, int thePageIndex) {
338                // default sort is myCreateTime Asc
339                PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME);
340                return myTransactionService
341                                .withSystemRequestOnDefaultPartition()
342                                .execute(() -> myJobInstanceRepository.findAll(pageRequest).stream()
343                                                .map(this::toInstance)
344                                                .collect(Collectors.toList()));
345        }
346
347        @Override
348        public void enqueueWorkChunkForProcessing(String theChunkId, Consumer<Integer> theCallback) {
349                int updated = myWorkChunkRepository.updateChunkStatus(
350                                theChunkId, WorkChunkStatusEnum.READY, WorkChunkStatusEnum.QUEUED);
351                theCallback.accept(updated);
352        }
353
354        @Override
355        public int updatePollWaitingChunksForJobIfReady(String theInstanceId) {
356                return myWorkChunkRepository.updateWorkChunksForPollWaiting(
357                                theInstanceId,
358                                Date.from(Instant.now()),
359                                Set.of(WorkChunkStatusEnum.POLL_WAITING),
360                                WorkChunkStatusEnum.READY);
361        }
362
363        @Override
364        @Transactional(propagation = Propagation.REQUIRES_NEW)
365        public List<JobInstance> fetchRecentInstances(int thePageSize, int thePageIndex) {
366                PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.DESC, CREATE_TIME);
367                return myTransactionService
368                                .withSystemRequestOnDefaultPartition()
369                                .execute(() -> myJobInstanceRepository.findAll(pageRequest).stream()
370                                                .map(this::toInstance)
371                                                .collect(Collectors.toList()));
372        }
373
374        private WorkChunk toChunk(Batch2WorkChunkEntity theEntity) {
375                return JobInstanceUtil.fromEntityToWorkChunk(theEntity);
376        }
377
378        private JobInstance toInstance(Batch2JobInstanceEntity theEntity) {
379                return JobInstanceUtil.fromEntityToInstance(theEntity);
380        }
381
382        @Override
383        public WorkChunkStatusEnum onWorkChunkError(WorkChunkErrorEvent theParameters) {
384                String chunkId = theParameters.getChunkId();
385                String errorMessage = truncateErrorMessage(theParameters.getErrorMsg());
386
387                return myTransactionService.withSystemRequestOnDefaultPartition().execute(() -> {
388                        int changeCount = myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError(
389                                        chunkId, new Date(), errorMessage, WorkChunkStatusEnum.ERRORED);
390                        Validate.isTrue(changeCount > 0, "No changed chunk matching %s", chunkId);
391
392                        int failChangeCount = myWorkChunkRepository.updateChunkForTooManyErrors(
393                                        WorkChunkStatusEnum.FAILED, chunkId, MAX_CHUNK_ERROR_COUNT, ERROR_MSG_MAX_LENGTH);
394
395                        if (failChangeCount > 0) {
396                                return WorkChunkStatusEnum.FAILED;
397                        } else {
398                                return WorkChunkStatusEnum.ERRORED;
399                        }
400                });
401        }
402
403        @Override
404        public void onWorkChunkPollDelay(String theChunkId, Date theDeadline) {
405                int updated = myWorkChunkRepository.updateWorkChunkNextPollTime(
406                                theChunkId, WorkChunkStatusEnum.POLL_WAITING, Set.of(WorkChunkStatusEnum.IN_PROGRESS), theDeadline);
407
408                if (updated != 1) {
409                        ourLog.warn("Expected to update 1 work chunk's poll delay; but found {}", updated);
410                }
411        }
412
413        @Override
414        public void onWorkChunkFailed(String theChunkId, String theErrorMessage) {
415                ourLog.info("Marking chunk {} as failed with message: {}", theChunkId, theErrorMessage);
416                String errorMessage = truncateErrorMessage(theErrorMessage);
417                myTransactionService
418                                .withSystemRequestOnDefaultPartition()
419                                .execute(() -> myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError(
420                                                theChunkId, new Date(), errorMessage, WorkChunkStatusEnum.FAILED));
421        }
422
423        @Override
424        public void onWorkChunkCompletion(WorkChunkCompletionEvent theEvent) {
425                myTransactionService
426                                .withSystemRequestOnDefaultPartition()
427                                .execute(() -> myWorkChunkRepository.updateChunkStatusAndClearDataForEndSuccess(
428                                                theEvent.getChunkId(),
429                                                new Date(),
430                                                theEvent.getRecordsProcessed(),
431                                                theEvent.getRecoveredErrorCount(),
432                                                WorkChunkStatusEnum.COMPLETED,
433                                                theEvent.getRecoveredWarningMessage()));
434        }
435
436        @Nullable
437        private static String truncateErrorMessage(String theErrorMessage) {
438                String errorMessage;
439                if (theErrorMessage != null && theErrorMessage.length() > ERROR_MSG_MAX_LENGTH) {
440                        ourLog.warn("Truncating error message that is too long to store in database: {}", theErrorMessage);
441                        errorMessage = theErrorMessage.substring(0, ERROR_MSG_MAX_LENGTH);
442                } else {
443                        errorMessage = theErrorMessage;
444                }
445                return errorMessage;
446        }
447
448        @Override
449        public void markWorkChunksWithStatusAndWipeData(
450                        String theInstanceId, List<String> theChunkIds, WorkChunkStatusEnum theStatus, String theErrorMessage) {
451                assert TransactionSynchronizationManager.isActualTransactionActive();
452
453                ourLog.debug("Marking all chunks for instance {} to status {}", theInstanceId, theStatus);
454                String errorMessage = truncateErrorMessage(theErrorMessage);
455                List<List<String>> listOfListOfIds = ListUtils.partition(theChunkIds, 100);
456                for (List<String> idList : listOfListOfIds) {
457                        myWorkChunkRepository.updateAllChunksForInstanceStatusClearDataAndSetError(
458                                        idList, new Date(), theStatus, errorMessage);
459                }
460        }
461
462        @Override
463        public Set<WorkChunkStatusEnum> getDistinctWorkChunkStatesForJobAndStep(
464                        String theInstanceId, String theCurrentStepId) {
465                if (getRunningJob(theInstanceId) == null) {
466                        return Collections.unmodifiableSet(new HashSet<>());
467                }
468                return myWorkChunkRepository.getDistinctStatusesForStep(theInstanceId, theCurrentStepId);
469        }
470
471        private Batch2JobInstanceEntity getRunningJob(String theInstanceId) {
472                Optional<Batch2JobInstanceEntity> instance = myJobInstanceRepository.findById(theInstanceId);
473                if (instance.isEmpty()) {
474                        return null;
475                }
476                if (instance.get().getStatus().isEnded()) {
477                        return null;
478                }
479                return instance.get();
480        }
481
482        private void fetchChunks(
483                        String theInstanceId,
484                        boolean theIncludeData,
485                        int thePageSize,
486                        int thePageIndex,
487                        Consumer<WorkChunk> theConsumer) {
488                myTransactionService
489                                .withSystemRequestOnDefaultPartition()
490                                .withPropagation(Propagation.REQUIRES_NEW)
491                                .execute(() -> {
492                                        List<Batch2WorkChunkEntity> chunks;
493                                        if (theIncludeData) {
494                                                chunks = myWorkChunkRepository.fetchChunks(
495                                                                PageRequest.of(thePageIndex, thePageSize), theInstanceId);
496                                        } else {
497                                                chunks = myWorkChunkRepository.fetchChunksNoData(
498                                                                PageRequest.of(thePageIndex, thePageSize), theInstanceId);
499                                        }
500                                        for (Batch2WorkChunkEntity chunk : chunks) {
501                                                theConsumer.accept(toChunk(chunk));
502                                        }
503                                });
504        }
505
506        @Override
507        public void updateInstanceUpdateTime(String theInstanceId) {
508                myJobInstanceRepository.updateInstanceUpdateTime(theInstanceId, new Date());
509        }
510
511        @Override
512        public WorkChunk createWorkChunk(WorkChunk theWorkChunk) {
513                if (theWorkChunk.getId() == null) {
514                        theWorkChunk.setId(UUID.randomUUID().toString());
515                }
516                return toChunk(myWorkChunkRepository.save(Batch2WorkChunkEntity.fromWorkChunk(theWorkChunk)));
517        }
518
519        /**
520         * Note: Not @Transactional because the transaction happens in a lambda that's called outside of this method's scope
521         */
522        @Override
523        public Iterator<WorkChunk> fetchAllWorkChunksIterator(String theInstanceId, boolean theWithData) {
524                return new PagingIterator<>((thePageIndex, theBatchSize, theConsumer) ->
525                                fetchChunks(theInstanceId, theWithData, theBatchSize, thePageIndex, theConsumer));
526        }
527
528        @Override
529        public Stream<WorkChunk> fetchAllWorkChunksForStepStream(String theInstanceId, String theStepId) {
530                return myWorkChunkRepository
531                                .fetchChunksForStep(theInstanceId, theStepId)
532                                .map(this::toChunk);
533        }
534
535        @Override
536        public Page<WorkChunkMetadata> fetchAllWorkChunkMetadataForJobInStates(
537                        Pageable thePageable, String theInstanceId, Set<WorkChunkStatusEnum> theStates) {
538                Page<Batch2WorkChunkMetadataView> page =
539                                myWorkChunkMetadataViewRepo.fetchWorkChunkMetadataForJobInStates(thePageable, theInstanceId, theStates);
540
541                return page.map(Batch2WorkChunkMetadataView::toChunkMetadata);
542        }
543
544        @Override
545        public boolean updateInstance(String theInstanceId, JobInstanceUpdateCallback theModifier) {
546                /*
547                 * We may already have a copy of the entity in the L1 cache, and it may be
548                 * stale if the scheduled maintenance service has touched it recently. So
549                 * we fetch it and then refresh-lock it so that we don't fail if someone
550                 * else has touched it.
551                 */
552                Batch2JobInstanceEntity instanceEntity = myEntityManager.find(Batch2JobInstanceEntity.class, theInstanceId);
553                myEntityManager.refresh(instanceEntity, LockModeType.PESSIMISTIC_WRITE);
554
555                if (null == instanceEntity) {
556                        ourLog.error("No instance found with Id {}", theInstanceId);
557                        return false;
558                }
559                // convert to JobInstance for public api
560                JobInstance jobInstance = JobInstanceUtil.fromEntityToInstance(instanceEntity);
561
562                // run the modification callback
563                boolean wasModified = theModifier.doUpdate(jobInstance);
564
565                if (wasModified) {
566                        // copy fields back for flush.
567                        JobInstanceUtil.fromInstanceToEntity(jobInstance, instanceEntity);
568                }
569
570                return wasModified;
571        }
572
573        @Override
574        @Transactional(propagation = Propagation.REQUIRES_NEW)
575        public void deleteInstanceAndChunks(String theInstanceId) {
576                ourLog.info("Deleting instance and chunks: {}", theInstanceId);
577                myWorkChunkRepository.deleteAllForInstance(theInstanceId);
578                myJobInstanceRepository.deleteById(theInstanceId);
579        }
580
581        @Override
582        @Transactional(propagation = Propagation.REQUIRES_NEW)
583        public void deleteChunksAndMarkInstanceAsChunksPurged(String theInstanceId) {
584                ourLog.info("Deleting all chunks for instance ID: {}", theInstanceId);
585                int updateCount = myJobInstanceRepository.updateWorkChunksPurgedTrue(theInstanceId);
586                int deleteCount = myWorkChunkRepository.deleteAllForInstance(theInstanceId);
587                ourLog.debug("Purged {} chunks, and updated {} instance.", deleteCount, updateCount);
588        }
589
590        @Override
591        public boolean markInstanceAsStatusWhenStatusIn(
592                        String theInstanceId, StatusEnum theStatusEnum, Set<StatusEnum> thePriorStates) {
593                int recordsChanged =
594                                myJobInstanceRepository.updateInstanceStatusIfIn(theInstanceId, theStatusEnum, thePriorStates);
595                ourLog.debug(
596                                "Update job {} to status {} if in status {}: {}",
597                                theInstanceId,
598                                theStatusEnum,
599                                thePriorStates,
600                                recordsChanged > 0);
601                return recordsChanged > 0;
602        }
603
604        @Override
605        @Transactional(propagation = Propagation.REQUIRES_NEW)
606        public JobOperationResultJson cancelInstance(String theInstanceId) {
607                int recordsChanged = myJobInstanceRepository.updateInstanceCancelled(theInstanceId, true);
608                String operationString = "Cancel job instance " + theInstanceId;
609
610                // wipmb For 6.8 - This is too detailed to be down here - this should be up at the api layer.
611                // Replace with boolean result or ResourceNotFound exception.  Build the message up at the ui.
612                String messagePrefix = "Job instance <" + theInstanceId + ">";
613                if (recordsChanged > 0) {
614                        return JobOperationResultJson.newSuccess(operationString, messagePrefix + " successfully cancelled.");
615                } else {
616                        Optional<JobInstance> instance = fetchInstance(theInstanceId);
617                        if (instance.isPresent()) {
618                                return JobOperationResultJson.newFailure(
619                                                operationString, messagePrefix + " was already cancelled.  Nothing to do.");
620                        } else {
621                                return JobOperationResultJson.newFailure(operationString, messagePrefix + " not found.");
622                        }
623                }
624        }
625
626        private void invokePreStorageBatchHooks(JobInstance theJobInstance) {
627                if (myInterceptorBroadcaster.hasHooks(Pointcut.STORAGE_PRESTORAGE_BATCH_JOB_CREATE)) {
628                        HookParams params = new HookParams()
629                                        .add(JobInstance.class, theJobInstance)
630                                        .add(RequestDetails.class, new SystemRequestDetails());
631
632                        myInterceptorBroadcaster.callHooks(Pointcut.STORAGE_PRESTORAGE_BATCH_JOB_CREATE, params);
633                }
634        }
635
636        @Override
637        @Transactional(propagation = Propagation.REQUIRES_NEW)
638        public boolean advanceJobStepAndUpdateChunkStatus(
639                        String theJobInstanceId, String theNextStepId, boolean theIsReductionStep) {
640                boolean changed = updateInstance(theJobInstanceId, instance -> {
641                        if (instance.getCurrentGatedStepId().equals(theNextStepId)) {
642                                // someone else beat us here.  No changes
643                                return false;
644                        }
645                        ourLog.debug("Moving gated instance {} to the next step {}.", theJobInstanceId, theNextStepId);
646                        instance.setCurrentGatedStepId(theNextStepId);
647                        return true;
648                });
649
650                if (changed) {
651                        ourLog.debug(
652                                        "Updating chunk status from GATE_WAITING to READY for gated instance {} in step {}.",
653                                        theJobInstanceId,
654                                        theNextStepId);
655                        WorkChunkStatusEnum nextStep =
656                                        theIsReductionStep ? WorkChunkStatusEnum.REDUCTION_READY : WorkChunkStatusEnum.READY;
657                        // when we reach here, the current step id is equal to theNextStepId
658                        // Up to 7.1, gated jobs' work chunks are created in status QUEUED but not actually queued for the
659                        // workers.
660                        // In order to keep them compatible, turn QUEUED chunks into READY, too.
661                        // TODO: 'QUEUED' from the IN clause will be removed after 7.6.0.
662                        int numChanged = myWorkChunkRepository.updateAllChunksForStepWithStatus(
663                                        theJobInstanceId,
664                                        theNextStepId,
665                                        List.of(WorkChunkStatusEnum.GATE_WAITING, WorkChunkStatusEnum.QUEUED),
666                                        nextStep);
667                        ourLog.debug(
668                                        "Updated {} chunks of gated instance {} for step {} from fake QUEUED to READY.",
669                                        numChanged,
670                                        theJobInstanceId,
671                                        theNextStepId);
672                }
673
674                return changed;
675        }
676}