001/*-
002 * #%L
003 * HAPI FHIR JPA Server
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.term;
021
022import ca.uhn.fhir.batch2.api.IJobCoordinator;
023import ca.uhn.fhir.batch2.model.JobInstance;
024import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
025import ca.uhn.fhir.batch2.model.StatusEnum;
026import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
027import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemDao;
028import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemVersionDao;
029import ca.uhn.fhir.jpa.dao.data.ITermConceptDao;
030import ca.uhn.fhir.jpa.dao.data.ITermConceptParentChildLinkDao;
031import ca.uhn.fhir.jpa.entity.TermCodeSystem;
032import ca.uhn.fhir.jpa.entity.TermCodeSystemVersion;
033import ca.uhn.fhir.jpa.entity.TermConcept;
034import ca.uhn.fhir.jpa.entity.TermConceptParentChildLink;
035import ca.uhn.fhir.jpa.model.entity.ResourceTable;
036import ca.uhn.fhir.jpa.model.sched.HapiJob;
037import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs;
038import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
039import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
040import ca.uhn.fhir.jpa.term.api.ITermDeferredStorageSvc;
041import ca.uhn.fhir.jpa.term.api.ITermVersionAdapterSvc;
042import ca.uhn.fhir.jpa.term.models.TermCodeSystemDeleteJobParameters;
043import ca.uhn.fhir.jpa.term.models.TermCodeSystemDeleteVersionJobParameters;
044import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
045import ca.uhn.fhir.util.StopWatch;
046import ca.uhn.fhir.util.TimeoutManager;
047import com.google.common.annotations.VisibleForTesting;
048import org.apache.commons.lang3.Validate;
049import org.apache.commons.lang3.builder.ToStringBuilder;
050import org.hl7.fhir.r4.model.ConceptMap;
051import org.hl7.fhir.r4.model.ValueSet;
052import org.quartz.JobExecutionContext;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055import org.springframework.beans.factory.annotation.Autowired;
056import org.springframework.transaction.PlatformTransactionManager;
057import org.springframework.transaction.annotation.Propagation;
058import org.springframework.transaction.annotation.Transactional;
059import org.springframework.transaction.support.TransactionSynchronizationManager;
060import org.springframework.transaction.support.TransactionTemplate;
061
062import java.time.Duration;
063import java.time.temporal.ChronoUnit;
064import java.util.ArrayList;
065import java.util.Collections;
066import java.util.List;
067import java.util.Queue;
068import java.util.UUID;
069import java.util.concurrent.ConcurrentLinkedQueue;
070import java.util.concurrent.TimeUnit;
071import java.util.function.Supplier;
072
073import static ca.uhn.fhir.batch2.jobs.termcodesystem.TermCodeSystemJobConfig.TERM_CODE_SYSTEM_DELETE_JOB_NAME;
074import static ca.uhn.fhir.batch2.jobs.termcodesystem.TermCodeSystemJobConfig.TERM_CODE_SYSTEM_VERSION_DELETE_JOB_NAME;
075
076public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc, IHasScheduledJobs {
077
078        private static final Logger ourLog = LoggerFactory.getLogger(TermDeferredStorageSvcImpl.class);
079        private static final long SAVE_ALL_DEFERRED_WARN_MINUTES = 1;
080        private static final long SAVE_ALL_DEFERRED_ERROR_MINUTES = 5;
081        private boolean myAllowDeferredTasksTimeout = true;
082        private final List<TermCodeSystem> myDeferredCodeSystemsDeletions = Collections.synchronizedList(new ArrayList<>());
083        private final Queue<TermCodeSystemVersion> myDeferredCodeSystemVersionsDeletions = new ConcurrentLinkedQueue<>();
084        private final List<TermConcept> myDeferredConcepts = Collections.synchronizedList(new ArrayList<>());
085        private final List<ValueSet> myDeferredValueSets = Collections.synchronizedList(new ArrayList<>());
086        private final List<ConceptMap> myDeferredConceptMaps = Collections.synchronizedList(new ArrayList<>());
087        private final List<TermConceptParentChildLink> myConceptLinksToSaveLater =
088                        Collections.synchronizedList(new ArrayList<>());
089
090        // TODO - why is this needed? it's cumbersome to maintain; consider removing it
091        /**
092         * A list of job ids for CodeSydstemDelete and CodeSystemVersionDelete jobs that
093         * have been scheduled (but not completed)
094         */
095        private final List<String> myJobExecutions = Collections.synchronizedList(new ArrayList<>());
096
097        @Autowired
098        protected ITermConceptDao myConceptDao;
099
100        @Autowired
101        protected ITermCodeSystemDao myCodeSystemDao;
102
103        @Autowired
104        protected ITermCodeSystemVersionDao myCodeSystemVersionDao;
105
106        @Autowired
107        protected PlatformTransactionManager myTransactionMgr;
108
109        private boolean myProcessDeferred = true;
110
111        @Autowired
112        private ITermConceptParentChildLinkDao myConceptParentChildLinkDao;
113
114        @Autowired
115        private ITermVersionAdapterSvc myTerminologyVersionAdapterSvc;
116
117        @Autowired
118        private TermConceptDaoSvc myTermConceptDaoSvc;
119
120        @Autowired
121        private IJobCoordinator myJobCoordinator;
122
123        @Override
124        public void addConceptToStorageQueue(TermConcept theConcept) {
125                Validate.notNull(theConcept);
126                myDeferredConcepts.add(theConcept);
127        }
128
129        @Override
130        public void addConceptLinkToStorageQueue(TermConceptParentChildLink theConceptLink) {
131                Validate.notNull(theConceptLink);
132                myConceptLinksToSaveLater.add(theConceptLink);
133        }
134
135        @Override
136        public void addConceptMapsToStorageQueue(List<ConceptMap> theConceptMaps) {
137                Validate.notNull(theConceptMaps);
138                myDeferredConceptMaps.addAll(theConceptMaps);
139        }
140
141        @Override
142        public void addValueSetsToStorageQueue(List<ValueSet> theValueSets) {
143                Validate.notNull(theValueSets);
144                myDeferredValueSets.addAll(theValueSets);
145        }
146
147        @Override
148        public void deleteCodeSystemForResource(ResourceTable theCodeSystemToDelete) {
149                // there are use cases (at least in tests) where the code system is not present for the resource but versions
150                // are,
151                // so, as code system deletion also deletes versions, we try the system first but if not present we also try
152                // versions
153                TermCodeSystem termCodeSystemToDelete =
154                                myCodeSystemDao.findByResourcePid(theCodeSystemToDelete.getResourceId());
155                if (termCodeSystemToDelete != null) {
156                        termCodeSystemToDelete.setCodeSystemUri("urn:uuid:" + UUID.randomUUID());
157                        myCodeSystemDao.save(termCodeSystemToDelete);
158                        myDeferredCodeSystemsDeletions.add(termCodeSystemToDelete);
159                        return;
160                }
161
162                List<TermCodeSystemVersion> codeSystemVersionsToDelete =
163                                myCodeSystemVersionDao.findByCodeSystemResourcePid(theCodeSystemToDelete.getResourceId());
164                for (TermCodeSystemVersion codeSystemVersionToDelete : codeSystemVersionsToDelete) {
165                        if (codeSystemVersionToDelete != null) {
166                                myDeferredCodeSystemVersionsDeletions.add(codeSystemVersionToDelete);
167                        }
168                }
169        }
170
171        @Override
172        public void setProcessDeferred(boolean theProcessDeferred) {
173                myProcessDeferred = theProcessDeferred;
174        }
175
176        private void processDeferredConceptMaps() {
177                int count = Math.min(myDeferredConceptMaps.size(), 20);
178                for (ConceptMap nextConceptMap : new ArrayList<>(myDeferredConceptMaps.subList(0, count))) {
179                        ourLog.info("Creating ConceptMap: {}", nextConceptMap.getId());
180                        myTerminologyVersionAdapterSvc.createOrUpdateConceptMap(nextConceptMap);
181                        myDeferredConceptMaps.remove(nextConceptMap);
182                }
183                ourLog.info("Saved {} deferred ConceptMap resources, have {} remaining", count, myDeferredConceptMaps.size());
184        }
185
186        private void processDeferredConcepts() {
187                int codeCount = 0, relCount = 0;
188                StopWatch stopwatch = new StopWatch();
189
190                int count = Math.min(1000, myDeferredConcepts.size());
191                ourLog.debug("Saving {} deferred concepts...", count);
192                while (codeCount < count && myDeferredConcepts.size() > 0) {
193                        TermConcept next = myDeferredConcepts.remove(0);
194                        if (myCodeSystemVersionDao
195                                        .findById(next.getCodeSystemVersion().getPid())
196                                        .isPresent()) {
197                                try {
198                                        codeCount += myTermConceptDaoSvc.saveConcept(next);
199                                } catch (Exception theE) {
200                                        ourLog.error(
201                                                        "Exception thrown when attempting to save TermConcept {} in Code System {}",
202                                                        next.getCode(),
203                                                        next.getCodeSystemVersion().getCodeSystemDisplayName(),
204                                                        theE);
205                                }
206                        } else {
207                                ourLog.warn(
208                                                "Unable to save deferred TermConcept {} because Code System {} version PID {} is no longer valid. Code system may have since been replaced.",
209                                                next.getCode(),
210                                                next.getCodeSystemVersion().getCodeSystemDisplayName(),
211                                                next.getCodeSystemVersion().getPid());
212                        }
213                }
214
215                if (codeCount > 0) {
216                        ourLog.info(
217                                        "Saved {} deferred concepts ({} codes remain and {} relationships remain) in {}ms ({} codes/sec)",
218                                        codeCount,
219                                        myDeferredConcepts.size(),
220                                        myConceptLinksToSaveLater.size(),
221                                        stopwatch.getMillis(),
222                                        stopwatch.formatThroughput(codeCount, TimeUnit.SECONDS));
223                }
224
225                if (codeCount == 0) {
226                        count = Math.min(1000, myConceptLinksToSaveLater.size());
227                        ourLog.info("Saving {} deferred concept relationships...", count);
228                        while (relCount < count && myConceptLinksToSaveLater.size() > 0) {
229                                TermConceptParentChildLink next = myConceptLinksToSaveLater.remove(0);
230                                assert next.getChild() != null;
231                                assert next.getParent() != null;
232
233                                if ((next.getChild().getId() == null
234                                                                || !myConceptDao
235                                                                                .findById(next.getChild().getId())
236                                                                                .isPresent())
237                                                || (next.getParent().getId() == null
238                                                                || !myConceptDao
239                                                                                .findById(next.getParent().getId())
240                                                                                .isPresent())) {
241                                        ourLog.warn(
242                                                        "Not inserting link from child {} to parent {} because it appears to have been deleted",
243                                                        next.getParent().getCode(),
244                                                        next.getChild().getCode());
245                                        continue;
246                                }
247
248                                saveConceptLink(next);
249                                relCount++;
250                        }
251                }
252
253                if (relCount > 0) {
254                        ourLog.info(
255                                        "Saved {} deferred relationships ({} remain) in {}ms ({} entries/sec)",
256                                        relCount,
257                                        myConceptLinksToSaveLater.size(),
258                                        stopwatch.getMillis(),
259                                        stopwatch.formatThroughput(relCount, TimeUnit.SECONDS));
260                }
261
262                if ((myDeferredConcepts.size() + myConceptLinksToSaveLater.size()) == 0) {
263                        ourLog.info("All deferred concepts and relationships have now been synchronized to the database");
264                }
265        }
266
267        private void processDeferredValueSets() {
268                int count = Math.min(myDeferredValueSets.size(), 200);
269                for (ValueSet nextValueSet : new ArrayList<>(myDeferredValueSets.subList(0, count))) {
270                        ourLog.info("Creating ValueSet: {}", nextValueSet.getId());
271                        myTerminologyVersionAdapterSvc.createOrUpdateValueSet(nextValueSet);
272                        myDeferredValueSets.remove(nextValueSet);
273                }
274                ourLog.info("Saved {} deferred ValueSet resources, have {} remaining", count, myDeferredValueSets.size());
275        }
276
277        /**
278         * This method is present only for unit tests, do not call from client code
279         */
280        @VisibleForTesting
281        public synchronized void clearDeferred() {
282                myProcessDeferred = true;
283                myDeferredValueSets.clear();
284                myDeferredConceptMaps.clear();
285                myDeferredConcepts.clear();
286                myDeferredCodeSystemsDeletions.clear();
287                myConceptLinksToSaveLater.clear();
288                myDeferredCodeSystemVersionsDeletions.clear();
289                clearJobExecutions();
290        }
291
292        private void clearJobExecutions() {
293                for (String id : new ArrayList<>(myJobExecutions)) {
294                        myJobCoordinator.cancelInstance(id);
295                }
296                myJobExecutions.clear();
297        }
298
299        @Override
300        public void notifyJobEnded(String theId) {
301                myJobExecutions.remove(theId);
302        }
303
304        private <T> T runInTransaction(Supplier<T> theRunnable) {
305                assert !TransactionSynchronizationManager.isActualTransactionActive();
306
307                return new TransactionTemplate(myTransactionMgr).execute(tx -> theRunnable.get());
308        }
309
310        @Override
311        public void saveAllDeferred() {
312                TimeoutManager timeoutManager = null;
313                if (myAllowDeferredTasksTimeout) {
314                        timeoutManager = new TimeoutManager(
315                                        TermDeferredStorageSvcImpl.class.getName() + ".saveAllDeferred()",
316                                        Duration.of(SAVE_ALL_DEFERRED_WARN_MINUTES, ChronoUnit.MINUTES),
317                                        Duration.of(SAVE_ALL_DEFERRED_ERROR_MINUTES, ChronoUnit.MINUTES));
318                }
319
320                // Don't include executing jobs here since there's no point in thrashing over and over
321                // in a busy wait while we wait for batch2 job processes to finish
322                while (!isStorageQueueEmpty(false)) {
323                        if (myAllowDeferredTasksTimeout) {
324                                if (timeoutManager.checkTimeout()) {
325                                        ourLog.info(toString());
326                                }
327                        }
328                        saveDeferred();
329                }
330        }
331
332        @Transactional(propagation = Propagation.NEVER)
333        @Override
334        public synchronized void saveDeferred() {
335                if (isProcessDeferredPaused()) {
336                        return;
337                }
338
339                for (int i = 0; i < 10; i++) {
340                        if (!isDeferredConcepts()
341                                        && !isConceptLinksToSaveLater()
342                                        && !isDeferredValueSets()
343                                        && !isDeferredConceptMaps()
344                                        && !isDeferredCodeSystemDeletions()) {
345                                return;
346                        }
347
348                        if (isDeferredConceptsOrConceptLinksToSaveLater()) {
349                                runInTransaction(() -> {
350                                        processDeferredConcepts();
351                                        return null;
352                                });
353
354                                continue;
355                        }
356
357                        if (isDeferredValueSets()) {
358                                runInTransaction(() -> {
359                                        processDeferredValueSets();
360                                        return null;
361                                });
362
363                                continue;
364                        }
365
366                        if (isDeferredConceptMaps()) {
367                                runInTransaction(() -> {
368                                        processDeferredConceptMaps();
369                                        return null;
370                                });
371
372                                continue;
373                        }
374
375                        if (isDeferredCodeSystemVersionDeletions()) {
376                                processDeferredCodeSystemVersionDeletions();
377                        }
378
379                        if (isDeferredCodeSystemDeletions()) {
380                                processDeferredCodeSystemDeletions();
381                        }
382                }
383        }
384
385        private boolean isDeferredCodeSystemVersionDeletions() {
386                return !myDeferredCodeSystemVersionsDeletions.isEmpty();
387        }
388
389        private void processDeferredCodeSystemDeletions() {
390                for (TermCodeSystem next : myDeferredCodeSystemsDeletions) {
391                        deleteTermCodeSystemOffline(next.getPid());
392                }
393                myDeferredCodeSystemsDeletions.clear();
394        }
395
396        private void processDeferredCodeSystemVersionDeletions() {
397                for (TermCodeSystemVersion next : myDeferredCodeSystemVersionsDeletions) {
398                        deleteTermCodeSystemVersionOffline(next.getPid());
399                }
400                myDeferredCodeSystemVersionsDeletions.clear();
401        }
402
403        private void deleteTermCodeSystemVersionOffline(Long theCodeSystemVersionPid) {
404                JobInstanceStartRequest request = new JobInstanceStartRequest();
405                request.setJobDefinitionId(TERM_CODE_SYSTEM_VERSION_DELETE_JOB_NAME);
406
407                TermCodeSystemDeleteVersionJobParameters parameters = new TermCodeSystemDeleteVersionJobParameters();
408                parameters.setCodeSystemVersionPid(theCodeSystemVersionPid);
409                request.setParameters(parameters);
410
411                Batch2JobStartResponse response = myJobCoordinator.startInstance(new SystemRequestDetails(), request);
412                myJobExecutions.add(response.getInstanceId());
413        }
414
415        private void deleteTermCodeSystemOffline(Long theCodeSystemPid) {
416                TermCodeSystemDeleteJobParameters parameters = new TermCodeSystemDeleteJobParameters();
417                parameters.setTermPid(theCodeSystemPid);
418                JobInstanceStartRequest request = new JobInstanceStartRequest();
419                request.setParameters(parameters);
420                request.setJobDefinitionId(TERM_CODE_SYSTEM_DELETE_JOB_NAME);
421                Batch2JobStartResponse response = myJobCoordinator.startInstance(new SystemRequestDetails(), request);
422                myJobExecutions.add(response.getInstanceId());
423        }
424
425        @Override
426        public boolean isStorageQueueEmpty(boolean theIncludeExecutingJobs) {
427                boolean retVal = !isProcessDeferredPaused();
428                retVal &= !isDeferredConcepts();
429                retVal &= !isConceptLinksToSaveLater();
430                retVal &= !isDeferredValueSets();
431                retVal &= !isDeferredConceptMaps();
432                retVal &= !isDeferredCodeSystemDeletions();
433                if (theIncludeExecutingJobs) {
434                        retVal &= !isJobsExecuting();
435                }
436                return retVal;
437        }
438
439        private boolean isJobsExecuting() {
440                cleanseEndedJobs();
441
442                return !myJobExecutions.isEmpty();
443        }
444
445        private void cleanseEndedJobs() {
446                /*
447                 * Cleanse the list of completed jobs.
448                 * This is mostly a fail-safe
449                 * because "cancelled" jobs are never removed.
450                 */
451                List<String> executions = new ArrayList<>(myJobExecutions);
452                List<String> idsToDelete = new ArrayList<>();
453                for (String id : executions) {
454                        // TODO - might want to consider a "fetch all instances"
455                        JobInstance instance = myJobCoordinator.getInstance(id);
456                        if (StatusEnum.getEndedStatuses().contains(instance.getStatus())) {
457                                idsToDelete.add(instance.getInstanceId());
458                        }
459                }
460                for (String id : idsToDelete) {
461                        myJobExecutions.remove(id);
462                }
463        }
464
465        private void saveConceptLink(TermConceptParentChildLink next) {
466                if (next.getId() == null) {
467                        myConceptParentChildLinkDao.save(next);
468                }
469        }
470
471        private boolean isProcessDeferredPaused() {
472                return !myProcessDeferred;
473        }
474
475        private boolean isDeferredConceptsOrConceptLinksToSaveLater() {
476                return isDeferredConcepts() || isConceptLinksToSaveLater();
477        }
478
479        private boolean isDeferredCodeSystemDeletions() {
480                return !myDeferredCodeSystemsDeletions.isEmpty() || !myDeferredCodeSystemVersionsDeletions.isEmpty();
481        }
482
483        private boolean isDeferredConcepts() {
484                return !myDeferredConcepts.isEmpty();
485        }
486
487        private boolean isConceptLinksToSaveLater() {
488                return !myConceptLinksToSaveLater.isEmpty();
489        }
490
491        private boolean isDeferredValueSets() {
492                return !myDeferredValueSets.isEmpty();
493        }
494
495        private boolean isDeferredConceptMaps() {
496                return !myDeferredConceptMaps.isEmpty();
497        }
498
499        @VisibleForTesting
500        void setTransactionManagerForUnitTest(PlatformTransactionManager theTxManager) {
501                myTransactionMgr = theTxManager;
502        }
503
504        @VisibleForTesting
505        void setTermConceptDaoSvc(TermConceptDaoSvc theTermConceptDaoSvc) {
506                myTermConceptDaoSvc = theTermConceptDaoSvc;
507        }
508
509        @VisibleForTesting
510        void setConceptDaoForUnitTest(ITermConceptDao theConceptDao) {
511                myConceptDao = theConceptDao;
512        }
513
514        @VisibleForTesting
515        void setCodeSystemVersionDaoForUnitTest(ITermCodeSystemVersionDao theCodeSystemVersionDao) {
516                myCodeSystemVersionDao = theCodeSystemVersionDao;
517        }
518
519        @Override
520        public void disallowDeferredTaskTimeout() {
521                myAllowDeferredTasksTimeout = false;
522        }
523
524        @Override
525        @VisibleForTesting
526        public void logQueueForUnitTest() {
527                ourLog.info("isProcessDeferredPaused: {}", isProcessDeferredPaused());
528                ourLog.info("isDeferredConcepts: {}", isDeferredConcepts());
529                ourLog.info("isConceptLinksToSaveLater: {}", isConceptLinksToSaveLater());
530                ourLog.info("isDeferredValueSets: {}", isDeferredValueSets());
531                ourLog.info("isDeferredConceptMaps: {}", isDeferredConceptMaps());
532                ourLog.info("isDeferredCodeSystemDeletions: {}", isDeferredCodeSystemDeletions());
533        }
534
535        @Override
536        public void deleteCodeSystemVersion(TermCodeSystemVersion theCodeSystemVersion) {
537                myDeferredCodeSystemVersionsDeletions.add(theCodeSystemVersion);
538        }
539
540        @Override
541        public void scheduleJobs(ISchedulerService theSchedulerService) {
542                // TODO KHS what does this mean?
543                // Register scheduled job to save deferred concepts
544                // In the future it would be great to make this a cluster-aware task somehow
545                ScheduledJobDefinition jobDefinition = new ScheduledJobDefinition();
546                jobDefinition.setId(Job.class.getName());
547                jobDefinition.setJobClass(Job.class);
548                theSchedulerService.scheduleLocalJob(5000, jobDefinition);
549        }
550
551        public static class Job implements HapiJob {
552                @Autowired
553                private ITermDeferredStorageSvc myTerminologySvc;
554
555                @Override
556                public void execute(JobExecutionContext theContext) {
557                        myTerminologySvc.saveDeferred();
558                }
559        }
560
561        @Override
562        public String toString() {
563                return new ToStringBuilder(this)
564                                .append("myDeferredCodeSystemsDeletions", myDeferredCodeSystemsDeletions.size())
565                                .append("myDeferredCodeSystemVersionsDeletions", myDeferredCodeSystemVersionsDeletions.size())
566                                .append("myDeferredConcepts", myDeferredConcepts.size())
567                                .append("myDeferredValueSets", myDeferredValueSets.size())
568                                .append("myDeferredConceptMaps", myDeferredConceptMaps.size())
569                                .append("myConceptLinksToSaveLater", myConceptLinksToSaveLater.size())
570                                .append("myJobExecutions", myJobExecutions.size())
571                                .append("myProcessDeferred", myProcessDeferred)
572                                .toString();
573        }
574}