001package ca.uhn.fhir.jpa.batch.reader;
002
003/*-
004 * #%L
005 * HAPI FHIR JPA Server
006 * %%
007 * Copyright (C) 2014 - 2022 Smile CDR, Inc.
008 * %%
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *      http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 * #L%
021 */
022
023import ca.uhn.fhir.interceptor.model.RequestPartitionId;
024import ca.uhn.fhir.jpa.api.config.DaoConfig;
025import ca.uhn.fhir.jpa.batch.CommonBatchJobConfig;
026import ca.uhn.fhir.jpa.dao.data.IResourceTableDao;
027import ca.uhn.fhir.jpa.model.entity.ResourceTable;
028import com.fasterxml.jackson.core.JsonProcessingException;
029import org.apache.commons.lang3.time.DateUtils;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032import org.springframework.batch.core.JobParameter;
033import org.springframework.batch.core.JobParameters;
034import org.springframework.batch.item.ExecutionContext;
035import org.springframework.batch.item.ItemReader;
036import org.springframework.batch.item.ItemStream;
037import org.springframework.batch.item.ItemStreamException;
038import org.springframework.beans.factory.annotation.Autowired;
039import org.springframework.beans.factory.annotation.Value;
040import org.springframework.data.domain.PageRequest;
041import org.springframework.data.domain.Slice;
042
043import java.util.ArrayList;
044import java.util.Date;
045import java.util.HashMap;
046import java.util.HashSet;
047import java.util.List;
048import java.util.Map;
049import java.util.Set;
050
051/**
052 * This Spring Batch reader takes 3 parameters:
053 * {@link #JOB_PARAM_BATCH_SIZE}: The number of resources to return with each search.
054 * {@link #JOB_PARAM_START_TIME}: The latest timestamp of resources to search for
055 * {@link #JOB_PARAM_REQUEST_PARTITION}: (optional) The partition of resources to read
056 * <p>
057 * The reader will return at most {@link #JOB_PARAM_BATCH_SIZE} pids every time it is called, or null
058 * once no more matching resources are available.  It returns the resources in reverse chronological order
059 * appended with "." and the index number of the url list item it has gotten up to.  This is to permit
060 * restarting jobs that use this reader so it can pick up where it left off.
061 */
062public class CronologicalBatchAllResourcePidReader implements ItemReader<List<Long>>, ItemStream {
063        public static final String JOB_PARAM_BATCH_SIZE = "batch-size";
064        public static final String JOB_PARAM_START_TIME = "start-time";
065        public static final String JOB_PARAM_REQUEST_PARTITION = "request-partition";
066        public static final String CURRENT_THRESHOLD_LOW = "current.threshold-low";
067
068        private static final Logger ourLog = LoggerFactory.getLogger(CronologicalBatchAllResourcePidReader.class);
069        private static final Date BEGINNING_OF_TIME = new Date(0);
070
071        @Autowired
072        private IResourceTableDao myResourceTableDao;
073        @Autowired
074        private DaoConfig myDaoConfig;
075
076        private Integer myBatchSize;
077        private Date myThresholdLow = BEGINNING_OF_TIME;
078        private final BatchDateThresholdUpdater myBatchDateThresholdUpdater = new BatchDateThresholdUpdater(this::dateFromPid);
079        private final Set<Long> myAlreadyProcessedPidsWithLowDate = new HashSet<>();
080        private Date myStartTime;
081        private RequestPartitionId myRequestPartitionId;
082
083        @Autowired
084        public void setBatchSize(@Value("#{jobParameters['" + JOB_PARAM_BATCH_SIZE + "']}") Integer theBatchSize) {
085                myBatchSize = theBatchSize;
086        }
087
088        @Autowired
089        public void setStartTime(@Value("#{jobParameters['" + JOB_PARAM_START_TIME + "']}") Date theStartTime) {
090                myStartTime = theStartTime;
091        }
092
093        public static JobParameters buildJobParameters(Integer theBatchSize, RequestPartitionId theRequestPartitionId) {
094                Map<String, JobParameter> map = new HashMap<>();
095                map.put(CronologicalBatchAllResourcePidReader.JOB_PARAM_REQUEST_PARTITION, new JobParameter(theRequestPartitionId.toJson()));
096                map.put(CronologicalBatchAllResourcePidReader.JOB_PARAM_START_TIME, new JobParameter(DateUtils.addMinutes(new Date(), CommonBatchJobConfig.MINUTES_IN_FUTURE_TO_PROCESS_FROM)));
097                if (theBatchSize != null) {
098                        map.put(CronologicalBatchAllResourcePidReader.JOB_PARAM_BATCH_SIZE, new JobParameter(theBatchSize.longValue()));
099                }
100                JobParameters parameters = new JobParameters(map);
101                return parameters;
102        }
103
104        @Override
105        public List<Long> read() throws Exception {
106                List<Long> nextBatch = getNextBatch();
107                return nextBatch.isEmpty() ? null : nextBatch;
108        }
109
110        private Date dateFromPid(Long thePid) {
111                ResourceTable entity = myResourceTableDao.findById(thePid).orElseThrow(IllegalStateException::new);
112                return entity.getUpdatedDate();
113        }
114
115        @Override
116        public void open(ExecutionContext executionContext) throws ItemStreamException {
117                if (myBatchSize == null) {
118                        myBatchSize = myDaoConfig.getExpungeBatchSize();
119                }
120                if (executionContext.containsKey(CURRENT_THRESHOLD_LOW)) {
121                        myThresholdLow = new Date(executionContext.getLong(CURRENT_THRESHOLD_LOW));
122                }
123        }
124
125        @Override
126        public void update(ExecutionContext executionContext) throws ItemStreamException {
127                executionContext.putLong(CURRENT_THRESHOLD_LOW, myThresholdLow.getTime());
128        }
129
130        @Override
131        public void close() throws ItemStreamException {
132        }
133
134        private List<Long> getNextBatch() {
135                PageRequest page = PageRequest.of(0, myBatchSize);
136                List<Long> retval = new ArrayList<>();
137                Slice<Long> slice;
138                do {
139                        if (myRequestPartitionId == null || myRequestPartitionId.isAllPartitions()) {
140                                slice = myResourceTableDao.findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest(page, myThresholdLow, myStartTime);
141                        } else {
142                                slice = myResourceTableDao.findIdsOfPartitionedResourcesWithinUpdatedRangeOrderedFromOldest(page, myThresholdLow, myStartTime, myRequestPartitionId.getFirstPartitionIdOrNull());
143                        }
144                        retval.addAll(slice.getContent());
145                        retval.removeAll(myAlreadyProcessedPidsWithLowDate);
146                        page = page.next();
147                } while (retval.size() < myBatchSize && slice.hasNext());
148
149                if (ourLog.isDebugEnabled()) {
150                        ourLog.debug("Results: {}", retval);
151                }
152                myThresholdLow = myBatchDateThresholdUpdater.updateThresholdAndCache(myThresholdLow, myAlreadyProcessedPidsWithLowDate, retval);
153                return retval;
154        }
155
156        @Autowired
157        public void setRequestPartitionId(@Value("#{jobParameters['" + JOB_PARAM_REQUEST_PARTITION + "']}") String theRequestPartitionIdJson) throws JsonProcessingException {
158                if (theRequestPartitionIdJson == null) {
159                        return;
160                }
161                myRequestPartitionId = RequestPartitionId.fromJson(theRequestPartitionIdJson);
162        }
163}