001/*-
002 * #%L
003 * HAPI FHIR JPA Server
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.dao.data;
021
022import ca.uhn.fhir.batch2.model.BatchWorkChunkStatusDTO;
023import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
024import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
025import org.springframework.data.domain.Pageable;
026import org.springframework.data.jpa.repository.JpaRepository;
027import org.springframework.data.jpa.repository.Modifying;
028import org.springframework.data.jpa.repository.Query;
029import org.springframework.data.repository.query.Param;
030
031import java.util.Collection;
032import java.util.Date;
033import java.util.List;
034import java.util.Set;
035import java.util.stream.Stream;
036
037public interface IBatch2WorkChunkRepository
038                extends JpaRepository<Batch2WorkChunkEntity, String>, IHapiFhirJpaRepository {
039
040        // NOTE we need a stable sort so paging is reliable.
041        // Warning: mySequence is not unique - it is reset for every chunk.  So we also sort by myId.
042        @Query(
043                        "SELECT e FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId ORDER BY e.mySequence ASC, e.myId ASC")
044        List<Batch2WorkChunkEntity> fetchChunks(Pageable thePageRequest, @Param("instanceId") String theInstanceId);
045
046        /**
047         * A projection query to avoid fetching the CLOB over the wire.
048         * Otherwise, the same as fetchChunks.
049         */
050        @Query("SELECT new Batch2WorkChunkEntity("
051                        + "e.myId, e.mySequence, e.myJobDefinitionId, e.myJobDefinitionVersion, e.myInstanceId, e.myTargetStepId, e.myStatus,"
052                        + "e.myCreateTime, e.myStartTime, e.myUpdateTime, e.myEndTime,"
053                        + "e.myErrorMessage, e.myErrorCount, e.myRecordsProcessed, e.myWarningMessage,"
054                        + "e.myNextPollTime, e.myPollAttempts"
055                        + ") FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId ORDER BY e.mySequence ASC, e.myId ASC")
056        List<Batch2WorkChunkEntity> fetchChunksNoData(Pageable thePageRequest, @Param("instanceId") String theInstanceId);
057
058        @Query(
059                        "SELECT DISTINCT e.myStatus from Batch2WorkChunkEntity e where e.myInstanceId = :instanceId AND e.myTargetStepId = :stepId")
060        Set<WorkChunkStatusEnum> getDistinctStatusesForStep(
061                        @Param("instanceId") String theInstanceId, @Param("stepId") String theStepId);
062
063        @Query(
064                        "SELECT e FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId AND e.myTargetStepId = :targetStepId ORDER BY e.mySequence ASC")
065        Stream<Batch2WorkChunkEntity> fetchChunksForStep(
066                        @Param("instanceId") String theInstanceId, @Param("targetStepId") String theTargetStepId);
067
068        @Modifying
069        @Query("UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myEndTime = :et, "
070                        + "e.myRecordsProcessed = :rp, e.myErrorCount = e.myErrorCount + :errorRetries, e.mySerializedData = null, e.mySerializedDataVc = null, "
071                        + "e.myWarningMessage = :warningMessage WHERE e.myId = :id")
072        void updateChunkStatusAndClearDataForEndSuccess(
073                        @Param("id") String theChunkId,
074                        @Param("et") Date theEndTime,
075                        @Param("rp") int theRecordsProcessed,
076                        @Param("errorRetries") int theErrorRetries,
077                        @Param("status") WorkChunkStatusEnum theInProgress,
078                        @Param("warningMessage") String theWarningMessage);
079
080        @Modifying
081        @Query(
082                        "UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myNextPollTime = :nextPollTime, e.myPollAttempts = COALESCE(e.myPollAttempts, 0) + 1 WHERE e.myId = :id AND e.myStatus IN(:states)")
083        int updateWorkChunkNextPollTime(
084                        @Param("id") String theChunkId,
085                        @Param("status") WorkChunkStatusEnum theStatus,
086                        @Param("states") Set<WorkChunkStatusEnum> theInitialStates,
087                        @Param("nextPollTime") Date theNextPollTime);
088
089        @Modifying
090        @Query(
091                        "UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myNextPollTime = null WHERE e.myInstanceId = :instanceId AND e.myStatus IN(:states) AND e.myNextPollTime <= :pollTime")
092        int updateWorkChunksForPollWaiting(
093                        @Param("instanceId") String theInstanceId,
094                        @Param("pollTime") Date theTime,
095                        @Param("states") Set<WorkChunkStatusEnum> theInitialStates,
096                        @Param("status") WorkChunkStatusEnum theNewStatus);
097
098        @Modifying
099        @Query(
100                        "UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myEndTime = :et, e.mySerializedData = null, e.mySerializedDataVc = null, e.myErrorMessage = :em WHERE e.myId IN(:ids)")
101        void updateAllChunksForInstanceStatusClearDataAndSetError(
102                        @Param("ids") List<String> theChunkIds,
103                        @Param("et") Date theEndTime,
104                        @Param("status") WorkChunkStatusEnum theInProgress,
105                        @Param("em") String theError);
106
107        @Modifying
108        @Query(
109                        "UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myEndTime = :et, e.myErrorMessage = :em, e.myErrorCount = e.myErrorCount + 1 WHERE e.myId = :id")
110        int updateChunkStatusAndIncrementErrorCountForEndError(
111                        @Param("id") String theChunkId,
112                        @Param("et") Date theEndTime,
113                        @Param("em") String theErrorMessage,
114                        @Param("status") WorkChunkStatusEnum theInProgress);
115
116        @Modifying
117        @Query(
118                        "UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myStartTime = :st WHERE e.myId = :id AND e.myStatus IN :startStatuses")
119        int updateChunkStatusForStart(
120                        @Param("id") String theChunkId,
121                        @Param("st") Date theStartedTime,
122                        @Param("status") WorkChunkStatusEnum theInProgress,
123                        @Param("startStatuses") Collection<WorkChunkStatusEnum> theStartStatuses);
124
125        @Modifying
126        @Query("UPDATE Batch2WorkChunkEntity e SET e.myStatus = :newStatus WHERE e.myId = :id AND e.myStatus = :oldStatus")
127        int updateChunkStatus(
128                        @Param("id") String theChunkId,
129                        @Param("oldStatus") WorkChunkStatusEnum theOldStatus,
130                        @Param("newStatus") WorkChunkStatusEnum theNewStatus);
131
132        @Modifying
133        @Query(
134                        "UPDATE Batch2WorkChunkEntity e SET e.myStatus = :newStatus WHERE e.myInstanceId = :instanceId AND e.myTargetStepId = :stepId AND e.myStatus IN ( :oldStatuses )")
135        int updateAllChunksForStepWithStatus(
136                        @Param("instanceId") String theInstanceId,
137                        @Param("stepId") String theStepId,
138                        @Param("oldStatuses") List<WorkChunkStatusEnum> theOldStatuses,
139                        @Param("newStatus") WorkChunkStatusEnum theNewStatus);
140
141        @Modifying
142        @Query("DELETE FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId")
143        int deleteAllForInstance(@Param("instanceId") String theInstanceId);
144
145        @Query(
146                        "SELECT e.myId from Batch2WorkChunkEntity e where e.myInstanceId = :instanceId AND e.myTargetStepId = :stepId AND e.myStatus = :status")
147        List<String> fetchAllChunkIdsForStepWithStatus(
148                        @Param("instanceId") String theInstanceId,
149                        @Param("stepId") String theStepId,
150                        @Param("status") WorkChunkStatusEnum theStatus);
151
152        @Query(
153                        "SELECT new ca.uhn.fhir.batch2.model.BatchWorkChunkStatusDTO(e.myTargetStepId, e.myStatus, min(e.myStartTime), max(e.myEndTime), avg(e.myEndTime - e.myStartTime), count(*)) FROM Batch2WorkChunkEntity e WHERE e.myInstanceId=:instanceId GROUP BY e.myTargetStepId, e.myStatus")
154        List<BatchWorkChunkStatusDTO> fetchWorkChunkStatusForInstance(@Param("instanceId") String theInstanceId);
155}