001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.term; 021 022import ca.uhn.fhir.batch2.api.IJobCoordinator; 023import ca.uhn.fhir.batch2.model.JobInstance; 024import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; 025import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse; 026import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemDao; 027import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemVersionDao; 028import ca.uhn.fhir.jpa.dao.data.ITermConceptDao; 029import ca.uhn.fhir.jpa.dao.data.ITermConceptParentChildLinkDao; 030import ca.uhn.fhir.jpa.entity.TermCodeSystem; 031import ca.uhn.fhir.jpa.entity.TermCodeSystemVersion; 032import ca.uhn.fhir.jpa.entity.TermConcept; 033import ca.uhn.fhir.jpa.entity.TermConceptParentChildLink; 034import ca.uhn.fhir.jpa.model.entity.ResourceTable; 035import ca.uhn.fhir.jpa.model.sched.HapiJob; 036import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs; 037import ca.uhn.fhir.jpa.model.sched.ISchedulerService; 038import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; 039import ca.uhn.fhir.jpa.term.api.ITermDeferredStorageSvc; 040import ca.uhn.fhir.jpa.term.api.ITermVersionAdapterSvc; 041import ca.uhn.fhir.jpa.term.models.TermCodeSystemDeleteJobParameters; 042import ca.uhn.fhir.jpa.term.models.TermCodeSystemDeleteVersionJobParameters; 043import ca.uhn.fhir.rest.api.server.SystemRequestDetails; 044import ca.uhn.fhir.util.StopWatch; 045import ca.uhn.fhir.util.TimeoutManager; 046import com.google.common.annotations.VisibleForTesting; 047import org.apache.commons.lang3.Validate; 048import org.apache.commons.lang3.builder.ToStringBuilder; 049import org.hl7.fhir.r4.model.ConceptMap; 050import org.hl7.fhir.r4.model.ValueSet; 051import org.quartz.JobExecutionContext; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054import org.springframework.beans.factory.annotation.Autowired; 055import org.springframework.transaction.PlatformTransactionManager; 056import org.springframework.transaction.annotation.Propagation; 057import org.springframework.transaction.annotation.Transactional; 058import org.springframework.transaction.support.TransactionSynchronizationManager; 059import org.springframework.transaction.support.TransactionTemplate; 060 061import java.time.Duration; 062import java.time.temporal.ChronoUnit; 063import java.util.ArrayList; 064import java.util.Collections; 065import java.util.List; 066import java.util.Queue; 067import java.util.UUID; 068import java.util.concurrent.ConcurrentLinkedQueue; 069import java.util.concurrent.TimeUnit; 070import java.util.function.Supplier; 071 072import static ca.uhn.fhir.batch2.jobs.termcodesystem.TermCodeSystemJobConfig.TERM_CODE_SYSTEM_DELETE_JOB_NAME; 073import static ca.uhn.fhir.batch2.jobs.termcodesystem.TermCodeSystemJobConfig.TERM_CODE_SYSTEM_VERSION_DELETE_JOB_NAME; 074 075public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc, IHasScheduledJobs { 076 077 private static final Logger ourLog = LoggerFactory.getLogger(TermDeferredStorageSvcImpl.class); 078 private static final long SAVE_ALL_DEFERRED_WARN_MINUTES = 1; 079 private static final long SAVE_ALL_DEFERRED_ERROR_MINUTES = 5; 080 private boolean myAllowDeferredTasksTimeout = true; 081 private static final List<String> BATCH_JOBS_TO_CARE_ABOUT = 082 List.of(TERM_CODE_SYSTEM_DELETE_JOB_NAME, TERM_CODE_SYSTEM_VERSION_DELETE_JOB_NAME); 083 private final List<TermCodeSystem> myDeferredCodeSystemsDeletions = Collections.synchronizedList(new ArrayList<>()); 084 private final Queue<TermCodeSystemVersion> myDeferredCodeSystemVersionsDeletions = new ConcurrentLinkedQueue<>(); 085 private final List<TermConcept> myDeferredConcepts = Collections.synchronizedList(new ArrayList<>()); 086 private final List<ValueSet> myDeferredValueSets = Collections.synchronizedList(new ArrayList<>()); 087 private final List<ConceptMap> myDeferredConceptMaps = Collections.synchronizedList(new ArrayList<>()); 088 private final List<TermConceptParentChildLink> myConceptLinksToSaveLater = 089 Collections.synchronizedList(new ArrayList<>()); 090 091 // TODO - why is this needed? it's cumbersome to maintain; consider removing it 092 /** 093 * A list of job ids for CodeSydstemDelete and CodeSystemVersionDelete jobs that 094 * have been scheduled (but not completed) 095 */ 096 private final List<String> myJobExecutions = Collections.synchronizedList(new ArrayList<>()); 097 098 @Autowired 099 protected ITermConceptDao myConceptDao; 100 101 @Autowired 102 protected ITermCodeSystemDao myCodeSystemDao; 103 104 @Autowired 105 protected ITermCodeSystemVersionDao myCodeSystemVersionDao; 106 107 @Autowired 108 protected PlatformTransactionManager myTransactionMgr; 109 110 private boolean myProcessDeferred = true; 111 112 @Autowired 113 private ITermConceptParentChildLinkDao myConceptParentChildLinkDao; 114 115 @Autowired 116 private ITermVersionAdapterSvc myTerminologyVersionAdapterSvc; 117 118 @Autowired 119 private TermConceptDaoSvc myTermConceptDaoSvc; 120 121 @Autowired 122 private IJobCoordinator myJobCoordinator; 123 124 @Override 125 public void addConceptToStorageQueue(TermConcept theConcept) { 126 Validate.notNull(theConcept); 127 myDeferredConcepts.add(theConcept); 128 } 129 130 @Override 131 public void addConceptLinkToStorageQueue(TermConceptParentChildLink theConceptLink) { 132 Validate.notNull(theConceptLink); 133 myConceptLinksToSaveLater.add(theConceptLink); 134 } 135 136 @Override 137 public void addConceptMapsToStorageQueue(List<ConceptMap> theConceptMaps) { 138 Validate.notNull(theConceptMaps); 139 myDeferredConceptMaps.addAll(theConceptMaps); 140 } 141 142 @Override 143 public void addValueSetsToStorageQueue(List<ValueSet> theValueSets) { 144 Validate.notNull(theValueSets); 145 myDeferredValueSets.addAll(theValueSets); 146 } 147 148 @Override 149 public void deleteCodeSystemForResource(ResourceTable theCodeSystemToDelete) { 150 // there are use cases (at least in tests) where the code system is not present for the resource but versions 151 // are, 152 // so, as code system deletion also deletes versions, we try the system first but if not present we also try 153 // versions 154 TermCodeSystem termCodeSystemToDelete = 155 myCodeSystemDao.findByResourcePid(theCodeSystemToDelete.getResourceId()); 156 if (termCodeSystemToDelete != null) { 157 termCodeSystemToDelete.setCodeSystemUri("urn:uuid:" + UUID.randomUUID()); 158 myCodeSystemDao.save(termCodeSystemToDelete); 159 myDeferredCodeSystemsDeletions.add(termCodeSystemToDelete); 160 return; 161 } 162 163 List<TermCodeSystemVersion> codeSystemVersionsToDelete = 164 myCodeSystemVersionDao.findByCodeSystemResourcePid(theCodeSystemToDelete.getResourceId()); 165 for (TermCodeSystemVersion codeSystemVersionToDelete : codeSystemVersionsToDelete) { 166 if (codeSystemVersionToDelete != null) { 167 myDeferredCodeSystemVersionsDeletions.add(codeSystemVersionToDelete); 168 } 169 } 170 } 171 172 @Override 173 public void setProcessDeferred(boolean theProcessDeferred) { 174 myProcessDeferred = theProcessDeferred; 175 } 176 177 private void processDeferredConceptMaps() { 178 int count = Math.min(myDeferredConceptMaps.size(), 20); 179 for (ConceptMap nextConceptMap : new ArrayList<>(myDeferredConceptMaps.subList(0, count))) { 180 ourLog.info("Creating ConceptMap: {}", nextConceptMap.getId()); 181 myTerminologyVersionAdapterSvc.createOrUpdateConceptMap(nextConceptMap); 182 myDeferredConceptMaps.remove(nextConceptMap); 183 } 184 ourLog.info("Saved {} deferred ConceptMap resources, have {} remaining", count, myDeferredConceptMaps.size()); 185 } 186 187 private void processDeferredConcepts() { 188 int codeCount = 0, relCount = 0; 189 StopWatch stopwatch = new StopWatch(); 190 191 int count = Math.min(1000, myDeferredConcepts.size()); 192 ourLog.debug("Saving {} deferred concepts...", count); 193 while (codeCount < count && myDeferredConcepts.size() > 0) { 194 TermConcept next = myDeferredConcepts.remove(0); 195 if (myCodeSystemVersionDao 196 .findById(next.getCodeSystemVersion().getPid()) 197 .isPresent()) { 198 try { 199 codeCount += myTermConceptDaoSvc.saveConcept(next); 200 } catch (Exception theE) { 201 ourLog.error( 202 "Exception thrown when attempting to save TermConcept {} in Code System {}", 203 next.getCode(), 204 next.getCodeSystemVersion().getCodeSystemDisplayName(), 205 theE); 206 } 207 } else { 208 ourLog.warn( 209 "Unable to save deferred TermConcept {} because Code System {} version PID {} is no longer valid. Code system may have since been replaced.", 210 next.getCode(), 211 next.getCodeSystemVersion().getCodeSystemDisplayName(), 212 next.getCodeSystemVersion().getPid()); 213 } 214 } 215 216 if (codeCount > 0) { 217 ourLog.info( 218 "Saved {} deferred concepts ({} codes remain and {} relationships remain) in {}ms ({} codes/sec)", 219 codeCount, 220 myDeferredConcepts.size(), 221 myConceptLinksToSaveLater.size(), 222 stopwatch.getMillis(), 223 stopwatch.formatThroughput(codeCount, TimeUnit.SECONDS)); 224 } 225 226 if (codeCount == 0) { 227 count = Math.min(1000, myConceptLinksToSaveLater.size()); 228 ourLog.info("Saving {} deferred concept relationships...", count); 229 while (relCount < count && myConceptLinksToSaveLater.size() > 0) { 230 TermConceptParentChildLink next = myConceptLinksToSaveLater.remove(0); 231 assert next.getChild() != null; 232 assert next.getParent() != null; 233 234 if ((next.getChild().getId() == null 235 || !myConceptDao 236 .findById(next.getChild().getId()) 237 .isPresent()) 238 || (next.getParent().getId() == null 239 || !myConceptDao 240 .findById(next.getParent().getId()) 241 .isPresent())) { 242 ourLog.warn( 243 "Not inserting link from child {} to parent {} because it appears to have been deleted", 244 next.getParent().getCode(), 245 next.getChild().getCode()); 246 continue; 247 } 248 249 saveConceptLink(next); 250 relCount++; 251 } 252 } 253 254 if (relCount > 0) { 255 ourLog.info( 256 "Saved {} deferred relationships ({} remain) in {}ms ({} entries/sec)", 257 relCount, 258 myConceptLinksToSaveLater.size(), 259 stopwatch.getMillis(), 260 stopwatch.formatThroughput(relCount, TimeUnit.SECONDS)); 261 } 262 263 if ((myDeferredConcepts.size() + myConceptLinksToSaveLater.size()) == 0) { 264 ourLog.info("All deferred concepts and relationships have now been synchronized to the database"); 265 } 266 } 267 268 private void processDeferredValueSets() { 269 int count = Math.min(myDeferredValueSets.size(), 200); 270 for (ValueSet nextValueSet : new ArrayList<>(myDeferredValueSets.subList(0, count))) { 271 ourLog.info("Creating ValueSet: {}", nextValueSet.getId()); 272 myTerminologyVersionAdapterSvc.createOrUpdateValueSet(nextValueSet); 273 myDeferredValueSets.remove(nextValueSet); 274 } 275 ourLog.info("Saved {} deferred ValueSet resources, have {} remaining", count, myDeferredValueSets.size()); 276 } 277 278 /** 279 * This method is present only for unit tests, do not call from client code 280 */ 281 @VisibleForTesting 282 public synchronized void clearDeferred() { 283 myProcessDeferred = true; 284 myDeferredValueSets.clear(); 285 myDeferredConceptMaps.clear(); 286 myDeferredConcepts.clear(); 287 myDeferredCodeSystemsDeletions.clear(); 288 myConceptLinksToSaveLater.clear(); 289 myDeferredCodeSystemVersionsDeletions.clear(); 290 clearJobExecutions(); 291 } 292 293 private void clearJobExecutions() { 294 for (String id : new ArrayList<>(myJobExecutions)) { 295 myJobCoordinator.cancelInstance(id); 296 } 297 myJobExecutions.clear(); 298 } 299 300 @Override 301 public void notifyJobEnded(String theId) { 302 myJobExecutions.remove(theId); 303 } 304 305 private <T> T runInTransaction(Supplier<T> theRunnable) { 306 assert !TransactionSynchronizationManager.isActualTransactionActive(); 307 308 return new TransactionTemplate(myTransactionMgr).execute(tx -> theRunnable.get()); 309 } 310 311 @Override 312 public void saveAllDeferred() { 313 TimeoutManager timeoutManager = null; 314 if (myAllowDeferredTasksTimeout) { 315 timeoutManager = new TimeoutManager( 316 TermDeferredStorageSvcImpl.class.getName() + ".saveAllDeferred()", 317 Duration.of(SAVE_ALL_DEFERRED_WARN_MINUTES, ChronoUnit.MINUTES), 318 Duration.of(SAVE_ALL_DEFERRED_ERROR_MINUTES, ChronoUnit.MINUTES)); 319 } 320 321 // Don't include executing jobs here since there's no point in thrashing over and over 322 // in a busy wait while we wait for batch2 job processes to finish 323 while (!isStorageQueueEmpty(false)) { 324 if (myAllowDeferredTasksTimeout) { 325 if (timeoutManager.checkTimeout()) { 326 ourLog.info(toString()); 327 } 328 } 329 saveDeferred(); 330 } 331 } 332 333 @Transactional(propagation = Propagation.NEVER) 334 @Override 335 public synchronized void saveDeferred() { 336 if (isProcessDeferredPaused()) { 337 return; 338 } 339 340 for (int i = 0; i < 10; i++) { 341 if (!isDeferredConcepts() 342 && !isConceptLinksToSaveLater() 343 && !isDeferredValueSets() 344 && !isDeferredConceptMaps() 345 && !isDeferredCodeSystemDeletions()) { 346 return; 347 } 348 349 if (isDeferredConceptsOrConceptLinksToSaveLater()) { 350 runInTransaction(() -> { 351 processDeferredConcepts(); 352 return null; 353 }); 354 355 continue; 356 } 357 358 if (isDeferredValueSets()) { 359 runInTransaction(() -> { 360 processDeferredValueSets(); 361 return null; 362 }); 363 364 continue; 365 } 366 367 if (isDeferredConceptMaps()) { 368 runInTransaction(() -> { 369 processDeferredConceptMaps(); 370 return null; 371 }); 372 373 continue; 374 } 375 376 if (isDeferredCodeSystemVersionDeletions()) { 377 processDeferredCodeSystemVersionDeletions(); 378 } 379 380 if (isDeferredCodeSystemDeletions()) { 381 processDeferredCodeSystemDeletions(); 382 } 383 } 384 } 385 386 private boolean isDeferredCodeSystemVersionDeletions() { 387 return !myDeferredCodeSystemVersionsDeletions.isEmpty(); 388 } 389 390 private void processDeferredCodeSystemDeletions() { 391 for (TermCodeSystem next : myDeferredCodeSystemsDeletions) { 392 deleteTermCodeSystemOffline(next.getPid()); 393 } 394 myDeferredCodeSystemsDeletions.clear(); 395 } 396 397 private void processDeferredCodeSystemVersionDeletions() { 398 for (TermCodeSystemVersion next : myDeferredCodeSystemVersionsDeletions) { 399 deleteTermCodeSystemVersionOffline(next.getPid()); 400 } 401 myDeferredCodeSystemVersionsDeletions.clear(); 402 } 403 404 private void deleteTermCodeSystemVersionOffline(Long theCodeSystemVersionPid) { 405 JobInstanceStartRequest request = new JobInstanceStartRequest(); 406 request.setJobDefinitionId(TERM_CODE_SYSTEM_VERSION_DELETE_JOB_NAME); 407 408 TermCodeSystemDeleteVersionJobParameters parameters = new TermCodeSystemDeleteVersionJobParameters(); 409 parameters.setCodeSystemVersionPid(theCodeSystemVersionPid); 410 request.setParameters(parameters); 411 412 Batch2JobStartResponse response = myJobCoordinator.startInstance(new SystemRequestDetails(), request); 413 myJobExecutions.add(response.getInstanceId()); 414 } 415 416 private void deleteTermCodeSystemOffline(Long theCodeSystemPid) { 417 TermCodeSystemDeleteJobParameters parameters = new TermCodeSystemDeleteJobParameters(); 418 parameters.setTermPid(theCodeSystemPid); 419 JobInstanceStartRequest request = new JobInstanceStartRequest(); 420 request.setParameters(parameters); 421 request.setJobDefinitionId(TERM_CODE_SYSTEM_DELETE_JOB_NAME); 422 Batch2JobStartResponse response = myJobCoordinator.startInstance(new SystemRequestDetails(), request); 423 myJobExecutions.add(response.getInstanceId()); 424 } 425 426 @Override 427 public boolean isStorageQueueEmpty(boolean theIncludeExecutingJobs) { 428 boolean retVal = !isProcessDeferredPaused(); 429 retVal &= !isDeferredConcepts(); 430 retVal &= !isConceptLinksToSaveLater(); 431 retVal &= !isDeferredValueSets(); 432 retVal &= !isDeferredConceptMaps(); 433 retVal &= !isDeferredCodeSystemDeletions(); 434 if (theIncludeExecutingJobs) { 435 retVal &= !isJobsExecuting(); 436 } 437 return retVal; 438 } 439 440 public boolean isJobsExecuting() { 441 cleanseEndedJobs(); 442 443 return !myJobExecutions.isEmpty(); 444 } 445 446 private void cleanseEndedJobs() { 447 /* 448 * Cleanse the list of completed jobs. 449 * This is mostly a fail-safe 450 * because "cancelled" jobs are never removed. 451 */ 452 List<String> idsToDelete = new ArrayList<>(); 453 for (String jobId : BATCH_JOBS_TO_CARE_ABOUT) { 454 List<JobInstance> jobInstanceInEndedState = myJobCoordinator.getInstancesbyJobDefinitionIdAndEndedStatus( 455 jobId, 456 true, // ended = true (COMPLETED, FAILED, CANCELLED jobs only) 457 Math.max(myJobExecutions.size(), 1), // at most this many 458 0); 459 for (JobInstance instance : jobInstanceInEndedState) { 460 idsToDelete.add(instance.getInstanceId()); 461 } 462 } 463 464 for (String id : idsToDelete) { 465 myJobExecutions.remove(id); 466 } 467 } 468 469 private void saveConceptLink(TermConceptParentChildLink next) { 470 if (next.getId() == null) { 471 myConceptParentChildLinkDao.save(next); 472 } 473 } 474 475 private boolean isProcessDeferredPaused() { 476 return !myProcessDeferred; 477 } 478 479 private boolean isDeferredConceptsOrConceptLinksToSaveLater() { 480 return isDeferredConcepts() || isConceptLinksToSaveLater(); 481 } 482 483 private boolean isDeferredCodeSystemDeletions() { 484 return !myDeferredCodeSystemsDeletions.isEmpty() || !myDeferredCodeSystemVersionsDeletions.isEmpty(); 485 } 486 487 private boolean isDeferredConcepts() { 488 return !myDeferredConcepts.isEmpty(); 489 } 490 491 private boolean isConceptLinksToSaveLater() { 492 return !myConceptLinksToSaveLater.isEmpty(); 493 } 494 495 private boolean isDeferredValueSets() { 496 return !myDeferredValueSets.isEmpty(); 497 } 498 499 private boolean isDeferredConceptMaps() { 500 return !myDeferredConceptMaps.isEmpty(); 501 } 502 503 @VisibleForTesting 504 void setTransactionManagerForUnitTest(PlatformTransactionManager theTxManager) { 505 myTransactionMgr = theTxManager; 506 } 507 508 @VisibleForTesting 509 void setTermConceptDaoSvc(TermConceptDaoSvc theTermConceptDaoSvc) { 510 myTermConceptDaoSvc = theTermConceptDaoSvc; 511 } 512 513 @VisibleForTesting 514 void setConceptDaoForUnitTest(ITermConceptDao theConceptDao) { 515 myConceptDao = theConceptDao; 516 } 517 518 @VisibleForTesting 519 void setCodeSystemVersionDaoForUnitTest(ITermCodeSystemVersionDao theCodeSystemVersionDao) { 520 myCodeSystemVersionDao = theCodeSystemVersionDao; 521 } 522 523 @Override 524 public void disallowDeferredTaskTimeout() { 525 myAllowDeferredTasksTimeout = false; 526 } 527 528 @Override 529 @VisibleForTesting 530 public void logQueueForUnitTest() { 531 ourLog.info("isProcessDeferredPaused: {}", isProcessDeferredPaused()); 532 ourLog.info("isDeferredConcepts: {}", isDeferredConcepts()); 533 ourLog.info("isConceptLinksToSaveLater: {}", isConceptLinksToSaveLater()); 534 ourLog.info("isDeferredValueSets: {}", isDeferredValueSets()); 535 ourLog.info("isDeferredConceptMaps: {}", isDeferredConceptMaps()); 536 ourLog.info("isDeferredCodeSystemDeletions: {}", isDeferredCodeSystemDeletions()); 537 } 538 539 @Override 540 public void deleteCodeSystemVersion(TermCodeSystemVersion theCodeSystemVersion) { 541 myDeferredCodeSystemVersionsDeletions.add(theCodeSystemVersion); 542 } 543 544 @Override 545 public void scheduleJobs(ISchedulerService theSchedulerService) { 546 // TODO KHS what does this mean? 547 // Register scheduled job to save deferred concepts 548 // In the future it would be great to make this a cluster-aware task somehow 549 ScheduledJobDefinition jobDefinition = new ScheduledJobDefinition(); 550 jobDefinition.setId(Job.class.getName()); 551 jobDefinition.setJobClass(Job.class); 552 theSchedulerService.scheduleLocalJob(5000, jobDefinition); 553 } 554 555 public static class Job implements HapiJob { 556 @Autowired 557 private ITermDeferredStorageSvc myTerminologySvc; 558 559 @Override 560 public void execute(JobExecutionContext theContext) { 561 myTerminologySvc.saveDeferred(); 562 } 563 } 564 565 @Override 566 public String toString() { 567 return new ToStringBuilder(this) 568 .append("myDeferredCodeSystemsDeletions", myDeferredCodeSystemsDeletions.size()) 569 .append("myDeferredCodeSystemVersionsDeletions", myDeferredCodeSystemVersionsDeletions.size()) 570 .append("myDeferredConcepts", myDeferredConcepts.size()) 571 .append("myDeferredValueSets", myDeferredValueSets.size()) 572 .append("myDeferredConceptMaps", myDeferredConceptMaps.size()) 573 .append("myConceptLinksToSaveLater", myConceptLinksToSaveLater.size()) 574 .append("myJobExecutions", myJobExecutions.size()) 575 .append("myProcessDeferred", myProcessDeferred) 576 .toString(); 577 } 578}