001/*-
002 * #%L
003 * HAPI FHIR JPA Server
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.bulk.imprt.svc;
021
022import ca.uhn.fhir.batch2.api.IJobCoordinator;
023import ca.uhn.fhir.batch2.importpull.models.Batch2BulkImportPullJobParameters;
024import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
025import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
026import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc;
027import ca.uhn.fhir.jpa.bulk.imprt.model.ActivateJobResult;
028import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobFileJson;
029import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobJson;
030import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobStatusEnum;
031import ca.uhn.fhir.jpa.dao.data.IBulkImportJobDao;
032import ca.uhn.fhir.jpa.dao.data.IBulkImportJobFileDao;
033import ca.uhn.fhir.jpa.entity.BulkImportJobEntity;
034import ca.uhn.fhir.jpa.entity.BulkImportJobFileEntity;
035import ca.uhn.fhir.jpa.model.sched.HapiJob;
036import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs;
037import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
038import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
039import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
040import ca.uhn.fhir.util.Logs;
041import ca.uhn.fhir.util.ValidateUtil;
042import com.apicatalog.jsonld.StringUtils;
043import jakarta.annotation.Nonnull;
044import jakarta.annotation.PostConstruct;
045import org.apache.commons.lang3.time.DateUtils;
046import org.quartz.JobExecutionContext;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049import org.springframework.beans.factory.annotation.Autowired;
050import org.springframework.data.domain.PageRequest;
051import org.springframework.data.domain.Pageable;
052import org.springframework.data.domain.Slice;
053import org.springframework.transaction.PlatformTransactionManager;
054import org.springframework.transaction.annotation.Propagation;
055import org.springframework.transaction.annotation.Transactional;
056import org.springframework.transaction.support.TransactionTemplate;
057
058import java.util.List;
059import java.util.Objects;
060import java.util.Optional;
061import java.util.UUID;
062import java.util.concurrent.Semaphore;
063
064import static ca.uhn.fhir.batch2.jobs.importpull.BulkImportPullConfig.BULK_IMPORT_JOB_NAME;
065
066public class BulkDataImportSvcImpl implements IBulkDataImportSvc, IHasScheduledJobs {
067        private static final Logger ourLog = LoggerFactory.getLogger(BulkDataImportSvcImpl.class);
068        private final Semaphore myRunningJobSemaphore = new Semaphore(1);
069
070        @Autowired
071        private IBulkImportJobDao myJobDao;
072
073        @Autowired
074        private IBulkImportJobFileDao myJobFileDao;
075
076        @Autowired
077        private PlatformTransactionManager myTxManager;
078
079        private TransactionTemplate myTxTemplate;
080
081        @Autowired
082        private IJobCoordinator myJobCoordinator;
083
084        @Autowired
085        private JpaStorageSettings myStorageSettings;
086
087        @PostConstruct
088        public void start() {
089                myTxTemplate = new TransactionTemplate(myTxManager);
090        }
091
092        @Override
093        public void scheduleJobs(ISchedulerService theSchedulerService) {
094                // This job should be local so that each node in the cluster can pick up jobs
095                ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
096                jobDetail.setId(ActivationJob.class.getName());
097                jobDetail.setJobClass(ActivationJob.class);
098                theSchedulerService.scheduleLocalJob(10 * DateUtils.MILLIS_PER_SECOND, jobDetail);
099        }
100
101        @Override
102        @Transactional
103        public String createNewJob(
104                        BulkImportJobJson theJobDescription, @Nonnull List<BulkImportJobFileJson> theInitialFiles) {
105                ValidateUtil.isNotNullOrThrowUnprocessableEntity(theJobDescription, "Job must not be null");
106                ValidateUtil.isNotNullOrThrowUnprocessableEntity(
107                                theJobDescription.getProcessingMode(), "Job File Processing mode must not be null");
108                ValidateUtil.isTrueOrThrowInvalidRequest(
109                                theJobDescription.getBatchSize() > 0, "Job File Batch Size must be > 0");
110
111                String biJobId = UUID.randomUUID().toString();
112
113                ourLog.info(
114                                "Creating new Bulk Import job with {} files, assigning bijob ID: {}", theInitialFiles.size(), biJobId);
115
116                BulkImportJobEntity job = new BulkImportJobEntity();
117                job.setJobId(biJobId);
118                job.setFileCount(theInitialFiles.size());
119                job.setStatus(BulkImportJobStatusEnum.STAGING);
120                job.setJobDescription(theJobDescription.getJobDescription());
121                job.setBatchSize(theJobDescription.getBatchSize());
122                job.setRowProcessingMode(theJobDescription.getProcessingMode());
123                job = myJobDao.save(job);
124
125                int nextSequence = 0;
126                addFilesToJob(theInitialFiles, job, nextSequence);
127
128                return biJobId;
129        }
130
131        @Override
132        @Transactional
133        public void addFilesToJob(String theBiJobId, List<BulkImportJobFileJson> theFiles) {
134                ourLog.info("Adding {} files to bulk import job with bijob id {}", theFiles.size(), theBiJobId);
135
136                BulkImportJobEntity job = findJobByBiJobId(theBiJobId);
137
138                ValidateUtil.isTrueOrThrowInvalidRequest(
139                                job.getStatus() == BulkImportJobStatusEnum.STAGING,
140                                "bijob id %s has status %s and can not be added to",
141                                theBiJobId,
142                                job.getStatus());
143
144                addFilesToJob(theFiles, job, job.getFileCount());
145
146                job.setFileCount(job.getFileCount() + theFiles.size());
147                myJobDao.save(job);
148        }
149
150        private BulkImportJobEntity findJobByBiJobId(String theBiJobId) {
151                BulkImportJobEntity job = myJobDao.findByJobId(theBiJobId)
152                                .orElseThrow(() -> new InvalidRequestException("Unknown bijob id: " + theBiJobId));
153                return job;
154        }
155
156        @Override
157        @Transactional
158        public void markJobAsReadyForActivation(String theBiJobId) {
159                ourLog.info("Activating bulk import bijob {}", theBiJobId);
160
161                BulkImportJobEntity job = findJobByBiJobId(theBiJobId);
162                ValidateUtil.isTrueOrThrowInvalidRequest(
163                                job.getStatus() == BulkImportJobStatusEnum.STAGING,
164                                "Bulk import bijob %s can not be activated in status: %s",
165                                theBiJobId,
166                                job.getStatus());
167
168                job.setStatus(BulkImportJobStatusEnum.READY);
169                myJobDao.save(job);
170        }
171
172        /**
173         * To be called by the job scheduler
174         */
175        @Transactional(propagation = Propagation.NEVER)
176        @Override
177        public ActivateJobResult activateNextReadyJob() {
178                if (!myStorageSettings.isEnableTaskBulkImportJobExecution()) {
179                        Logs.getBatchTroubleshootingLog()
180                                        .trace("Bulk import job execution is not enabled on this server. No action taken.");
181                        return new ActivateJobResult(false, null);
182                }
183
184                if (!myRunningJobSemaphore.tryAcquire()) {
185                        Logs.getBatchTroubleshootingLog().trace("Already have a running batch job, not going to check for more");
186                        return new ActivateJobResult(false, null);
187                }
188
189                try {
190                        ActivateJobResult retval = doActivateNextReadyJob();
191                        if (!StringUtils.isBlank(retval.jobId)) {
192                                ourLog.info("Batch job submitted with batch job id {}", retval.jobId);
193                        }
194                        return retval;
195                } finally {
196                        myRunningJobSemaphore.release();
197                }
198        }
199
200        private ActivateJobResult doActivateNextReadyJob() {
201                Optional<BulkImportJobEntity> jobToProcessOpt = Objects.requireNonNull(myTxTemplate.execute(t -> {
202                        Pageable page = PageRequest.of(0, 1);
203                        Slice<BulkImportJobEntity> submittedJobs = myJobDao.findByStatus(page, BulkImportJobStatusEnum.READY);
204                        if (submittedJobs.isEmpty()) {
205                                return Optional.empty();
206                        }
207                        return Optional.of(submittedJobs.getContent().get(0));
208                }));
209
210                if (!jobToProcessOpt.isPresent()) {
211                        return new ActivateJobResult(false, null);
212                }
213
214                BulkImportJobEntity bulkImportJobEntity = jobToProcessOpt.get();
215
216                String jobUuid = bulkImportJobEntity.getJobId();
217                String biJobId = null;
218                try {
219                        biJobId = processJob(bulkImportJobEntity);
220                        // set job status to RUNNING so it would not be processed again
221                        myTxTemplate.execute(t -> {
222                                bulkImportJobEntity.setStatus(BulkImportJobStatusEnum.RUNNING);
223                                myJobDao.save(bulkImportJobEntity);
224                                return null;
225                        });
226                } catch (Exception e) {
227                        ourLog.error("Failure while preparing bulk export extract", e);
228                        myTxTemplate.execute(t -> {
229                                Optional<BulkImportJobEntity> submittedJobs = myJobDao.findByJobId(jobUuid);
230                                if (submittedJobs.isPresent()) {
231                                        BulkImportJobEntity jobEntity = submittedJobs.get();
232                                        jobEntity.setStatus(BulkImportJobStatusEnum.ERROR);
233                                        jobEntity.setStatusMessage(e.getMessage());
234                                        myJobDao.save(jobEntity);
235                                }
236                                return new ActivateJobResult(false, null);
237                        });
238                }
239
240                return new ActivateJobResult(true, biJobId);
241        }
242
243        @Override
244        @Transactional
245        public void setJobToStatus(String theBiJobId, BulkImportJobStatusEnum theStatus) {
246                setJobToStatus(theBiJobId, theStatus, null);
247        }
248
249        @Override
250        public void setJobToStatus(String theBiJobId, BulkImportJobStatusEnum theStatus, String theStatusMessage) {
251                BulkImportJobEntity job = findJobByBiJobId(theBiJobId);
252                job.setStatus(theStatus);
253                job.setStatusMessage(theStatusMessage);
254                myJobDao.save(job);
255        }
256
257        @Override
258        @Transactional
259        public BulkImportJobJson fetchJob(String theBiJobId) {
260                BulkImportJobEntity job = findJobByBiJobId(theBiJobId);
261                return job.toJson();
262        }
263
264        @Override
265        @Transactional
266        public JobInfo getJobStatus(String theBiJobId) {
267                BulkImportJobEntity theJob = findJobByBiJobId(theBiJobId);
268                return new JobInfo()
269                                .setStatus(theJob.getStatus())
270                                .setStatusMessage(theJob.getStatusMessage())
271                                .setStatusTime(theJob.getStatusTime());
272        }
273
274        @Transactional
275        @Override
276        public BulkImportJobFileJson fetchFile(String theBiJobId, int theFileIndex) {
277                BulkImportJobEntity job = findJobByBiJobId(theBiJobId);
278
279                return myJobFileDao
280                                .findForJob(job, theFileIndex)
281                                .map(t -> t.toJson())
282                                .orElseThrow(() ->
283                                                new IllegalArgumentException("Invalid index " + theFileIndex + " for bijob " + theBiJobId));
284        }
285
286        @Transactional
287        @Override
288        public String getFileDescription(String theBiJobId, int theFileIndex) {
289                BulkImportJobEntity job = findJobByBiJobId(theBiJobId);
290
291                return myJobFileDao.findFileDescriptionForJob(job, theFileIndex).orElse("");
292        }
293
294        @Override
295        @Transactional
296        public void deleteJobFiles(String theBiJobId) {
297                BulkImportJobEntity job = findJobByBiJobId(theBiJobId);
298                List<Long> files = myJobFileDao.findAllIdsForJob(theBiJobId);
299                for (Long next : files) {
300                        myJobFileDao.deleteById(next);
301                }
302                myJobDao.delete(job);
303        }
304
305        private String processJob(BulkImportJobEntity theBulkExportJobEntity) {
306                String biJobId = theBulkExportJobEntity.getJobId();
307                int batchSize = theBulkExportJobEntity.getBatchSize();
308
309                Batch2BulkImportPullJobParameters jobParameters = new Batch2BulkImportPullJobParameters();
310                jobParameters.setJobId(biJobId);
311                jobParameters.setBatchSize(batchSize);
312
313                JobInstanceStartRequest request = new JobInstanceStartRequest();
314                request.setJobDefinitionId(BULK_IMPORT_JOB_NAME);
315                request.setParameters(jobParameters);
316
317                ourLog.info("Submitting bulk import with bijob id {} to job scheduler", biJobId);
318
319                return myJobCoordinator.startInstance(request).getInstanceId();
320        }
321
322        private void addFilesToJob(
323                        @Nonnull List<BulkImportJobFileJson> theInitialFiles, BulkImportJobEntity job, int nextSequence) {
324                for (BulkImportJobFileJson nextFile : theInitialFiles) {
325                        ValidateUtil.isNotBlankOrThrowUnprocessableEntity(
326                                        nextFile.getContents(), "Job File Contents mode must not be null");
327
328                        BulkImportJobFileEntity jobFile = new BulkImportJobFileEntity();
329                        jobFile.setJob(job);
330                        jobFile.setContents(nextFile.getContents());
331                        jobFile.setTenantName(nextFile.getTenantName());
332                        jobFile.setFileDescription(nextFile.getDescription());
333                        jobFile.setFileSequence(nextSequence++);
334                        myJobFileDao.save(jobFile);
335                }
336        }
337
338        public static class ActivationJob implements HapiJob {
339                @Autowired
340                private IBulkDataImportSvc myTarget;
341
342                @Override
343                public void execute(JobExecutionContext theContext) {
344                        myTarget.activateNextReadyJob();
345                }
346        }
347}