001/*-
002 * #%L
003 * HAPI FHIR JPA Server
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.batch2;
021
022import ca.uhn.fhir.batch2.api.IJobPersistence;
023import ca.uhn.fhir.batch2.api.JobOperationResultJson;
024import ca.uhn.fhir.batch2.model.BatchInstanceStatusDTO;
025import ca.uhn.fhir.batch2.model.BatchWorkChunkStatusDTO;
026import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest;
027import ca.uhn.fhir.batch2.model.JobInstance;
028import ca.uhn.fhir.batch2.model.StatusEnum;
029import ca.uhn.fhir.batch2.model.WorkChunk;
030import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent;
031import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
032import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent;
033import ca.uhn.fhir.batch2.model.WorkChunkMetadata;
034import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
035import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
036import ca.uhn.fhir.interceptor.api.HookParams;
037import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
038import ca.uhn.fhir.interceptor.api.Pointcut;
039import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
040import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkMetadataViewRepository;
041import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
042import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
043import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity;
044import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
045import ca.uhn.fhir.jpa.entity.Batch2WorkChunkMetadataView;
046import ca.uhn.fhir.model.api.PagingIterator;
047import ca.uhn.fhir.rest.api.server.RequestDetails;
048import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
049import ca.uhn.fhir.util.Batch2JobDefinitionConstants;
050import ca.uhn.fhir.util.Logs;
051import com.fasterxml.jackson.core.JsonParser;
052import com.fasterxml.jackson.databind.JsonNode;
053import com.fasterxml.jackson.databind.ObjectMapper;
054import com.fasterxml.jackson.databind.node.ObjectNode;
055import jakarta.annotation.Nonnull;
056import jakarta.annotation.Nullable;
057import jakarta.persistence.EntityManager;
058import jakarta.persistence.LockModeType;
059import jakarta.persistence.Query;
060import org.apache.commons.collections4.ListUtils;
061import org.apache.commons.lang3.Validate;
062import org.slf4j.Logger;
063import org.springframework.data.domain.Page;
064import org.springframework.data.domain.PageImpl;
065import org.springframework.data.domain.PageRequest;
066import org.springframework.data.domain.Pageable;
067import org.springframework.data.domain.Sort;
068import org.springframework.transaction.annotation.Propagation;
069import org.springframework.transaction.annotation.Transactional;
070import org.springframework.transaction.support.TransactionSynchronizationManager;
071
072import java.time.Instant;
073import java.util.Collections;
074import java.util.Date;
075import java.util.HashSet;
076import java.util.Iterator;
077import java.util.List;
078import java.util.Objects;
079import java.util.Optional;
080import java.util.Set;
081import java.util.UUID;
082import java.util.function.Consumer;
083import java.util.stream.Collectors;
084import java.util.stream.Stream;
085
086import static ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor.MAX_CHUNK_ERROR_COUNT;
087import static ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity.ERROR_MSG_MAX_LENGTH;
088import static org.apache.commons.lang3.StringUtils.isBlank;
089
090public class JpaJobPersistenceImpl implements IJobPersistence {
091        private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
092        public static final String CREATE_TIME = "myCreateTime";
093
094        private final IBatch2JobInstanceRepository myJobInstanceRepository;
095        private final IBatch2WorkChunkRepository myWorkChunkRepository;
096        private final IBatch2WorkChunkMetadataViewRepository myWorkChunkMetadataViewRepo;
097        private final EntityManager myEntityManager;
098        private final IHapiTransactionService myTransactionService;
099        private final IInterceptorBroadcaster myInterceptorBroadcaster;
100
101        /**
102         * Constructor
103         */
104        public JpaJobPersistenceImpl(
105                        IBatch2JobInstanceRepository theJobInstanceRepository,
106                        IBatch2WorkChunkRepository theWorkChunkRepository,
107                        IBatch2WorkChunkMetadataViewRepository theWorkChunkMetadataViewRepo,
108                        IHapiTransactionService theTransactionService,
109                        EntityManager theEntityManager,
110                        IInterceptorBroadcaster theInterceptorBroadcaster) {
111                Validate.notNull(theJobInstanceRepository, "theJobInstanceRepository");
112                Validate.notNull(theWorkChunkRepository, "theWorkChunkRepository");
113                myJobInstanceRepository = theJobInstanceRepository;
114                myWorkChunkRepository = theWorkChunkRepository;
115                myWorkChunkMetadataViewRepo = theWorkChunkMetadataViewRepo;
116                myTransactionService = theTransactionService;
117                myEntityManager = theEntityManager;
118                myInterceptorBroadcaster = theInterceptorBroadcaster;
119        }
120
121        @Override
122        @Transactional(propagation = Propagation.REQUIRED)
123        public String onWorkChunkCreate(WorkChunkCreateEvent theBatchWorkChunk) {
124                Batch2WorkChunkEntity entity = new Batch2WorkChunkEntity();
125                entity.setId(UUID.randomUUID().toString());
126                entity.setSequence(theBatchWorkChunk.sequence);
127                entity.setJobDefinitionId(theBatchWorkChunk.jobDefinitionId);
128                entity.setJobDefinitionVersion(theBatchWorkChunk.jobDefinitionVersion);
129                entity.setTargetStepId(theBatchWorkChunk.targetStepId);
130                entity.setInstanceId(theBatchWorkChunk.instanceId);
131                entity.setSerializedData(theBatchWorkChunk.serializedData);
132                entity.setCreateTime(new Date());
133                entity.setStartTime(new Date());
134                entity.setStatus(getOnCreateStatus(theBatchWorkChunk));
135
136                ourLog.debug("Create work chunk {}/{}/{}", entity.getInstanceId(), entity.getId(), entity.getTargetStepId());
137                ourLog.trace(
138                                "Create work chunk data {}/{}: {}", entity.getInstanceId(), entity.getId(), entity.getSerializedData());
139                myTransactionService.withSystemRequestOnDefaultPartition().execute(() -> myWorkChunkRepository.save(entity));
140
141                return entity.getId();
142        }
143
144        /**
145         * Gets the initial onCreate state for the given workchunk.
146         * Gated job chunks start in GATE_WAITING; they will be transitioned to READY during maintenance pass when all
147         * chunks in the previous step are COMPLETED.
148         * Non gated job chunks start in READY
149         */
150        private static WorkChunkStatusEnum getOnCreateStatus(WorkChunkCreateEvent theBatchWorkChunk) {
151                if (theBatchWorkChunk.isGatedExecution) {
152                        return WorkChunkStatusEnum.GATE_WAITING;
153                } else {
154                        return WorkChunkStatusEnum.READY;
155                }
156        }
157
158        @Override
159        @Transactional(propagation = Propagation.REQUIRED)
160        public Optional<WorkChunk> onWorkChunkDequeue(String theChunkId) {
161                // take a lock on the chunk id to ensure that the maintenance run isn't doing anything.
162                Batch2WorkChunkEntity chunkLock =
163                                myEntityManager.find(Batch2WorkChunkEntity.class, theChunkId, LockModeType.PESSIMISTIC_WRITE);
164                // remove from the current state to avoid stale data.
165                myEntityManager.detach(chunkLock);
166
167                // NOTE: Ideally, IN_PROGRESS wouldn't be allowed here.  On chunk failure, we probably shouldn't be allowed.
168                // But how does re-run happen if k8s kills a processor mid run?
169                List<WorkChunkStatusEnum> priorStates =
170                                List.of(WorkChunkStatusEnum.QUEUED, WorkChunkStatusEnum.ERRORED, WorkChunkStatusEnum.IN_PROGRESS);
171                int rowsModified = myWorkChunkRepository.updateChunkStatusForStart(
172                                theChunkId, new Date(), WorkChunkStatusEnum.IN_PROGRESS, priorStates);
173
174                if (rowsModified == 0) {
175                        ourLog.info("Attempting to start chunk {} but it was already started.", theChunkId);
176                        return Optional.empty();
177                } else {
178                        Optional<Batch2WorkChunkEntity> chunk = myWorkChunkRepository.findById(theChunkId);
179                        return chunk.map(this::toChunk);
180                }
181        }
182
183        @Override
184        @Transactional(propagation = Propagation.REQUIRED)
185        public String storeNewInstance(JobInstance theInstance) {
186                Validate.isTrue(isBlank(theInstance.getInstanceId()));
187
188                invokePreStorageBatchHooks(theInstance);
189
190                Batch2JobInstanceEntity entity = new Batch2JobInstanceEntity();
191                entity.setId(UUID.randomUUID().toString());
192                entity.setDefinitionId(theInstance.getJobDefinitionId());
193                entity.setDefinitionVersion(theInstance.getJobDefinitionVersion());
194                entity.setStatus(theInstance.getStatus());
195                entity.setParams(theInstance.getParameters());
196                entity.setCurrentGatedStepId(theInstance.getCurrentGatedStepId());
197                entity.setFastTracking(theInstance.isFastTracking());
198                entity.setCreateTime(new Date());
199                entity.setStartTime(new Date());
200                entity.setReport(theInstance.getReport());
201                entity.setTriggeringUsername(theInstance.getTriggeringUsername());
202                entity.setTriggeringClientId(theInstance.getTriggeringClientId());
203
204                entity = myJobInstanceRepository.save(entity);
205                return entity.getId();
206        }
207
208        @Override
209        @Transactional(propagation = Propagation.REQUIRES_NEW)
210        public List<JobInstance> fetchInstances(
211                        String theJobDefinitionId, Set<StatusEnum> theStatuses, Date theCutoff, Pageable thePageable) {
212                return toInstanceList(myJobInstanceRepository.findInstancesByJobIdAndStatusAndExpiry(
213                                theJobDefinitionId, theStatuses, theCutoff, thePageable));
214        }
215
216        @Override
217        @Transactional(propagation = Propagation.REQUIRES_NEW)
218        public List<JobInstance> fetchInstancesByJobDefinitionIdAndStatus(
219                        String theJobDefinitionId, Set<StatusEnum> theRequestedStatuses, int thePageSize, int thePageIndex) {
220                PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME);
221                return toInstanceList(myJobInstanceRepository.fetchInstancesByJobDefinitionIdAndStatus(
222                                theJobDefinitionId, theRequestedStatuses, pageRequest));
223        }
224
225        @Override
226        @Transactional(propagation = Propagation.REQUIRES_NEW)
227        public List<JobInstance> fetchInstancesByJobDefinitionId(
228                        String theJobDefinitionId, int thePageSize, int thePageIndex) {
229                PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME);
230                return toInstanceList(myJobInstanceRepository.findInstancesByJobDefinitionId(theJobDefinitionId, pageRequest));
231        }
232
233        @Override
234        @Transactional(propagation = Propagation.REQUIRES_NEW)
235        public Page<JobInstance> fetchJobInstances(JobInstanceFetchRequest theRequest) {
236                PageRequest pageRequest =
237                                PageRequest.of(theRequest.getPageStart(), theRequest.getBatchSize(), theRequest.getSort());
238
239                String jobStatus = theRequest.getJobStatus();
240                if (Objects.equals(jobStatus, "")) {
241                        Page<Batch2JobInstanceEntity> pageOfEntities = myJobInstanceRepository.findAll(pageRequest);
242                        return pageOfEntities.map(this::toInstance);
243                }
244
245                StatusEnum status = StatusEnum.valueOf(jobStatus);
246                List<JobInstance> jobs = toInstanceList(myJobInstanceRepository.findInstancesByJobStatus(status, pageRequest));
247                Integer jobsOfStatus = myJobInstanceRepository.findTotalJobsOfStatus(status);
248                return new PageImpl<>(jobs, pageRequest, jobsOfStatus);
249        }
250
251        private List<JobInstance> toInstanceList(List<Batch2JobInstanceEntity> theInstancesByJobDefinitionId) {
252                return theInstancesByJobDefinitionId.stream().map(this::toInstance).collect(Collectors.toList());
253        }
254
255        @Override
256        @Nonnull
257        public Optional<JobInstance> fetchInstance(String theInstanceId) {
258                return myTransactionService
259                                .withSystemRequestOnDefaultPartition()
260                                .execute(() -> myJobInstanceRepository.findById(theInstanceId).map(this::toInstance));
261        }
262
263        @Nonnull
264        @Override
265        public List<BatchWorkChunkStatusDTO> fetchWorkChunkStatusForInstance(String theInstanceId) {
266                return myTransactionService
267                                .withSystemRequestOnDefaultPartition()
268                                .execute(() -> myWorkChunkRepository.fetchWorkChunkStatusForInstance(theInstanceId));
269        }
270
271        @Nonnull
272        @Override
273        public BatchInstanceStatusDTO fetchBatchInstanceStatus(String theInstanceId) {
274                return myTransactionService
275                                .withSystemRequestOnDefaultPartition()
276                                .execute(() -> myJobInstanceRepository.fetchBatchInstanceStatus(theInstanceId));
277        }
278
279        @Override
280        @Transactional(propagation = Propagation.REQUIRES_NEW)
281        public List<JobInstance> fetchInstances(FetchJobInstancesRequest theRequest, int thePage, int theBatchSize) {
282                String definitionId = theRequest.getJobDefinition();
283                String params = theRequest.getParameters();
284                Set<StatusEnum> statuses = theRequest.getStatuses();
285
286                Pageable pageable = PageRequest.of(thePage, theBatchSize);
287
288                List<Batch2JobInstanceEntity> instanceEntities;
289
290                if (statuses != null && !statuses.isEmpty()) {
291                        if (Batch2JobDefinitionConstants.BULK_EXPORT.equals(definitionId)) {
292                                if (originalRequestUrlTruncation(params) != null) {
293                                        params = originalRequestUrlTruncation(params);
294                                }
295                        }
296                        instanceEntities = myJobInstanceRepository.findInstancesByJobIdParamsAndStatus(
297                                        definitionId, params, statuses, pageable);
298                } else {
299                        instanceEntities = myJobInstanceRepository.findInstancesByJobIdAndParams(definitionId, params, pageable);
300                }
301                return toInstanceList(instanceEntities);
302        }
303
304        private String originalRequestUrlTruncation(String theParams) {
305                try {
306                        ObjectMapper mapper = new ObjectMapper();
307                        mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
308                        mapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
309                        JsonNode rootNode = mapper.readTree(theParams);
310                        String originalUrl = "originalRequestUrl";
311
312                        if (rootNode instanceof ObjectNode) {
313                                ObjectNode objectNode = (ObjectNode) rootNode;
314
315                                if (objectNode.has(originalUrl)) {
316                                        String url = objectNode.get(originalUrl).asText();
317                                        if (url.contains("?")) {
318                                                objectNode.put(originalUrl, url.split("\\?")[0]);
319                                        }
320                                }
321                                return mapper.writeValueAsString(objectNode);
322                        }
323                } catch (Exception e) {
324                        ourLog.info("Error Truncating Original Request Url", e);
325                }
326                return null;
327        }
328
329        @Override
330        @Transactional(propagation = Propagation.REQUIRES_NEW)
331        public List<JobInstance> fetchInstances(int thePageSize, int thePageIndex) {
332                // default sort is myCreateTime Asc
333                PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME);
334                return myTransactionService
335                                .withSystemRequestOnDefaultPartition()
336                                .execute(() -> myJobInstanceRepository.findAll(pageRequest).stream()
337                                                .map(this::toInstance)
338                                                .collect(Collectors.toList()));
339        }
340
341        @Override
342        public void enqueueWorkChunkForProcessing(String theChunkId, Consumer<Integer> theCallback) {
343                int updated = myWorkChunkRepository.updateChunkStatus(
344                                theChunkId, WorkChunkStatusEnum.READY, WorkChunkStatusEnum.QUEUED);
345                theCallback.accept(updated);
346        }
347
348        @Override
349        public int updatePollWaitingChunksForJobIfReady(String theInstanceId) {
350                return myWorkChunkRepository.updateWorkChunksForPollWaiting(
351                                theInstanceId,
352                                Date.from(Instant.now()),
353                                Set.of(WorkChunkStatusEnum.POLL_WAITING),
354                                WorkChunkStatusEnum.READY);
355        }
356
357        @Override
358        @Transactional(propagation = Propagation.REQUIRES_NEW)
359        public List<JobInstance> fetchRecentInstances(int thePageSize, int thePageIndex) {
360                PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.DESC, CREATE_TIME);
361                return myTransactionService
362                                .withSystemRequestOnDefaultPartition()
363                                .execute(() -> myJobInstanceRepository.findAll(pageRequest).stream()
364                                                .map(this::toInstance)
365                                                .collect(Collectors.toList()));
366        }
367
368        private WorkChunk toChunk(Batch2WorkChunkEntity theEntity) {
369                return JobInstanceUtil.fromEntityToWorkChunk(theEntity);
370        }
371
372        private JobInstance toInstance(Batch2JobInstanceEntity theEntity) {
373                return JobInstanceUtil.fromEntityToInstance(theEntity);
374        }
375
376        @Override
377        public WorkChunkStatusEnum onWorkChunkError(WorkChunkErrorEvent theParameters) {
378                String chunkId = theParameters.getChunkId();
379                String errorMessage = truncateErrorMessage(theParameters.getErrorMsg());
380
381                return myTransactionService.withSystemRequestOnDefaultPartition().execute(() -> {
382                        int changeCount = myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError(
383                                        chunkId, new Date(), errorMessage, WorkChunkStatusEnum.ERRORED);
384                        Validate.isTrue(changeCount > 0, "changed chunk matching %s", chunkId);
385
386                        Query query = myEntityManager.createQuery("update Batch2WorkChunkEntity " + "set myStatus = :failed "
387                                        + ",myErrorMessage = CONCAT('Too many errors: ', CAST(myErrorCount as string), '. Last error msg was ', myErrorMessage) "
388                                        + "where myId = :chunkId and myErrorCount > :maxCount");
389                        query.setParameter("chunkId", chunkId);
390                        query.setParameter("failed", WorkChunkStatusEnum.FAILED);
391                        query.setParameter("maxCount", MAX_CHUNK_ERROR_COUNT);
392                        int failChangeCount = query.executeUpdate();
393
394                        if (failChangeCount > 0) {
395                                return WorkChunkStatusEnum.FAILED;
396                        } else {
397                                return WorkChunkStatusEnum.ERRORED;
398                        }
399                });
400        }
401
402        @Override
403        public void onWorkChunkPollDelay(String theChunkId, Date theDeadline) {
404                int updated = myWorkChunkRepository.updateWorkChunkNextPollTime(
405                                theChunkId, WorkChunkStatusEnum.POLL_WAITING, Set.of(WorkChunkStatusEnum.IN_PROGRESS), theDeadline);
406
407                if (updated != 1) {
408                        ourLog.warn("Expected to update 1 work chunk's poll delay; but found {}", updated);
409                }
410        }
411
412        @Override
413        public void onWorkChunkFailed(String theChunkId, String theErrorMessage) {
414                ourLog.info("Marking chunk {} as failed with message: {}", theChunkId, theErrorMessage);
415                String errorMessage = truncateErrorMessage(theErrorMessage);
416                myTransactionService
417                                .withSystemRequestOnDefaultPartition()
418                                .execute(() -> myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError(
419                                                theChunkId, new Date(), errorMessage, WorkChunkStatusEnum.FAILED));
420        }
421
422        @Override
423        public void onWorkChunkCompletion(WorkChunkCompletionEvent theEvent) {
424                myTransactionService
425                                .withSystemRequestOnDefaultPartition()
426                                .execute(() -> myWorkChunkRepository.updateChunkStatusAndClearDataForEndSuccess(
427                                                theEvent.getChunkId(),
428                                                new Date(),
429                                                theEvent.getRecordsProcessed(),
430                                                theEvent.getRecoveredErrorCount(),
431                                                WorkChunkStatusEnum.COMPLETED,
432                                                theEvent.getRecoveredWarningMessage()));
433        }
434
435        @Nullable
436        private static String truncateErrorMessage(String theErrorMessage) {
437                String errorMessage;
438                if (theErrorMessage != null && theErrorMessage.length() > ERROR_MSG_MAX_LENGTH) {
439                        ourLog.warn("Truncating error message that is too long to store in database: {}", theErrorMessage);
440                        errorMessage = theErrorMessage.substring(0, ERROR_MSG_MAX_LENGTH);
441                } else {
442                        errorMessage = theErrorMessage;
443                }
444                return errorMessage;
445        }
446
447        @Override
448        public void markWorkChunksWithStatusAndWipeData(
449                        String theInstanceId, List<String> theChunkIds, WorkChunkStatusEnum theStatus, String theErrorMessage) {
450                assert TransactionSynchronizationManager.isActualTransactionActive();
451
452                ourLog.debug("Marking all chunks for instance {} to status {}", theInstanceId, theStatus);
453                String errorMessage = truncateErrorMessage(theErrorMessage);
454                List<List<String>> listOfListOfIds = ListUtils.partition(theChunkIds, 100);
455                for (List<String> idList : listOfListOfIds) {
456                        myWorkChunkRepository.updateAllChunksForInstanceStatusClearDataAndSetError(
457                                        idList, new Date(), theStatus, errorMessage);
458                }
459        }
460
461        @Override
462        public Set<WorkChunkStatusEnum> getDistinctWorkChunkStatesForJobAndStep(
463                        String theInstanceId, String theCurrentStepId) {
464                if (getRunningJob(theInstanceId) == null) {
465                        return Collections.unmodifiableSet(new HashSet<>());
466                }
467                return myWorkChunkRepository.getDistinctStatusesForStep(theInstanceId, theCurrentStepId);
468        }
469
470        private Batch2JobInstanceEntity getRunningJob(String theInstanceId) {
471                Optional<Batch2JobInstanceEntity> instance = myJobInstanceRepository.findById(theInstanceId);
472                if (instance.isEmpty()) {
473                        return null;
474                }
475                if (instance.get().getStatus().isEnded()) {
476                        return null;
477                }
478                return instance.get();
479        }
480
481        private void fetchChunks(
482                        String theInstanceId,
483                        boolean theIncludeData,
484                        int thePageSize,
485                        int thePageIndex,
486                        Consumer<WorkChunk> theConsumer) {
487                myTransactionService
488                                .withSystemRequestOnDefaultPartition()
489                                .withPropagation(Propagation.REQUIRES_NEW)
490                                .execute(() -> {
491                                        List<Batch2WorkChunkEntity> chunks;
492                                        if (theIncludeData) {
493                                                chunks = myWorkChunkRepository.fetchChunks(
494                                                                PageRequest.of(thePageIndex, thePageSize), theInstanceId);
495                                        } else {
496                                                chunks = myWorkChunkRepository.fetchChunksNoData(
497                                                                PageRequest.of(thePageIndex, thePageSize), theInstanceId);
498                                        }
499                                        for (Batch2WorkChunkEntity chunk : chunks) {
500                                                theConsumer.accept(toChunk(chunk));
501                                        }
502                                });
503        }
504
505        @Override
506        public void updateInstanceUpdateTime(String theInstanceId) {
507                myJobInstanceRepository.updateInstanceUpdateTime(theInstanceId, new Date());
508        }
509
510        @Override
511        public WorkChunk createWorkChunk(WorkChunk theWorkChunk) {
512                if (theWorkChunk.getId() == null) {
513                        theWorkChunk.setId(UUID.randomUUID().toString());
514                }
515                return toChunk(myWorkChunkRepository.save(Batch2WorkChunkEntity.fromWorkChunk(theWorkChunk)));
516        }
517
518        /**
519         * Note: Not @Transactional because the transaction happens in a lambda that's called outside of this method's scope
520         */
521        @Override
522        public Iterator<WorkChunk> fetchAllWorkChunksIterator(String theInstanceId, boolean theWithData) {
523                return new PagingIterator<>((thePageIndex, theBatchSize, theConsumer) ->
524                                fetchChunks(theInstanceId, theWithData, theBatchSize, thePageIndex, theConsumer));
525        }
526
527        @Override
528        public Stream<WorkChunk> fetchAllWorkChunksForStepStream(String theInstanceId, String theStepId) {
529                return myWorkChunkRepository
530                                .fetchChunksForStep(theInstanceId, theStepId)
531                                .map(this::toChunk);
532        }
533
534        @Override
535        public Page<WorkChunkMetadata> fetchAllWorkChunkMetadataForJobInStates(
536                        Pageable thePageable, String theInstanceId, Set<WorkChunkStatusEnum> theStates) {
537                Page<Batch2WorkChunkMetadataView> page =
538                                myWorkChunkMetadataViewRepo.fetchWorkChunkMetadataForJobInStates(thePageable, theInstanceId, theStates);
539
540                return page.map(Batch2WorkChunkMetadataView::toChunkMetadata);
541        }
542
543        @Override
544        public boolean updateInstance(String theInstanceId, JobInstanceUpdateCallback theModifier) {
545                Batch2JobInstanceEntity instanceEntity =
546                                myEntityManager.find(Batch2JobInstanceEntity.class, theInstanceId, LockModeType.PESSIMISTIC_WRITE);
547                if (null == instanceEntity) {
548                        ourLog.error("No instance found with Id {}", theInstanceId);
549                        return false;
550                }
551                // convert to JobInstance for public api
552                JobInstance jobInstance = JobInstanceUtil.fromEntityToInstance(instanceEntity);
553
554                // run the modification callback
555                boolean wasModified = theModifier.doUpdate(jobInstance);
556
557                if (wasModified) {
558                        // copy fields back for flush.
559                        JobInstanceUtil.fromInstanceToEntity(jobInstance, instanceEntity);
560                }
561
562                return wasModified;
563        }
564
565        @Override
566        @Transactional(propagation = Propagation.REQUIRES_NEW)
567        public void deleteInstanceAndChunks(String theInstanceId) {
568                ourLog.info("Deleting instance and chunks: {}", theInstanceId);
569                myWorkChunkRepository.deleteAllForInstance(theInstanceId);
570                myJobInstanceRepository.deleteById(theInstanceId);
571        }
572
573        @Override
574        @Transactional(propagation = Propagation.REQUIRES_NEW)
575        public void deleteChunksAndMarkInstanceAsChunksPurged(String theInstanceId) {
576                ourLog.info("Deleting all chunks for instance ID: {}", theInstanceId);
577                int updateCount = myJobInstanceRepository.updateWorkChunksPurgedTrue(theInstanceId);
578                int deleteCount = myWorkChunkRepository.deleteAllForInstance(theInstanceId);
579                ourLog.debug("Purged {} chunks, and updated {} instance.", deleteCount, updateCount);
580        }
581
582        @Override
583        public boolean markInstanceAsStatusWhenStatusIn(
584                        String theInstanceId, StatusEnum theStatusEnum, Set<StatusEnum> thePriorStates) {
585                int recordsChanged =
586                                myJobInstanceRepository.updateInstanceStatusIfIn(theInstanceId, theStatusEnum, thePriorStates);
587                ourLog.debug(
588                                "Update job {} to status {} if in status {}: {}",
589                                theInstanceId,
590                                theStatusEnum,
591                                thePriorStates,
592                                recordsChanged > 0);
593                return recordsChanged > 0;
594        }
595
596        @Override
597        @Transactional(propagation = Propagation.REQUIRES_NEW)
598        public JobOperationResultJson cancelInstance(String theInstanceId) {
599                int recordsChanged = myJobInstanceRepository.updateInstanceCancelled(theInstanceId, true);
600                String operationString = "Cancel job instance " + theInstanceId;
601
602                // wipmb For 6.8 - This is too detailed to be down here - this should be up at the api layer.
603                // Replace with boolean result or ResourceNotFound exception.  Build the message up at the ui.
604                String messagePrefix = "Job instance <" + theInstanceId + ">";
605                if (recordsChanged > 0) {
606                        return JobOperationResultJson.newSuccess(operationString, messagePrefix + " successfully cancelled.");
607                } else {
608                        Optional<JobInstance> instance = fetchInstance(theInstanceId);
609                        if (instance.isPresent()) {
610                                return JobOperationResultJson.newFailure(
611                                                operationString, messagePrefix + " was already cancelled.  Nothing to do.");
612                        } else {
613                                return JobOperationResultJson.newFailure(operationString, messagePrefix + " not found.");
614                        }
615                }
616        }
617
618        private void invokePreStorageBatchHooks(JobInstance theJobInstance) {
619                if (myInterceptorBroadcaster.hasHooks(Pointcut.STORAGE_PRESTORAGE_BATCH_JOB_CREATE)) {
620                        HookParams params = new HookParams()
621                                        .add(JobInstance.class, theJobInstance)
622                                        .add(RequestDetails.class, new SystemRequestDetails());
623
624                        myInterceptorBroadcaster.callHooks(Pointcut.STORAGE_PRESTORAGE_BATCH_JOB_CREATE, params);
625                }
626        }
627
628        @Override
629        @Transactional(propagation = Propagation.REQUIRES_NEW)
630        public boolean advanceJobStepAndUpdateChunkStatus(
631                        String theJobInstanceId, String theNextStepId, boolean theIsReductionStep) {
632                boolean changed = updateInstance(theJobInstanceId, instance -> {
633                        if (instance.getCurrentGatedStepId().equals(theNextStepId)) {
634                                // someone else beat us here.  No changes
635                                return false;
636                        }
637                        ourLog.debug("Moving gated instance {} to the next step {}.", theJobInstanceId, theNextStepId);
638                        instance.setCurrentGatedStepId(theNextStepId);
639                        return true;
640                });
641
642                if (changed) {
643                        ourLog.debug(
644                                        "Updating chunk status from GATE_WAITING to READY for gated instance {} in step {}.",
645                                        theJobInstanceId,
646                                        theNextStepId);
647                        WorkChunkStatusEnum nextStep =
648                                        theIsReductionStep ? WorkChunkStatusEnum.REDUCTION_READY : WorkChunkStatusEnum.READY;
649                        // when we reach here, the current step id is equal to theNextStepId
650                        // Up to 7.1, gated jobs' work chunks are created in status QUEUED but not actually queued for the
651                        // workers.
652                        // In order to keep them compatible, turn QUEUED chunks into READY, too.
653                        // TODO: 'QUEUED' from the IN clause will be removed after 7.6.0.
654                        int numChanged = myWorkChunkRepository.updateAllChunksForStepWithStatus(
655                                        theJobInstanceId,
656                                        theNextStepId,
657                                        List.of(WorkChunkStatusEnum.GATE_WAITING, WorkChunkStatusEnum.QUEUED),
658                                        nextStep);
659                        ourLog.debug(
660                                        "Updated {} chunks of gated instance {} for step {} from fake QUEUED to READY.",
661                                        numChanged,
662                                        theJobInstanceId,
663                                        theNextStepId);
664                }
665
666                return changed;
667        }
668}