001/*-
002 * #%L
003 * HAPI FHIR JPA Server
004 * %%
005 * Copyright (C) 2014 - 2025 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.batch2;
021
022import ca.uhn.fhir.batch2.api.IJobPersistence;
023import ca.uhn.fhir.batch2.api.JobOperationResultJson;
024import ca.uhn.fhir.batch2.model.BatchInstanceStatusDTO;
025import ca.uhn.fhir.batch2.model.BatchWorkChunkStatusDTO;
026import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest;
027import ca.uhn.fhir.batch2.model.JobInstance;
028import ca.uhn.fhir.batch2.model.StatusEnum;
029import ca.uhn.fhir.batch2.model.WorkChunk;
030import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent;
031import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
032import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent;
033import ca.uhn.fhir.batch2.model.WorkChunkMetadata;
034import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
035import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
036import ca.uhn.fhir.interceptor.api.HookParams;
037import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
038import ca.uhn.fhir.interceptor.api.Pointcut;
039import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
040import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkMetadataViewRepository;
041import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
042import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
043import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity;
044import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
045import ca.uhn.fhir.jpa.entity.Batch2WorkChunkMetadataView;
046import ca.uhn.fhir.model.api.PagingIterator;
047import ca.uhn.fhir.rest.api.server.RequestDetails;
048import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
049import ca.uhn.fhir.util.Batch2JobDefinitionConstants;
050import ca.uhn.fhir.util.Logs;
051import com.fasterxml.jackson.core.JsonParser;
052import com.fasterxml.jackson.databind.JsonNode;
053import com.fasterxml.jackson.databind.ObjectMapper;
054import com.fasterxml.jackson.databind.node.ObjectNode;
055import jakarta.annotation.Nonnull;
056import jakarta.annotation.Nullable;
057import jakarta.persistence.EntityManager;
058import jakarta.persistence.LockModeType;
059import org.apache.commons.collections4.ListUtils;
060import org.apache.commons.lang3.Validate;
061import org.slf4j.Logger;
062import org.springframework.data.domain.Page;
063import org.springframework.data.domain.PageImpl;
064import org.springframework.data.domain.PageRequest;
065import org.springframework.data.domain.Pageable;
066import org.springframework.data.domain.Sort;
067import org.springframework.transaction.annotation.Propagation;
068import org.springframework.transaction.annotation.Transactional;
069import org.springframework.transaction.support.TransactionSynchronizationManager;
070
071import java.time.Instant;
072import java.util.Collections;
073import java.util.Date;
074import java.util.HashSet;
075import java.util.Iterator;
076import java.util.List;
077import java.util.Objects;
078import java.util.Optional;
079import java.util.Set;
080import java.util.UUID;
081import java.util.function.Consumer;
082import java.util.stream.Collectors;
083import java.util.stream.Stream;
084
085import static ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor.MAX_CHUNK_ERROR_COUNT;
086import static ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity.ERROR_MSG_MAX_LENGTH;
087import static org.apache.commons.lang3.StringUtils.isBlank;
088
089public class JpaJobPersistenceImpl implements IJobPersistence {
090        private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
091        public static final String CREATE_TIME = "myCreateTime";
092
093        private final IBatch2JobInstanceRepository myJobInstanceRepository;
094        private final IBatch2WorkChunkRepository myWorkChunkRepository;
095        private final IBatch2WorkChunkMetadataViewRepository myWorkChunkMetadataViewRepo;
096        private final EntityManager myEntityManager;
097        private final IHapiTransactionService myTransactionService;
098        private final IInterceptorBroadcaster myInterceptorBroadcaster;
099
100        /**
101         * Constructor
102         */
103        public JpaJobPersistenceImpl(
104                        IBatch2JobInstanceRepository theJobInstanceRepository,
105                        IBatch2WorkChunkRepository theWorkChunkRepository,
106                        IBatch2WorkChunkMetadataViewRepository theWorkChunkMetadataViewRepo,
107                        IHapiTransactionService theTransactionService,
108                        EntityManager theEntityManager,
109                        IInterceptorBroadcaster theInterceptorBroadcaster) {
110                Validate.notNull(theJobInstanceRepository, "theJobInstanceRepository");
111                Validate.notNull(theWorkChunkRepository, "theWorkChunkRepository");
112                myJobInstanceRepository = theJobInstanceRepository;
113                myWorkChunkRepository = theWorkChunkRepository;
114                myWorkChunkMetadataViewRepo = theWorkChunkMetadataViewRepo;
115                myTransactionService = theTransactionService;
116                myEntityManager = theEntityManager;
117                myInterceptorBroadcaster = theInterceptorBroadcaster;
118        }
119
120        @Override
121        @Transactional(propagation = Propagation.REQUIRED)
122        public String onWorkChunkCreate(WorkChunkCreateEvent theBatchWorkChunk) {
123                Batch2WorkChunkEntity entity = new Batch2WorkChunkEntity();
124                entity.setId(UUID.randomUUID().toString());
125                entity.setSequence(theBatchWorkChunk.sequence);
126                entity.setJobDefinitionId(theBatchWorkChunk.jobDefinitionId);
127                entity.setJobDefinitionVersion(theBatchWorkChunk.jobDefinitionVersion);
128                entity.setTargetStepId(theBatchWorkChunk.targetStepId);
129                entity.setInstanceId(theBatchWorkChunk.instanceId);
130                entity.setSerializedData(theBatchWorkChunk.serializedData);
131                entity.setCreateTime(new Date());
132                entity.setStartTime(new Date());
133                entity.setStatus(getOnCreateStatus(theBatchWorkChunk));
134
135                ourLog.debug("Create work chunk {}/{}/{}", entity.getInstanceId(), entity.getId(), entity.getTargetStepId());
136                ourLog.trace(
137                                "Create work chunk data {}/{}: {}", entity.getInstanceId(), entity.getId(), entity.getSerializedData());
138                myTransactionService.withSystemRequestOnDefaultPartition().execute(() -> myWorkChunkRepository.save(entity));
139
140                return entity.getId();
141        }
142
143        /**
144         * Gets the initial onCreate state for the given workchunk.
145         * Gated job chunks start in GATE_WAITING; they will be transitioned to READY during maintenance pass when all
146         * chunks in the previous step are COMPLETED.
147         * Non gated job chunks start in READY
148         */
149        private static WorkChunkStatusEnum getOnCreateStatus(WorkChunkCreateEvent theBatchWorkChunk) {
150                if (theBatchWorkChunk.isGatedExecution) {
151                        return WorkChunkStatusEnum.GATE_WAITING;
152                } else {
153                        return WorkChunkStatusEnum.READY;
154                }
155        }
156
157        @Override
158        @Transactional(propagation = Propagation.REQUIRED)
159        public Optional<WorkChunk> onWorkChunkDequeue(String theChunkId) {
160                // take a lock on the chunk id to ensure that the maintenance run isn't doing anything.
161                Batch2WorkChunkEntity chunkLock =
162                                myEntityManager.find(Batch2WorkChunkEntity.class, theChunkId, LockModeType.PESSIMISTIC_WRITE);
163                // remove from the current state to avoid stale data.
164                myEntityManager.detach(chunkLock);
165
166                // NOTE: Ideally, IN_PROGRESS wouldn't be allowed here.  On chunk failure, we probably shouldn't be allowed.
167                // But how does re-run happen if k8s kills a processor mid run?
168                List<WorkChunkStatusEnum> priorStates =
169                                List.of(WorkChunkStatusEnum.QUEUED, WorkChunkStatusEnum.ERRORED, WorkChunkStatusEnum.IN_PROGRESS);
170                int rowsModified = myWorkChunkRepository.updateChunkStatusForStart(
171                                theChunkId, new Date(), WorkChunkStatusEnum.IN_PROGRESS, priorStates);
172
173                if (rowsModified == 0) {
174                        ourLog.info("Attempting to start chunk {} but it was already started.", theChunkId);
175                        return Optional.empty();
176                } else {
177                        Optional<Batch2WorkChunkEntity> chunk = myWorkChunkRepository.findById(theChunkId);
178                        return chunk.map(this::toChunk);
179                }
180        }
181
182        @Override
183        @Transactional(propagation = Propagation.REQUIRED)
184        public String storeNewInstance(JobInstance theInstance) {
185                Validate.isTrue(isBlank(theInstance.getInstanceId()));
186
187                invokePreStorageBatchHooks(theInstance);
188
189                Batch2JobInstanceEntity entity = new Batch2JobInstanceEntity();
190                entity.setId(UUID.randomUUID().toString());
191                entity.setDefinitionId(theInstance.getJobDefinitionId());
192                entity.setDefinitionVersion(theInstance.getJobDefinitionVersion());
193                entity.setStatus(theInstance.getStatus());
194                entity.setParams(theInstance.getParameters());
195                entity.setCurrentGatedStepId(theInstance.getCurrentGatedStepId());
196                entity.setFastTracking(theInstance.isFastTracking());
197                entity.setCreateTime(new Date());
198                entity.setStartTime(new Date());
199                entity.setReport(theInstance.getReport());
200                entity.setTriggeringUsername(theInstance.getTriggeringUsername());
201                entity.setTriggeringClientId(theInstance.getTriggeringClientId());
202
203                entity = myJobInstanceRepository.save(entity);
204                return entity.getId();
205        }
206
207        @Override
208        @Transactional(propagation = Propagation.REQUIRES_NEW)
209        public List<JobInstance> fetchInstances(
210                        String theJobDefinitionId, Set<StatusEnum> theStatuses, Date theCutoff, Pageable thePageable) {
211                return toInstanceList(myJobInstanceRepository.findInstancesByJobIdAndStatusAndExpiry(
212                                theJobDefinitionId, theStatuses, theCutoff, thePageable));
213        }
214
215        @Override
216        @Transactional(propagation = Propagation.REQUIRES_NEW)
217        public List<JobInstance> fetchInstancesByJobDefinitionIdAndStatus(
218                        String theJobDefinitionId, Set<StatusEnum> theRequestedStatuses, int thePageSize, int thePageIndex) {
219                PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME);
220                return toInstanceList(myJobInstanceRepository.fetchInstancesByJobDefinitionIdAndStatus(
221                                theJobDefinitionId, theRequestedStatuses, pageRequest));
222        }
223
224        @Override
225        @Transactional(propagation = Propagation.REQUIRES_NEW)
226        public List<JobInstance> fetchInstancesByJobDefinitionId(
227                        String theJobDefinitionId, int thePageSize, int thePageIndex) {
228                PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME);
229                return toInstanceList(myJobInstanceRepository.findInstancesByJobDefinitionId(theJobDefinitionId, pageRequest));
230        }
231
232        @Override
233        @Transactional(propagation = Propagation.REQUIRES_NEW)
234        public Page<JobInstance> fetchJobInstances(JobInstanceFetchRequest theRequest) {
235                PageRequest pageRequest =
236                                PageRequest.of(theRequest.getPageStart(), theRequest.getBatchSize(), theRequest.getSort());
237
238                String jobStatus = theRequest.getJobStatus();
239                if (Objects.equals(jobStatus, "")) {
240                        Page<Batch2JobInstanceEntity> pageOfEntities = myJobInstanceRepository.findAll(pageRequest);
241                        return pageOfEntities.map(this::toInstance);
242                }
243
244                StatusEnum status = StatusEnum.valueOf(jobStatus);
245                List<JobInstance> jobs = toInstanceList(myJobInstanceRepository.findInstancesByJobStatus(status, pageRequest));
246                Integer jobsOfStatus = myJobInstanceRepository.findTotalJobsOfStatus(status);
247                return new PageImpl<>(jobs, pageRequest, jobsOfStatus);
248        }
249
250        private List<JobInstance> toInstanceList(List<Batch2JobInstanceEntity> theInstancesByJobDefinitionId) {
251                return theInstancesByJobDefinitionId.stream().map(this::toInstance).collect(Collectors.toList());
252        }
253
254        @Override
255        @Nonnull
256        public Optional<JobInstance> fetchInstance(String theInstanceId) {
257                return myTransactionService
258                                .withSystemRequestOnDefaultPartition()
259                                .execute(() -> myJobInstanceRepository.findById(theInstanceId).map(this::toInstance));
260        }
261
262        @Nonnull
263        @Override
264        public List<BatchWorkChunkStatusDTO> fetchWorkChunkStatusForInstance(String theInstanceId) {
265                return myTransactionService
266                                .withSystemRequestOnDefaultPartition()
267                                .execute(() -> myWorkChunkRepository.fetchWorkChunkStatusForInstance(theInstanceId));
268        }
269
270        @Nonnull
271        @Override
272        public BatchInstanceStatusDTO fetchBatchInstanceStatus(String theInstanceId) {
273                return myTransactionService
274                                .withSystemRequestOnDefaultPartition()
275                                .execute(() -> myJobInstanceRepository.fetchBatchInstanceStatus(theInstanceId));
276        }
277
278        @Override
279        @Transactional(propagation = Propagation.REQUIRES_NEW)
280        public List<JobInstance> fetchInstances(FetchJobInstancesRequest theRequest, int thePage, int theBatchSize) {
281                String definitionId = theRequest.getJobDefinition();
282                String params = theRequest.getParameters();
283                Set<StatusEnum> statuses = theRequest.getStatuses();
284
285                Pageable pageable = PageRequest.of(thePage, theBatchSize);
286
287                List<Batch2JobInstanceEntity> instanceEntities;
288
289                if (statuses != null && !statuses.isEmpty()) {
290                        if (Batch2JobDefinitionConstants.BULK_EXPORT.equals(definitionId)) {
291                                if (originalRequestUrlTruncation(params) != null) {
292                                        params = originalRequestUrlTruncation(params);
293                                }
294                        }
295                        instanceEntities = myJobInstanceRepository.findInstancesByJobIdParamsAndStatus(
296                                        definitionId, params, statuses, pageable);
297                } else {
298                        instanceEntities = myJobInstanceRepository.findInstancesByJobIdAndParams(definitionId, params, pageable);
299                }
300                return toInstanceList(instanceEntities);
301        }
302
303        private String originalRequestUrlTruncation(String theParams) {
304                try {
305                        ObjectMapper mapper = new ObjectMapper();
306                        mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
307                        mapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
308                        JsonNode rootNode = mapper.readTree(theParams);
309                        String originalUrl = "originalRequestUrl";
310
311                        if (rootNode instanceof ObjectNode) {
312                                ObjectNode objectNode = (ObjectNode) rootNode;
313
314                                if (objectNode.has(originalUrl)) {
315                                        String url = objectNode.get(originalUrl).asText();
316                                        if (url.contains("?")) {
317                                                objectNode.put(originalUrl, url.split("\\?")[0]);
318                                        }
319                                }
320                                return mapper.writeValueAsString(objectNode);
321                        }
322                } catch (Exception e) {
323                        ourLog.info("Error Truncating Original Request Url", e);
324                }
325                return null;
326        }
327
328        @Override
329        @Transactional(propagation = Propagation.REQUIRES_NEW)
330        public List<JobInstance> fetchInstances(int thePageSize, int thePageIndex) {
331                // default sort is myCreateTime Asc
332                PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME);
333                return myTransactionService
334                                .withSystemRequestOnDefaultPartition()
335                                .execute(() -> myJobInstanceRepository.findAll(pageRequest).stream()
336                                                .map(this::toInstance)
337                                                .collect(Collectors.toList()));
338        }
339
340        @Override
341        public void enqueueWorkChunkForProcessing(String theChunkId, Consumer<Integer> theCallback) {
342                int updated = myWorkChunkRepository.updateChunkStatus(
343                                theChunkId, WorkChunkStatusEnum.READY, WorkChunkStatusEnum.QUEUED);
344                theCallback.accept(updated);
345        }
346
347        @Override
348        public int updatePollWaitingChunksForJobIfReady(String theInstanceId) {
349                return myWorkChunkRepository.updateWorkChunksForPollWaiting(
350                                theInstanceId,
351                                Date.from(Instant.now()),
352                                Set.of(WorkChunkStatusEnum.POLL_WAITING),
353                                WorkChunkStatusEnum.READY);
354        }
355
356        @Override
357        @Transactional(propagation = Propagation.REQUIRES_NEW)
358        public List<JobInstance> fetchRecentInstances(int thePageSize, int thePageIndex) {
359                PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.DESC, CREATE_TIME);
360                return myTransactionService
361                                .withSystemRequestOnDefaultPartition()
362                                .execute(() -> myJobInstanceRepository.findAll(pageRequest).stream()
363                                                .map(this::toInstance)
364                                                .collect(Collectors.toList()));
365        }
366
367        private WorkChunk toChunk(Batch2WorkChunkEntity theEntity) {
368                return JobInstanceUtil.fromEntityToWorkChunk(theEntity);
369        }
370
371        private JobInstance toInstance(Batch2JobInstanceEntity theEntity) {
372                return JobInstanceUtil.fromEntityToInstance(theEntity);
373        }
374
375        @Override
376        public WorkChunkStatusEnum onWorkChunkError(WorkChunkErrorEvent theParameters) {
377                String chunkId = theParameters.getChunkId();
378                String errorMessage = truncateErrorMessage(theParameters.getErrorMsg());
379
380                return myTransactionService.withSystemRequestOnDefaultPartition().execute(() -> {
381                        int changeCount = myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError(
382                                        chunkId, new Date(), errorMessage, WorkChunkStatusEnum.ERRORED);
383                        Validate.isTrue(changeCount > 0, "No changed chunk matching %s", chunkId);
384
385                        int failChangeCount = myWorkChunkRepository.updateChunkForTooManyErrors(
386                                        WorkChunkStatusEnum.FAILED, chunkId, MAX_CHUNK_ERROR_COUNT, ERROR_MSG_MAX_LENGTH);
387
388                        if (failChangeCount > 0) {
389                                return WorkChunkStatusEnum.FAILED;
390                        } else {
391                                return WorkChunkStatusEnum.ERRORED;
392                        }
393                });
394        }
395
396        @Override
397        public void onWorkChunkPollDelay(String theChunkId, Date theDeadline) {
398                int updated = myWorkChunkRepository.updateWorkChunkNextPollTime(
399                                theChunkId, WorkChunkStatusEnum.POLL_WAITING, Set.of(WorkChunkStatusEnum.IN_PROGRESS), theDeadline);
400
401                if (updated != 1) {
402                        ourLog.warn("Expected to update 1 work chunk's poll delay; but found {}", updated);
403                }
404        }
405
406        @Override
407        public void onWorkChunkFailed(String theChunkId, String theErrorMessage) {
408                ourLog.info("Marking chunk {} as failed with message: {}", theChunkId, theErrorMessage);
409                String errorMessage = truncateErrorMessage(theErrorMessage);
410                myTransactionService
411                                .withSystemRequestOnDefaultPartition()
412                                .execute(() -> myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError(
413                                                theChunkId, new Date(), errorMessage, WorkChunkStatusEnum.FAILED));
414        }
415
416        @Override
417        public void onWorkChunkCompletion(WorkChunkCompletionEvent theEvent) {
418                myTransactionService
419                                .withSystemRequestOnDefaultPartition()
420                                .execute(() -> myWorkChunkRepository.updateChunkStatusAndClearDataForEndSuccess(
421                                                theEvent.getChunkId(),
422                                                new Date(),
423                                                theEvent.getRecordsProcessed(),
424                                                theEvent.getRecoveredErrorCount(),
425                                                WorkChunkStatusEnum.COMPLETED,
426                                                theEvent.getRecoveredWarningMessage()));
427        }
428
429        @Nullable
430        private static String truncateErrorMessage(String theErrorMessage) {
431                String errorMessage;
432                if (theErrorMessage != null && theErrorMessage.length() > ERROR_MSG_MAX_LENGTH) {
433                        ourLog.warn("Truncating error message that is too long to store in database: {}", theErrorMessage);
434                        errorMessage = theErrorMessage.substring(0, ERROR_MSG_MAX_LENGTH);
435                } else {
436                        errorMessage = theErrorMessage;
437                }
438                return errorMessage;
439        }
440
441        @Override
442        public void markWorkChunksWithStatusAndWipeData(
443                        String theInstanceId, List<String> theChunkIds, WorkChunkStatusEnum theStatus, String theErrorMessage) {
444                assert TransactionSynchronizationManager.isActualTransactionActive();
445
446                ourLog.debug("Marking all chunks for instance {} to status {}", theInstanceId, theStatus);
447                String errorMessage = truncateErrorMessage(theErrorMessage);
448                List<List<String>> listOfListOfIds = ListUtils.partition(theChunkIds, 100);
449                for (List<String> idList : listOfListOfIds) {
450                        myWorkChunkRepository.updateAllChunksForInstanceStatusClearDataAndSetError(
451                                        idList, new Date(), theStatus, errorMessage);
452                }
453        }
454
455        @Override
456        public Set<WorkChunkStatusEnum> getDistinctWorkChunkStatesForJobAndStep(
457                        String theInstanceId, String theCurrentStepId) {
458                if (getRunningJob(theInstanceId) == null) {
459                        return Collections.unmodifiableSet(new HashSet<>());
460                }
461                return myWorkChunkRepository.getDistinctStatusesForStep(theInstanceId, theCurrentStepId);
462        }
463
464        private Batch2JobInstanceEntity getRunningJob(String theInstanceId) {
465                Optional<Batch2JobInstanceEntity> instance = myJobInstanceRepository.findById(theInstanceId);
466                if (instance.isEmpty()) {
467                        return null;
468                }
469                if (instance.get().getStatus().isEnded()) {
470                        return null;
471                }
472                return instance.get();
473        }
474
475        private void fetchChunks(
476                        String theInstanceId,
477                        boolean theIncludeData,
478                        int thePageSize,
479                        int thePageIndex,
480                        Consumer<WorkChunk> theConsumer) {
481                myTransactionService
482                                .withSystemRequestOnDefaultPartition()
483                                .withPropagation(Propagation.REQUIRES_NEW)
484                                .execute(() -> {
485                                        List<Batch2WorkChunkEntity> chunks;
486                                        if (theIncludeData) {
487                                                chunks = myWorkChunkRepository.fetchChunks(
488                                                                PageRequest.of(thePageIndex, thePageSize), theInstanceId);
489                                        } else {
490                                                chunks = myWorkChunkRepository.fetchChunksNoData(
491                                                                PageRequest.of(thePageIndex, thePageSize), theInstanceId);
492                                        }
493                                        for (Batch2WorkChunkEntity chunk : chunks) {
494                                                theConsumer.accept(toChunk(chunk));
495                                        }
496                                });
497        }
498
499        @Override
500        public void updateInstanceUpdateTime(String theInstanceId) {
501                myJobInstanceRepository.updateInstanceUpdateTime(theInstanceId, new Date());
502        }
503
504        @Override
505        public WorkChunk createWorkChunk(WorkChunk theWorkChunk) {
506                if (theWorkChunk.getId() == null) {
507                        theWorkChunk.setId(UUID.randomUUID().toString());
508                }
509                return toChunk(myWorkChunkRepository.save(Batch2WorkChunkEntity.fromWorkChunk(theWorkChunk)));
510        }
511
512        /**
513         * Note: Not @Transactional because the transaction happens in a lambda that's called outside of this method's scope
514         */
515        @Override
516        public Iterator<WorkChunk> fetchAllWorkChunksIterator(String theInstanceId, boolean theWithData) {
517                return new PagingIterator<>((thePageIndex, theBatchSize, theConsumer) ->
518                                fetchChunks(theInstanceId, theWithData, theBatchSize, thePageIndex, theConsumer));
519        }
520
521        @Override
522        public Stream<WorkChunk> fetchAllWorkChunksForStepStream(String theInstanceId, String theStepId) {
523                return myWorkChunkRepository
524                                .fetchChunksForStep(theInstanceId, theStepId)
525                                .map(this::toChunk);
526        }
527
528        @Override
529        public Page<WorkChunkMetadata> fetchAllWorkChunkMetadataForJobInStates(
530                        Pageable thePageable, String theInstanceId, Set<WorkChunkStatusEnum> theStates) {
531                Page<Batch2WorkChunkMetadataView> page =
532                                myWorkChunkMetadataViewRepo.fetchWorkChunkMetadataForJobInStates(thePageable, theInstanceId, theStates);
533
534                return page.map(Batch2WorkChunkMetadataView::toChunkMetadata);
535        }
536
537        @Override
538        public boolean updateInstance(String theInstanceId, JobInstanceUpdateCallback theModifier) {
539                /*
540                 * We may already have a copy of the entity in the L1 cache, and it may be
541                 * stale if the scheduled maintenance service has touched it recently. So
542                 * we fetch it and then refresh-lock it so that we don't fail if someone
543                 * else has touched it.
544                 */
545                Batch2JobInstanceEntity instanceEntity = myEntityManager.find(Batch2JobInstanceEntity.class, theInstanceId);
546                myEntityManager.refresh(instanceEntity, LockModeType.PESSIMISTIC_WRITE);
547
548                if (null == instanceEntity) {
549                        ourLog.error("No instance found with Id {}", theInstanceId);
550                        return false;
551                }
552                // convert to JobInstance for public api
553                JobInstance jobInstance = JobInstanceUtil.fromEntityToInstance(instanceEntity);
554
555                // run the modification callback
556                boolean wasModified = theModifier.doUpdate(jobInstance);
557
558                if (wasModified) {
559                        // copy fields back for flush.
560                        JobInstanceUtil.fromInstanceToEntity(jobInstance, instanceEntity);
561                }
562
563                return wasModified;
564        }
565
566        @Override
567        @Transactional(propagation = Propagation.REQUIRES_NEW)
568        public void deleteInstanceAndChunks(String theInstanceId) {
569                ourLog.info("Deleting instance and chunks: {}", theInstanceId);
570                myWorkChunkRepository.deleteAllForInstance(theInstanceId);
571                myJobInstanceRepository.deleteById(theInstanceId);
572        }
573
574        @Override
575        @Transactional(propagation = Propagation.REQUIRES_NEW)
576        public void deleteChunksAndMarkInstanceAsChunksPurged(String theInstanceId) {
577                ourLog.info("Deleting all chunks for instance ID: {}", theInstanceId);
578                int updateCount = myJobInstanceRepository.updateWorkChunksPurgedTrue(theInstanceId);
579                int deleteCount = myWorkChunkRepository.deleteAllForInstance(theInstanceId);
580                ourLog.debug("Purged {} chunks, and updated {} instance.", deleteCount, updateCount);
581        }
582
583        @Override
584        public boolean markInstanceAsStatusWhenStatusIn(
585                        String theInstanceId, StatusEnum theStatusEnum, Set<StatusEnum> thePriorStates) {
586                int recordsChanged =
587                                myJobInstanceRepository.updateInstanceStatusIfIn(theInstanceId, theStatusEnum, thePriorStates);
588                ourLog.debug(
589                                "Update job {} to status {} if in status {}: {}",
590                                theInstanceId,
591                                theStatusEnum,
592                                thePriorStates,
593                                recordsChanged > 0);
594                return recordsChanged > 0;
595        }
596
597        @Override
598        @Transactional(propagation = Propagation.REQUIRES_NEW)
599        public JobOperationResultJson cancelInstance(String theInstanceId) {
600                int recordsChanged = myJobInstanceRepository.updateInstanceCancelled(theInstanceId, true);
601                String operationString = "Cancel job instance " + theInstanceId;
602
603                // wipmb For 6.8 - This is too detailed to be down here - this should be up at the api layer.
604                // Replace with boolean result or ResourceNotFound exception.  Build the message up at the ui.
605                String messagePrefix = "Job instance <" + theInstanceId + ">";
606                if (recordsChanged > 0) {
607                        return JobOperationResultJson.newSuccess(operationString, messagePrefix + " successfully cancelled.");
608                } else {
609                        Optional<JobInstance> instance = fetchInstance(theInstanceId);
610                        if (instance.isPresent()) {
611                                return JobOperationResultJson.newFailure(
612                                                operationString, messagePrefix + " was already cancelled.  Nothing to do.");
613                        } else {
614                                return JobOperationResultJson.newFailure(operationString, messagePrefix + " not found.");
615                        }
616                }
617        }
618
619        private void invokePreStorageBatchHooks(JobInstance theJobInstance) {
620                if (myInterceptorBroadcaster.hasHooks(Pointcut.STORAGE_PRESTORAGE_BATCH_JOB_CREATE)) {
621                        HookParams params = new HookParams()
622                                        .add(JobInstance.class, theJobInstance)
623                                        .add(RequestDetails.class, new SystemRequestDetails());
624
625                        myInterceptorBroadcaster.callHooks(Pointcut.STORAGE_PRESTORAGE_BATCH_JOB_CREATE, params);
626                }
627        }
628
629        @Override
630        @Transactional(propagation = Propagation.REQUIRES_NEW)
631        public boolean advanceJobStepAndUpdateChunkStatus(
632                        String theJobInstanceId, String theNextStepId, boolean theIsReductionStep) {
633                boolean changed = updateInstance(theJobInstanceId, instance -> {
634                        if (instance.getCurrentGatedStepId().equals(theNextStepId)) {
635                                // someone else beat us here.  No changes
636                                return false;
637                        }
638                        ourLog.debug("Moving gated instance {} to the next step {}.", theJobInstanceId, theNextStepId);
639                        instance.setCurrentGatedStepId(theNextStepId);
640                        return true;
641                });
642
643                if (changed) {
644                        ourLog.debug(
645                                        "Updating chunk status from GATE_WAITING to READY for gated instance {} in step {}.",
646                                        theJobInstanceId,
647                                        theNextStepId);
648                        WorkChunkStatusEnum nextStep =
649                                        theIsReductionStep ? WorkChunkStatusEnum.REDUCTION_READY : WorkChunkStatusEnum.READY;
650                        // when we reach here, the current step id is equal to theNextStepId
651                        // Up to 7.1, gated jobs' work chunks are created in status QUEUED but not actually queued for the
652                        // workers.
653                        // In order to keep them compatible, turn QUEUED chunks into READY, too.
654                        // TODO: 'QUEUED' from the IN clause will be removed after 7.6.0.
655                        int numChanged = myWorkChunkRepository.updateAllChunksForStepWithStatus(
656                                        theJobInstanceId,
657                                        theNextStepId,
658                                        List.of(WorkChunkStatusEnum.GATE_WAITING, WorkChunkStatusEnum.QUEUED),
659                                        nextStep);
660                        ourLog.debug(
661                                        "Updated {} chunks of gated instance {} for step {} from fake QUEUED to READY.",
662                                        numChanged,
663                                        theJobInstanceId,
664                                        theNextStepId);
665                }
666
667                return changed;
668        }
669}