001/*-
002 * #%L
003 * HAPI FHIR JPA Server
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.bulk.imprt.svc;
021
022import ca.uhn.fhir.batch2.api.IJobCoordinator;
023import ca.uhn.fhir.batch2.importpull.models.Batch2BulkImportPullJobParameters;
024import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
025import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
026import ca.uhn.fhir.jpa.bulk.imprt.api.IBulkDataImportSvc;
027import ca.uhn.fhir.jpa.bulk.imprt.model.ActivateJobResult;
028import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobFileJson;
029import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobJson;
030import ca.uhn.fhir.jpa.bulk.imprt.model.BulkImportJobStatusEnum;
031import ca.uhn.fhir.jpa.dao.data.IBulkImportJobDao;
032import ca.uhn.fhir.jpa.dao.data.IBulkImportJobFileDao;
033import ca.uhn.fhir.jpa.entity.BulkImportJobEntity;
034import ca.uhn.fhir.jpa.entity.BulkImportJobFileEntity;
035import ca.uhn.fhir.jpa.model.sched.HapiJob;
036import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs;
037import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
038import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
039import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
040import ca.uhn.fhir.util.Logs;
041import ca.uhn.fhir.util.ValidateUtil;
042import com.apicatalog.jsonld.StringUtils;
043import jakarta.annotation.Nonnull;
044import jakarta.annotation.PostConstruct;
045import org.apache.commons.lang3.time.DateUtils;
046import org.quartz.JobExecutionContext;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049import org.springframework.beans.factory.annotation.Autowired;
050import org.springframework.data.domain.PageRequest;
051import org.springframework.data.domain.Pageable;
052import org.springframework.data.domain.Slice;
053import org.springframework.transaction.PlatformTransactionManager;
054import org.springframework.transaction.annotation.Propagation;
055import org.springframework.transaction.annotation.Transactional;
056import org.springframework.transaction.support.TransactionTemplate;
057
058import java.util.List;
059import java.util.Objects;
060import java.util.Optional;
061import java.util.UUID;
062import java.util.concurrent.Semaphore;
063
064import static ca.uhn.fhir.batch2.jobs.importpull.BulkImportPullConfig.BULK_IMPORT_JOB_NAME;
065
066public class BulkDataImportSvcImpl implements IBulkDataImportSvc, IHasScheduledJobs {
067        private static final Logger ourLog = LoggerFactory.getLogger(BulkDataImportSvcImpl.class);
068        private final Semaphore myRunningJobSemaphore = new Semaphore(1);
069
070        @Autowired
071        private IBulkImportJobDao myJobDao;
072
073        @Autowired
074        private IBulkImportJobFileDao myJobFileDao;
075
076        @Autowired
077        private PlatformTransactionManager myTxManager;
078
079        private TransactionTemplate myTxTemplate;
080
081        @Autowired
082        private IJobCoordinator myJobCoordinator;
083
084        @Autowired
085        private JpaStorageSettings myStorageSettings;
086
087        @PostConstruct
088        public void start() {
089                myTxTemplate = new TransactionTemplate(myTxManager);
090        }
091
092        @Override
093        public void scheduleJobs(ISchedulerService theSchedulerService) {
094                // This job should be local so that each node in the cluster can pick up jobs
095                ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
096                jobDetail.setId(ActivationJob.class.getName());
097                jobDetail.setJobClass(ActivationJob.class);
098                theSchedulerService.scheduleLocalJob(10 * DateUtils.MILLIS_PER_SECOND, jobDetail);
099        }
100
101        @Override
102        @Transactional
103        public String createNewJob(
104                        BulkImportJobJson theJobDescription, @Nonnull List<BulkImportJobFileJson> theInitialFiles) {
105                ValidateUtil.isNotNullOrThrowUnprocessableEntity(theJobDescription, "Job must not be null");
106                ValidateUtil.isNotNullOrThrowUnprocessableEntity(
107                                theJobDescription.getProcessingMode(), "Job File Processing mode must not be null");
108                ValidateUtil.isTrueOrThrowInvalidRequest(
109                                theJobDescription.getBatchSize() > 0, "Job File Batch Size must be > 0");
110
111                String biJobId = UUID.randomUUID().toString();
112
113                ourLog.info(
114                                "Creating new Bulk Import job with {} files, assigning bijob ID: {}", theInitialFiles.size(), biJobId);
115
116                BulkImportJobEntity job = new BulkImportJobEntity();
117                job.setJobId(biJobId);
118                job.setFileCount(theInitialFiles.size());
119                job.setStatus(BulkImportJobStatusEnum.STAGING);
120                job.setJobDescription(theJobDescription.getJobDescription());
121                job.setBatchSize(theJobDescription.getBatchSize());
122                job.setRowProcessingMode(theJobDescription.getProcessingMode());
123                job = myJobDao.save(job);
124
125                int nextSequence = 0;
126                addFilesToJob(theInitialFiles, job, nextSequence);
127
128                return biJobId;
129        }
130
131        @Override
132        @Transactional
133        public void addFilesToJob(String theBiJobId, List<BulkImportJobFileJson> theFiles) {
134                ourLog.info("Adding {} files to bulk import job with bijob id {}", theFiles.size(), theBiJobId);
135
136                BulkImportJobEntity job = findJobByBiJobId(theBiJobId);
137
138                ValidateUtil.isTrueOrThrowInvalidRequest(
139                                job.getStatus() == BulkImportJobStatusEnum.STAGING,
140                                "bijob id %s has status %s and can not be added to",
141                                theBiJobId,
142                                job.getStatus());
143
144                addFilesToJob(theFiles, job, job.getFileCount());
145
146                job.setFileCount(job.getFileCount() + theFiles.size());
147                myJobDao.save(job);
148        }
149
150        private BulkImportJobEntity findJobByBiJobId(String theBiJobId) {
151                BulkImportJobEntity job = myJobDao.findByJobId(theBiJobId)
152                                .orElseThrow(() -> new InvalidRequestException("Unknown bijob id: " + theBiJobId));
153                return job;
154        }
155
156        @Override
157        @Transactional
158        public void markJobAsReadyForActivation(String theBiJobId) {
159                ourLog.info("Activating bulk import bijob {}", theBiJobId);
160
161                BulkImportJobEntity job = findJobByBiJobId(theBiJobId);
162                ValidateUtil.isTrueOrThrowInvalidRequest(
163                                job.getStatus() == BulkImportJobStatusEnum.STAGING,
164                                "Bulk import bijob %s can not be activated in status: %s",
165                                theBiJobId,
166                                job.getStatus());
167
168                job.setStatus(BulkImportJobStatusEnum.READY);
169                myJobDao.save(job);
170        }
171
172        /**
173         * To be called by the job scheduler
174         */
175        @Transactional(propagation = Propagation.NEVER)
176        @Override
177        public ActivateJobResult activateNextReadyJob() {
178                if (!myStorageSettings.isEnableTaskBulkImportJobExecution()) {
179                        Logs.getBatchTroubleshootingLog()
180                                        .trace("Bulk import job execution is not enabled on this server. No action taken.");
181                        return new ActivateJobResult(false, null);
182                }
183
184                if (!myRunningJobSemaphore.tryAcquire()) {
185                        Logs.getBatchTroubleshootingLog().trace("Already have a running batch job, not going to check for more");
186                        return new ActivateJobResult(false, null);
187                }
188
189                try {
190                        ActivateJobResult retval = doActivateNextReadyJob();
191                        if (!StringUtils.isBlank(retval.jobId)) {
192                                ourLog.info("Batch job submitted with batch job id {}", retval.jobId);
193                        }
194                        return retval;
195                } finally {
196                        myRunningJobSemaphore.release();
197                }
198        }
199
200        private ActivateJobResult doActivateNextReadyJob() {
201                Optional<BulkImportJobEntity> jobToProcessOpt = Objects.requireNonNull(myTxTemplate.execute(t -> {
202                        Pageable page = PageRequest.of(0, 1);
203                        Slice<BulkImportJobEntity> submittedJobs = myJobDao.findByStatus(page, BulkImportJobStatusEnum.READY);
204                        if (submittedJobs.isEmpty()) {
205                                return Optional.empty();
206                        }
207                        return Optional.of(submittedJobs.getContent().get(0));
208                }));
209
210                if (!jobToProcessOpt.isPresent()) {
211                        return new ActivateJobResult(false, null);
212                }
213
214                BulkImportJobEntity bulkImportJobEntity = jobToProcessOpt.get();
215
216                String jobUuid = bulkImportJobEntity.getJobId();
217                String biJobId = null;
218                try {
219                        biJobId = processJob(bulkImportJobEntity);
220                } catch (Exception e) {
221                        ourLog.error("Failure while preparing bulk export extract", e);
222                        myTxTemplate.execute(t -> {
223                                Optional<BulkImportJobEntity> submittedJobs = myJobDao.findByJobId(jobUuid);
224                                if (submittedJobs.isPresent()) {
225                                        BulkImportJobEntity jobEntity = submittedJobs.get();
226                                        jobEntity.setStatus(BulkImportJobStatusEnum.ERROR);
227                                        jobEntity.setStatusMessage(e.getMessage());
228                                        myJobDao.save(jobEntity);
229                                }
230                                return new ActivateJobResult(false, null);
231                        });
232                }
233
234                return new ActivateJobResult(true, biJobId);
235        }
236
237        @Override
238        @Transactional
239        public void setJobToStatus(String theBiJobId, BulkImportJobStatusEnum theStatus) {
240                setJobToStatus(theBiJobId, theStatus, null);
241        }
242
243        @Override
244        public void setJobToStatus(String theBiJobId, BulkImportJobStatusEnum theStatus, String theStatusMessage) {
245                BulkImportJobEntity job = findJobByBiJobId(theBiJobId);
246                job.setStatus(theStatus);
247                job.setStatusMessage(theStatusMessage);
248                myJobDao.save(job);
249        }
250
251        @Override
252        @Transactional
253        public BulkImportJobJson fetchJob(String theBiJobId) {
254                BulkImportJobEntity job = findJobByBiJobId(theBiJobId);
255                return job.toJson();
256        }
257
258        @Override
259        public JobInfo getJobStatus(String theBiJobId) {
260                BulkImportJobEntity theJob = findJobByBiJobId(theBiJobId);
261                return new JobInfo()
262                                .setStatus(theJob.getStatus())
263                                .setStatusMessage(theJob.getStatusMessage())
264                                .setStatusTime(theJob.getStatusTime());
265        }
266
267        @Transactional
268        @Override
269        public BulkImportJobFileJson fetchFile(String theBiJobId, int theFileIndex) {
270                BulkImportJobEntity job = findJobByBiJobId(theBiJobId);
271
272                return myJobFileDao
273                                .findForJob(job, theFileIndex)
274                                .map(t -> t.toJson())
275                                .orElseThrow(() ->
276                                                new IllegalArgumentException("Invalid index " + theFileIndex + " for bijob " + theBiJobId));
277        }
278
279        @Transactional
280        @Override
281        public String getFileDescription(String theBiJobId, int theFileIndex) {
282                BulkImportJobEntity job = findJobByBiJobId(theBiJobId);
283
284                return myJobFileDao.findFileDescriptionForJob(job, theFileIndex).orElse("");
285        }
286
287        @Override
288        @Transactional
289        public void deleteJobFiles(String theBiJobId) {
290                BulkImportJobEntity job = findJobByBiJobId(theBiJobId);
291                List<Long> files = myJobFileDao.findAllIdsForJob(theBiJobId);
292                for (Long next : files) {
293                        myJobFileDao.deleteById(next);
294                }
295                myJobDao.delete(job);
296        }
297
298        private String processJob(BulkImportJobEntity theBulkExportJobEntity) {
299                String biJobId = theBulkExportJobEntity.getJobId();
300                int batchSize = theBulkExportJobEntity.getBatchSize();
301
302                Batch2BulkImportPullJobParameters jobParameters = new Batch2BulkImportPullJobParameters();
303                jobParameters.setJobId(biJobId);
304                jobParameters.setBatchSize(batchSize);
305
306                JobInstanceStartRequest request = new JobInstanceStartRequest();
307                request.setJobDefinitionId(BULK_IMPORT_JOB_NAME);
308                request.setParameters(jobParameters);
309
310                ourLog.info("Submitting bulk import with bijob id {} to job scheduler", biJobId);
311
312                return myJobCoordinator.startInstance(request).getInstanceId();
313        }
314
315        private void addFilesToJob(
316                        @Nonnull List<BulkImportJobFileJson> theInitialFiles, BulkImportJobEntity job, int nextSequence) {
317                for (BulkImportJobFileJson nextFile : theInitialFiles) {
318                        ValidateUtil.isNotBlankOrThrowUnprocessableEntity(
319                                        nextFile.getContents(), "Job File Contents mode must not be null");
320
321                        BulkImportJobFileEntity jobFile = new BulkImportJobFileEntity();
322                        jobFile.setJob(job);
323                        jobFile.setContents(nextFile.getContents());
324                        jobFile.setTenantName(nextFile.getTenantName());
325                        jobFile.setFileDescription(nextFile.getDescription());
326                        jobFile.setFileSequence(nextSequence++);
327                        myJobFileDao.save(jobFile);
328                }
329        }
330
331        public static class ActivationJob implements HapiJob {
332                @Autowired
333                private IBulkDataImportSvc myTarget;
334
335                @Override
336                public void execute(JobExecutionContext theContext) {
337                        myTarget.activateNextReadyJob();
338                }
339        }
340}