
001/*- 002 * #%L 003 * HAPI FHIR Subscription Server 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.subscription.triggering; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.context.RuntimeResourceDefinition; 024import ca.uhn.fhir.i18n.Msg; 025import ca.uhn.fhir.interceptor.model.RequestPartitionId; 026import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 027import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 028import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc; 029import ca.uhn.fhir.jpa.api.svc.ISearchSvc; 030import ca.uhn.fhir.jpa.dao.ISearchBuilder; 031import ca.uhn.fhir.jpa.dao.SearchBuilderFactory; 032import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; 033import ca.uhn.fhir.jpa.model.config.SubscriptionSettings; 034import ca.uhn.fhir.jpa.model.sched.HapiJob; 035import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs; 036import ca.uhn.fhir.jpa.model.sched.ISchedulerService; 037import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; 038import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc; 039import ca.uhn.fhir.jpa.searchparam.MatchUrlService; 040import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 041import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer; 042import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer; 043import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; 044import ca.uhn.fhir.rest.api.CacheControlDirective; 045import ca.uhn.fhir.rest.api.server.IBundleProvider; 046import ca.uhn.fhir.rest.api.server.RequestDetails; 047import ca.uhn.fhir.rest.api.server.SystemRequestDetails; 048import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; 049import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 050import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; 051import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException; 052import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException; 053import ca.uhn.fhir.util.ParametersUtil; 054import ca.uhn.fhir.util.StopWatch; 055import ca.uhn.fhir.util.UrlUtil; 056import ca.uhn.fhir.util.ValidateUtil; 057import com.google.common.collect.Lists; 058import jakarta.annotation.Nullable; 059import jakarta.annotation.PostConstruct; 060import org.apache.commons.lang3.ObjectUtils; 061import org.apache.commons.lang3.Validate; 062import org.apache.commons.lang3.concurrent.BasicThreadFactory; 063import org.apache.commons.lang3.time.DateUtils; 064import org.hl7.fhir.dstu2.model.IdType; 065import org.hl7.fhir.instance.model.api.IBaseParameters; 066import org.hl7.fhir.instance.model.api.IBaseResource; 067import org.hl7.fhir.instance.model.api.IIdType; 068import org.hl7.fhir.instance.model.api.IPrimitiveType; 069import org.quartz.JobExecutionContext; 070import org.slf4j.Logger; 071import org.slf4j.LoggerFactory; 072import org.springframework.beans.factory.annotation.Autowired; 073 074import java.util.ArrayList; 075import java.util.Collections; 076import java.util.List; 077import java.util.UUID; 078import java.util.concurrent.ExecutorService; 079import java.util.concurrent.Future; 080import java.util.concurrent.LinkedBlockingQueue; 081import java.util.concurrent.RejectedExecutionException; 082import java.util.concurrent.RejectedExecutionHandler; 083import java.util.concurrent.ThreadPoolExecutor; 084import java.util.concurrent.TimeUnit; 085import java.util.concurrent.atomic.AtomicInteger; 086import java.util.stream.Collectors; 087 088import static ca.uhn.fhir.rest.server.provider.ProviderConstants.SUBSCRIPTION_TRIGGERING_PARAM_RESOURCE_ID; 089import static java.util.Objects.isNull; 090import static java.util.Objects.nonNull; 091import static org.apache.commons.collections4.CollectionUtils.isNotEmpty; 092import static org.apache.commons.lang3.ObjectUtils.defaultIfNull; 093import static org.apache.commons.lang3.StringUtils.isBlank; 094import static org.apache.commons.lang3.StringUtils.isNotBlank; 095 096public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc, IHasScheduledJobs { 097 private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTriggeringSvcImpl.class); 098 private static final int DEFAULT_MAX_SUBMIT = 10000; 099 private final List<SubscriptionTriggeringJobDetails> myActiveJobs = new ArrayList<>(); 100 101 @Autowired 102 private FhirContext myFhirContext; 103 104 @Autowired 105 private DaoRegistry myDaoRegistry; 106 107 @Autowired 108 private SubscriptionSettings mySubscriptionSettings; 109 110 @Autowired 111 private ISearchCoordinatorSvc<? extends IResourcePersistentId<?>> mySearchCoordinatorSvc; 112 113 @Autowired 114 private MatchUrlService myMatchUrlService; 115 116 @Autowired 117 private IResourceModifiedConsumer myResourceModifiedConsumer; 118 119 @Autowired 120 private HapiTransactionService myTransactionService; 121 122 private int myMaxSubmitPerPass = DEFAULT_MAX_SUBMIT; 123 private ExecutorService myExecutorService; 124 125 @Autowired 126 private ISearchSvc mySearchService; 127 128 @Autowired 129 IRequestPartitionHelperSvc myRequestPartitionHelperSvc; 130 131 @Autowired 132 private SearchBuilderFactory mySearchBuilderFactory; 133 134 @Autowired 135 private SubscriptionCanonicalizer mySubscriptionCanonicalizer; 136 137 @Override 138 public IBaseParameters triggerSubscription( 139 @Nullable List<IPrimitiveType<String>> theResourceIds, 140 @Nullable List<IPrimitiveType<String>> theSearchUrls, 141 @Nullable IIdType theSubscriptionId, 142 RequestDetails theRequestDetails) { 143 144 if (mySubscriptionSettings.getSupportedSubscriptionTypes().isEmpty()) { 145 throw new PreconditionFailedException(Msg.code(22) + "Subscription processing not active on this server"); 146 } 147 148 RequestPartitionId requestPartitionId; 149 150 // Throw a 404 if the subscription doesn't exist, otherwise determine its partition. 151 if (theSubscriptionId != null) { 152 IFhirResourceDao<?> subscriptionDao = myDaoRegistry.getSubscriptionDao(); 153 IBaseResource subscription = subscriptionDao.read(theSubscriptionId, theRequestDetails); 154 if (mySubscriptionCanonicalizer.canonicalize(subscription).isCrossPartitionEnabled()) { 155 requestPartitionId = RequestPartitionId.allPartitions(); 156 } else { 157 // Otherwise, trust the partition passed in via tenant/interceptor. 158 requestPartitionId = myRequestPartitionHelperSvc.determineGenericPartitionForRequest(theRequestDetails); 159 } 160 } else { 161 // If we have no specific subscription, allow standard partition selection 162 requestPartitionId = myRequestPartitionHelperSvc.determineGenericPartitionForRequest(theRequestDetails); 163 } 164 165 List<IPrimitiveType<String>> resourceIds = defaultIfNull(theResourceIds, Collections.emptyList()); 166 List<IPrimitiveType<String>> searchUrls = defaultIfNull(theSearchUrls, Collections.emptyList()); 167 // Make sure we have at least one resource ID or search URL 168 if (resourceIds.isEmpty() && searchUrls.isEmpty()) { 169 throw new InvalidRequestException(Msg.code(23) + "No resource IDs or search URLs specified for triggering"); 170 } 171 172 // Resource URLs must be compete 173 for (IPrimitiveType<String> next : resourceIds) { 174 IdType resourceId = new IdType(next.getValue()); 175 ValidateUtil.isTrueOrThrowInvalidRequest( 176 resourceId.hasResourceType(), 177 SUBSCRIPTION_TRIGGERING_PARAM_RESOURCE_ID + " parameter must have resource type"); 178 ValidateUtil.isTrueOrThrowInvalidRequest( 179 resourceId.hasIdPart(), 180 SUBSCRIPTION_TRIGGERING_PARAM_RESOURCE_ID + " parameter must have resource ID part"); 181 } 182 183 // Search URLs must be valid 184 for (IPrimitiveType<String> next : searchUrls) { 185 if (!next.getValue().contains("?")) { 186 throw new InvalidRequestException(Msg.code(24) 187 + "Search URL is not valid (must be in the form \"[resource type]?[optional params]\")"); 188 } 189 } 190 191 SubscriptionTriggeringJobDetails jobDetails = new SubscriptionTriggeringJobDetails(); 192 jobDetails.setJobId(UUID.randomUUID().toString()); 193 jobDetails.setRequestPartitionId( 194 requestPartitionId == null ? RequestPartitionId.allPartitions() : requestPartitionId); 195 jobDetails.setRemainingResourceIds( 196 resourceIds.stream().map(IPrimitiveType::getValue).collect(Collectors.toList())); 197 jobDetails.setRemainingSearchUrls( 198 searchUrls.stream().map(IPrimitiveType::getValue).collect(Collectors.toList())); 199 if (theSubscriptionId != null) { 200 jobDetails.setSubscriptionId(theSubscriptionId.getIdPart()); 201 } 202 203 // Submit job for processing 204 synchronized (myActiveJobs) { 205 myActiveJobs.add(jobDetails); 206 ourLog.info( 207 "Subscription triggering requested for {} resource and {} search - Gave job ID: {} and have {} jobs", 208 resourceIds.size(), 209 searchUrls.size(), 210 jobDetails.getJobId(), 211 myActiveJobs.size()); 212 } 213 214 // Create a parameters response 215 IBaseParameters retVal = ParametersUtil.newInstance(myFhirContext); 216 IPrimitiveType<?> value = 217 (IPrimitiveType<?>) myFhirContext.getElementDefinition("string").newInstance(); 218 value.setValueAsString("Subscription triggering job submitted as JOB ID: " + jobDetails.myJobId); 219 ParametersUtil.addParameterToParameters(myFhirContext, retVal, "information", value); 220 return retVal; 221 } 222 223 @Override 224 public IBaseParameters triggerSubscription( 225 @Nullable List<IPrimitiveType<String>> theResourceIds, 226 @Nullable List<IPrimitiveType<String>> theSearchUrls, 227 @Nullable IIdType theSubscriptionId) { 228 return triggerSubscription( 229 theResourceIds, theSearchUrls, theSubscriptionId, SystemRequestDetails.newSystemRequestAllPartitions()); 230 } 231 232 @Override 233 public void runDeliveryPass() { 234 235 synchronized (myActiveJobs) { 236 if (myActiveJobs.isEmpty()) { 237 return; 238 } 239 240 String activeJobIds = myActiveJobs.stream() 241 .map(SubscriptionTriggeringJobDetails::getJobId) 242 .collect(Collectors.joining(", ")); 243 ourLog.info("Starting pass: currently have {} active job IDs: {}", myActiveJobs.size(), activeJobIds); 244 245 SubscriptionTriggeringJobDetails activeJob = myActiveJobs.get(0); 246 247 runJob(activeJob); 248 249 // If the job is complete, remove it from the queue 250 if (activeJob.getRemainingResourceIds().isEmpty()) { 251 if (activeJob.getRemainingSearchUrls().isEmpty()) { 252 if (jobHasCompleted(activeJob)) { 253 myActiveJobs.remove(0); 254 String remainingJobsMsg = ""; 255 if (myActiveJobs.size() > 0) { 256 remainingJobsMsg = "(" + myActiveJobs.size() + " jobs remaining)"; 257 } 258 ourLog.info( 259 "Subscription triggering job {} is complete{}", activeJob.getJobId(), remainingJobsMsg); 260 } 261 } 262 } 263 } 264 } 265 266 private void runJob(SubscriptionTriggeringJobDetails theJobDetails) { 267 StopWatch sw = new StopWatch(); 268 ourLog.info("Starting pass of subscription triggering job {}", theJobDetails.getJobId()); 269 270 // Submit individual resources 271 AtomicInteger totalSubmitted = new AtomicInteger(0); 272 List<Future<?>> futures = new ArrayList<>(); 273 while (!theJobDetails.getRemainingResourceIds().isEmpty() && totalSubmitted.get() < myMaxSubmitPerPass) { 274 totalSubmitted.incrementAndGet(); 275 String nextResourceId = theJobDetails.getRemainingResourceIds().remove(0); 276 submitResource(theJobDetails.getSubscriptionId(), theJobDetails.getRequestPartitionId(), nextResourceId); 277 } 278 279 // Make sure these all succeeded in submitting 280 if (validateFuturesAndReturnTrueIfWeShouldAbort(futures)) { 281 return; 282 } 283 284 IBundleProvider search = null; 285 286 // This is the job initial step where we set ourselves up to do the actual re-submitting of resources 287 // to the broker. Note that querying of resource can be done synchronously or asynchronously 288 if (isInitialStep(theJobDetails) 289 && isNotEmpty(theJobDetails.getRemainingSearchUrls()) 290 && totalSubmitted.get() < myMaxSubmitPerPass) { 291 292 String nextSearchUrl = theJobDetails.getRemainingSearchUrls().remove(0); 293 RuntimeResourceDefinition resourceDef = UrlUtil.parseUrlResourceType(myFhirContext, nextSearchUrl); 294 String queryPart = nextSearchUrl.substring(nextSearchUrl.indexOf('?')); 295 SearchParameterMap params = myMatchUrlService.translateMatchUrl(queryPart, resourceDef); 296 297 String resourceType = resourceDef.getName(); 298 IFhirResourceDao<?> callingDao = myDaoRegistry.getResourceDao(resourceType); 299 300 ourLog.info("Triggering job[{}] is starting a search for {}", theJobDetails.getJobId(), nextSearchUrl); 301 302 SystemRequestDetails systemRequestDetails = new SystemRequestDetails(); 303 systemRequestDetails.setRequestPartitionId(theJobDetails.getRequestPartitionId()); 304 305 search = mySearchCoordinatorSvc.registerSearch( 306 callingDao, params, resourceType, new CacheControlDirective(), systemRequestDetails); 307 308 if (isNull(search.getUuid())) { 309 // we don't have a search uuid i.e. we're setting up for synchronous processing 310 theJobDetails.setCurrentSearchUrl(nextSearchUrl); 311 theJobDetails.setCurrentOffset(params.getOffset()); 312 } else { 313 // populate properties for asynchronous path 314 theJobDetails.setCurrentSearchUuid(search.getUuid()); 315 } 316 317 theJobDetails.setCurrentSearchResourceType(resourceType); 318 theJobDetails.setCurrentSearchCount(params.getCount()); 319 theJobDetails.setCurrentSearchLastUploadedIndex(-1); 320 } 321 322 /* 323 * Processing step for synchronous processing mode - This is only called if the 324 * server is configured to force offset searches, ie using ForceSynchronousSearchInterceptor. 325 * Otherwise, we'll always do async mode. 326 */ 327 if (isNotBlank(theJobDetails.getCurrentSearchUrl()) && totalSubmitted.get() < myMaxSubmitPerPass) { 328 processSynchronous(theJobDetails, totalSubmitted, futures, search); 329 } 330 331 // processing step for asynchronous processing mode 332 if (isNotBlank(theJobDetails.getCurrentSearchUuid()) && totalSubmitted.get() < myMaxSubmitPerPass) { 333 processAsynchronous(theJobDetails, totalSubmitted, futures); 334 } 335 336 ourLog.info( 337 "Subscription trigger job[{}] triggered {} resources in {}ms ({} res / second)", 338 theJobDetails.getJobId(), 339 totalSubmitted, 340 sw.getMillis(), 341 sw.getThroughput(totalSubmitted.get(), TimeUnit.SECONDS)); 342 } 343 344 private void processAsynchronous( 345 SubscriptionTriggeringJobDetails theJobDetails, AtomicInteger totalSubmitted, List<Future<?>> futures) { 346 int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1; 347 348 IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(theJobDetails.getCurrentSearchResourceType()); 349 350 int maxQuerySize = myMaxSubmitPerPass - totalSubmitted.get(); 351 int toIndex; 352 if (theJobDetails.getCurrentSearchCount() != null) { 353 toIndex = Math.min(fromIndex + maxQuerySize, theJobDetails.getCurrentSearchCount()); 354 } else { 355 toIndex = fromIndex + maxQuerySize; 356 } 357 358 ourLog.info( 359 "Triggering job[{}] search {} requesting resources {} - {} from partition {}", 360 theJobDetails.getJobId(), 361 theJobDetails.getCurrentSearchUuid(), 362 fromIndex, 363 toIndex, 364 theJobDetails.getRequestPartitionId()); 365 366 List<? extends IResourcePersistentId<?>> allResourceIds; 367 RequestPartitionId requestPartitionId = theJobDetails.getRequestPartitionId(); 368 try { 369 allResourceIds = mySearchCoordinatorSvc.getResources( 370 theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex, null, requestPartitionId); 371 } catch (ResourceGoneException e) { 372 ourLog.debug("Search has expired, submission is done with error: {}", e.getMessage()); 373 allResourceIds = new ArrayList<>(); 374 } 375 376 ourLog.info("Triggering job[{}] delivering {} resources", theJobDetails.getJobId(), allResourceIds.size()); 377 AtomicInteger highestIndexSubmitted = new AtomicInteger(theJobDetails.getCurrentSearchLastUploadedIndex()); 378 379 List<? extends List<? extends IResourcePersistentId<?>>> partitions = Lists.partition(allResourceIds, 100); 380 for (List<? extends IResourcePersistentId<?>> resourceIds : partitions) { 381 Runnable job = () -> { 382 String resourceType = myFhirContext.getResourceType(theJobDetails.getCurrentSearchResourceType()); 383 RuntimeResourceDefinition resourceDef = 384 myFhirContext.getResourceDefinition(theJobDetails.getCurrentSearchResourceType()); 385 ISearchBuilder searchBuilder = 386 mySearchBuilderFactory.newSearchBuilder(resourceType, resourceDef.getImplementingClass()); 387 List<IBaseResource> listToPopulate = new ArrayList<>(); 388 389 myTransactionService.withRequest(null).execute(() -> { 390 searchBuilder.loadResourcesByPid( 391 resourceIds, Collections.emptyList(), listToPopulate, false, new SystemRequestDetails()); 392 }); 393 394 for (IBaseResource nextResource : listToPopulate) { 395 submitResource( 396 theJobDetails.getSubscriptionId(), theJobDetails.getRequestPartitionId(), nextResource); 397 totalSubmitted.incrementAndGet(); 398 highestIndexSubmitted.incrementAndGet(); 399 } 400 }; 401 402 Future<?> future = myExecutorService.submit(job); 403 futures.add(future); 404 } 405 406 if (!validateFuturesAndReturnTrueIfWeShouldAbort(futures)) { 407 408 theJobDetails.setCurrentSearchLastUploadedIndex(highestIndexSubmitted.get()); 409 410 if (allResourceIds.isEmpty() 411 || (theJobDetails.getCurrentSearchCount() != null 412 && toIndex >= theJobDetails.getCurrentSearchCount())) { 413 ourLog.info( 414 "Triggering job[{}] search {} has completed ", 415 theJobDetails.getJobId(), 416 theJobDetails.getCurrentSearchUuid()); 417 theJobDetails.setCurrentSearchResourceType(null); 418 theJobDetails.setCurrentSearchUuid(null); 419 theJobDetails.setCurrentSearchLastUploadedIndex(-1); 420 theJobDetails.setCurrentSearchCount(null); 421 } 422 } 423 } 424 425 private void processSynchronous( 426 SubscriptionTriggeringJobDetails theJobDetails, 427 AtomicInteger totalSubmitted, 428 List<Future<?>> futures, 429 IBundleProvider search) { 430 List<IBaseResource> allCurrentResources; 431 432 int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1; 433 434 String searchUrl = theJobDetails.getCurrentSearchUrl(); 435 436 ourLog.info( 437 "Triggered job [{}] - Starting synchronous processing at offset {} and index {} on partition {}", 438 theJobDetails.getJobId(), 439 theJobDetails.getCurrentOffset(), 440 fromIndex, 441 theJobDetails.getRequestPartitionId()); 442 443 int submittableCount = myMaxSubmitPerPass - totalSubmitted.get(); 444 int toIndex = fromIndex + submittableCount; 445 446 if (nonNull(search) && !search.isEmpty()) { 447 448 if (search.getCurrentPageSize() != null) { 449 toIndex = search.getCurrentPageSize(); 450 } 451 452 // we already have data from the initial step so process as much as we can. 453 ourLog.info( 454 "Triggered job[{}] will process up to {} resources from partition {}", 455 theJobDetails.getJobId(), 456 toIndex, 457 theJobDetails.getRequestPartitionId()); 458 allCurrentResources = search.getResources(0, toIndex); 459 460 } else { 461 if (theJobDetails.getCurrentSearchCount() != null) { 462 toIndex = Math.min(toIndex, theJobDetails.getCurrentSearchCount()); 463 } 464 465 RuntimeResourceDefinition resourceDef = UrlUtil.parseUrlResourceType(myFhirContext, searchUrl); 466 String queryPart = searchUrl.substring(searchUrl.indexOf('?')); 467 SearchParameterMap params = myMatchUrlService.translateMatchUrl(queryPart, resourceDef); 468 int offset = theJobDetails.getCurrentOffset() + fromIndex; 469 params.setOffset(offset); 470 params.setCount(toIndex); 471 472 ourLog.info( 473 "Triggered job[{}] requesting {} resources from offset {}", 474 theJobDetails.getJobId(), 475 toIndex, 476 offset); 477 478 search = mySearchService.executeQuery(resourceDef.getName(), params, theJobDetails.getRequestPartitionId()); 479 allCurrentResources = search.getResources(0, submittableCount); 480 } 481 482 ourLog.info("Triggered job[{}] delivering {} resources", theJobDetails.getJobId(), allCurrentResources.size()); 483 AtomicInteger highestIndexSubmitted = new AtomicInteger(theJobDetails.getCurrentSearchLastUploadedIndex()); 484 485 for (IBaseResource nextResource : allCurrentResources) { 486 Future<?> future = myExecutorService.submit(() -> submitResource( 487 theJobDetails.getSubscriptionId(), theJobDetails.getRequestPartitionId(), nextResource)); 488 futures.add(future); 489 totalSubmitted.incrementAndGet(); 490 highestIndexSubmitted.incrementAndGet(); 491 } 492 493 if (!validateFuturesAndReturnTrueIfWeShouldAbort(futures)) { 494 495 theJobDetails.setCurrentSearchLastUploadedIndex(highestIndexSubmitted.get()); 496 497 ourLog.info( 498 "Triggered job[{}] lastUploadedIndex is {}", 499 theJobDetails.getJobId(), 500 theJobDetails.getCurrentSearchLastUploadedIndex()); 501 502 if (allCurrentResources.isEmpty() 503 || nonNull(theJobDetails.getCurrentSearchCount()) 504 && toIndex > theJobDetails.getCurrentSearchCount()) { 505 ourLog.info( 506 "Triggered job[{}] for search URL {} has completed ", 507 theJobDetails.getJobId(), 508 theJobDetails.getCurrentSearchUrl()); 509 theJobDetails.setCurrentSearchResourceType(null); 510 theJobDetails.clearCurrentSearchUrl(); 511 theJobDetails.setCurrentSearchLastUploadedIndex(-1); 512 theJobDetails.setCurrentSearchCount(null); 513 } 514 } 515 } 516 517 private boolean isInitialStep(SubscriptionTriggeringJobDetails theJobDetails) { 518 return isBlank(theJobDetails.myCurrentSearchUuid) && isBlank(theJobDetails.myCurrentSearchUrl); 519 } 520 521 private boolean jobHasCompleted(SubscriptionTriggeringJobDetails theJobDetails) { 522 return isInitialStep(theJobDetails); 523 } 524 525 private boolean validateFuturesAndReturnTrueIfWeShouldAbort(List<Future<?>> theFutures) { 526 527 for (Future<?> nextFuture : theFutures) { 528 try { 529 nextFuture.get(); 530 } catch (Exception e) { 531 ourLog.error("Failure triggering resource", e); 532 return true; 533 } 534 } 535 536 // Clear the list since it will potentially get reused 537 theFutures.clear(); 538 return false; 539 } 540 541 private void submitResource( 542 String theSubscriptionId, RequestPartitionId theRequestPartitionId, String theResourceIdToTrigger) { 543 org.hl7.fhir.r4.model.IdType resourceId = new org.hl7.fhir.r4.model.IdType(theResourceIdToTrigger); 544 IFhirResourceDao dao = myDaoRegistry.getResourceDao(resourceId.getResourceType()); 545 IBaseResource resourceToTrigger = dao.read(resourceId, SystemRequestDetails.forAllPartitions()); 546 547 submitResource(theSubscriptionId, theRequestPartitionId, resourceToTrigger); 548 } 549 550 private void submitResource( 551 String theSubscriptionId, RequestPartitionId theRequestPartitionId, IBaseResource theResourceToTrigger) { 552 553 ourLog.info( 554 "Submitting resource {} to subscription {}", 555 theResourceToTrigger.getIdElement().toUnqualifiedVersionless().getValue(), 556 theSubscriptionId); 557 558 ResourceModifiedMessage msg = new ResourceModifiedMessage( 559 myFhirContext, 560 theResourceToTrigger, 561 ResourceModifiedMessage.OperationTypeEnum.MANUALLY_TRIGGERED, 562 theRequestPartitionId); 563 msg.setSubscriptionId(theSubscriptionId); 564 565 for (int i = 0; ; i++) { 566 try { 567 myResourceModifiedConsumer.submitResourceModified(msg); 568 break; 569 } catch (Exception e) { 570 if (i >= 3) { 571 throw new InternalErrorException(Msg.code(25) + e); 572 } 573 574 ourLog.warn("Exception while retriggering subscriptions (going to sleep and retry): {}", e.toString()); 575 try { 576 Thread.sleep(1000); 577 } catch (InterruptedException ex) { 578 Thread.currentThread().interrupt(); 579 } 580 } 581 } 582 } 583 584 public void cancelAll() { 585 synchronized (myActiveJobs) { 586 myActiveJobs.clear(); 587 } 588 } 589 590 /** 591 * Sets the maximum number of resources that will be submitted in a single pass 592 */ 593 public void setMaxSubmitPerPass(Integer theMaxSubmitPerPass) { 594 Integer maxSubmitPerPass = theMaxSubmitPerPass; 595 if (maxSubmitPerPass == null) { 596 maxSubmitPerPass = DEFAULT_MAX_SUBMIT; 597 } 598 Validate.isTrue(maxSubmitPerPass > 0, "theMaxSubmitPerPass must be > 0"); 599 myMaxSubmitPerPass = maxSubmitPerPass; 600 } 601 602 @PostConstruct 603 public void start() { 604 createExecutorService(); 605 } 606 607 private void createExecutorService() { 608 LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>(1000); 609 BasicThreadFactory threadFactory = new BasicThreadFactory.Builder() 610 .namingPattern("SubscriptionTriggering-%d") 611 .daemon(false) 612 .priority(Thread.NORM_PRIORITY) 613 .build(); 614 RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() { 615 @Override 616 public void rejectedExecution(Runnable theRunnable, ThreadPoolExecutor theExecutor) { 617 ourLog.info( 618 "Note: Subscription triggering queue is full ({} elements), waiting for a slot to become available!", 619 executorQueue.size()); 620 StopWatch sw = new StopWatch(); 621 try { 622 executorQueue.put(theRunnable); 623 } catch (InterruptedException theE) { 624 // Restore interrupted state... 625 Thread.currentThread().interrupt(); 626 throw new RejectedExecutionException( 627 Msg.code(26) + "Task " + theRunnable.toString() + " rejected from " + theE.toString()); 628 } 629 ourLog.info("Slot become available after {}ms", sw.getMillis()); 630 } 631 }; 632 myExecutorService = new ThreadPoolExecutor( 633 10, 10, 0L, TimeUnit.MILLISECONDS, executorQueue, threadFactory, rejectedExecutionHandler); 634 } 635 636 @Override 637 public void scheduleJobs(ISchedulerService theSchedulerService) { 638 ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); 639 jobDetail.setId(getClass().getName()); 640 jobDetail.setJobClass(Job.class); 641 // Currently jobs ae kept in a local ArrayList so this should be a local job, and 642 // it can fire frequently without adding load 643 theSchedulerService.scheduleLocalJob(5 * DateUtils.MILLIS_PER_SECOND, jobDetail); 644 } 645 646 public int getActiveJobCount() { 647 return myActiveJobs.size(); 648 } 649 650 public static class Job implements HapiJob { 651 @Autowired 652 private ISubscriptionTriggeringSvc myTarget; 653 654 @Override 655 public void execute(JobExecutionContext theContext) { 656 myTarget.runDeliveryPass(); 657 } 658 } 659 660 private static class SubscriptionTriggeringJobDetails { 661 662 private String myJobId; 663 private String mySubscriptionId; 664 private List<String> myRemainingResourceIds; 665 private List<String> myRemainingSearchUrls; 666 private String myCurrentSearchUuid; 667 private String myCurrentSearchUrl; 668 private Integer myCurrentSearchCount; 669 private String myCurrentSearchResourceType; 670 private int myCurrentSearchLastUploadedIndex; 671 private int myCurrentOffset; 672 673 private RequestPartitionId myRequestPartitionId; 674 675 Integer getCurrentSearchCount() { 676 return myCurrentSearchCount; 677 } 678 679 void setCurrentSearchCount(Integer theCurrentSearchCount) { 680 myCurrentSearchCount = theCurrentSearchCount; 681 } 682 683 String getCurrentSearchResourceType() { 684 return myCurrentSearchResourceType; 685 } 686 687 void setCurrentSearchResourceType(String theCurrentSearchResourceType) { 688 myCurrentSearchResourceType = theCurrentSearchResourceType; 689 } 690 691 String getJobId() { 692 return myJobId; 693 } 694 695 void setJobId(String theJobId) { 696 myJobId = theJobId; 697 } 698 699 String getSubscriptionId() { 700 return mySubscriptionId; 701 } 702 703 void setSubscriptionId(String theSubscriptionId) { 704 mySubscriptionId = theSubscriptionId; 705 } 706 707 List<String> getRemainingResourceIds() { 708 return myRemainingResourceIds; 709 } 710 711 void setRemainingResourceIds(List<String> theRemainingResourceIds) { 712 myRemainingResourceIds = theRemainingResourceIds; 713 } 714 715 List<String> getRemainingSearchUrls() { 716 return myRemainingSearchUrls; 717 } 718 719 void setRemainingSearchUrls(List<String> theRemainingSearchUrls) { 720 myRemainingSearchUrls = theRemainingSearchUrls; 721 } 722 723 String getCurrentSearchUuid() { 724 return myCurrentSearchUuid; 725 } 726 727 void setCurrentSearchUuid(String theCurrentSearchUuid) { 728 myCurrentSearchUuid = theCurrentSearchUuid; 729 } 730 731 public String getCurrentSearchUrl() { 732 return myCurrentSearchUrl; 733 } 734 735 public void setCurrentSearchUrl(String theCurrentSearchUrl) { 736 this.myCurrentSearchUrl = theCurrentSearchUrl; 737 } 738 739 int getCurrentSearchLastUploadedIndex() { 740 return myCurrentSearchLastUploadedIndex; 741 } 742 743 void setCurrentSearchLastUploadedIndex(int theCurrentSearchLastUploadedIndex) { 744 myCurrentSearchLastUploadedIndex = theCurrentSearchLastUploadedIndex; 745 } 746 747 public void clearCurrentSearchUrl() { 748 myCurrentSearchUrl = null; 749 } 750 751 public int getCurrentOffset() { 752 return myCurrentOffset; 753 } 754 755 public void setCurrentOffset(Integer theCurrentOffset) { 756 myCurrentOffset = ObjectUtils.defaultIfNull(theCurrentOffset, 0); 757 } 758 759 public void setRequestPartitionId(RequestPartitionId theRequestPartitionId) { 760 myRequestPartitionId = theRequestPartitionId; 761 } 762 763 public RequestPartitionId getRequestPartitionId() { 764 return myRequestPartitionId; 765 } 766 } 767}