
001package ca.uhn.fhir.jpa.batch.reader; 002 003/*- 004 * #%L 005 * HAPI FHIR JPA Server 006 * %% 007 * Copyright (C) 2014 - 2022 Smile CDR, Inc. 008 * %% 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 * #L% 021 */ 022 023import ca.uhn.fhir.context.FhirContext; 024import ca.uhn.fhir.interceptor.model.RequestPartitionId; 025import ca.uhn.fhir.jpa.api.config.DaoConfig; 026import ca.uhn.fhir.jpa.batch.CommonBatchJobConfig; 027import ca.uhn.fhir.jpa.batch.config.BatchConstants; 028import ca.uhn.fhir.jpa.batch.job.MultiUrlJobParameterValidator; 029import ca.uhn.fhir.jpa.batch.job.model.PartitionedUrl; 030import ca.uhn.fhir.jpa.batch.job.model.RequestListJson; 031import ca.uhn.fhir.jpa.searchparam.MatchUrlService; 032import ca.uhn.fhir.jpa.searchparam.ResourceSearch; 033import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 034import ca.uhn.fhir.rest.api.Constants; 035import ca.uhn.fhir.rest.api.SortOrderEnum; 036import ca.uhn.fhir.rest.api.SortSpec; 037import ca.uhn.fhir.rest.param.DateRangeParam; 038import ca.uhn.fhir.rest.param.ParamPrefixEnum; 039import org.apache.commons.lang3.time.DateUtils; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042import org.springframework.batch.core.JobParameter; 043import org.springframework.batch.core.JobParameters; 044import org.springframework.batch.item.ExecutionContext; 045import org.springframework.batch.item.ItemReader; 046import org.springframework.batch.item.ItemStream; 047import org.springframework.batch.item.ItemStreamException; 048import org.springframework.beans.factory.annotation.Autowired; 049import org.springframework.beans.factory.annotation.Value; 050 051import javax.annotation.Nonnull; 052import java.util.ArrayList; 053import java.util.Date; 054import java.util.HashMap; 055import java.util.HashSet; 056import java.util.List; 057import java.util.Map; 058import java.util.Set; 059import java.util.function.Function; 060 061/** 062 * This Spring Batch reader takes 4 parameters: 063 * {@link BatchConstants#JOB_PARAM_REQUEST_LIST}: A list of URLs to search for along with the partitions those searches should be performed on 064 * {@link BatchConstants#JOB_PARAM_BATCH_SIZE}: The number of resources to return with each search. If ommitted, {@link DaoConfig#getExpungeBatchSize} will be used. 065 * {@link BatchConstants#JOB_PARAM_START_TIME}: The latest timestamp of entities to search for 066 * <p> 067 * The reader will return at most {@link BatchConstants#JOB_PARAM_BATCH_SIZE} pids every time it is called, or null 068 * once no more matching entities are available. It returns the resources in reverse chronological order 069 * and stores where it's at in the Spring Batch execution context with the key {@link BatchConstants#CURRENT_THRESHOLD_HIGH} 070 * appended with "." and the index number of the url list item it has gotten up to. This is to permit 071 * restarting jobs that use this reader so it can pick up where it left off. 072 */ 073public abstract class BaseReverseCronologicalBatchPidReader implements ItemReader<List<Long>>, ItemStream { 074 private static final Logger ourLog = LoggerFactory.getLogger(ReverseCronologicalBatchResourcePidReader.class); 075 private final BatchDateThresholdUpdater myBatchDateThresholdUpdater = new BatchDateThresholdUpdater(); 076 private final Map<Integer, Date> myThresholdHighByUrlIndex = new HashMap<>(); 077 private final Map<Integer, Set<Long>> myAlreadyProcessedPidsWithHighDate = new HashMap<>(); 078 @Autowired 079 private FhirContext myFhirContext; 080 @Autowired 081 private MatchUrlService myMatchUrlService; 082 private List<PartitionedUrl> myPartitionedUrls; 083 private Integer myBatchSize; 084 private int myUrlIndex = 0; 085 private Date myStartTime; 086 087 private static String highKey(int theIndex) { 088 return BatchConstants.CURRENT_THRESHOLD_HIGH + "." + theIndex; 089 } 090 091 @Nonnull 092 public static JobParameters buildJobParameters(String theOperationName, Integer theBatchSize, RequestListJson theRequestListJson) { 093 Map<String, JobParameter> map = new HashMap<>(); 094 map.put(MultiUrlJobParameterValidator.JOB_PARAM_OPERATION_NAME, new JobParameter(theOperationName)); 095 map.put(BatchConstants.JOB_PARAM_REQUEST_LIST, new JobParameter(theRequestListJson.toJson())); 096 map.put(BatchConstants.JOB_PARAM_START_TIME, new JobParameter(DateUtils.addMinutes(new Date(), CommonBatchJobConfig.MINUTES_IN_FUTURE_TO_PROCESS_FROM))); 097 if (theBatchSize != null) { 098 map.put(BatchConstants.JOB_PARAM_BATCH_SIZE, new JobParameter(theBatchSize.longValue())); 099 } 100 JobParameters parameters = new JobParameters(map); 101 return parameters; 102 } 103 104 @Autowired 105 public void setRequestListJson(@Value("#{jobParameters['" + BatchConstants.JOB_PARAM_REQUEST_LIST + "']}") String theRequestListJson) { 106 RequestListJson requestListJson = RequestListJson.fromJson(theRequestListJson); 107 myPartitionedUrls = requestListJson.getPartitionedUrls(); 108 } 109 110 @Autowired 111 public void setStartTime(@Value("#{jobParameters['" + BatchConstants.JOB_PARAM_START_TIME + "']}") Date theStartTime) { 112 myStartTime = theStartTime; 113 } 114 115 @Override 116 public List<Long> read() throws Exception { 117 while (myUrlIndex < myPartitionedUrls.size()) { 118 List<Long> nextBatch = getNextBatch(); 119 if (nextBatch.isEmpty()) { 120 ++myUrlIndex; 121 continue; 122 } 123 124 return nextBatch; 125 } 126 return null; 127 } 128 129 protected List<Long> getNextBatch() { 130 RequestPartitionId requestPartitionId = myPartitionedUrls.get(myUrlIndex).getRequestPartitionId(); 131 ResourceSearch resourceSearch = myMatchUrlService.getResourceSearch(myPartitionedUrls.get(myUrlIndex).getUrl(), requestPartitionId); 132 myAlreadyProcessedPidsWithHighDate.putIfAbsent(myUrlIndex, new HashSet<>()); 133 Set<Long> newPids = getNextPidBatch(resourceSearch); 134 135 if (ourLog.isDebugEnabled()) { 136 ourLog.debug("Search for {}{} returned {} results", resourceSearch.getResourceName(), resourceSearch.getSearchParameterMap().toNormalizedQueryString(myFhirContext), newPids.size()); 137 ourLog.debug("Results: {}", newPids); 138 } 139 140 setDateFromPidFunction(resourceSearch); 141 142 List<Long> retval = new ArrayList<>(newPids); 143 Date newThreshold = myBatchDateThresholdUpdater.updateThresholdAndCache(getCurrentHighThreshold(), myAlreadyProcessedPidsWithHighDate.get(myUrlIndex), retval); 144 myThresholdHighByUrlIndex.put(myUrlIndex, newThreshold); 145 146 return retval; 147 } 148 149 protected Date getCurrentHighThreshold() { 150 return myThresholdHighByUrlIndex.get(myUrlIndex); 151 } 152 153 protected void setDateExtractorFunction(Function<Long, Date> theDateExtractorFunction) { 154 myBatchDateThresholdUpdater.setDateFromPid(theDateExtractorFunction); 155 } 156 157 protected void addDateCountAndSortToSearch(ResourceSearch resourceSearch) { 158 SearchParameterMap map = resourceSearch.getSearchParameterMap(); 159 DateRangeParam rangeParam = getDateRangeParam(resourceSearch); 160 map.setLastUpdated(rangeParam); 161 map.setLoadSynchronousUpTo(myBatchSize); 162 map.setSort(new SortSpec(Constants.PARAM_LASTUPDATED, SortOrderEnum.DESC)); 163 } 164 165 /** 166 * Evaluates the passed in {@link ResourceSearch} to see if it contains a non-null {@link DateRangeParam}. 167 * 168 * If one such {@link DateRangeParam} exists, we use that to determine the upper and lower bounds for the returned 169 * {@link DateRangeParam}. The {@link DateRangeParam#getUpperBound()} is compared to the 170 * {@link BaseReverseCronologicalBatchPidReader#getCurrentHighThreshold()}, and the lower of the two date values 171 * is used. 172 * 173 * If no {@link DateRangeParam} is set, we use the local {@link BaseReverseCronologicalBatchPidReader#getCurrentHighThreshold()} 174 * to create a {@link DateRangeParam}. 175 * @param resourceSearch The {@link ResourceSearch} to check. 176 * @return {@link DateRangeParam} 177 */ 178 private DateRangeParam getDateRangeParam(ResourceSearch resourceSearch) { 179 DateRangeParam rangeParam = resourceSearch.getSearchParameterMap().getLastUpdated(); 180 if (rangeParam != null) { 181 if (rangeParam.getUpperBound() == null) { 182 rangeParam.setUpperBoundInclusive(getCurrentHighThreshold()); 183 } else { 184 Date theUpperBound = (getCurrentHighThreshold() == null || rangeParam.getUpperBound().getValue().before(getCurrentHighThreshold())) 185 ? rangeParam.getUpperBound().getValue() : getCurrentHighThreshold(); 186 rangeParam.setUpperBoundInclusive(theUpperBound); 187 } 188 } else { 189 rangeParam = new DateRangeParam().setUpperBoundInclusive(getCurrentHighThreshold()); 190 } 191 return rangeParam; 192 } 193 194 @Override 195 public void open(ExecutionContext executionContext) throws ItemStreamException { 196 if (executionContext.containsKey(BatchConstants.CURRENT_URL_INDEX)) { 197 myUrlIndex = new Long(executionContext.getLong(BatchConstants.CURRENT_URL_INDEX)).intValue(); 198 } 199 for (int index = 0; index < myPartitionedUrls.size(); ++index) { 200 String key = highKey(index); 201 if (executionContext.containsKey(key)) { 202 myThresholdHighByUrlIndex.put(index, new Date(executionContext.getLong(key))); 203 } else { 204 myThresholdHighByUrlIndex.put(index, myStartTime); 205 } 206 } 207 } 208 209 @Override 210 public void update(ExecutionContext executionContext) throws ItemStreamException { 211 executionContext.putLong(BatchConstants.CURRENT_URL_INDEX, myUrlIndex); 212 for (int index = 0; index < myPartitionedUrls.size(); ++index) { 213 Date date = myThresholdHighByUrlIndex.get(index); 214 if (date != null) { 215 executionContext.putLong(highKey(index), date.getTime()); 216 } 217 } 218 } 219 220 @Override 221 public void close() throws ItemStreamException { 222 } 223 224 protected Integer getBatchSize() { 225 return myBatchSize; 226 } 227 228 @Autowired 229 public void setBatchSize(@Value("#{jobParameters['" + BatchConstants.JOB_PARAM_BATCH_SIZE + "']}") Integer theBatchSize) { 230 myBatchSize = theBatchSize; 231 } 232 233 protected Set<Long> getAlreadySeenPids() { 234 return myAlreadyProcessedPidsWithHighDate.get(myUrlIndex); 235 } 236 237 protected abstract Set<Long> getNextPidBatch(ResourceSearch resourceSearch); 238 239 protected abstract void setDateFromPidFunction(ResourceSearch resourceSearch); 240}