001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.term; 021 022import ca.uhn.fhir.batch2.api.IJobCoordinator; 023import ca.uhn.fhir.batch2.model.JobInstance; 024import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; 025import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse; 026import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemDao; 027import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemVersionDao; 028import ca.uhn.fhir.jpa.dao.data.ITermConceptDao; 029import ca.uhn.fhir.jpa.dao.data.ITermConceptParentChildLinkDao; 030import ca.uhn.fhir.jpa.entity.TermCodeSystem; 031import ca.uhn.fhir.jpa.entity.TermCodeSystemVersion; 032import ca.uhn.fhir.jpa.entity.TermConcept; 033import ca.uhn.fhir.jpa.entity.TermConceptParentChildLink; 034import ca.uhn.fhir.jpa.model.entity.ResourceTable; 035import ca.uhn.fhir.jpa.model.sched.HapiJob; 036import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs; 037import ca.uhn.fhir.jpa.model.sched.ISchedulerService; 038import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; 039import ca.uhn.fhir.jpa.term.api.ITermDeferredStorageSvc; 040import ca.uhn.fhir.jpa.term.api.ITermVersionAdapterSvc; 041import ca.uhn.fhir.jpa.term.models.TermCodeSystemDeleteJobParameters; 042import ca.uhn.fhir.jpa.term.models.TermCodeSystemDeleteVersionJobParameters; 043import ca.uhn.fhir.rest.api.server.SystemRequestDetails; 044import ca.uhn.fhir.util.StopWatch; 045import ca.uhn.fhir.util.TimeoutManager; 046import com.google.common.annotations.VisibleForTesting; 047import org.apache.commons.lang3.Validate; 048import org.apache.commons.lang3.builder.ToStringBuilder; 049import org.hl7.fhir.r4.model.ConceptMap; 050import org.hl7.fhir.r4.model.ValueSet; 051import org.quartz.JobExecutionContext; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054import org.springframework.beans.factory.annotation.Autowired; 055import org.springframework.transaction.PlatformTransactionManager; 056import org.springframework.transaction.annotation.Propagation; 057import org.springframework.transaction.annotation.Transactional; 058import org.springframework.transaction.support.TransactionSynchronizationManager; 059import org.springframework.transaction.support.TransactionTemplate; 060 061import java.time.Duration; 062import java.time.temporal.ChronoUnit; 063import java.util.ArrayList; 064import java.util.Collections; 065import java.util.List; 066import java.util.Queue; 067import java.util.UUID; 068import java.util.concurrent.ConcurrentLinkedQueue; 069import java.util.concurrent.TimeUnit; 070import java.util.function.Supplier; 071 072import static ca.uhn.fhir.batch2.jobs.termcodesystem.TermCodeSystemJobConfig.TERM_CODE_SYSTEM_DELETE_JOB_NAME; 073import static ca.uhn.fhir.batch2.jobs.termcodesystem.TermCodeSystemJobConfig.TERM_CODE_SYSTEM_VERSION_DELETE_JOB_NAME; 074 075public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc, IHasScheduledJobs { 076 077 private static final Logger ourLog = LoggerFactory.getLogger(TermDeferredStorageSvcImpl.class); 078 private static final long SAVE_ALL_DEFERRED_WARN_MINUTES = 1; 079 private static final long SAVE_ALL_DEFERRED_ERROR_MINUTES = 5; 080 private boolean myAllowDeferredTasksTimeout = true; 081 private static final List<String> BATCH_JOBS_TO_CARE_ABOUT = 082 List.of(TERM_CODE_SYSTEM_DELETE_JOB_NAME, TERM_CODE_SYSTEM_VERSION_DELETE_JOB_NAME); 083 private final List<TermCodeSystem> myDeferredCodeSystemsDeletions = Collections.synchronizedList(new ArrayList<>()); 084 private final Queue<TermCodeSystemVersion> myDeferredCodeSystemVersionsDeletions = new ConcurrentLinkedQueue<>(); 085 private final List<TermConcept> myDeferredConcepts = Collections.synchronizedList(new ArrayList<>()); 086 private final List<ValueSet> myDeferredValueSets = Collections.synchronizedList(new ArrayList<>()); 087 private final List<ConceptMap> myDeferredConceptMaps = Collections.synchronizedList(new ArrayList<>()); 088 private final List<TermConceptParentChildLink> myConceptLinksToSaveLater = 089 Collections.synchronizedList(new ArrayList<>()); 090 091 // TODO - why is this needed? it's cumbersome to maintain; consider removing it 092 /** 093 * A list of job ids for CodeSydstemDelete and CodeSystemVersionDelete jobs that 094 * have been scheduled (but not completed) 095 */ 096 private final List<String> myJobExecutions = Collections.synchronizedList(new ArrayList<>()); 097 098 @Autowired 099 protected ITermConceptDao myConceptDao; 100 101 @Autowired 102 protected ITermCodeSystemDao myCodeSystemDao; 103 104 @Autowired 105 protected ITermCodeSystemVersionDao myCodeSystemVersionDao; 106 107 @Autowired 108 protected PlatformTransactionManager myTransactionMgr; 109 110 private boolean myProcessDeferred = true; 111 112 @Autowired 113 private ITermConceptParentChildLinkDao myConceptParentChildLinkDao; 114 115 @Autowired 116 private ITermVersionAdapterSvc myTerminologyVersionAdapterSvc; 117 118 @Autowired 119 private TermConceptDaoSvc myTermConceptDaoSvc; 120 121 @Autowired 122 private IJobCoordinator myJobCoordinator; 123 124 @Override 125 public void addConceptToStorageQueue(TermConcept theConcept) { 126 Validate.notNull(theConcept); 127 myDeferredConcepts.add(theConcept); 128 } 129 130 @Override 131 public void addConceptLinkToStorageQueue(TermConceptParentChildLink theConceptLink) { 132 Validate.notNull(theConceptLink); 133 myConceptLinksToSaveLater.add(theConceptLink); 134 } 135 136 @Override 137 public void addConceptMapsToStorageQueue(List<ConceptMap> theConceptMaps) { 138 Validate.notNull(theConceptMaps); 139 myDeferredConceptMaps.addAll(theConceptMaps); 140 } 141 142 @Override 143 public void addValueSetsToStorageQueue(List<ValueSet> theValueSets) { 144 Validate.notNull(theValueSets); 145 myDeferredValueSets.addAll(theValueSets); 146 } 147 148 @Override 149 public void deleteCodeSystemForResource(ResourceTable theCodeSystemToDelete) { 150 // there are use cases (at least in tests) where the code system is not present for the resource but versions 151 // are, 152 // so, as code system deletion also deletes versions, we try the system first but if not present we also try 153 // versions 154 TermCodeSystem termCodeSystemToDelete = myCodeSystemDao.findByResourcePid(theCodeSystemToDelete.getId()); 155 if (termCodeSystemToDelete != null) { 156 termCodeSystemToDelete.setCodeSystemUri("urn:uuid:" + UUID.randomUUID()); 157 myCodeSystemDao.save(termCodeSystemToDelete); 158 myDeferredCodeSystemsDeletions.add(termCodeSystemToDelete); 159 return; 160 } 161 162 List<TermCodeSystemVersion> codeSystemVersionsToDelete = 163 myCodeSystemVersionDao.findByCodeSystemResourcePid(theCodeSystemToDelete.getId()); 164 for (TermCodeSystemVersion codeSystemVersionToDelete : codeSystemVersionsToDelete) { 165 if (codeSystemVersionToDelete != null) { 166 myDeferredCodeSystemVersionsDeletions.add(codeSystemVersionToDelete); 167 } 168 } 169 } 170 171 @Override 172 public void setProcessDeferred(boolean theProcessDeferred) { 173 myProcessDeferred = theProcessDeferred; 174 } 175 176 private void processDeferredConceptMaps() { 177 int count = Math.min(myDeferredConceptMaps.size(), 20); 178 for (ConceptMap nextConceptMap : new ArrayList<>(myDeferredConceptMaps.subList(0, count))) { 179 ourLog.info("Creating ConceptMap: {}", nextConceptMap.getId()); 180 myTerminologyVersionAdapterSvc.createOrUpdateConceptMap(nextConceptMap); 181 myDeferredConceptMaps.remove(nextConceptMap); 182 } 183 ourLog.info("Saved {} deferred ConceptMap resources, have {} remaining", count, myDeferredConceptMaps.size()); 184 } 185 186 private void processDeferredConcepts() { 187 int codeCount = 0, relCount = 0; 188 StopWatch stopwatch = new StopWatch(); 189 190 int count = Math.min(1000, myDeferredConcepts.size()); 191 ourLog.debug("Saving {} deferred concepts...", count); 192 while (codeCount < count && myDeferredConcepts.size() > 0) { 193 TermConcept next = myDeferredConcepts.remove(0); 194 if (myCodeSystemVersionDao 195 .findById(next.getCodeSystemVersion().getId()) 196 .isPresent()) { 197 try { 198 codeCount += myTermConceptDaoSvc.saveConcept(next); 199 } catch (Exception theE) { 200 ourLog.error( 201 "Exception thrown when attempting to save TermConcept {} in Code System {}", 202 next.getCode(), 203 next.getCodeSystemVersion().getCodeSystemDisplayName(), 204 theE); 205 } 206 } else { 207 ourLog.warn( 208 "Unable to save deferred TermConcept {} because Code System {} version PID {} is no longer valid. Code system may have since been replaced.", 209 next.getCode(), 210 next.getCodeSystemVersion().getCodeSystemDisplayName(), 211 next.getCodeSystemVersion().getPid()); 212 } 213 } 214 215 if (codeCount > 0) { 216 ourLog.info( 217 "Saved {} deferred concepts ({} codes remain and {} relationships remain) in {}ms ({} codes/sec)", 218 codeCount, 219 myDeferredConcepts.size(), 220 myConceptLinksToSaveLater.size(), 221 stopwatch.getMillis(), 222 stopwatch.formatThroughput(codeCount, TimeUnit.SECONDS)); 223 } 224 225 if (codeCount == 0) { 226 count = Math.min(1000, myConceptLinksToSaveLater.size()); 227 ourLog.info("Saving {} deferred concept relationships...", count); 228 while (relCount < count && myConceptLinksToSaveLater.size() > 0) { 229 TermConceptParentChildLink next = myConceptLinksToSaveLater.remove(0); 230 assert next.getChild() != null; 231 assert next.getParent() != null; 232 233 if ((next.getChild().getId() == null 234 || !myConceptDao 235 .findById(next.getChild().getPid()) 236 .isPresent()) 237 || (next.getParent().getId() == null 238 || !myConceptDao 239 .findById(next.getParent().getPid()) 240 .isPresent())) { 241 ourLog.warn( 242 "Not inserting link from child {} to parent {} because it appears to have been deleted", 243 next.getParent().getCode(), 244 next.getChild().getCode()); 245 continue; 246 } 247 248 saveConceptLink(next); 249 relCount++; 250 } 251 } 252 253 if (relCount > 0) { 254 ourLog.info( 255 "Saved {} deferred relationships ({} remain) in {}ms ({} entries/sec)", 256 relCount, 257 myConceptLinksToSaveLater.size(), 258 stopwatch.getMillis(), 259 stopwatch.formatThroughput(relCount, TimeUnit.SECONDS)); 260 } 261 262 if ((myDeferredConcepts.size() + myConceptLinksToSaveLater.size()) == 0) { 263 ourLog.info("All deferred concepts and relationships have now been synchronized to the database"); 264 } 265 } 266 267 private void processDeferredValueSets() { 268 int count = Math.min(myDeferredValueSets.size(), 200); 269 for (ValueSet nextValueSet : new ArrayList<>(myDeferredValueSets.subList(0, count))) { 270 ourLog.info("Creating ValueSet: {}", nextValueSet.getId()); 271 myTerminologyVersionAdapterSvc.createOrUpdateValueSet(nextValueSet); 272 myDeferredValueSets.remove(nextValueSet); 273 } 274 ourLog.info("Saved {} deferred ValueSet resources, have {} remaining", count, myDeferredValueSets.size()); 275 } 276 277 /** 278 * This method is present only for unit tests, do not call from client code 279 */ 280 @VisibleForTesting 281 public synchronized void clearDeferred() { 282 myProcessDeferred = true; 283 myDeferredValueSets.clear(); 284 myDeferredConceptMaps.clear(); 285 myDeferredConcepts.clear(); 286 myDeferredCodeSystemsDeletions.clear(); 287 myConceptLinksToSaveLater.clear(); 288 myDeferredCodeSystemVersionsDeletions.clear(); 289 clearJobExecutions(); 290 } 291 292 private void clearJobExecutions() { 293 for (String id : new ArrayList<>(myJobExecutions)) { 294 myJobCoordinator.cancelInstance(id); 295 } 296 myJobExecutions.clear(); 297 } 298 299 @Override 300 public void notifyJobEnded(String theId) { 301 myJobExecutions.remove(theId); 302 } 303 304 private <T> T runInTransaction(Supplier<T> theRunnable) { 305 assert !TransactionSynchronizationManager.isActualTransactionActive(); 306 307 return new TransactionTemplate(myTransactionMgr).execute(tx -> theRunnable.get()); 308 } 309 310 @Override 311 public void saveAllDeferred() { 312 TimeoutManager timeoutManager = null; 313 if (myAllowDeferredTasksTimeout) { 314 timeoutManager = new TimeoutManager( 315 TermDeferredStorageSvcImpl.class.getName() + ".saveAllDeferred()", 316 Duration.of(SAVE_ALL_DEFERRED_WARN_MINUTES, ChronoUnit.MINUTES), 317 Duration.of(SAVE_ALL_DEFERRED_ERROR_MINUTES, ChronoUnit.MINUTES)); 318 } 319 320 // Don't include executing jobs here since there's no point in thrashing over and over 321 // in a busy wait while we wait for batch2 job processes to finish 322 while (!isStorageQueueEmpty(false)) { 323 if (myAllowDeferredTasksTimeout) { 324 if (timeoutManager.checkTimeout()) { 325 ourLog.info(toString()); 326 } 327 } 328 saveDeferred(); 329 } 330 } 331 332 @Transactional(propagation = Propagation.NEVER) 333 @Override 334 public synchronized void saveDeferred() { 335 if (isProcessDeferredPaused()) { 336 return; 337 } 338 339 for (int i = 0; i < 10; i++) { 340 if (!isDeferredConcepts() 341 && !isConceptLinksToSaveLater() 342 && !isDeferredValueSets() 343 && !isDeferredConceptMaps() 344 && !isDeferredCodeSystemDeletions()) { 345 return; 346 } 347 348 if (isDeferredConceptsOrConceptLinksToSaveLater()) { 349 runInTransaction(() -> { 350 processDeferredConcepts(); 351 return null; 352 }); 353 354 continue; 355 } 356 357 if (isDeferredValueSets()) { 358 runInTransaction(() -> { 359 processDeferredValueSets(); 360 return null; 361 }); 362 363 continue; 364 } 365 366 if (isDeferredConceptMaps()) { 367 runInTransaction(() -> { 368 processDeferredConceptMaps(); 369 return null; 370 }); 371 372 continue; 373 } 374 375 if (isDeferredCodeSystemVersionDeletions()) { 376 processDeferredCodeSystemVersionDeletions(); 377 } 378 379 if (isDeferredCodeSystemDeletions()) { 380 processDeferredCodeSystemDeletions(); 381 } 382 } 383 } 384 385 private boolean isDeferredCodeSystemVersionDeletions() { 386 return !myDeferredCodeSystemVersionsDeletions.isEmpty(); 387 } 388 389 private void processDeferredCodeSystemDeletions() { 390 for (TermCodeSystem next : myDeferredCodeSystemsDeletions) { 391 deleteTermCodeSystemOffline(next.getPid()); 392 } 393 myDeferredCodeSystemsDeletions.clear(); 394 } 395 396 private void processDeferredCodeSystemVersionDeletions() { 397 for (TermCodeSystemVersion next : myDeferredCodeSystemVersionsDeletions) { 398 deleteTermCodeSystemVersionOffline(next.getPid()); 399 } 400 myDeferredCodeSystemVersionsDeletions.clear(); 401 } 402 403 private void deleteTermCodeSystemVersionOffline(Long theCodeSystemVersionPid) { 404 JobInstanceStartRequest request = new JobInstanceStartRequest(); 405 request.setJobDefinitionId(TERM_CODE_SYSTEM_VERSION_DELETE_JOB_NAME); 406 407 TermCodeSystemDeleteVersionJobParameters parameters = new TermCodeSystemDeleteVersionJobParameters(); 408 parameters.setCodeSystemVersionPid(theCodeSystemVersionPid); 409 request.setParameters(parameters); 410 411 Batch2JobStartResponse response = myJobCoordinator.startInstance(new SystemRequestDetails(), request); 412 myJobExecutions.add(response.getInstanceId()); 413 } 414 415 private void deleteTermCodeSystemOffline(Long theCodeSystemPid) { 416 TermCodeSystemDeleteJobParameters parameters = new TermCodeSystemDeleteJobParameters(); 417 parameters.setTermPid(theCodeSystemPid); 418 JobInstanceStartRequest request = new JobInstanceStartRequest(); 419 request.setParameters(parameters); 420 request.setJobDefinitionId(TERM_CODE_SYSTEM_DELETE_JOB_NAME); 421 Batch2JobStartResponse response = myJobCoordinator.startInstance(new SystemRequestDetails(), request); 422 myJobExecutions.add(response.getInstanceId()); 423 } 424 425 @Override 426 public boolean isStorageQueueEmpty(boolean theIncludeExecutingJobs) { 427 boolean retVal = !isProcessDeferredPaused(); 428 retVal &= !isDeferredConcepts(); 429 retVal &= !isConceptLinksToSaveLater(); 430 retVal &= !isDeferredValueSets(); 431 retVal &= !isDeferredConceptMaps(); 432 retVal &= !isDeferredCodeSystemDeletions(); 433 if (theIncludeExecutingJobs) { 434 retVal &= !isJobsExecuting(); 435 } 436 return retVal; 437 } 438 439 @Override 440 public boolean isJobsExecuting() { 441 cleanseEndedJobs(); 442 443 return !myJobExecutions.isEmpty(); 444 } 445 446 private void cleanseEndedJobs() { 447 /* 448 * Cleanse the list of completed jobs. 449 * This is mostly a fail-safe 450 * because "cancelled" jobs are never removed. 451 */ 452 List<String> idsToDelete = new ArrayList<>(); 453 for (String jobId : BATCH_JOBS_TO_CARE_ABOUT) { 454 List<JobInstance> jobInstanceInEndedState = myJobCoordinator.getInstancesbyJobDefinitionIdAndEndedStatus( 455 jobId, 456 true, // ended = true (COMPLETED, FAILED, CANCELLED jobs only) 457 Math.max(myJobExecutions.size(), 1), // at most this many 458 0); 459 for (JobInstance instance : jobInstanceInEndedState) { 460 idsToDelete.add(instance.getInstanceId()); 461 } 462 } 463 464 for (String id : idsToDelete) { 465 myJobExecutions.remove(id); 466 } 467 } 468 469 private void saveConceptLink(TermConceptParentChildLink next) { 470 if (next.getId() == null) { 471 myConceptParentChildLinkDao.save(next); 472 } 473 } 474 475 private boolean isProcessDeferredPaused() { 476 return !myProcessDeferred; 477 } 478 479 private boolean isDeferredConceptsOrConceptLinksToSaveLater() { 480 return isDeferredConcepts() || isConceptLinksToSaveLater(); 481 } 482 483 private boolean isDeferredCodeSystemDeletions() { 484 return !myDeferredCodeSystemsDeletions.isEmpty() || !myDeferredCodeSystemVersionsDeletions.isEmpty(); 485 } 486 487 private boolean isDeferredConcepts() { 488 return !myDeferredConcepts.isEmpty(); 489 } 490 491 private boolean isConceptLinksToSaveLater() { 492 return !myConceptLinksToSaveLater.isEmpty(); 493 } 494 495 private boolean isDeferredValueSets() { 496 return !myDeferredValueSets.isEmpty(); 497 } 498 499 private boolean isDeferredConceptMaps() { 500 return !myDeferredConceptMaps.isEmpty(); 501 } 502 503 @VisibleForTesting 504 void setTransactionManagerForUnitTest(PlatformTransactionManager theTxManager) { 505 myTransactionMgr = theTxManager; 506 } 507 508 @VisibleForTesting 509 void setTermConceptDaoSvc(TermConceptDaoSvc theTermConceptDaoSvc) { 510 myTermConceptDaoSvc = theTermConceptDaoSvc; 511 } 512 513 @VisibleForTesting 514 void setConceptDaoForUnitTest(ITermConceptDao theConceptDao) { 515 myConceptDao = theConceptDao; 516 } 517 518 @VisibleForTesting 519 void setCodeSystemVersionDaoForUnitTest(ITermCodeSystemVersionDao theCodeSystemVersionDao) { 520 myCodeSystemVersionDao = theCodeSystemVersionDao; 521 } 522 523 @Override 524 public void disallowDeferredTaskTimeout() { 525 myAllowDeferredTasksTimeout = false; 526 } 527 528 @Override 529 @VisibleForTesting 530 public void logQueueForUnitTest() { 531 ourLog.info("isProcessDeferredPaused: {}", isProcessDeferredPaused()); 532 ourLog.info("isDeferredConcepts: {}", isDeferredConcepts()); 533 ourLog.info("isConceptLinksToSaveLater: {}", isConceptLinksToSaveLater()); 534 ourLog.info("isDeferredValueSets: {}", isDeferredValueSets()); 535 ourLog.info("isDeferredConceptMaps: {}", isDeferredConceptMaps()); 536 ourLog.info("isDeferredCodeSystemDeletions: {}", isDeferredCodeSystemDeletions()); 537 } 538 539 @Override 540 public void deleteCodeSystemVersion(TermCodeSystemVersion theCodeSystemVersion) { 541 myDeferredCodeSystemVersionsDeletions.add(theCodeSystemVersion); 542 } 543 544 @Override 545 public void scheduleJobs(ISchedulerService theSchedulerService) { 546 // TODO KHS what does this mean? 547 // Register scheduled job to save deferred concepts 548 // In the future it would be great to make this a cluster-aware task somehow 549 ScheduledJobDefinition jobDefinition = new ScheduledJobDefinition(); 550 jobDefinition.setId(Job.class.getName()); 551 jobDefinition.setJobClass(Job.class); 552 theSchedulerService.scheduleLocalJob(5000, jobDefinition); 553 } 554 555 public static class Job implements HapiJob { 556 @Autowired 557 private ITermDeferredStorageSvc myTerminologySvc; 558 559 @Override 560 public void execute(JobExecutionContext theContext) { 561 myTerminologySvc.saveDeferred(); 562 } 563 } 564 565 @Override 566 public String toString() { 567 return new ToStringBuilder(this) 568 .append("myDeferredCodeSystemsDeletions", myDeferredCodeSystemsDeletions.size()) 569 .append("myDeferredCodeSystemVersionsDeletions", myDeferredCodeSystemVersionsDeletions.size()) 570 .append("myDeferredConcepts", myDeferredConcepts.size()) 571 .append("myDeferredValueSets", myDeferredValueSets.size()) 572 .append("myDeferredConceptMaps", myDeferredConceptMaps.size()) 573 .append("myConceptLinksToSaveLater", myConceptLinksToSaveLater.size()) 574 .append("myJobExecutions", myJobExecutions.size()) 575 .append("myProcessDeferred", myProcessDeferred) 576 .toString(); 577 } 578}