001/*-
002 * #%L
003 * HAPI FHIR JPA Server
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.bulk.export.svc;
021
022import ca.uhn.fhir.batch2.api.IJobPersistence;
023import ca.uhn.fhir.batch2.model.JobInstance;
024import ca.uhn.fhir.batch2.model.StatusEnum;
025import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
026import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
027import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
028import ca.uhn.fhir.jpa.api.model.BulkExportJobResults;
029import ca.uhn.fhir.jpa.bulk.export.api.IBulkDataExportJobSchedulingHelper;
030import ca.uhn.fhir.jpa.model.sched.HapiJob;
031import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs;
032import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
033import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
034import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
035import ca.uhn.fhir.util.Batch2JobDefinitionConstants;
036import ca.uhn.fhir.util.JsonUtil;
037import jakarta.annotation.Nonnull;
038import jakarta.annotation.PostConstruct;
039import org.apache.commons.lang3.StringUtils;
040import org.apache.commons.lang3.time.DateUtils;
041import org.hl7.fhir.instance.model.api.IBaseBinary;
042import org.hl7.fhir.instance.model.api.IIdType;
043import org.hl7.fhir.r4.model.Binary;
044import org.quartz.JobExecutionContext;
045import org.slf4j.Logger;
046import org.springframework.beans.factory.annotation.Autowired;
047import org.springframework.data.domain.PageRequest;
048import org.springframework.transaction.PlatformTransactionManager;
049import org.springframework.transaction.annotation.Propagation;
050import org.springframework.transaction.annotation.Transactional;
051import org.springframework.transaction.support.TransactionTemplate;
052
053import java.time.LocalDateTime;
054import java.time.ZoneId;
055import java.util.Date;
056import java.util.List;
057import java.util.Map;
058import java.util.Optional;
059
060import static org.slf4j.LoggerFactory.getLogger;
061
062public class BulkDataExportJobSchedulingHelperImpl implements IBulkDataExportJobSchedulingHelper, IHasScheduledJobs {
063        private static final Logger ourLog = getLogger(BulkDataExportJobSchedulingHelperImpl.class);
064
065        private final DaoRegistry myDaoRegistry;
066
067        private final PlatformTransactionManager myTxManager;
068        private final JpaStorageSettings myDaoConfig;
069        private final BulkExportHelperService myBulkExportHelperSvc;
070        private final IJobPersistence myJpaJobPersistence;
071        private TransactionTemplate myTxTemplate;
072
073        public BulkDataExportJobSchedulingHelperImpl(
074                        DaoRegistry theDaoRegistry,
075                        PlatformTransactionManager theTxManager,
076                        JpaStorageSettings theDaoConfig,
077                        BulkExportHelperService theBulkExportHelperSvc,
078                        IJobPersistence theJpaJobPersistence,
079                        TransactionTemplate theTxTemplate) {
080                myDaoRegistry = theDaoRegistry;
081                myTxManager = theTxManager;
082                myDaoConfig = theDaoConfig;
083                myBulkExportHelperSvc = theBulkExportHelperSvc;
084                myJpaJobPersistence = theJpaJobPersistence;
085                myTxTemplate = theTxTemplate;
086        }
087
088        @PostConstruct
089        public void start() {
090                myTxTemplate = new TransactionTemplate(myTxManager);
091        }
092
093        @Override
094        public void scheduleJobs(ISchedulerService theSchedulerService) {
095                // job to cleanup unneeded BulkExportJobEntities that are persisted, but unwanted
096                ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
097                jobDetail.setId(PurgeExpiredFilesJob.class.getName());
098                jobDetail.setJobClass(PurgeExpiredFilesJob.class);
099                theSchedulerService.scheduleClusteredJob(DateUtils.MILLIS_PER_HOUR, jobDetail);
100        }
101
102        @Override
103        @Transactional(propagation = Propagation.NEVER)
104        public synchronized void cancelAndPurgeAllJobs() {
105                // This is called by unit test code that also calls ExpungeEverythingService,
106                // which explicitly deletes both Batch2WorkChunkEntity and Batch2JobInstanceEntity, as well as ResourceTable, in
107                // which Binary's are stored
108                // Long story short, this method no longer needs to do anything
109        }
110
111        /**
112         * This method is called by the scheduler to run a pass of the
113         * generator
114         */
115        @Transactional(propagation = Propagation.NEVER)
116        @Override
117        public void purgeExpiredFiles() {
118                if (!myDaoConfig.isEnableTaskBulkExportJobExecution()) {
119                        ourLog.debug("bulk export disabled:  doing nothing");
120                        return;
121                }
122
123                final List<JobInstance> jobInstancesToDelete = myTxTemplate.execute(t -> myJpaJobPersistence.fetchInstances(
124                                Batch2JobDefinitionConstants.BULK_EXPORT,
125                                StatusEnum.getEndedStatuses(),
126                                computeCutoffFromConfig(),
127                                PageRequest.of(0, 50)));
128
129                if (jobInstancesToDelete == null || jobInstancesToDelete.isEmpty()) {
130                        ourLog.debug("No batch 2 bulk export jobs found!  Nothing to do!");
131                        ourLog.info("Finished bulk export job deletion with nothing to do");
132                        return;
133                }
134
135                for (JobInstance jobInstance : jobInstancesToDelete) {
136                        ourLog.info("Deleting batch 2 bulk export job: {}", jobInstance);
137
138                        myTxTemplate.execute(t -> {
139                                final Optional<JobInstance> optJobInstanceForInstanceId =
140                                                myJpaJobPersistence.fetchInstance(jobInstance.getInstanceId());
141
142                                if (optJobInstanceForInstanceId.isEmpty()) {
143                                        ourLog.error(
144                                                        "Can't find job instance for ID: {} despite having retrieved it in the first step",
145                                                        jobInstance.getInstanceId());
146                                        return null;
147                                }
148
149                                final JobInstance jobInstanceForInstanceId = optJobInstanceForInstanceId.get();
150                                ourLog.info("Deleting bulk export job: {}", jobInstanceForInstanceId);
151
152                                // We need to keep these for investigation but we also need a process to manually delete these jobs once
153                                // we're done investigating
154                                if (StatusEnum.FAILED == jobInstanceForInstanceId.getStatus()) {
155                                        ourLog.info("skipping because the status is FAILED for ID: {}"
156                                                        + jobInstanceForInstanceId.getInstanceId());
157                                        return null;
158                                }
159
160                                purgeBinariesIfNeeded(jobInstanceForInstanceId, jobInstanceForInstanceId.getReport());
161
162                                final String batch2BulkExportJobInstanceId = jobInstanceForInstanceId.getInstanceId();
163                                ourLog.debug("*** About to delete batch 2 bulk export job with ID {}", batch2BulkExportJobInstanceId);
164
165                                myJpaJobPersistence.deleteInstanceAndChunks(batch2BulkExportJobInstanceId);
166
167                                ourLog.info("Finished deleting bulk export job: {}", jobInstance.getInstanceId());
168
169                                return null;
170                        });
171
172                        ourLog.info("Finished deleting bulk export jobs");
173                }
174        }
175
176        private void purgeBinariesIfNeeded(JobInstance theJobInstanceForInstanceId, String theJobInstanceReportString) {
177                final Optional<BulkExportJobResults> optBulkExportJobResults =
178                                getBulkExportJobResults(theJobInstanceReportString);
179
180                if (optBulkExportJobResults.isPresent()) {
181                        final BulkExportJobResults bulkExportJobResults = optBulkExportJobResults.get();
182                        ourLog.debug(
183                                        "job: {} resource type to binary ID: {}",
184                                        theJobInstanceForInstanceId.getInstanceId(),
185                                        bulkExportJobResults.getResourceTypeToBinaryIds());
186
187                        final Map<String, List<String>> resourceTypeToBinaryIds = bulkExportJobResults.getResourceTypeToBinaryIds();
188                        for (String resourceType : resourceTypeToBinaryIds.keySet()) {
189                                final List<String> binaryIds = resourceTypeToBinaryIds.get(resourceType);
190                                for (String binaryId : binaryIds) {
191                                        ourLog.info("Purging batch 2 bulk export binary: {}", binaryId);
192                                        IIdType id = myBulkExportHelperSvc.toId(binaryId);
193                                        getBinaryDao().delete(id, new SystemRequestDetails());
194                                }
195                        }
196                } // else we can't know what the binary IDs are, so delete this job and move on
197        }
198
199        @SuppressWarnings("unchecked")
200        private IFhirResourceDao<IBaseBinary> getBinaryDao() {
201                return myDaoRegistry.getResourceDao(Binary.class.getSimpleName());
202        }
203
204        @Nonnull
205        private Optional<BulkExportJobResults> getBulkExportJobResults(String theJobInstanceReportString) {
206                if (StringUtils.isBlank(theJobInstanceReportString)) {
207                        ourLog.error(String.format(
208                                        "Cannot parse job report string because it's null or blank: %s", theJobInstanceReportString));
209                        return Optional.empty();
210                }
211
212                try {
213                        return Optional.of(JsonUtil.deserialize(theJobInstanceReportString, BulkExportJobResults.class));
214                } catch (Exception theException) {
215                        ourLog.error(String.format("Cannot parse job report string: %s", theJobInstanceReportString), theException);
216                        return Optional.empty();
217                }
218        }
219
220        @Nonnull
221        private Date computeCutoffFromConfig() {
222                final int bulkExportFileRetentionPeriodHours = myDaoConfig.getBulkExportFileRetentionPeriodHours();
223
224                final LocalDateTime cutoffLocalDateTime = LocalDateTime.now().minusHours(bulkExportFileRetentionPeriodHours);
225
226                return Date.from(cutoffLocalDateTime.atZone(ZoneId.systemDefault()).toInstant());
227        }
228
229        public static class PurgeExpiredFilesJob implements HapiJob {
230                @Autowired
231                private IBulkDataExportJobSchedulingHelper myTarget;
232
233                @Override
234                public void execute(JobExecutionContext theContext) {
235                        myTarget.purgeExpiredFiles();
236                }
237        }
238}