001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.bulk.export.svc; 021 022import ca.uhn.fhir.batch2.api.IJobPersistence; 023import ca.uhn.fhir.batch2.model.JobInstance; 024import ca.uhn.fhir.batch2.model.StatusEnum; 025import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; 026import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 027import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 028import ca.uhn.fhir.jpa.api.model.BulkExportJobResults; 029import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper; 030import ca.uhn.fhir.jpa.model.sched.HapiJob; 031import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs; 032import ca.uhn.fhir.jpa.model.sched.ISchedulerService; 033import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; 034import ca.uhn.fhir.rest.api.server.SystemRequestDetails; 035import ca.uhn.fhir.util.Batch2JobDefinitionConstants; 036import ca.uhn.fhir.util.JsonUtil; 037import jakarta.annotation.Nonnull; 038import jakarta.annotation.PostConstruct; 039import org.apache.commons.lang3.StringUtils; 040import org.apache.commons.lang3.time.DateUtils; 041import org.hl7.fhir.instance.model.api.IBaseBinary; 042import org.hl7.fhir.instance.model.api.IIdType; 043import org.hl7.fhir.r4.model.Binary; 044import org.quartz.JobExecutionContext; 045import org.slf4j.Logger; 046import org.springframework.beans.factory.annotation.Autowired; 047import org.springframework.data.domain.PageRequest; 048import org.springframework.transaction.PlatformTransactionManager; 049import org.springframework.transaction.annotation.Propagation; 050import org.springframework.transaction.annotation.Transactional; 051import org.springframework.transaction.support.TransactionTemplate; 052 053import java.time.LocalDateTime; 054import java.time.ZoneId; 055import java.util.Date; 056import java.util.List; 057import java.util.Map; 058import java.util.Optional; 059 060import static org.slf4j.LoggerFactory.getLogger; 061 062public class BulkDataExportJobSchedulingHelperImpl implements IBulkDataExportJobSchedulingHelper, IHasScheduledJobs { 063 private static final Logger ourLog = getLogger(BulkDataExportJobSchedulingHelperImpl.class); 064 065 private final DaoRegistry myDaoRegistry; 066 067 private final PlatformTransactionManager myTxManager; 068 private final JpaStorageSettings myDaoConfig; 069 private final BulkExportHelperService myBulkExportHelperSvc; 070 private final IJobPersistence myJpaJobPersistence; 071 private TransactionTemplate myTxTemplate; 072 073 public BulkDataExportJobSchedulingHelperImpl( 074 DaoRegistry theDaoRegistry, 075 PlatformTransactionManager theTxManager, 076 JpaStorageSettings theDaoConfig, 077 BulkExportHelperService theBulkExportHelperSvc, 078 IJobPersistence theJpaJobPersistence, 079 TransactionTemplate theTxTemplate) { 080 myDaoRegistry = theDaoRegistry; 081 myTxManager = theTxManager; 082 myDaoConfig = theDaoConfig; 083 myBulkExportHelperSvc = theBulkExportHelperSvc; 084 myJpaJobPersistence = theJpaJobPersistence; 085 myTxTemplate = theTxTemplate; 086 } 087 088 @PostConstruct 089 public void start() { 090 myTxTemplate = new TransactionTemplate(myTxManager); 091 } 092 093 @Override 094 public void scheduleJobs(ISchedulerService theSchedulerService) { 095 // job to cleanup unneeded BulkExportJobEntities that are persisted, but unwanted 096 ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); 097 jobDetail.setId(PurgeExpiredFilesJob.class.getName()); 098 jobDetail.setJobClass(PurgeExpiredFilesJob.class); 099 theSchedulerService.scheduleClusteredJob(DateUtils.MILLIS_PER_HOUR, jobDetail); 100 } 101 102 @Override 103 @Transactional(propagation = Propagation.NEVER) 104 public synchronized void cancelAndPurgeAllJobs() { 105 // This is called by unit test code that also calls ExpungeEverythingService, 106 // which explicitly deletes both Batch2WorkChunkEntity and Batch2JobInstanceEntity, as well as ResourceTable, in 107 // which Binary's are stored 108 // Long story short, this method no longer needs to do anything 109 } 110 111 /** 112 * This method is called by the scheduler to run a pass of the 113 * generator 114 */ 115 @Transactional(propagation = Propagation.NEVER) 116 @Override 117 public void purgeExpiredFiles() { 118 if (!myDaoConfig.isEnableTaskBulkExportJobExecution()) { 119 ourLog.debug("bulk export disabled: doing nothing"); 120 return; 121 } 122 123 final List<JobInstance> jobInstancesToDelete = myTxTemplate.execute(t -> myJpaJobPersistence.fetchInstances( 124 Batch2JobDefinitionConstants.BULK_EXPORT, 125 StatusEnum.getEndedStatuses(), 126 computeCutoffFromConfig(), 127 PageRequest.of(0, 50))); 128 129 if (jobInstancesToDelete == null || jobInstancesToDelete.isEmpty()) { 130 ourLog.debug("No batch 2 bulk export jobs found! Nothing to do!"); 131 ourLog.info("Finished bulk export job deletion with nothing to do"); 132 return; 133 } 134 135 for (JobInstance jobInstance : jobInstancesToDelete) { 136 ourLog.info("Deleting batch 2 bulk export job: {}", jobInstance); 137 138 myTxTemplate.execute(t -> { 139 final Optional<JobInstance> optJobInstanceForInstanceId = 140 myJpaJobPersistence.fetchInstance(jobInstance.getInstanceId()); 141 142 if (optJobInstanceForInstanceId.isEmpty()) { 143 ourLog.error( 144 "Can't find job instance for ID: {} despite having retrieved it in the first step", 145 jobInstance.getInstanceId()); 146 return null; 147 } 148 149 final JobInstance jobInstanceForInstanceId = optJobInstanceForInstanceId.get(); 150 ourLog.info("Deleting bulk export job: {}", jobInstanceForInstanceId); 151 152 // We need to keep these for investigation but we also need a process to manually delete these jobs once 153 // we're done investigating 154 if (StatusEnum.FAILED == jobInstanceForInstanceId.getStatus()) { 155 ourLog.info("skipping because the status is FAILED for ID: {}" 156 + jobInstanceForInstanceId.getInstanceId()); 157 return null; 158 } 159 160 purgeBinariesIfNeeded(jobInstanceForInstanceId, jobInstanceForInstanceId.getReport()); 161 162 final String batch2BulkExportJobInstanceId = jobInstanceForInstanceId.getInstanceId(); 163 ourLog.debug("*** About to delete batch 2 bulk export job with ID {}", batch2BulkExportJobInstanceId); 164 165 myJpaJobPersistence.deleteInstanceAndChunks(batch2BulkExportJobInstanceId); 166 167 ourLog.info("Finished deleting bulk export job: {}", jobInstance.getInstanceId()); 168 169 return null; 170 }); 171 172 ourLog.info("Finished deleting bulk export jobs"); 173 } 174 } 175 176 private void purgeBinariesIfNeeded(JobInstance theJobInstanceForInstanceId, String theJobInstanceReportString) { 177 final Optional<BulkExportJobResults> optBulkExportJobResults = 178 getBulkExportJobResults(theJobInstanceReportString); 179 180 if (optBulkExportJobResults.isPresent()) { 181 final BulkExportJobResults bulkExportJobResults = optBulkExportJobResults.get(); 182 ourLog.debug( 183 "job: {} resource type to binary ID: {}", 184 theJobInstanceForInstanceId.getInstanceId(), 185 bulkExportJobResults.getResourceTypeToBinaryIds()); 186 187 final Map<String, List<String>> resourceTypeToBinaryIds = bulkExportJobResults.getResourceTypeToBinaryIds(); 188 for (String resourceType : resourceTypeToBinaryIds.keySet()) { 189 final List<String> binaryIds = resourceTypeToBinaryIds.get(resourceType); 190 for (String binaryId : binaryIds) { 191 ourLog.info("Purging batch 2 bulk export binary: {}", binaryId); 192 IIdType id = myBulkExportHelperSvc.toId(binaryId); 193 getBinaryDao().delete(id, new SystemRequestDetails()); 194 } 195 } 196 } // else we can't know what the binary IDs are, so delete this job and move on 197 } 198 199 @SuppressWarnings("unchecked") 200 private IFhirResourceDao<IBaseBinary> getBinaryDao() { 201 return myDaoRegistry.getResourceDao(Binary.class.getSimpleName()); 202 } 203 204 @Nonnull 205 private Optional<BulkExportJobResults> getBulkExportJobResults(String theJobInstanceReportString) { 206 if (StringUtils.isBlank(theJobInstanceReportString)) { 207 ourLog.error(String.format( 208 "Cannot parse job report string because it's null or blank: %s", theJobInstanceReportString)); 209 return Optional.empty(); 210 } 211 212 try { 213 return Optional.of(JsonUtil.deserialize(theJobInstanceReportString, BulkExportJobResults.class)); 214 } catch (Exception theException) { 215 ourLog.error(String.format("Cannot parse job report string: %s", theJobInstanceReportString), theException); 216 return Optional.empty(); 217 } 218 } 219 220 @Nonnull 221 private Date computeCutoffFromConfig() { 222 final int bulkExportFileRetentionPeriodHours = myDaoConfig.getBulkExportFileRetentionPeriodHours(); 223 224 final LocalDateTime cutoffLocalDateTime = LocalDateTime.now().minusHours(bulkExportFileRetentionPeriodHours); 225 226 return Date.from(cutoffLocalDateTime.atZone(ZoneId.systemDefault()).toInstant()); 227 } 228 229 public static class PurgeExpiredFilesJob implements HapiJob { 230 @Autowired 231 private IBulkDataExportJobSchedulingHelper myTarget; 232 233 @Override 234 public void execute(JobExecutionContext theContext) { 235 myTarget.purgeExpiredFiles(); 236 } 237 } 238}