001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.dao.data; 021 022import ca.uhn.fhir.batch2.model.BatchWorkChunkStatusDTO; 023import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; 024import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity; 025import org.springframework.data.domain.Pageable; 026import org.springframework.data.jpa.repository.JpaRepository; 027import org.springframework.data.jpa.repository.Modifying; 028import org.springframework.data.jpa.repository.Query; 029import org.springframework.data.repository.query.Param; 030 031import java.util.Collection; 032import java.util.Date; 033import java.util.List; 034import java.util.Set; 035import java.util.stream.Stream; 036 037public interface IBatch2WorkChunkRepository 038 extends JpaRepository<Batch2WorkChunkEntity, String>, IHapiFhirJpaRepository { 039 040 // NOTE we need a stable sort so paging is reliable. 041 // Warning: mySequence is not unique - it is reset for every chunk. So we also sort by myId. 042 @Query( 043 "SELECT e FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId ORDER BY e.mySequence ASC, e.myId ASC") 044 List<Batch2WorkChunkEntity> fetchChunks(Pageable thePageRequest, @Param("instanceId") String theInstanceId); 045 046 /** 047 * A projection query to avoid fetching the CLOB over the wire. 048 * Otherwise, the same as fetchChunks. 049 */ 050 @Query("SELECT new Batch2WorkChunkEntity(" 051 + "e.myId, e.mySequence, e.myJobDefinitionId, e.myJobDefinitionVersion, e.myInstanceId, e.myTargetStepId, e.myStatus," 052 + "e.myCreateTime, e.myStartTime, e.myUpdateTime, e.myEndTime," 053 + "e.myErrorMessage, e.myErrorCount, e.myRecordsProcessed, e.myWarningMessage," 054 + "e.myNextPollTime, e.myPollAttempts" 055 + ") FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId ORDER BY e.mySequence ASC, e.myId ASC") 056 List<Batch2WorkChunkEntity> fetchChunksNoData(Pageable thePageRequest, @Param("instanceId") String theInstanceId); 057 058 @Query( 059 "SELECT DISTINCT e.myStatus from Batch2WorkChunkEntity e where e.myInstanceId = :instanceId AND e.myTargetStepId = :stepId") 060 Set<WorkChunkStatusEnum> getDistinctStatusesForStep( 061 @Param("instanceId") String theInstanceId, @Param("stepId") String theStepId); 062 063 @Query( 064 "SELECT e FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId AND e.myTargetStepId = :targetStepId ORDER BY e.mySequence ASC") 065 Stream<Batch2WorkChunkEntity> fetchChunksForStep( 066 @Param("instanceId") String theInstanceId, @Param("targetStepId") String theTargetStepId); 067 068 @Modifying 069 @Query("UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myEndTime = :et, " 070 + "e.myRecordsProcessed = :rp, e.myErrorCount = e.myErrorCount + :errorRetries, e.mySerializedData = null, e.mySerializedDataVc = null, " 071 + "e.myWarningMessage = :warningMessage WHERE e.myId = :id") 072 void updateChunkStatusAndClearDataForEndSuccess( 073 @Param("id") String theChunkId, 074 @Param("et") Date theEndTime, 075 @Param("rp") int theRecordsProcessed, 076 @Param("errorRetries") int theErrorRetries, 077 @Param("status") WorkChunkStatusEnum theInProgress, 078 @Param("warningMessage") String theWarningMessage); 079 080 @Modifying 081 @Query( 082 "UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myNextPollTime = :nextPollTime, e.myPollAttempts = COALESCE(e.myPollAttempts, 0) + 1 WHERE e.myId = :id AND e.myStatus IN(:states)") 083 int updateWorkChunkNextPollTime( 084 @Param("id") String theChunkId, 085 @Param("status") WorkChunkStatusEnum theStatus, 086 @Param("states") Set<WorkChunkStatusEnum> theInitialStates, 087 @Param("nextPollTime") Date theNextPollTime); 088 089 @Modifying 090 @Query( 091 "UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myNextPollTime = null WHERE e.myInstanceId = :instanceId AND e.myStatus IN(:states) AND e.myNextPollTime <= :pollTime") 092 int updateWorkChunksForPollWaiting( 093 @Param("instanceId") String theInstanceId, 094 @Param("pollTime") Date theTime, 095 @Param("states") Set<WorkChunkStatusEnum> theInitialStates, 096 @Param("status") WorkChunkStatusEnum theNewStatus); 097 098 @Modifying 099 @Query( 100 "UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myEndTime = :et, e.mySerializedData = null, e.mySerializedDataVc = null, e.myErrorMessage = :em WHERE e.myId IN(:ids)") 101 void updateAllChunksForInstanceStatusClearDataAndSetError( 102 @Param("ids") List<String> theChunkIds, 103 @Param("et") Date theEndTime, 104 @Param("status") WorkChunkStatusEnum theInProgress, 105 @Param("em") String theError); 106 107 @Modifying 108 @Query( 109 "UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myEndTime = :et, e.myErrorMessage = :em, e.myErrorCount = e.myErrorCount + 1 WHERE e.myId = :id") 110 int updateChunkStatusAndIncrementErrorCountForEndError( 111 @Param("id") String theChunkId, 112 @Param("et") Date theEndTime, 113 @Param("em") String theErrorMessage, 114 @Param("status") WorkChunkStatusEnum theInProgress); 115 116 @Modifying 117 @Query( 118 "UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myStartTime = :st WHERE e.myId = :id AND e.myStatus IN :startStatuses") 119 int updateChunkStatusForStart( 120 @Param("id") String theChunkId, 121 @Param("st") Date theStartedTime, 122 @Param("status") WorkChunkStatusEnum theInProgress, 123 @Param("startStatuses") Collection<WorkChunkStatusEnum> theStartStatuses); 124 125 @Modifying 126 @Query("UPDATE Batch2WorkChunkEntity e SET e.myStatus = :newStatus WHERE e.myId = :id AND e.myStatus = :oldStatus") 127 int updateChunkStatus( 128 @Param("id") String theChunkId, 129 @Param("oldStatus") WorkChunkStatusEnum theOldStatus, 130 @Param("newStatus") WorkChunkStatusEnum theNewStatus); 131 132 @Modifying 133 @Query( 134 "UPDATE Batch2WorkChunkEntity e SET e.myStatus = :newStatus WHERE e.myInstanceId = :instanceId AND e.myTargetStepId = :stepId AND e.myStatus IN ( :oldStatuses )") 135 int updateAllChunksForStepWithStatus( 136 @Param("instanceId") String theInstanceId, 137 @Param("stepId") String theStepId, 138 @Param("oldStatuses") List<WorkChunkStatusEnum> theOldStatuses, 139 @Param("newStatus") WorkChunkStatusEnum theNewStatus); 140 141 @Modifying 142 @Query("DELETE FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId") 143 int deleteAllForInstance(@Param("instanceId") String theInstanceId); 144 145 @Query( 146 "SELECT e.myId from Batch2WorkChunkEntity e where e.myInstanceId = :instanceId AND e.myTargetStepId = :stepId AND e.myStatus = :status") 147 List<String> fetchAllChunkIdsForStepWithStatus( 148 @Param("instanceId") String theInstanceId, 149 @Param("stepId") String theStepId, 150 @Param("status") WorkChunkStatusEnum theStatus); 151 152 @Query( 153 "SELECT new ca.uhn.fhir.batch2.model.BatchWorkChunkStatusDTO(e.myTargetStepId, e.myStatus, min(e.myStartTime), max(e.myEndTime), avg(e.myEndTime - e.myStartTime), count(*)) FROM Batch2WorkChunkEntity e WHERE e.myInstanceId=:instanceId GROUP BY e.myTargetStepId, e.myStatus") 154 List<BatchWorkChunkStatusDTO> fetchWorkChunkStatusForInstance(@Param("instanceId") String theInstanceId); 155}