001package ca.uhn.fhir.jpa.batch.reader;
002
003/*-
004 * #%L
005 * HAPI FHIR JPA Server
006 * %%
007 * Copyright (C) 2014 - 2022 Smile CDR, Inc.
008 * %%
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *      http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 * #L%
021 */
022
023import ca.uhn.fhir.context.FhirContext;
024import ca.uhn.fhir.interceptor.model.RequestPartitionId;
025import ca.uhn.fhir.jpa.api.config.DaoConfig;
026import ca.uhn.fhir.jpa.batch.CommonBatchJobConfig;
027import ca.uhn.fhir.jpa.batch.config.BatchConstants;
028import ca.uhn.fhir.jpa.batch.job.MultiUrlJobParameterValidator;
029import ca.uhn.fhir.jpa.batch.job.model.PartitionedUrl;
030import ca.uhn.fhir.jpa.batch.job.model.RequestListJson;
031import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
032import ca.uhn.fhir.jpa.searchparam.ResourceSearch;
033import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
034import ca.uhn.fhir.rest.api.Constants;
035import ca.uhn.fhir.rest.api.SortOrderEnum;
036import ca.uhn.fhir.rest.api.SortSpec;
037import ca.uhn.fhir.rest.param.DateRangeParam;
038import ca.uhn.fhir.rest.param.ParamPrefixEnum;
039import org.apache.commons.lang3.time.DateUtils;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042import org.springframework.batch.core.JobParameter;
043import org.springframework.batch.core.JobParameters;
044import org.springframework.batch.item.ExecutionContext;
045import org.springframework.batch.item.ItemReader;
046import org.springframework.batch.item.ItemStream;
047import org.springframework.batch.item.ItemStreamException;
048import org.springframework.beans.factory.annotation.Autowired;
049import org.springframework.beans.factory.annotation.Value;
050
051import javax.annotation.Nonnull;
052import java.util.ArrayList;
053import java.util.Date;
054import java.util.HashMap;
055import java.util.HashSet;
056import java.util.List;
057import java.util.Map;
058import java.util.Set;
059import java.util.function.Function;
060
061/**
062 * This Spring Batch reader takes 4 parameters:
063 * {@link BatchConstants#JOB_PARAM_REQUEST_LIST}: A list of URLs to search for along with the partitions those searches should be performed on
064 * {@link BatchConstants#JOB_PARAM_BATCH_SIZE}: The number of resources to return with each search.  If ommitted, {@link DaoConfig#getExpungeBatchSize} will be used.
065 * {@link BatchConstants#JOB_PARAM_START_TIME}: The latest timestamp of entities to search for
066 * <p>
067 * The reader will return at most {@link BatchConstants#JOB_PARAM_BATCH_SIZE} pids every time it is called, or null
068 * once no more matching entities are available.  It returns the resources in reverse chronological order
069 * and stores where it's at in the Spring Batch execution context with the key {@link BatchConstants#CURRENT_THRESHOLD_HIGH}
070 * appended with "." and the index number of the url list item it has gotten up to.  This is to permit
071 * restarting jobs that use this reader so it can pick up where it left off.
072 */
073public abstract class BaseReverseCronologicalBatchPidReader implements ItemReader<List<Long>>, ItemStream {
074        private static final Logger ourLog = LoggerFactory.getLogger(ReverseCronologicalBatchResourcePidReader.class);
075        private final BatchDateThresholdUpdater myBatchDateThresholdUpdater = new BatchDateThresholdUpdater();
076        private final Map<Integer, Date> myThresholdHighByUrlIndex = new HashMap<>();
077        private final Map<Integer, Set<Long>> myAlreadyProcessedPidsWithHighDate = new HashMap<>();
078        @Autowired
079        private FhirContext myFhirContext;
080        @Autowired
081        private MatchUrlService myMatchUrlService;
082        private List<PartitionedUrl> myPartitionedUrls;
083        private Integer myBatchSize;
084        private int myUrlIndex = 0;
085        private Date myStartTime;
086
087        private static String highKey(int theIndex) {
088                return BatchConstants.CURRENT_THRESHOLD_HIGH + "." + theIndex;
089        }
090
091        @Nonnull
092        public static JobParameters buildJobParameters(String theOperationName, Integer theBatchSize, RequestListJson theRequestListJson) {
093                Map<String, JobParameter> map = new HashMap<>();
094                map.put(MultiUrlJobParameterValidator.JOB_PARAM_OPERATION_NAME, new JobParameter(theOperationName));
095                map.put(BatchConstants.JOB_PARAM_REQUEST_LIST, new JobParameter(theRequestListJson.toJson()));
096                map.put(BatchConstants.JOB_PARAM_START_TIME, new JobParameter(DateUtils.addMinutes(new Date(), CommonBatchJobConfig.MINUTES_IN_FUTURE_TO_PROCESS_FROM)));
097                if (theBatchSize != null) {
098                        map.put(BatchConstants.JOB_PARAM_BATCH_SIZE, new JobParameter(theBatchSize.longValue()));
099                }
100                JobParameters parameters = new JobParameters(map);
101                return parameters;
102        }
103
104        @Autowired
105        public void setRequestListJson(@Value("#{jobParameters['" + BatchConstants.JOB_PARAM_REQUEST_LIST + "']}") String theRequestListJson) {
106                RequestListJson requestListJson = RequestListJson.fromJson(theRequestListJson);
107                myPartitionedUrls = requestListJson.getPartitionedUrls();
108        }
109
110        @Autowired
111        public void setStartTime(@Value("#{jobParameters['" + BatchConstants.JOB_PARAM_START_TIME + "']}") Date theStartTime) {
112                myStartTime = theStartTime;
113        }
114
115        @Override
116        public List<Long> read() throws Exception {
117                while (myUrlIndex < myPartitionedUrls.size()) {
118                        List<Long> nextBatch = getNextBatch();
119                        if (nextBatch.isEmpty()) {
120                                ++myUrlIndex;
121                                continue;
122                        }
123
124                        return nextBatch;
125                }
126                return null;
127        }
128
129        protected List<Long> getNextBatch() {
130                RequestPartitionId requestPartitionId = myPartitionedUrls.get(myUrlIndex).getRequestPartitionId();
131                ResourceSearch resourceSearch = myMatchUrlService.getResourceSearch(myPartitionedUrls.get(myUrlIndex).getUrl(), requestPartitionId);
132                myAlreadyProcessedPidsWithHighDate.putIfAbsent(myUrlIndex, new HashSet<>());
133                Set<Long> newPids = getNextPidBatch(resourceSearch);
134
135                if (ourLog.isDebugEnabled()) {
136                        ourLog.debug("Search for {}{} returned {} results", resourceSearch.getResourceName(), resourceSearch.getSearchParameterMap().toNormalizedQueryString(myFhirContext), newPids.size());
137                        ourLog.debug("Results: {}", newPids);
138                }
139
140                setDateFromPidFunction(resourceSearch);
141
142                List<Long> retval = new ArrayList<>(newPids);
143                Date newThreshold = myBatchDateThresholdUpdater.updateThresholdAndCache(getCurrentHighThreshold(), myAlreadyProcessedPidsWithHighDate.get(myUrlIndex), retval);
144                myThresholdHighByUrlIndex.put(myUrlIndex, newThreshold);
145
146                return retval;
147        }
148
149        protected Date getCurrentHighThreshold() {
150                return myThresholdHighByUrlIndex.get(myUrlIndex);
151        }
152
153        protected void setDateExtractorFunction(Function<Long, Date> theDateExtractorFunction) {
154                myBatchDateThresholdUpdater.setDateFromPid(theDateExtractorFunction);
155        }
156
157        protected void addDateCountAndSortToSearch(ResourceSearch resourceSearch) {
158                SearchParameterMap map = resourceSearch.getSearchParameterMap();
159                DateRangeParam rangeParam = getDateRangeParam(resourceSearch);
160                map.setLastUpdated(rangeParam);
161                map.setLoadSynchronousUpTo(myBatchSize);
162                map.setSort(new SortSpec(Constants.PARAM_LASTUPDATED, SortOrderEnum.DESC));
163        }
164
165        /**
166         * Evaluates the passed in {@link ResourceSearch} to see if it contains a non-null {@link DateRangeParam}.
167         *
168         * If one such {@link DateRangeParam} exists, we use that to determine the upper and lower bounds for the returned
169         * {@link DateRangeParam}. The {@link DateRangeParam#getUpperBound()} is compared to the
170         * {@link BaseReverseCronologicalBatchPidReader#getCurrentHighThreshold()}, and the lower of the two date values
171         * is used.
172         *
173         * If no {@link DateRangeParam} is set, we use the local {@link BaseReverseCronologicalBatchPidReader#getCurrentHighThreshold()}
174         * to create a {@link DateRangeParam}.
175         * @param resourceSearch The {@link ResourceSearch} to check.
176         * @return {@link DateRangeParam}
177         */
178        private DateRangeParam getDateRangeParam(ResourceSearch resourceSearch) {
179                DateRangeParam rangeParam = resourceSearch.getSearchParameterMap().getLastUpdated();
180                if (rangeParam != null) {
181                        if (rangeParam.getUpperBound() == null) {
182                                rangeParam.setUpperBoundInclusive(getCurrentHighThreshold());
183                        } else {
184                                Date theUpperBound = (getCurrentHighThreshold() == null || rangeParam.getUpperBound().getValue().before(getCurrentHighThreshold()))
185                                        ? rangeParam.getUpperBound().getValue() : getCurrentHighThreshold();
186                                rangeParam.setUpperBoundInclusive(theUpperBound);
187                        }
188                } else {
189                        rangeParam = new DateRangeParam().setUpperBoundInclusive(getCurrentHighThreshold());
190                }
191                return rangeParam;
192        }
193
194        @Override
195        public void open(ExecutionContext executionContext) throws ItemStreamException {
196                if (executionContext.containsKey(BatchConstants.CURRENT_URL_INDEX)) {
197                        myUrlIndex = new Long(executionContext.getLong(BatchConstants.CURRENT_URL_INDEX)).intValue();
198                }
199                for (int index = 0; index < myPartitionedUrls.size(); ++index) {
200                        String key = highKey(index);
201                        if (executionContext.containsKey(key)) {
202                                myThresholdHighByUrlIndex.put(index, new Date(executionContext.getLong(key)));
203                        } else {
204                                myThresholdHighByUrlIndex.put(index, myStartTime);
205                        }
206                }
207        }
208
209        @Override
210        public void update(ExecutionContext executionContext) throws ItemStreamException {
211                executionContext.putLong(BatchConstants.CURRENT_URL_INDEX, myUrlIndex);
212                for (int index = 0; index < myPartitionedUrls.size(); ++index) {
213                        Date date = myThresholdHighByUrlIndex.get(index);
214                        if (date != null) {
215                                executionContext.putLong(highKey(index), date.getTime());
216                        }
217                }
218        }
219
220        @Override
221        public void close() throws ItemStreamException {
222        }
223
224        protected Integer getBatchSize() {
225                return myBatchSize;
226        }
227
228        @Autowired
229        public void setBatchSize(@Value("#{jobParameters['" + BatchConstants.JOB_PARAM_BATCH_SIZE + "']}") Integer theBatchSize) {
230                myBatchSize = theBatchSize;
231        }
232
233        protected Set<Long> getAlreadySeenPids() {
234                return myAlreadyProcessedPidsWithHighDate.get(myUrlIndex);
235        }
236
237        protected abstract Set<Long> getNextPidBatch(ResourceSearch resourceSearch);
238
239        protected abstract void setDateFromPidFunction(ResourceSearch resourceSearch);
240}