
001/*- 002 * #%L 003 * HAPI FHIR Subscription Server 004 * %% 005 * Copyright (C) 2014 - 2023 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.subscription.triggering; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.context.RuntimeResourceDefinition; 024import ca.uhn.fhir.i18n.Msg; 025import ca.uhn.fhir.interceptor.model.RequestPartitionId; 026import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; 027import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 028import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 029import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc; 030import ca.uhn.fhir.jpa.api.svc.ISearchSvc; 031import ca.uhn.fhir.jpa.dao.ISearchBuilder; 032import ca.uhn.fhir.jpa.dao.SearchBuilderFactory; 033import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; 034import ca.uhn.fhir.jpa.model.sched.HapiJob; 035import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs; 036import ca.uhn.fhir.jpa.model.sched.ISchedulerService; 037import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; 038import ca.uhn.fhir.jpa.searchparam.MatchUrlService; 039import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 040import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer; 041import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; 042import ca.uhn.fhir.model.dstu2.valueset.ResourceTypeEnum; 043import ca.uhn.fhir.rest.api.CacheControlDirective; 044import ca.uhn.fhir.rest.api.server.IBundleProvider; 045import ca.uhn.fhir.rest.api.server.SystemRequestDetails; 046import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; 047import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 048import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; 049import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException; 050import ca.uhn.fhir.util.ParametersUtil; 051import ca.uhn.fhir.util.StopWatch; 052import ca.uhn.fhir.util.UrlUtil; 053import ca.uhn.fhir.util.ValidateUtil; 054import org.apache.commons.lang3.ObjectUtils; 055import org.apache.commons.lang3.Validate; 056import org.apache.commons.lang3.concurrent.BasicThreadFactory; 057import org.apache.commons.lang3.time.DateUtils; 058import org.apache.commons.lang3.tuple.Pair; 059import org.hl7.fhir.dstu2.model.IdType; 060import org.hl7.fhir.instance.model.api.IBaseParameters; 061import org.hl7.fhir.instance.model.api.IBaseResource; 062import org.hl7.fhir.instance.model.api.IIdType; 063import org.hl7.fhir.instance.model.api.IPrimitiveType; 064import org.quartz.JobExecutionContext; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067import org.springframework.beans.factory.annotation.Autowired; 068 069import java.util.ArrayList; 070import java.util.Collections; 071import java.util.List; 072import java.util.UUID; 073import java.util.concurrent.ExecutorService; 074import java.util.concurrent.Future; 075import java.util.concurrent.LinkedBlockingQueue; 076import java.util.concurrent.RejectedExecutionException; 077import java.util.concurrent.RejectedExecutionHandler; 078import java.util.concurrent.ThreadPoolExecutor; 079import java.util.concurrent.TimeUnit; 080import java.util.stream.Collectors; 081import javax.annotation.Nullable; 082import javax.annotation.PostConstruct; 083 084import static ca.uhn.fhir.rest.server.provider.ProviderConstants.SUBSCRIPTION_TRIGGERING_PARAM_RESOURCE_ID; 085import static java.util.Objects.isNull; 086import static java.util.Objects.nonNull; 087import static org.apache.commons.collections4.CollectionUtils.isNotEmpty; 088import static org.apache.commons.lang3.ObjectUtils.defaultIfNull; 089import static org.apache.commons.lang3.StringUtils.isBlank; 090import static org.apache.commons.lang3.StringUtils.isNotBlank; 091 092public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc, IHasScheduledJobs { 093 private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTriggeringSvcImpl.class); 094 private static final int DEFAULT_MAX_SUBMIT = 10000; 095 private final List<SubscriptionTriggeringJobDetails> myActiveJobs = new ArrayList<>(); 096 097 @Autowired 098 private FhirContext myFhirContext; 099 100 @Autowired 101 private DaoRegistry myDaoRegistry; 102 103 @Autowired 104 private JpaStorageSettings myStorageSettings; 105 106 @Autowired 107 private ISearchCoordinatorSvc mySearchCoordinatorSvc; 108 109 @Autowired 110 private MatchUrlService myMatchUrlService; 111 112 @Autowired 113 private IResourceModifiedConsumer myResourceModifiedConsumer; 114 115 @Autowired 116 private HapiTransactionService myTransactionService; 117 118 private int myMaxSubmitPerPass = DEFAULT_MAX_SUBMIT; 119 private ExecutorService myExecutorService; 120 121 @Autowired 122 private ISearchSvc mySearchService; 123 124 @Autowired 125 private SearchBuilderFactory mySearchBuilderFactory; 126 127 @Override 128 public IBaseParameters triggerSubscription( 129 @Nullable List<IPrimitiveType<String>> theResourceIds, 130 @Nullable List<IPrimitiveType<String>> theSearchUrls, 131 @Nullable IIdType theSubscriptionId) { 132 133 if (myStorageSettings.getSupportedSubscriptionTypes().isEmpty()) { 134 throw new PreconditionFailedException(Msg.code(22) + "Subscription processing not active on this server"); 135 } 136 137 // Throw a 404 if the subscription doesn't exist 138 if (theSubscriptionId != null) { 139 IFhirResourceDao<?> subscriptionDao = myDaoRegistry.getSubscriptionDao(); 140 IIdType subscriptionId = theSubscriptionId; 141 if (!subscriptionId.hasResourceType()) { 142 subscriptionId = subscriptionId.withResourceType(ResourceTypeEnum.SUBSCRIPTION.getCode()); 143 } 144 subscriptionDao.read(subscriptionId, SystemRequestDetails.forAllPartitions()); 145 } 146 147 List<IPrimitiveType<String>> resourceIds = defaultIfNull(theResourceIds, Collections.emptyList()); 148 List<IPrimitiveType<String>> searchUrls = defaultIfNull(theSearchUrls, Collections.emptyList()); 149 150 // Make sure we have at least one resource ID or search URL 151 if (resourceIds.size() == 0 && searchUrls.size() == 0) { 152 throw new InvalidRequestException(Msg.code(23) + "No resource IDs or search URLs specified for triggering"); 153 } 154 155 // Resource URLs must be compete 156 for (IPrimitiveType<String> next : resourceIds) { 157 IdType resourceId = new IdType(next.getValue()); 158 ValidateUtil.isTrueOrThrowInvalidRequest( 159 resourceId.hasResourceType(), 160 SUBSCRIPTION_TRIGGERING_PARAM_RESOURCE_ID + " parameter must have resource type"); 161 ValidateUtil.isTrueOrThrowInvalidRequest( 162 resourceId.hasIdPart(), 163 SUBSCRIPTION_TRIGGERING_PARAM_RESOURCE_ID + " parameter must have resource ID part"); 164 } 165 166 // Search URLs must be valid 167 for (IPrimitiveType<String> next : searchUrls) { 168 if (!next.getValue().contains("?")) { 169 throw new InvalidRequestException(Msg.code(24) 170 + "Search URL is not valid (must be in the form \"[resource type]?[optional params]\")"); 171 } 172 } 173 174 SubscriptionTriggeringJobDetails jobDetails = new SubscriptionTriggeringJobDetails(); 175 jobDetails.setJobId(UUID.randomUUID().toString()); 176 jobDetails.setRemainingResourceIds( 177 resourceIds.stream().map(IPrimitiveType::getValue).collect(Collectors.toList())); 178 jobDetails.setRemainingSearchUrls( 179 searchUrls.stream().map(IPrimitiveType::getValue).collect(Collectors.toList())); 180 if (theSubscriptionId != null) { 181 jobDetails.setSubscriptionId(theSubscriptionId.getIdPart()); 182 } 183 184 // Submit job for processing 185 synchronized (myActiveJobs) { 186 myActiveJobs.add(jobDetails); 187 ourLog.info( 188 "Subscription triggering requested for {} resource and {} search - Gave job ID: {} and have {} jobs", 189 resourceIds.size(), 190 searchUrls.size(), 191 jobDetails.getJobId(), 192 myActiveJobs.size()); 193 } 194 195 // Create a parameters response 196 IBaseParameters retVal = ParametersUtil.newInstance(myFhirContext); 197 IPrimitiveType<?> value = 198 (IPrimitiveType<?>) myFhirContext.getElementDefinition("string").newInstance(); 199 value.setValueAsString("Subscription triggering job submitted as JOB ID: " + jobDetails.myJobId); 200 ParametersUtil.addParameterToParameters(myFhirContext, retVal, "information", value); 201 return retVal; 202 } 203 204 @Override 205 public void runDeliveryPass() { 206 207 synchronized (myActiveJobs) { 208 if (myActiveJobs.isEmpty()) { 209 return; 210 } 211 212 String activeJobIds = myActiveJobs.stream() 213 .map(SubscriptionTriggeringJobDetails::getJobId) 214 .collect(Collectors.joining(", ")); 215 ourLog.info("Starting pass: currently have {} active job IDs: {}", myActiveJobs.size(), activeJobIds); 216 217 SubscriptionTriggeringJobDetails activeJob = myActiveJobs.get(0); 218 219 runJob(activeJob); 220 221 // If the job is complete, remove it from the queue 222 if (activeJob.getRemainingResourceIds().isEmpty()) { 223 if (activeJob.getRemainingSearchUrls().isEmpty()) { 224 if (jobHasCompleted(activeJob)) { 225 myActiveJobs.remove(0); 226 String remainingJobsMsg = ""; 227 if (myActiveJobs.size() > 0) { 228 remainingJobsMsg = "(" + myActiveJobs.size() + " jobs remaining)"; 229 } 230 ourLog.info( 231 "Subscription triggering job {} is complete{}", activeJob.getJobId(), remainingJobsMsg); 232 } 233 } 234 } 235 } 236 } 237 238 private void runJob(SubscriptionTriggeringJobDetails theJobDetails) { 239 StopWatch sw = new StopWatch(); 240 ourLog.info("Starting pass of subscription triggering job {}", theJobDetails.getJobId()); 241 242 // Submit individual resources 243 int totalSubmitted = 0; 244 List<Pair<String, Future<Void>>> futures = new ArrayList<>(); 245 while (theJobDetails.getRemainingResourceIds().size() > 0 && totalSubmitted < myMaxSubmitPerPass) { 246 totalSubmitted++; 247 String nextResourceId = theJobDetails.getRemainingResourceIds().remove(0); 248 Future<Void> future = submitResource(theJobDetails.getSubscriptionId(), nextResourceId); 249 futures.add(Pair.of(nextResourceId, future)); 250 } 251 252 // Make sure these all succeeded in submitting 253 if (validateFuturesAndReturnTrueIfWeShouldAbort(futures)) { 254 return; 255 } 256 257 IBundleProvider search = null; 258 259 // This is the job initial step where we set ourselves up to do the actual re-submitting of resources 260 // to the broker. Note that querying of resource can be done synchronously or asynchronously 261 if (isInitialStep(theJobDetails) 262 && isNotEmpty(theJobDetails.getRemainingSearchUrls()) 263 && totalSubmitted < myMaxSubmitPerPass) { 264 265 String nextSearchUrl = theJobDetails.getRemainingSearchUrls().remove(0); 266 RuntimeResourceDefinition resourceDef = UrlUtil.parseUrlResourceType(myFhirContext, nextSearchUrl); 267 String queryPart = nextSearchUrl.substring(nextSearchUrl.indexOf('?')); 268 SearchParameterMap params = myMatchUrlService.translateMatchUrl(queryPart, resourceDef); 269 270 String resourceType = resourceDef.getName(); 271 IFhirResourceDao<?> callingDao = myDaoRegistry.getResourceDao(resourceType); 272 273 ourLog.info("Triggering job[{}] is starting a search for {}", theJobDetails.getJobId(), nextSearchUrl); 274 275 search = mySearchCoordinatorSvc.registerSearch( 276 callingDao, 277 params, 278 resourceType, 279 new CacheControlDirective(), 280 null, 281 RequestPartitionId.allPartitions()); 282 283 if (isNull(search.getUuid())) { 284 // we don't have a search uuid i.e. we're setting up for synchronous processing 285 theJobDetails.setCurrentSearchUrl(nextSearchUrl); 286 theJobDetails.setCurrentOffset(params.getOffset()); 287 288 } else { 289 // populate properties for asynchronous path 290 theJobDetails.setCurrentSearchUuid(search.getUuid()); 291 } 292 293 theJobDetails.setCurrentSearchResourceType(resourceType); 294 theJobDetails.setCurrentSearchCount(params.getCount()); 295 theJobDetails.setCurrentSearchLastUploadedIndex(-1); 296 } 297 298 // processing step for synchronous processing mode 299 if (isNotBlank(theJobDetails.getCurrentSearchUrl()) && totalSubmitted < myMaxSubmitPerPass) { 300 List<IBaseResource> allCurrentResources; 301 302 int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1; 303 304 String searchUrl = theJobDetails.getCurrentSearchUrl(); 305 306 ourLog.info( 307 "Triggered job [{}] - Starting synchronous processing at offset {} and index {}", 308 theJobDetails.getJobId(), 309 theJobDetails.getCurrentOffset(), 310 fromIndex); 311 312 int submittableCount = myMaxSubmitPerPass - totalSubmitted; 313 int toIndex = fromIndex + submittableCount; 314 315 if (nonNull(search) && !search.isEmpty()) { 316 317 // we already have data from the initial step so process as much as we can. 318 ourLog.info("Triggered job[{}] will process up to {} resources", theJobDetails.getJobId(), toIndex); 319 allCurrentResources = search.getResources(0, toIndex); 320 321 } else { 322 if (theJobDetails.getCurrentSearchCount() != null) { 323 toIndex = Math.min(toIndex, theJobDetails.getCurrentSearchCount()); 324 } 325 326 RuntimeResourceDefinition resourceDef = UrlUtil.parseUrlResourceType(myFhirContext, searchUrl); 327 String queryPart = searchUrl.substring(searchUrl.indexOf('?')); 328 SearchParameterMap params = myMatchUrlService.translateMatchUrl(queryPart, resourceDef); 329 int offset = theJobDetails.getCurrentOffset() + fromIndex; 330 params.setOffset(offset); 331 params.setCount(toIndex); 332 333 ourLog.info( 334 "Triggered job[{}] requesting {} resources from offset {}", 335 theJobDetails.getJobId(), 336 toIndex, 337 offset); 338 339 search = 340 mySearchService.executeQuery(resourceDef.getName(), params, RequestPartitionId.allPartitions()); 341 allCurrentResources = search.getAllResources(); 342 } 343 344 ourLog.info( 345 "Triggered job[{}] delivering {} resources", theJobDetails.getJobId(), allCurrentResources.size()); 346 int highestIndexSubmitted = theJobDetails.getCurrentSearchLastUploadedIndex(); 347 348 for (IBaseResource nextResource : allCurrentResources) { 349 Future<Void> future = submitResource(theJobDetails.getSubscriptionId(), nextResource); 350 futures.add(Pair.of(nextResource.getIdElement().getIdPart(), future)); 351 totalSubmitted++; 352 highestIndexSubmitted++; 353 } 354 355 if (validateFuturesAndReturnTrueIfWeShouldAbort(futures)) { 356 return; 357 } 358 359 theJobDetails.setCurrentSearchLastUploadedIndex(highestIndexSubmitted); 360 361 ourLog.info( 362 "Triggered job[{}] lastUploadedIndex is {}", 363 theJobDetails.getJobId(), 364 theJobDetails.getCurrentSearchLastUploadedIndex()); 365 366 if (allCurrentResources.isEmpty() 367 || nonNull(theJobDetails.getCurrentSearchCount()) 368 && toIndex >= theJobDetails.getCurrentSearchCount()) { 369 ourLog.info( 370 "Triggered job[{}] for search URL {} has completed ", 371 theJobDetails.getJobId(), 372 theJobDetails.getCurrentSearchUrl()); 373 theJobDetails.setCurrentSearchResourceType(null); 374 theJobDetails.clearCurrentSearchUrl(); 375 theJobDetails.setCurrentSearchLastUploadedIndex(-1); 376 theJobDetails.setCurrentSearchCount(null); 377 } 378 } 379 380 // processing step for asynchronous processing mode 381 if (isNotBlank(theJobDetails.getCurrentSearchUuid()) && totalSubmitted < myMaxSubmitPerPass) { 382 int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1; 383 384 IFhirResourceDao<?> resourceDao = 385 myDaoRegistry.getResourceDao(theJobDetails.getCurrentSearchResourceType()); 386 387 int maxQuerySize = myMaxSubmitPerPass - totalSubmitted; 388 int toIndex; 389 if (theJobDetails.getCurrentSearchCount() != null) { 390 toIndex = Math.min(fromIndex + maxQuerySize, theJobDetails.getCurrentSearchCount()); 391 } else { 392 toIndex = fromIndex + maxQuerySize; 393 } 394 395 ourLog.info( 396 "Triggering job[{}] search {} requesting resources {} - {}", 397 theJobDetails.getJobId(), 398 theJobDetails.getCurrentSearchUuid(), 399 fromIndex, 400 toIndex); 401 402 List<IResourcePersistentId<?>> resourceIds; 403 RequestPartitionId requestPartitionId = RequestPartitionId.allPartitions(); 404 resourceIds = mySearchCoordinatorSvc.getResources( 405 theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex, null, requestPartitionId); 406 407 ourLog.info("Triggering job[{}] delivering {} resources", theJobDetails.getJobId(), resourceIds.size()); 408 int highestIndexSubmitted = theJobDetails.getCurrentSearchLastUploadedIndex(); 409 410 String resourceType = myFhirContext.getResourceType(theJobDetails.getCurrentSearchResourceType()); 411 RuntimeResourceDefinition resourceDef = 412 myFhirContext.getResourceDefinition(theJobDetails.getCurrentSearchResourceType()); 413 ISearchBuilder searchBuilder = mySearchBuilderFactory.newSearchBuilder( 414 resourceDao, resourceType, resourceDef.getImplementingClass()); 415 List<IBaseResource> listToPopulate = new ArrayList<>(); 416 417 myTransactionService.withSystemRequest().execute(() -> { 418 searchBuilder.loadResourcesByPid( 419 resourceIds, Collections.emptyList(), listToPopulate, false, new SystemRequestDetails()); 420 }); 421 422 for (IBaseResource nextResource : listToPopulate) { 423 Future<Void> future = submitResource(theJobDetails.getSubscriptionId(), nextResource); 424 futures.add(Pair.of(nextResource.getIdElement().getIdPart(), future)); 425 totalSubmitted++; 426 highestIndexSubmitted++; 427 } 428 429 if (validateFuturesAndReturnTrueIfWeShouldAbort(futures)) { 430 return; 431 } 432 433 theJobDetails.setCurrentSearchLastUploadedIndex(highestIndexSubmitted); 434 435 if (resourceIds.size() == 0 436 || (theJobDetails.getCurrentSearchCount() != null 437 && toIndex >= theJobDetails.getCurrentSearchCount())) { 438 ourLog.info( 439 "Triggering job[{}] search {} has completed ", 440 theJobDetails.getJobId(), 441 theJobDetails.getCurrentSearchUuid()); 442 theJobDetails.setCurrentSearchResourceType(null); 443 theJobDetails.setCurrentSearchUuid(null); 444 theJobDetails.setCurrentSearchLastUploadedIndex(-1); 445 theJobDetails.setCurrentSearchCount(null); 446 } 447 } 448 449 ourLog.info( 450 "Subscription trigger job[{}] triggered {} resources in {}ms ({} res / second)", 451 theJobDetails.getJobId(), 452 totalSubmitted, 453 sw.getMillis(), 454 sw.getThroughput(totalSubmitted, TimeUnit.SECONDS)); 455 } 456 457 private boolean isInitialStep(SubscriptionTriggeringJobDetails theJobDetails) { 458 return isBlank(theJobDetails.myCurrentSearchUuid) && isBlank(theJobDetails.myCurrentSearchUrl); 459 } 460 461 private boolean jobHasCompleted(SubscriptionTriggeringJobDetails theJobDetails) { 462 return isInitialStep(theJobDetails); 463 } 464 465 private boolean validateFuturesAndReturnTrueIfWeShouldAbort(List<Pair<String, Future<Void>>> theIdToFutures) { 466 467 for (Pair<String, Future<Void>> next : theIdToFutures) { 468 String nextDeliveredId = next.getKey(); 469 try { 470 Future<Void> nextFuture = next.getValue(); 471 nextFuture.get(); 472 ourLog.info("Finished redelivering {}", nextDeliveredId); 473 } catch (Exception e) { 474 ourLog.error("Failure triggering resource " + nextDeliveredId, e); 475 return true; 476 } 477 } 478 479 // Clear the list since it will potentially get reused 480 theIdToFutures.clear(); 481 return false; 482 } 483 484 private Future<Void> submitResource(String theSubscriptionId, String theResourceIdToTrigger) { 485 org.hl7.fhir.r4.model.IdType resourceId = new org.hl7.fhir.r4.model.IdType(theResourceIdToTrigger); 486 IFhirResourceDao dao = myDaoRegistry.getResourceDao(resourceId.getResourceType()); 487 IBaseResource resourceToTrigger = dao.read(resourceId, SystemRequestDetails.forAllPartitions()); 488 489 return submitResource(theSubscriptionId, resourceToTrigger); 490 } 491 492 private Future<Void> submitResource(String theSubscriptionId, IBaseResource theResourceToTrigger) { 493 494 ourLog.info( 495 "Submitting resource {} to subscription {}", 496 theResourceToTrigger.getIdElement().toUnqualifiedVersionless().getValue(), 497 theSubscriptionId); 498 499 ResourceModifiedMessage msg = new ResourceModifiedMessage( 500 myFhirContext, theResourceToTrigger, ResourceModifiedMessage.OperationTypeEnum.UPDATE); 501 msg.setSubscriptionId(theSubscriptionId); 502 503 return myExecutorService.submit(() -> { 504 for (int i = 0; ; i++) { 505 try { 506 myResourceModifiedConsumer.submitResourceModified(msg); 507 break; 508 } catch (Exception e) { 509 if (i >= 3) { 510 throw new InternalErrorException(Msg.code(25) + e); 511 } 512 513 ourLog.warn( 514 "Exception while retriggering subscriptions (going to sleep and retry): {}", e.toString()); 515 Thread.sleep(1000); 516 } 517 } 518 519 return null; 520 }); 521 } 522 523 public void cancelAll() { 524 synchronized (myActiveJobs) { 525 myActiveJobs.clear(); 526 } 527 } 528 529 /** 530 * Sets the maximum number of resources that will be submitted in a single pass 531 */ 532 public void setMaxSubmitPerPass(Integer theMaxSubmitPerPass) { 533 Integer maxSubmitPerPass = theMaxSubmitPerPass; 534 if (maxSubmitPerPass == null) { 535 maxSubmitPerPass = DEFAULT_MAX_SUBMIT; 536 } 537 Validate.isTrue(maxSubmitPerPass > 0, "theMaxSubmitPerPass must be > 0"); 538 myMaxSubmitPerPass = maxSubmitPerPass; 539 } 540 541 @PostConstruct 542 public void start() { 543 createExecutorService(); 544 } 545 546 private void createExecutorService() { 547 LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>(1000); 548 BasicThreadFactory threadFactory = new BasicThreadFactory.Builder() 549 .namingPattern("SubscriptionTriggering-%d") 550 .daemon(false) 551 .priority(Thread.NORM_PRIORITY) 552 .build(); 553 RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() { 554 @Override 555 public void rejectedExecution(Runnable theRunnable, ThreadPoolExecutor theExecutor) { 556 ourLog.info( 557 "Note: Subscription triggering queue is full ({} elements), waiting for a slot to become available!", 558 executorQueue.size()); 559 StopWatch sw = new StopWatch(); 560 try { 561 executorQueue.put(theRunnable); 562 } catch (InterruptedException theE) { 563 // Restore interrupted state... 564 Thread.currentThread().interrupt(); 565 throw new RejectedExecutionException( 566 Msg.code(26) + "Task " + theRunnable.toString() + " rejected from " + theE.toString()); 567 } 568 ourLog.info("Slot become available after {}ms", sw.getMillis()); 569 } 570 }; 571 myExecutorService = new ThreadPoolExecutor( 572 10, 10, 0L, TimeUnit.MILLISECONDS, executorQueue, threadFactory, rejectedExecutionHandler); 573 } 574 575 @Override 576 public void scheduleJobs(ISchedulerService theSchedulerService) { 577 ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); 578 jobDetail.setId(getClass().getName()); 579 jobDetail.setJobClass(Job.class); 580 // Currently jobs ae kept in a local ArrayList so this should be a local job, and 581 // it can fire frequently without adding load 582 theSchedulerService.scheduleLocalJob(5 * DateUtils.MILLIS_PER_SECOND, jobDetail); 583 } 584 585 public int getActiveJobCount() { 586 return myActiveJobs.size(); 587 } 588 589 public static class Job implements HapiJob { 590 @Autowired 591 private ISubscriptionTriggeringSvc myTarget; 592 593 @Override 594 public void execute(JobExecutionContext theContext) { 595 myTarget.runDeliveryPass(); 596 } 597 } 598 599 private static class SubscriptionTriggeringJobDetails { 600 601 private String myJobId; 602 private String mySubscriptionId; 603 private List<String> myRemainingResourceIds; 604 private List<String> myRemainingSearchUrls; 605 private String myCurrentSearchUuid; 606 private String myCurrentSearchUrl; 607 private Integer myCurrentSearchCount; 608 private String myCurrentSearchResourceType; 609 private int myCurrentSearchLastUploadedIndex; 610 private int myCurrentOffset; 611 612 Integer getCurrentSearchCount() { 613 return myCurrentSearchCount; 614 } 615 616 void setCurrentSearchCount(Integer theCurrentSearchCount) { 617 myCurrentSearchCount = theCurrentSearchCount; 618 } 619 620 String getCurrentSearchResourceType() { 621 return myCurrentSearchResourceType; 622 } 623 624 void setCurrentSearchResourceType(String theCurrentSearchResourceType) { 625 myCurrentSearchResourceType = theCurrentSearchResourceType; 626 } 627 628 String getJobId() { 629 return myJobId; 630 } 631 632 void setJobId(String theJobId) { 633 myJobId = theJobId; 634 } 635 636 String getSubscriptionId() { 637 return mySubscriptionId; 638 } 639 640 void setSubscriptionId(String theSubscriptionId) { 641 mySubscriptionId = theSubscriptionId; 642 } 643 644 List<String> getRemainingResourceIds() { 645 return myRemainingResourceIds; 646 } 647 648 void setRemainingResourceIds(List<String> theRemainingResourceIds) { 649 myRemainingResourceIds = theRemainingResourceIds; 650 } 651 652 List<String> getRemainingSearchUrls() { 653 return myRemainingSearchUrls; 654 } 655 656 void setRemainingSearchUrls(List<String> theRemainingSearchUrls) { 657 myRemainingSearchUrls = theRemainingSearchUrls; 658 } 659 660 String getCurrentSearchUuid() { 661 return myCurrentSearchUuid; 662 } 663 664 void setCurrentSearchUuid(String theCurrentSearchUuid) { 665 myCurrentSearchUuid = theCurrentSearchUuid; 666 } 667 668 public String getCurrentSearchUrl() { 669 return myCurrentSearchUrl; 670 } 671 672 public void setCurrentSearchUrl(String theCurrentSearchUrl) { 673 this.myCurrentSearchUrl = theCurrentSearchUrl; 674 } 675 676 int getCurrentSearchLastUploadedIndex() { 677 return myCurrentSearchLastUploadedIndex; 678 } 679 680 void setCurrentSearchLastUploadedIndex(int theCurrentSearchLastUploadedIndex) { 681 myCurrentSearchLastUploadedIndex = theCurrentSearchLastUploadedIndex; 682 } 683 684 public void clearCurrentSearchUrl() { 685 myCurrentSearchUrl = null; 686 } 687 688 public int getCurrentOffset() { 689 return myCurrentOffset; 690 } 691 692 public void setCurrentOffset(Integer theCurrentOffset) { 693 myCurrentOffset = ObjectUtils.defaultIfNull(theCurrentOffset, 0); 694 } 695 } 696}