001/*-
002 * #%L
003 * HAPI FHIR JPA Server
004 * %%
005 * Copyright (C) 2014 - 2025 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.dao.data;
021
022import ca.uhn.fhir.batch2.model.BatchWorkChunkStatusDTO;
023import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
024import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
025import org.springframework.data.domain.Pageable;
026import org.springframework.data.jpa.repository.JpaRepository;
027import org.springframework.data.jpa.repository.Modifying;
028import org.springframework.data.jpa.repository.Query;
029import org.springframework.data.repository.query.Param;
030
031import java.util.Collection;
032import java.util.Date;
033import java.util.List;
034import java.util.Set;
035import java.util.stream.Stream;
036
037public interface IBatch2WorkChunkRepository
038                extends JpaRepository<Batch2WorkChunkEntity, String>, IHapiFhirJpaRepository {
039
040        // NOTE we need a stable sort so paging is reliable.
041        // Warning: mySequence is not unique - it is reset for every chunk.  So we also sort by myId.
042        @Query(
043                        "SELECT e FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId ORDER BY e.mySequence ASC, e.myId ASC")
044        List<Batch2WorkChunkEntity> fetchChunks(Pageable thePageRequest, @Param("instanceId") String theInstanceId);
045
046        /**
047         * A projection query to avoid fetching the CLOB over the wire.
048         * Otherwise, the same as fetchChunks.
049         */
050        @Query("SELECT new Batch2WorkChunkEntity("
051                        + "e.myId, e.mySequence, e.myJobDefinitionId, e.myJobDefinitionVersion, e.myInstanceId, e.myTargetStepId, e.myStatus,"
052                        + "e.myCreateTime, e.myStartTime, e.myUpdateTime, e.myEndTime,"
053                        + "e.myErrorMessage, e.myErrorCount, e.myRecordsProcessed, e.myWarningMessage,"
054                        + "e.myNextPollTime, e.myPollAttempts"
055                        + ") FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId ORDER BY e.mySequence ASC, e.myId ASC")
056        List<Batch2WorkChunkEntity> fetchChunksNoData(Pageable thePageRequest, @Param("instanceId") String theInstanceId);
057
058        @Query(
059                        "SELECT DISTINCT e.myStatus from Batch2WorkChunkEntity e where e.myInstanceId = :instanceId AND e.myTargetStepId = :stepId")
060        Set<WorkChunkStatusEnum> getDistinctStatusesForStep(
061                        @Param("instanceId") String theInstanceId, @Param("stepId") String theStepId);
062
063        @Query(
064                        "SELECT e FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId AND e.myTargetStepId = :targetStepId ORDER BY e.mySequence ASC")
065        Stream<Batch2WorkChunkEntity> fetchChunksForStep(
066                        @Param("instanceId") String theInstanceId, @Param("targetStepId") String theTargetStepId);
067
068        @Modifying
069        @Query("UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myEndTime = :et, "
070                        + "e.myRecordsProcessed = :rp, e.myErrorCount = e.myErrorCount + :errorRetries, e.mySerializedData = null, e.mySerializedDataVc = null, "
071                        + "e.myWarningMessage = :warningMessage WHERE e.myId = :id")
072        void updateChunkStatusAndClearDataForEndSuccess(
073                        @Param("id") String theChunkId,
074                        @Param("et") Date theEndTime,
075                        @Param("rp") int theRecordsProcessed,
076                        @Param("errorRetries") int theErrorRetries,
077                        @Param("status") WorkChunkStatusEnum theInProgress,
078                        @Param("warningMessage") String theWarningMessage);
079
080        @Modifying
081        @Query(
082                        "UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myNextPollTime = :nextPollTime, e.myPollAttempts = COALESCE(e.myPollAttempts, 0) + 1 WHERE e.myId = :id AND e.myStatus IN(:states)")
083        int updateWorkChunkNextPollTime(
084                        @Param("id") String theChunkId,
085                        @Param("status") WorkChunkStatusEnum theStatus,
086                        @Param("states") Set<WorkChunkStatusEnum> theInitialStates,
087                        @Param("nextPollTime") Date theNextPollTime);
088
089        @Modifying
090        @Query(
091                        "UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myNextPollTime = null WHERE e.myInstanceId = :instanceId AND e.myStatus IN(:states) AND e.myNextPollTime <= :pollTime")
092        int updateWorkChunksForPollWaiting(
093                        @Param("instanceId") String theInstanceId,
094                        @Param("pollTime") Date theTime,
095                        @Param("states") Set<WorkChunkStatusEnum> theInitialStates,
096                        @Param("status") WorkChunkStatusEnum theNewStatus);
097
098        @Modifying
099        @Query(
100                        "UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myEndTime = :et, e.mySerializedData = null, e.mySerializedDataVc = null, e.myErrorMessage = :em WHERE e.myId IN(:ids)")
101        void updateAllChunksForInstanceStatusClearDataAndSetError(
102                        @Param("ids") List<String> theChunkIds,
103                        @Param("et") Date theEndTime,
104                        @Param("status") WorkChunkStatusEnum theInProgress,
105                        @Param("em") String theError);
106
107        @Modifying
108        @Query(
109                        "UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myEndTime = :et, e.myErrorMessage = :em, e.myErrorCount = e.myErrorCount + 1 WHERE e.myId = :id")
110        int updateChunkStatusAndIncrementErrorCountForEndError(
111                        @Param("id") String theChunkId,
112                        @Param("et") Date theEndTime,
113                        @Param("em") String theErrorMessage,
114                        @Param("status") WorkChunkStatusEnum theInProgress);
115
116        /**
117         * Updates the workchunk error count and error message for WorkChunks that have failed after multiple retries.
118         *
119         * @param theStatus - the new status of the workchunk
120         * @param theChunkId - the id of the workchunk to update
121         * @param theMaxErrorCount - maximum error count (# of errors allowed for retry)
122         * @param theMaxErrorSize - max error size (maximum number of characters)
123         * @return - the number of updated chunks (should be 1)
124         */
125        @Modifying
126        @Query("UPDATE Batch2WorkChunkEntity e "
127                        + "SET e.myStatus = :failed, "
128                        + "e.myErrorMessage = LEFT(CONCAT('Too many errors (', CAST(e.myErrorCount as string), '). Last err msg ', e.myErrorMessage), :maxErrorSize) "
129                        + "WHERE e.myId = :chunkId and e.myErrorCount > :maxCount")
130        int updateChunkForTooManyErrors(
131                        @Param("failed") WorkChunkStatusEnum theStatus,
132                        @Param("chunkId") String theChunkId,
133                        @Param("maxCount") int theMaxErrorCount,
134                        @Param("maxErrorSize") int theMaxErrorSize);
135
136        @Modifying
137        @Query(
138                        "UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myStartTime = :st WHERE e.myId = :id AND e.myStatus IN :startStatuses")
139        int updateChunkStatusForStart(
140                        @Param("id") String theChunkId,
141                        @Param("st") Date theStartedTime,
142                        @Param("status") WorkChunkStatusEnum theInProgress,
143                        @Param("startStatuses") Collection<WorkChunkStatusEnum> theStartStatuses);
144
145        @Modifying
146        @Query("UPDATE Batch2WorkChunkEntity e SET e.myStatus = :newStatus WHERE e.myId = :id AND e.myStatus = :oldStatus")
147        int updateChunkStatus(
148                        @Param("id") String theChunkId,
149                        @Param("oldStatus") WorkChunkStatusEnum theOldStatus,
150                        @Param("newStatus") WorkChunkStatusEnum theNewStatus);
151
152        @Modifying
153        @Query(
154                        "UPDATE Batch2WorkChunkEntity e SET e.myStatus = :newStatus WHERE e.myInstanceId = :instanceId AND e.myTargetStepId = :stepId AND e.myStatus IN ( :oldStatuses )")
155        int updateAllChunksForStepWithStatus(
156                        @Param("instanceId") String theInstanceId,
157                        @Param("stepId") String theStepId,
158                        @Param("oldStatuses") List<WorkChunkStatusEnum> theOldStatuses,
159                        @Param("newStatus") WorkChunkStatusEnum theNewStatus);
160
161        @Modifying
162        @Query("DELETE FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId")
163        int deleteAllForInstance(@Param("instanceId") String theInstanceId);
164
165        @Query(
166                        "SELECT e.myId from Batch2WorkChunkEntity e where e.myInstanceId = :instanceId AND e.myTargetStepId = :stepId AND e.myStatus = :status")
167        List<String> fetchAllChunkIdsForStepWithStatus(
168                        @Param("instanceId") String theInstanceId,
169                        @Param("stepId") String theStepId,
170                        @Param("status") WorkChunkStatusEnum theStatus);
171
172        @Query(
173                        "SELECT new ca.uhn.fhir.batch2.model.BatchWorkChunkStatusDTO(e.myTargetStepId, e.myStatus, min(e.myStartTime), max(e.myEndTime), avg(cast((e.myEndTime - e.myStartTime) as long)), count(*)) FROM Batch2WorkChunkEntity e WHERE e.myInstanceId=:instanceId GROUP BY e.myTargetStepId, e.myStatus")
174        List<BatchWorkChunkStatusDTO> fetchWorkChunkStatusForInstance(@Param("instanceId") String theInstanceId);
175}