001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.term; 021 022import ca.uhn.fhir.batch2.api.IJobCoordinator; 023import ca.uhn.fhir.batch2.model.JobInstance; 024import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; 025import ca.uhn.fhir.batch2.model.StatusEnum; 026import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse; 027import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemDao; 028import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemVersionDao; 029import ca.uhn.fhir.jpa.dao.data.ITermConceptDao; 030import ca.uhn.fhir.jpa.dao.data.ITermConceptParentChildLinkDao; 031import ca.uhn.fhir.jpa.entity.TermCodeSystem; 032import ca.uhn.fhir.jpa.entity.TermCodeSystemVersion; 033import ca.uhn.fhir.jpa.entity.TermConcept; 034import ca.uhn.fhir.jpa.entity.TermConceptParentChildLink; 035import ca.uhn.fhir.jpa.model.entity.ResourceTable; 036import ca.uhn.fhir.jpa.model.sched.HapiJob; 037import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs; 038import ca.uhn.fhir.jpa.model.sched.ISchedulerService; 039import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; 040import ca.uhn.fhir.jpa.term.api.ITermDeferredStorageSvc; 041import ca.uhn.fhir.jpa.term.api.ITermVersionAdapterSvc; 042import ca.uhn.fhir.jpa.term.models.TermCodeSystemDeleteJobParameters; 043import ca.uhn.fhir.jpa.term.models.TermCodeSystemDeleteVersionJobParameters; 044import ca.uhn.fhir.rest.api.server.SystemRequestDetails; 045import ca.uhn.fhir.util.StopWatch; 046import ca.uhn.fhir.util.TimeoutManager; 047import com.google.common.annotations.VisibleForTesting; 048import org.apache.commons.lang3.Validate; 049import org.apache.commons.lang3.builder.ToStringBuilder; 050import org.hl7.fhir.r4.model.ConceptMap; 051import org.hl7.fhir.r4.model.ValueSet; 052import org.quartz.JobExecutionContext; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055import org.springframework.beans.factory.annotation.Autowired; 056import org.springframework.transaction.PlatformTransactionManager; 057import org.springframework.transaction.annotation.Propagation; 058import org.springframework.transaction.annotation.Transactional; 059import org.springframework.transaction.support.TransactionSynchronizationManager; 060import org.springframework.transaction.support.TransactionTemplate; 061 062import java.time.Duration; 063import java.time.temporal.ChronoUnit; 064import java.util.ArrayList; 065import java.util.Collections; 066import java.util.List; 067import java.util.Queue; 068import java.util.UUID; 069import java.util.concurrent.ConcurrentLinkedQueue; 070import java.util.concurrent.TimeUnit; 071import java.util.function.Supplier; 072 073import static ca.uhn.fhir.batch2.jobs.termcodesystem.TermCodeSystemJobConfig.TERM_CODE_SYSTEM_DELETE_JOB_NAME; 074import static ca.uhn.fhir.batch2.jobs.termcodesystem.TermCodeSystemJobConfig.TERM_CODE_SYSTEM_VERSION_DELETE_JOB_NAME; 075 076public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc, IHasScheduledJobs { 077 078 private static final Logger ourLog = LoggerFactory.getLogger(TermDeferredStorageSvcImpl.class); 079 private static final long SAVE_ALL_DEFERRED_WARN_MINUTES = 1; 080 private static final long SAVE_ALL_DEFERRED_ERROR_MINUTES = 5; 081 private boolean myAllowDeferredTasksTimeout = true; 082 private final List<TermCodeSystem> myDeferredCodeSystemsDeletions = Collections.synchronizedList(new ArrayList<>()); 083 private final Queue<TermCodeSystemVersion> myDeferredCodeSystemVersionsDeletions = new ConcurrentLinkedQueue<>(); 084 private final List<TermConcept> myDeferredConcepts = Collections.synchronizedList(new ArrayList<>()); 085 private final List<ValueSet> myDeferredValueSets = Collections.synchronizedList(new ArrayList<>()); 086 private final List<ConceptMap> myDeferredConceptMaps = Collections.synchronizedList(new ArrayList<>()); 087 private final List<TermConceptParentChildLink> myConceptLinksToSaveLater = 088 Collections.synchronizedList(new ArrayList<>()); 089 090 // TODO - why is this needed? it's cumbersome to maintain; consider removing it 091 /** 092 * A list of job ids for CodeSydstemDelete and CodeSystemVersionDelete jobs that 093 * have been scheduled (but not completed) 094 */ 095 private final List<String> myJobExecutions = Collections.synchronizedList(new ArrayList<>()); 096 097 @Autowired 098 protected ITermConceptDao myConceptDao; 099 100 @Autowired 101 protected ITermCodeSystemDao myCodeSystemDao; 102 103 @Autowired 104 protected ITermCodeSystemVersionDao myCodeSystemVersionDao; 105 106 @Autowired 107 protected PlatformTransactionManager myTransactionMgr; 108 109 private boolean myProcessDeferred = true; 110 111 @Autowired 112 private ITermConceptParentChildLinkDao myConceptParentChildLinkDao; 113 114 @Autowired 115 private ITermVersionAdapterSvc myTerminologyVersionAdapterSvc; 116 117 @Autowired 118 private TermConceptDaoSvc myTermConceptDaoSvc; 119 120 @Autowired 121 private IJobCoordinator myJobCoordinator; 122 123 @Override 124 public void addConceptToStorageQueue(TermConcept theConcept) { 125 Validate.notNull(theConcept); 126 myDeferredConcepts.add(theConcept); 127 } 128 129 @Override 130 public void addConceptLinkToStorageQueue(TermConceptParentChildLink theConceptLink) { 131 Validate.notNull(theConceptLink); 132 myConceptLinksToSaveLater.add(theConceptLink); 133 } 134 135 @Override 136 public void addConceptMapsToStorageQueue(List<ConceptMap> theConceptMaps) { 137 Validate.notNull(theConceptMaps); 138 myDeferredConceptMaps.addAll(theConceptMaps); 139 } 140 141 @Override 142 public void addValueSetsToStorageQueue(List<ValueSet> theValueSets) { 143 Validate.notNull(theValueSets); 144 myDeferredValueSets.addAll(theValueSets); 145 } 146 147 @Override 148 public void deleteCodeSystemForResource(ResourceTable theCodeSystemToDelete) { 149 // there are use cases (at least in tests) where the code system is not present for the resource but versions 150 // are, 151 // so, as code system deletion also deletes versions, we try the system first but if not present we also try 152 // versions 153 TermCodeSystem termCodeSystemToDelete = 154 myCodeSystemDao.findByResourcePid(theCodeSystemToDelete.getResourceId()); 155 if (termCodeSystemToDelete != null) { 156 termCodeSystemToDelete.setCodeSystemUri("urn:uuid:" + UUID.randomUUID()); 157 myCodeSystemDao.save(termCodeSystemToDelete); 158 myDeferredCodeSystemsDeletions.add(termCodeSystemToDelete); 159 return; 160 } 161 162 List<TermCodeSystemVersion> codeSystemVersionsToDelete = 163 myCodeSystemVersionDao.findByCodeSystemResourcePid(theCodeSystemToDelete.getResourceId()); 164 for (TermCodeSystemVersion codeSystemVersionToDelete : codeSystemVersionsToDelete) { 165 if (codeSystemVersionToDelete != null) { 166 myDeferredCodeSystemVersionsDeletions.add(codeSystemVersionToDelete); 167 } 168 } 169 } 170 171 @Override 172 public void setProcessDeferred(boolean theProcessDeferred) { 173 myProcessDeferred = theProcessDeferred; 174 } 175 176 private void processDeferredConceptMaps() { 177 int count = Math.min(myDeferredConceptMaps.size(), 20); 178 for (ConceptMap nextConceptMap : new ArrayList<>(myDeferredConceptMaps.subList(0, count))) { 179 ourLog.info("Creating ConceptMap: {}", nextConceptMap.getId()); 180 myTerminologyVersionAdapterSvc.createOrUpdateConceptMap(nextConceptMap); 181 myDeferredConceptMaps.remove(nextConceptMap); 182 } 183 ourLog.info("Saved {} deferred ConceptMap resources, have {} remaining", count, myDeferredConceptMaps.size()); 184 } 185 186 private void processDeferredConcepts() { 187 int codeCount = 0, relCount = 0; 188 StopWatch stopwatch = new StopWatch(); 189 190 int count = Math.min(1000, myDeferredConcepts.size()); 191 ourLog.debug("Saving {} deferred concepts...", count); 192 while (codeCount < count && myDeferredConcepts.size() > 0) { 193 TermConcept next = myDeferredConcepts.remove(0); 194 if (myCodeSystemVersionDao 195 .findById(next.getCodeSystemVersion().getPid()) 196 .isPresent()) { 197 try { 198 codeCount += myTermConceptDaoSvc.saveConcept(next); 199 } catch (Exception theE) { 200 ourLog.error( 201 "Exception thrown when attempting to save TermConcept {} in Code System {}", 202 next.getCode(), 203 next.getCodeSystemVersion().getCodeSystemDisplayName(), 204 theE); 205 } 206 } else { 207 ourLog.warn( 208 "Unable to save deferred TermConcept {} because Code System {} version PID {} is no longer valid. Code system may have since been replaced.", 209 next.getCode(), 210 next.getCodeSystemVersion().getCodeSystemDisplayName(), 211 next.getCodeSystemVersion().getPid()); 212 } 213 } 214 215 if (codeCount > 0) { 216 ourLog.info( 217 "Saved {} deferred concepts ({} codes remain and {} relationships remain) in {}ms ({} codes/sec)", 218 codeCount, 219 myDeferredConcepts.size(), 220 myConceptLinksToSaveLater.size(), 221 stopwatch.getMillis(), 222 stopwatch.formatThroughput(codeCount, TimeUnit.SECONDS)); 223 } 224 225 if (codeCount == 0) { 226 count = Math.min(1000, myConceptLinksToSaveLater.size()); 227 ourLog.info("Saving {} deferred concept relationships...", count); 228 while (relCount < count && myConceptLinksToSaveLater.size() > 0) { 229 TermConceptParentChildLink next = myConceptLinksToSaveLater.remove(0); 230 assert next.getChild() != null; 231 assert next.getParent() != null; 232 233 if ((next.getChild().getId() == null 234 || !myConceptDao 235 .findById(next.getChild().getId()) 236 .isPresent()) 237 || (next.getParent().getId() == null 238 || !myConceptDao 239 .findById(next.getParent().getId()) 240 .isPresent())) { 241 ourLog.warn( 242 "Not inserting link from child {} to parent {} because it appears to have been deleted", 243 next.getParent().getCode(), 244 next.getChild().getCode()); 245 continue; 246 } 247 248 saveConceptLink(next); 249 relCount++; 250 } 251 } 252 253 if (relCount > 0) { 254 ourLog.info( 255 "Saved {} deferred relationships ({} remain) in {}ms ({} entries/sec)", 256 relCount, 257 myConceptLinksToSaveLater.size(), 258 stopwatch.getMillis(), 259 stopwatch.formatThroughput(relCount, TimeUnit.SECONDS)); 260 } 261 262 if ((myDeferredConcepts.size() + myConceptLinksToSaveLater.size()) == 0) { 263 ourLog.info("All deferred concepts and relationships have now been synchronized to the database"); 264 } 265 } 266 267 private void processDeferredValueSets() { 268 int count = Math.min(myDeferredValueSets.size(), 200); 269 for (ValueSet nextValueSet : new ArrayList<>(myDeferredValueSets.subList(0, count))) { 270 ourLog.info("Creating ValueSet: {}", nextValueSet.getId()); 271 myTerminologyVersionAdapterSvc.createOrUpdateValueSet(nextValueSet); 272 myDeferredValueSets.remove(nextValueSet); 273 } 274 ourLog.info("Saved {} deferred ValueSet resources, have {} remaining", count, myDeferredValueSets.size()); 275 } 276 277 /** 278 * This method is present only for unit tests, do not call from client code 279 */ 280 @VisibleForTesting 281 public synchronized void clearDeferred() { 282 myProcessDeferred = true; 283 myDeferredValueSets.clear(); 284 myDeferredConceptMaps.clear(); 285 myDeferredConcepts.clear(); 286 myDeferredCodeSystemsDeletions.clear(); 287 myConceptLinksToSaveLater.clear(); 288 myDeferredCodeSystemVersionsDeletions.clear(); 289 clearJobExecutions(); 290 } 291 292 private void clearJobExecutions() { 293 for (String id : new ArrayList<>(myJobExecutions)) { 294 myJobCoordinator.cancelInstance(id); 295 } 296 myJobExecutions.clear(); 297 } 298 299 @Override 300 public void notifyJobEnded(String theId) { 301 myJobExecutions.remove(theId); 302 } 303 304 private <T> T runInTransaction(Supplier<T> theRunnable) { 305 assert !TransactionSynchronizationManager.isActualTransactionActive(); 306 307 return new TransactionTemplate(myTransactionMgr).execute(tx -> theRunnable.get()); 308 } 309 310 @Override 311 public void saveAllDeferred() { 312 TimeoutManager timeoutManager = null; 313 if (myAllowDeferredTasksTimeout) { 314 timeoutManager = new TimeoutManager( 315 TermDeferredStorageSvcImpl.class.getName() + ".saveAllDeferred()", 316 Duration.of(SAVE_ALL_DEFERRED_WARN_MINUTES, ChronoUnit.MINUTES), 317 Duration.of(SAVE_ALL_DEFERRED_ERROR_MINUTES, ChronoUnit.MINUTES)); 318 } 319 320 // Don't include executing jobs here since there's no point in thrashing over and over 321 // in a busy wait while we wait for batch2 job processes to finish 322 while (!isStorageQueueEmpty(false)) { 323 if (myAllowDeferredTasksTimeout) { 324 if (timeoutManager.checkTimeout()) { 325 ourLog.info(toString()); 326 } 327 } 328 saveDeferred(); 329 } 330 } 331 332 @Transactional(propagation = Propagation.NEVER) 333 @Override 334 public synchronized void saveDeferred() { 335 if (isProcessDeferredPaused()) { 336 return; 337 } 338 339 for (int i = 0; i < 10; i++) { 340 if (!isDeferredConcepts() 341 && !isConceptLinksToSaveLater() 342 && !isDeferredValueSets() 343 && !isDeferredConceptMaps() 344 && !isDeferredCodeSystemDeletions()) { 345 return; 346 } 347 348 if (isDeferredConceptsOrConceptLinksToSaveLater()) { 349 runInTransaction(() -> { 350 processDeferredConcepts(); 351 return null; 352 }); 353 354 continue; 355 } 356 357 if (isDeferredValueSets()) { 358 runInTransaction(() -> { 359 processDeferredValueSets(); 360 return null; 361 }); 362 363 continue; 364 } 365 366 if (isDeferredConceptMaps()) { 367 runInTransaction(() -> { 368 processDeferredConceptMaps(); 369 return null; 370 }); 371 372 continue; 373 } 374 375 if (isDeferredCodeSystemVersionDeletions()) { 376 processDeferredCodeSystemVersionDeletions(); 377 } 378 379 if (isDeferredCodeSystemDeletions()) { 380 processDeferredCodeSystemDeletions(); 381 } 382 } 383 } 384 385 private boolean isDeferredCodeSystemVersionDeletions() { 386 return !myDeferredCodeSystemVersionsDeletions.isEmpty(); 387 } 388 389 private void processDeferredCodeSystemDeletions() { 390 for (TermCodeSystem next : myDeferredCodeSystemsDeletions) { 391 deleteTermCodeSystemOffline(next.getPid()); 392 } 393 myDeferredCodeSystemsDeletions.clear(); 394 } 395 396 private void processDeferredCodeSystemVersionDeletions() { 397 for (TermCodeSystemVersion next : myDeferredCodeSystemVersionsDeletions) { 398 deleteTermCodeSystemVersionOffline(next.getPid()); 399 } 400 myDeferredCodeSystemVersionsDeletions.clear(); 401 } 402 403 private void deleteTermCodeSystemVersionOffline(Long theCodeSystemVersionPid) { 404 JobInstanceStartRequest request = new JobInstanceStartRequest(); 405 request.setJobDefinitionId(TERM_CODE_SYSTEM_VERSION_DELETE_JOB_NAME); 406 407 TermCodeSystemDeleteVersionJobParameters parameters = new TermCodeSystemDeleteVersionJobParameters(); 408 parameters.setCodeSystemVersionPid(theCodeSystemVersionPid); 409 request.setParameters(parameters); 410 411 Batch2JobStartResponse response = myJobCoordinator.startInstance(new SystemRequestDetails(), request); 412 myJobExecutions.add(response.getInstanceId()); 413 } 414 415 private void deleteTermCodeSystemOffline(Long theCodeSystemPid) { 416 TermCodeSystemDeleteJobParameters parameters = new TermCodeSystemDeleteJobParameters(); 417 parameters.setTermPid(theCodeSystemPid); 418 JobInstanceStartRequest request = new JobInstanceStartRequest(); 419 request.setParameters(parameters); 420 request.setJobDefinitionId(TERM_CODE_SYSTEM_DELETE_JOB_NAME); 421 Batch2JobStartResponse response = myJobCoordinator.startInstance(new SystemRequestDetails(), request); 422 myJobExecutions.add(response.getInstanceId()); 423 } 424 425 @Override 426 public boolean isStorageQueueEmpty(boolean theIncludeExecutingJobs) { 427 boolean retVal = !isProcessDeferredPaused(); 428 retVal &= !isDeferredConcepts(); 429 retVal &= !isConceptLinksToSaveLater(); 430 retVal &= !isDeferredValueSets(); 431 retVal &= !isDeferredConceptMaps(); 432 retVal &= !isDeferredCodeSystemDeletions(); 433 if (theIncludeExecutingJobs) { 434 retVal &= !isJobsExecuting(); 435 } 436 return retVal; 437 } 438 439 private boolean isJobsExecuting() { 440 cleanseEndedJobs(); 441 442 return !myJobExecutions.isEmpty(); 443 } 444 445 private void cleanseEndedJobs() { 446 /* 447 * Cleanse the list of completed jobs. 448 * This is mostly a fail-safe 449 * because "cancelled" jobs are never removed. 450 */ 451 List<String> executions = new ArrayList<>(myJobExecutions); 452 List<String> idsToDelete = new ArrayList<>(); 453 for (String id : executions) { 454 // TODO - might want to consider a "fetch all instances" 455 JobInstance instance = myJobCoordinator.getInstance(id); 456 if (StatusEnum.getEndedStatuses().contains(instance.getStatus())) { 457 idsToDelete.add(instance.getInstanceId()); 458 } 459 } 460 for (String id : idsToDelete) { 461 myJobExecutions.remove(id); 462 } 463 } 464 465 private void saveConceptLink(TermConceptParentChildLink next) { 466 if (next.getId() == null) { 467 myConceptParentChildLinkDao.save(next); 468 } 469 } 470 471 private boolean isProcessDeferredPaused() { 472 return !myProcessDeferred; 473 } 474 475 private boolean isDeferredConceptsOrConceptLinksToSaveLater() { 476 return isDeferredConcepts() || isConceptLinksToSaveLater(); 477 } 478 479 private boolean isDeferredCodeSystemDeletions() { 480 return !myDeferredCodeSystemsDeletions.isEmpty() || !myDeferredCodeSystemVersionsDeletions.isEmpty(); 481 } 482 483 private boolean isDeferredConcepts() { 484 return !myDeferredConcepts.isEmpty(); 485 } 486 487 private boolean isConceptLinksToSaveLater() { 488 return !myConceptLinksToSaveLater.isEmpty(); 489 } 490 491 private boolean isDeferredValueSets() { 492 return !myDeferredValueSets.isEmpty(); 493 } 494 495 private boolean isDeferredConceptMaps() { 496 return !myDeferredConceptMaps.isEmpty(); 497 } 498 499 @VisibleForTesting 500 void setTransactionManagerForUnitTest(PlatformTransactionManager theTxManager) { 501 myTransactionMgr = theTxManager; 502 } 503 504 @VisibleForTesting 505 void setTermConceptDaoSvc(TermConceptDaoSvc theTermConceptDaoSvc) { 506 myTermConceptDaoSvc = theTermConceptDaoSvc; 507 } 508 509 @VisibleForTesting 510 void setConceptDaoForUnitTest(ITermConceptDao theConceptDao) { 511 myConceptDao = theConceptDao; 512 } 513 514 @VisibleForTesting 515 void setCodeSystemVersionDaoForUnitTest(ITermCodeSystemVersionDao theCodeSystemVersionDao) { 516 myCodeSystemVersionDao = theCodeSystemVersionDao; 517 } 518 519 @Override 520 public void disallowDeferredTaskTimeout() { 521 myAllowDeferredTasksTimeout = false; 522 } 523 524 @Override 525 @VisibleForTesting 526 public void logQueueForUnitTest() { 527 ourLog.info("isProcessDeferredPaused: {}", isProcessDeferredPaused()); 528 ourLog.info("isDeferredConcepts: {}", isDeferredConcepts()); 529 ourLog.info("isConceptLinksToSaveLater: {}", isConceptLinksToSaveLater()); 530 ourLog.info("isDeferredValueSets: {}", isDeferredValueSets()); 531 ourLog.info("isDeferredConceptMaps: {}", isDeferredConceptMaps()); 532 ourLog.info("isDeferredCodeSystemDeletions: {}", isDeferredCodeSystemDeletions()); 533 } 534 535 @Override 536 public void deleteCodeSystemVersion(TermCodeSystemVersion theCodeSystemVersion) { 537 myDeferredCodeSystemVersionsDeletions.add(theCodeSystemVersion); 538 } 539 540 @Override 541 public void scheduleJobs(ISchedulerService theSchedulerService) { 542 // TODO KHS what does this mean? 543 // Register scheduled job to save deferred concepts 544 // In the future it would be great to make this a cluster-aware task somehow 545 ScheduledJobDefinition jobDefinition = new ScheduledJobDefinition(); 546 jobDefinition.setId(Job.class.getName()); 547 jobDefinition.setJobClass(Job.class); 548 theSchedulerService.scheduleLocalJob(5000, jobDefinition); 549 } 550 551 public static class Job implements HapiJob { 552 @Autowired 553 private ITermDeferredStorageSvc myTerminologySvc; 554 555 @Override 556 public void execute(JobExecutionContext theContext) { 557 myTerminologySvc.saveDeferred(); 558 } 559 } 560 561 @Override 562 public String toString() { 563 return new ToStringBuilder(this) 564 .append("myDeferredCodeSystemsDeletions", myDeferredCodeSystemsDeletions.size()) 565 .append("myDeferredCodeSystemVersionsDeletions", myDeferredCodeSystemVersionsDeletions.size()) 566 .append("myDeferredConcepts", myDeferredConcepts.size()) 567 .append("myDeferredValueSets", myDeferredValueSets.size()) 568 .append("myDeferredConceptMaps", myDeferredConceptMaps.size()) 569 .append("myConceptLinksToSaveLater", myConceptLinksToSaveLater.size()) 570 .append("myJobExecutions", myJobExecutions.size()) 571 .append("myProcessDeferred", myProcessDeferred) 572 .toString(); 573 } 574}