001/*- 002 * #%L 003 * HAPI FHIR Subscription Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.subscription.triggering; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.context.RuntimeResourceDefinition; 024import ca.uhn.fhir.i18n.Msg; 025import ca.uhn.fhir.interceptor.model.RequestPartitionId; 026import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 027import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 028import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc; 029import ca.uhn.fhir.jpa.api.svc.ISearchSvc; 030import ca.uhn.fhir.jpa.dao.ISearchBuilder; 031import ca.uhn.fhir.jpa.dao.SearchBuilderFactory; 032import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; 033import ca.uhn.fhir.jpa.model.config.SubscriptionSettings; 034import ca.uhn.fhir.jpa.model.sched.HapiJob; 035import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs; 036import ca.uhn.fhir.jpa.model.sched.ISchedulerService; 037import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; 038import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc; 039import ca.uhn.fhir.jpa.searchparam.MatchUrlService; 040import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 041import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer; 042import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer; 043import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; 044import ca.uhn.fhir.rest.api.CacheControlDirective; 045import ca.uhn.fhir.rest.api.server.IBundleProvider; 046import ca.uhn.fhir.rest.api.server.RequestDetails; 047import ca.uhn.fhir.rest.api.server.SystemRequestDetails; 048import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId; 049import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 050import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; 051import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException; 052import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException; 053import ca.uhn.fhir.util.ParametersUtil; 054import ca.uhn.fhir.util.StopWatch; 055import ca.uhn.fhir.util.UrlUtil; 056import ca.uhn.fhir.util.ValidateUtil; 057import com.google.common.collect.Lists; 058import jakarta.annotation.Nullable; 059import jakarta.annotation.PostConstruct; 060import org.apache.commons.lang3.ObjectUtils; 061import org.apache.commons.lang3.Validate; 062import org.apache.commons.lang3.concurrent.BasicThreadFactory; 063import org.apache.commons.lang3.time.DateUtils; 064import org.hl7.fhir.dstu2.model.IdType; 065import org.hl7.fhir.instance.model.api.IBaseParameters; 066import org.hl7.fhir.instance.model.api.IBaseResource; 067import org.hl7.fhir.instance.model.api.IIdType; 068import org.hl7.fhir.instance.model.api.IPrimitiveType; 069import org.quartz.JobExecutionContext; 070import org.slf4j.Logger; 071import org.slf4j.LoggerFactory; 072import org.springframework.beans.factory.annotation.Autowired; 073 074import java.util.ArrayList; 075import java.util.Collections; 076import java.util.List; 077import java.util.UUID; 078import java.util.concurrent.ExecutorService; 079import java.util.concurrent.Future; 080import java.util.concurrent.LinkedBlockingQueue; 081import java.util.concurrent.RejectedExecutionException; 082import java.util.concurrent.RejectedExecutionHandler; 083import java.util.concurrent.ThreadPoolExecutor; 084import java.util.concurrent.TimeUnit; 085import java.util.concurrent.atomic.AtomicInteger; 086import java.util.stream.Collectors; 087 088import static ca.uhn.fhir.rest.server.provider.ProviderConstants.SUBSCRIPTION_TRIGGERING_PARAM_RESOURCE_ID; 089import static java.util.Objects.isNull; 090import static java.util.Objects.nonNull; 091import static org.apache.commons.collections4.CollectionUtils.isNotEmpty; 092import static org.apache.commons.lang3.ObjectUtils.defaultIfNull; 093import static org.apache.commons.lang3.StringUtils.isBlank; 094import static org.apache.commons.lang3.StringUtils.isNotBlank; 095 096public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc, IHasScheduledJobs { 097 private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTriggeringSvcImpl.class); 098 private static final int DEFAULT_MAX_SUBMIT = 10000; 099 private final List<SubscriptionTriggeringJobDetails> myActiveJobs = new ArrayList<>(); 100 101 @Autowired 102 private FhirContext myFhirContext; 103 104 @Autowired 105 private DaoRegistry myDaoRegistry; 106 107 @Autowired 108 private SubscriptionSettings mySubscriptionSettings; 109 110 @Autowired 111 private ISearchCoordinatorSvc<? extends IResourcePersistentId<?>> mySearchCoordinatorSvc; 112 113 @Autowired 114 private MatchUrlService myMatchUrlService; 115 116 @Autowired 117 private IResourceModifiedConsumer myResourceModifiedConsumer; 118 119 @Autowired 120 private HapiTransactionService myTransactionService; 121 122 private int myMaxSubmitPerPass = DEFAULT_MAX_SUBMIT; 123 private ExecutorService myExecutorService; 124 125 @Autowired 126 private ISearchSvc mySearchService; 127 128 @Autowired 129 IRequestPartitionHelperSvc myRequestPartitionHelperSvc; 130 131 @Autowired 132 private SearchBuilderFactory mySearchBuilderFactory; 133 134 @Autowired 135 private SubscriptionCanonicalizer mySubscriptionCanonicalizer; 136 137 @Override 138 public IBaseParameters triggerSubscription( 139 @Nullable List<IPrimitiveType<String>> theResourceIds, 140 @Nullable List<IPrimitiveType<String>> theSearchUrls, 141 @Nullable IIdType theSubscriptionId, 142 RequestDetails theRequestDetails) { 143 144 if (mySubscriptionSettings.getSupportedSubscriptionTypes().isEmpty()) { 145 throw new PreconditionFailedException(Msg.code(22) + "Subscription processing not active on this server"); 146 } 147 148 RequestPartitionId requestPartitionId; 149 150 // Throw a 404 if the subscription doesn't exist, otherwise determine its partition. 151 if (theSubscriptionId != null) { 152 IFhirResourceDao<?> subscriptionDao = myDaoRegistry.getSubscriptionDao(); 153 IBaseResource subscription = subscriptionDao.read(theSubscriptionId, theRequestDetails); 154 if (mySubscriptionCanonicalizer.canonicalize(subscription).isCrossPartitionEnabled()) { 155 requestPartitionId = RequestPartitionId.allPartitions(); 156 } else { 157 // Otherwise, trust the partition passed in via tenant/interceptor. 158 requestPartitionId = myRequestPartitionHelperSvc.determineGenericPartitionForRequest(theRequestDetails); 159 } 160 } else { 161 // If we have no specific subscription, allow standard partition selection 162 requestPartitionId = myRequestPartitionHelperSvc.determineGenericPartitionForRequest(theRequestDetails); 163 } 164 165 List<IPrimitiveType<String>> resourceIds = defaultIfNull(theResourceIds, Collections.emptyList()); 166 List<IPrimitiveType<String>> searchUrls = defaultIfNull(theSearchUrls, Collections.emptyList()); 167 // Make sure we have at least one resource ID or search URL 168 if (resourceIds.isEmpty() && searchUrls.isEmpty()) { 169 throw new InvalidRequestException(Msg.code(23) + "No resource IDs or search URLs specified for triggering"); 170 } 171 172 // Resource URLs must be compete 173 for (IPrimitiveType<String> next : resourceIds) { 174 IdType resourceId = new IdType(next.getValue()); 175 ValidateUtil.isTrueOrThrowInvalidRequest( 176 resourceId.hasResourceType(), 177 SUBSCRIPTION_TRIGGERING_PARAM_RESOURCE_ID + " parameter must have resource type"); 178 ValidateUtil.isTrueOrThrowInvalidRequest( 179 resourceId.hasIdPart(), 180 SUBSCRIPTION_TRIGGERING_PARAM_RESOURCE_ID + " parameter must have resource ID part"); 181 } 182 183 // Search URLs must be valid 184 for (IPrimitiveType<String> next : searchUrls) { 185 if (!next.getValue().contains("?")) { 186 throw new InvalidRequestException(Msg.code(24) 187 + "Search URL is not valid (must be in the form \"[resource type]?[optional params]\")"); 188 } 189 } 190 191 SubscriptionTriggeringJobDetails jobDetails = new SubscriptionTriggeringJobDetails(); 192 jobDetails.setJobId(UUID.randomUUID().toString()); 193 jobDetails.setRequestPartitionId( 194 requestPartitionId == null ? RequestPartitionId.allPartitions() : requestPartitionId); 195 jobDetails.setRemainingResourceIds( 196 resourceIds.stream().map(IPrimitiveType::getValue).collect(Collectors.toList())); 197 jobDetails.setRemainingSearchUrls( 198 searchUrls.stream().map(IPrimitiveType::getValue).collect(Collectors.toList())); 199 if (theSubscriptionId != null) { 200 jobDetails.setSubscriptionId(theSubscriptionId.getIdPart()); 201 } 202 203 // Submit job for processing 204 synchronized (myActiveJobs) { 205 myActiveJobs.add(jobDetails); 206 ourLog.info( 207 "Subscription triggering requested for {} resource and {} search - Gave job ID: {} and have {} jobs", 208 resourceIds.size(), 209 searchUrls.size(), 210 jobDetails.getJobId(), 211 myActiveJobs.size()); 212 } 213 214 // Create a parameters response 215 IBaseParameters retVal = ParametersUtil.newInstance(myFhirContext); 216 IPrimitiveType<?> value = 217 (IPrimitiveType<?>) myFhirContext.getElementDefinition("string").newInstance(); 218 value.setValueAsString("Subscription triggering job submitted as JOB ID: " + jobDetails.myJobId); 219 ParametersUtil.addParameterToParameters(myFhirContext, retVal, "information", value); 220 return retVal; 221 } 222 223 @Override 224 public IBaseParameters triggerSubscription( 225 @Nullable List<IPrimitiveType<String>> theResourceIds, 226 @Nullable List<IPrimitiveType<String>> theSearchUrls, 227 @Nullable IIdType theSubscriptionId) { 228 return triggerSubscription( 229 theResourceIds, theSearchUrls, theSubscriptionId, SystemRequestDetails.newSystemRequestAllPartitions()); 230 } 231 232 @Override 233 public void runDeliveryPass() { 234 235 synchronized (myActiveJobs) { 236 if (myActiveJobs.isEmpty()) { 237 return; 238 } 239 240 String activeJobIds = myActiveJobs.stream() 241 .map(SubscriptionTriggeringJobDetails::getJobId) 242 .collect(Collectors.joining(", ")); 243 ourLog.info("Starting pass: currently have {} active job IDs: {}", myActiveJobs.size(), activeJobIds); 244 245 SubscriptionTriggeringJobDetails activeJob = myActiveJobs.get(0); 246 247 runJob(activeJob); 248 249 // If the job is complete, remove it from the queue 250 if (activeJob.getRemainingResourceIds().isEmpty()) { 251 if (activeJob.getRemainingSearchUrls().isEmpty()) { 252 if (jobHasCompleted(activeJob)) { 253 myActiveJobs.remove(0); 254 String remainingJobsMsg = ""; 255 if (myActiveJobs.size() > 0) { 256 remainingJobsMsg = "(" + myActiveJobs.size() + " jobs remaining)"; 257 } 258 ourLog.info( 259 "Subscription triggering job {} is complete{}", activeJob.getJobId(), remainingJobsMsg); 260 } 261 } 262 } 263 } 264 } 265 266 private void runJob(SubscriptionTriggeringJobDetails theJobDetails) { 267 StopWatch sw = new StopWatch(); 268 ourLog.info("Starting pass of subscription triggering job {}", theJobDetails.getJobId()); 269 270 // Submit individual resources 271 AtomicInteger totalSubmitted = new AtomicInteger(0); 272 List<Future<?>> futures = new ArrayList<>(); 273 while (!theJobDetails.getRemainingResourceIds().isEmpty() && totalSubmitted.get() < myMaxSubmitPerPass) { 274 totalSubmitted.incrementAndGet(); 275 String nextResourceId = theJobDetails.getRemainingResourceIds().remove(0); 276 submitResource(theJobDetails.getSubscriptionId(), theJobDetails.getRequestPartitionId(), nextResourceId); 277 } 278 279 // Make sure these all succeeded in submitting 280 if (validateFuturesAndReturnTrueIfWeShouldAbort(futures)) { 281 return; 282 } 283 284 IBundleProvider search = null; 285 286 // This is the job initial step where we set ourselves up to do the actual re-submitting of resources 287 // to the broker. Note that querying of resource can be done synchronously or asynchronously 288 if (isInitialStep(theJobDetails) 289 && isNotEmpty(theJobDetails.getRemainingSearchUrls()) 290 && totalSubmitted.get() < myMaxSubmitPerPass) { 291 292 String nextSearchUrl = theJobDetails.getRemainingSearchUrls().remove(0); 293 RuntimeResourceDefinition resourceDef = UrlUtil.parseUrlResourceType(myFhirContext, nextSearchUrl); 294 String queryPart = nextSearchUrl.substring(nextSearchUrl.indexOf('?')); 295 SearchParameterMap params = myMatchUrlService.translateMatchUrl(queryPart, resourceDef); 296 297 String resourceType = resourceDef.getName(); 298 IFhirResourceDao<?> callingDao = myDaoRegistry.getResourceDao(resourceType); 299 300 ourLog.info("Triggering job[{}] is starting a search for {}", theJobDetails.getJobId(), nextSearchUrl); 301 search = mySearchCoordinatorSvc.registerSearch( 302 callingDao, 303 params, 304 resourceType, 305 new CacheControlDirective(), 306 null, 307 theJobDetails.getRequestPartitionId()); 308 309 if (isNull(search.getUuid())) { 310 // we don't have a search uuid i.e. we're setting up for synchronous processing 311 theJobDetails.setCurrentSearchUrl(nextSearchUrl); 312 theJobDetails.setCurrentOffset(params.getOffset()); 313 } else { 314 // populate properties for asynchronous path 315 theJobDetails.setCurrentSearchUuid(search.getUuid()); 316 } 317 318 theJobDetails.setCurrentSearchResourceType(resourceType); 319 theJobDetails.setCurrentSearchCount(params.getCount()); 320 theJobDetails.setCurrentSearchLastUploadedIndex(-1); 321 } 322 323 /* 324 * Processing step for synchronous processing mode - This is only called if the 325 * server is configured to force offset searches, ie using ForceSynchronousSearchInterceptor. 326 * Otherwise, we'll always do async mode. 327 */ 328 if (isNotBlank(theJobDetails.getCurrentSearchUrl()) && totalSubmitted.get() < myMaxSubmitPerPass) { 329 processSynchronous(theJobDetails, totalSubmitted, futures, search); 330 } 331 332 // processing step for asynchronous processing mode 333 if (isNotBlank(theJobDetails.getCurrentSearchUuid()) && totalSubmitted.get() < myMaxSubmitPerPass) { 334 processAsynchronous(theJobDetails, totalSubmitted, futures); 335 } 336 337 ourLog.info( 338 "Subscription trigger job[{}] triggered {} resources in {}ms ({} res / second)", 339 theJobDetails.getJobId(), 340 totalSubmitted, 341 sw.getMillis(), 342 sw.getThroughput(totalSubmitted.get(), TimeUnit.SECONDS)); 343 } 344 345 private void processAsynchronous( 346 SubscriptionTriggeringJobDetails theJobDetails, AtomicInteger totalSubmitted, List<Future<?>> futures) { 347 int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1; 348 349 IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(theJobDetails.getCurrentSearchResourceType()); 350 351 int maxQuerySize = myMaxSubmitPerPass - totalSubmitted.get(); 352 int toIndex; 353 if (theJobDetails.getCurrentSearchCount() != null) { 354 toIndex = Math.min(fromIndex + maxQuerySize, theJobDetails.getCurrentSearchCount()); 355 } else { 356 toIndex = fromIndex + maxQuerySize; 357 } 358 359 ourLog.info( 360 "Triggering job[{}] search {} requesting resources {} - {} from partition {}", 361 theJobDetails.getJobId(), 362 theJobDetails.getCurrentSearchUuid(), 363 fromIndex, 364 toIndex, 365 theJobDetails.getRequestPartitionId()); 366 367 List<? extends IResourcePersistentId<?>> allResourceIds; 368 RequestPartitionId requestPartitionId = theJobDetails.getRequestPartitionId(); 369 try { 370 allResourceIds = mySearchCoordinatorSvc.getResources( 371 theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex, null, requestPartitionId); 372 } catch (ResourceGoneException e) { 373 ourLog.trace("Search has expired, submission is done."); 374 allResourceIds = new ArrayList<>(); 375 } 376 377 ourLog.info("Triggering job[{}] delivering {} resources", theJobDetails.getJobId(), allResourceIds.size()); 378 AtomicInteger highestIndexSubmitted = new AtomicInteger(theJobDetails.getCurrentSearchLastUploadedIndex()); 379 380 List<? extends List<? extends IResourcePersistentId<?>>> partitions = Lists.partition(allResourceIds, 100); 381 for (List<? extends IResourcePersistentId<?>> resourceIds : partitions) { 382 Runnable job = () -> { 383 String resourceType = myFhirContext.getResourceType(theJobDetails.getCurrentSearchResourceType()); 384 RuntimeResourceDefinition resourceDef = 385 myFhirContext.getResourceDefinition(theJobDetails.getCurrentSearchResourceType()); 386 ISearchBuilder searchBuilder = 387 mySearchBuilderFactory.newSearchBuilder(resourceType, resourceDef.getImplementingClass()); 388 List<IBaseResource> listToPopulate = new ArrayList<>(); 389 390 myTransactionService.withRequest(null).execute(() -> { 391 searchBuilder.loadResourcesByPid( 392 resourceIds, Collections.emptyList(), listToPopulate, false, new SystemRequestDetails()); 393 }); 394 395 for (IBaseResource nextResource : listToPopulate) { 396 submitResource( 397 theJobDetails.getSubscriptionId(), theJobDetails.getRequestPartitionId(), nextResource); 398 totalSubmitted.incrementAndGet(); 399 highestIndexSubmitted.incrementAndGet(); 400 } 401 }; 402 403 Future<?> future = myExecutorService.submit(job); 404 futures.add(future); 405 } 406 407 if (!validateFuturesAndReturnTrueIfWeShouldAbort(futures)) { 408 409 theJobDetails.setCurrentSearchLastUploadedIndex(highestIndexSubmitted.get()); 410 411 if (allResourceIds.isEmpty() 412 || (theJobDetails.getCurrentSearchCount() != null 413 && toIndex >= theJobDetails.getCurrentSearchCount())) { 414 ourLog.info( 415 "Triggering job[{}] search {} has completed ", 416 theJobDetails.getJobId(), 417 theJobDetails.getCurrentSearchUuid()); 418 theJobDetails.setCurrentSearchResourceType(null); 419 theJobDetails.setCurrentSearchUuid(null); 420 theJobDetails.setCurrentSearchLastUploadedIndex(-1); 421 theJobDetails.setCurrentSearchCount(null); 422 } 423 } 424 } 425 426 private void processSynchronous( 427 SubscriptionTriggeringJobDetails theJobDetails, 428 AtomicInteger totalSubmitted, 429 List<Future<?>> futures, 430 IBundleProvider search) { 431 List<IBaseResource> allCurrentResources; 432 433 int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1; 434 435 String searchUrl = theJobDetails.getCurrentSearchUrl(); 436 437 ourLog.info( 438 "Triggered job [{}] - Starting synchronous processing at offset {} and index {} on partition {}", 439 theJobDetails.getJobId(), 440 theJobDetails.getCurrentOffset(), 441 fromIndex, 442 theJobDetails.getRequestPartitionId()); 443 444 int submittableCount = myMaxSubmitPerPass - totalSubmitted.get(); 445 int toIndex = fromIndex + submittableCount; 446 447 if (nonNull(search) && !search.isEmpty()) { 448 449 if (search.getCurrentPageSize() != null) { 450 toIndex = search.getCurrentPageSize(); 451 } 452 453 // we already have data from the initial step so process as much as we can. 454 ourLog.info( 455 "Triggered job[{}] will process up to {} resources from partition {}", 456 theJobDetails.getJobId(), 457 toIndex, 458 theJobDetails.getRequestPartitionId()); 459 allCurrentResources = search.getResources(0, toIndex); 460 461 } else { 462 if (theJobDetails.getCurrentSearchCount() != null) { 463 toIndex = Math.min(toIndex, theJobDetails.getCurrentSearchCount()); 464 } 465 466 RuntimeResourceDefinition resourceDef = UrlUtil.parseUrlResourceType(myFhirContext, searchUrl); 467 String queryPart = searchUrl.substring(searchUrl.indexOf('?')); 468 SearchParameterMap params = myMatchUrlService.translateMatchUrl(queryPart, resourceDef); 469 int offset = theJobDetails.getCurrentOffset() + fromIndex; 470 params.setOffset(offset); 471 params.setCount(toIndex); 472 473 ourLog.info( 474 "Triggered job[{}] requesting {} resources from offset {}", 475 theJobDetails.getJobId(), 476 toIndex, 477 offset); 478 479 search = mySearchService.executeQuery(resourceDef.getName(), params, theJobDetails.getRequestPartitionId()); 480 allCurrentResources = search.getResources(0, submittableCount); 481 } 482 483 ourLog.info("Triggered job[{}] delivering {} resources", theJobDetails.getJobId(), allCurrentResources.size()); 484 AtomicInteger highestIndexSubmitted = new AtomicInteger(theJobDetails.getCurrentSearchLastUploadedIndex()); 485 486 for (IBaseResource nextResource : allCurrentResources) { 487 Future<?> future = myExecutorService.submit(() -> submitResource( 488 theJobDetails.getSubscriptionId(), theJobDetails.getRequestPartitionId(), nextResource)); 489 futures.add(future); 490 totalSubmitted.incrementAndGet(); 491 highestIndexSubmitted.incrementAndGet(); 492 } 493 494 if (!validateFuturesAndReturnTrueIfWeShouldAbort(futures)) { 495 496 theJobDetails.setCurrentSearchLastUploadedIndex(highestIndexSubmitted.get()); 497 498 ourLog.info( 499 "Triggered job[{}] lastUploadedIndex is {}", 500 theJobDetails.getJobId(), 501 theJobDetails.getCurrentSearchLastUploadedIndex()); 502 503 if (allCurrentResources.isEmpty() 504 || nonNull(theJobDetails.getCurrentSearchCount()) 505 && toIndex > theJobDetails.getCurrentSearchCount()) { 506 ourLog.info( 507 "Triggered job[{}] for search URL {} has completed ", 508 theJobDetails.getJobId(), 509 theJobDetails.getCurrentSearchUrl()); 510 theJobDetails.setCurrentSearchResourceType(null); 511 theJobDetails.clearCurrentSearchUrl(); 512 theJobDetails.setCurrentSearchLastUploadedIndex(-1); 513 theJobDetails.setCurrentSearchCount(null); 514 } 515 } 516 } 517 518 private boolean isInitialStep(SubscriptionTriggeringJobDetails theJobDetails) { 519 return isBlank(theJobDetails.myCurrentSearchUuid) && isBlank(theJobDetails.myCurrentSearchUrl); 520 } 521 522 private boolean jobHasCompleted(SubscriptionTriggeringJobDetails theJobDetails) { 523 return isInitialStep(theJobDetails); 524 } 525 526 private boolean validateFuturesAndReturnTrueIfWeShouldAbort(List<Future<?>> theFutures) { 527 528 for (Future<?> nextFuture : theFutures) { 529 try { 530 nextFuture.get(); 531 } catch (Exception e) { 532 ourLog.error("Failure triggering resource", e); 533 return true; 534 } 535 } 536 537 // Clear the list since it will potentially get reused 538 theFutures.clear(); 539 return false; 540 } 541 542 private void submitResource( 543 String theSubscriptionId, RequestPartitionId theRequestPartitionId, String theResourceIdToTrigger) { 544 org.hl7.fhir.r4.model.IdType resourceId = new org.hl7.fhir.r4.model.IdType(theResourceIdToTrigger); 545 IFhirResourceDao dao = myDaoRegistry.getResourceDao(resourceId.getResourceType()); 546 IBaseResource resourceToTrigger = dao.read(resourceId, SystemRequestDetails.forAllPartitions()); 547 548 submitResource(theSubscriptionId, theRequestPartitionId, resourceToTrigger); 549 } 550 551 private void submitResource( 552 String theSubscriptionId, RequestPartitionId theRequestPartitionId, IBaseResource theResourceToTrigger) { 553 554 ourLog.info( 555 "Submitting resource {} to subscription {}", 556 theResourceToTrigger.getIdElement().toUnqualifiedVersionless().getValue(), 557 theSubscriptionId); 558 559 ResourceModifiedMessage msg = new ResourceModifiedMessage( 560 myFhirContext, 561 theResourceToTrigger, 562 ResourceModifiedMessage.OperationTypeEnum.MANUALLY_TRIGGERED, 563 theRequestPartitionId); 564 msg.setSubscriptionId(theSubscriptionId); 565 566 for (int i = 0; ; i++) { 567 try { 568 myResourceModifiedConsumer.submitResourceModified(msg); 569 break; 570 } catch (Exception e) { 571 if (i >= 3) { 572 throw new InternalErrorException(Msg.code(25) + e); 573 } 574 575 ourLog.warn("Exception while retriggering subscriptions (going to sleep and retry): {}", e.toString()); 576 try { 577 Thread.sleep(1000); 578 } catch (InterruptedException ex) { 579 Thread.currentThread().interrupt(); 580 } 581 } 582 } 583 } 584 585 public void cancelAll() { 586 synchronized (myActiveJobs) { 587 myActiveJobs.clear(); 588 } 589 } 590 591 /** 592 * Sets the maximum number of resources that will be submitted in a single pass 593 */ 594 public void setMaxSubmitPerPass(Integer theMaxSubmitPerPass) { 595 Integer maxSubmitPerPass = theMaxSubmitPerPass; 596 if (maxSubmitPerPass == null) { 597 maxSubmitPerPass = DEFAULT_MAX_SUBMIT; 598 } 599 Validate.isTrue(maxSubmitPerPass > 0, "theMaxSubmitPerPass must be > 0"); 600 myMaxSubmitPerPass = maxSubmitPerPass; 601 } 602 603 @PostConstruct 604 public void start() { 605 createExecutorService(); 606 } 607 608 private void createExecutorService() { 609 LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>(1000); 610 BasicThreadFactory threadFactory = new BasicThreadFactory.Builder() 611 .namingPattern("SubscriptionTriggering-%d") 612 .daemon(false) 613 .priority(Thread.NORM_PRIORITY) 614 .build(); 615 RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() { 616 @Override 617 public void rejectedExecution(Runnable theRunnable, ThreadPoolExecutor theExecutor) { 618 ourLog.info( 619 "Note: Subscription triggering queue is full ({} elements), waiting for a slot to become available!", 620 executorQueue.size()); 621 StopWatch sw = new StopWatch(); 622 try { 623 executorQueue.put(theRunnable); 624 } catch (InterruptedException theE) { 625 // Restore interrupted state... 626 Thread.currentThread().interrupt(); 627 throw new RejectedExecutionException( 628 Msg.code(26) + "Task " + theRunnable.toString() + " rejected from " + theE.toString()); 629 } 630 ourLog.info("Slot become available after {}ms", sw.getMillis()); 631 } 632 }; 633 myExecutorService = new ThreadPoolExecutor( 634 10, 10, 0L, TimeUnit.MILLISECONDS, executorQueue, threadFactory, rejectedExecutionHandler); 635 } 636 637 @Override 638 public void scheduleJobs(ISchedulerService theSchedulerService) { 639 ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); 640 jobDetail.setId(getClass().getName()); 641 jobDetail.setJobClass(Job.class); 642 // Currently jobs ae kept in a local ArrayList so this should be a local job, and 643 // it can fire frequently without adding load 644 theSchedulerService.scheduleLocalJob(5 * DateUtils.MILLIS_PER_SECOND, jobDetail); 645 } 646 647 public int getActiveJobCount() { 648 return myActiveJobs.size(); 649 } 650 651 public static class Job implements HapiJob { 652 @Autowired 653 private ISubscriptionTriggeringSvc myTarget; 654 655 @Override 656 public void execute(JobExecutionContext theContext) { 657 myTarget.runDeliveryPass(); 658 } 659 } 660 661 private static class SubscriptionTriggeringJobDetails { 662 663 private String myJobId; 664 private String mySubscriptionId; 665 private List<String> myRemainingResourceIds; 666 private List<String> myRemainingSearchUrls; 667 private String myCurrentSearchUuid; 668 private String myCurrentSearchUrl; 669 private Integer myCurrentSearchCount; 670 private String myCurrentSearchResourceType; 671 private int myCurrentSearchLastUploadedIndex; 672 private int myCurrentOffset; 673 674 private RequestPartitionId myRequestPartitionId; 675 676 Integer getCurrentSearchCount() { 677 return myCurrentSearchCount; 678 } 679 680 void setCurrentSearchCount(Integer theCurrentSearchCount) { 681 myCurrentSearchCount = theCurrentSearchCount; 682 } 683 684 String getCurrentSearchResourceType() { 685 return myCurrentSearchResourceType; 686 } 687 688 void setCurrentSearchResourceType(String theCurrentSearchResourceType) { 689 myCurrentSearchResourceType = theCurrentSearchResourceType; 690 } 691 692 String getJobId() { 693 return myJobId; 694 } 695 696 void setJobId(String theJobId) { 697 myJobId = theJobId; 698 } 699 700 String getSubscriptionId() { 701 return mySubscriptionId; 702 } 703 704 void setSubscriptionId(String theSubscriptionId) { 705 mySubscriptionId = theSubscriptionId; 706 } 707 708 List<String> getRemainingResourceIds() { 709 return myRemainingResourceIds; 710 } 711 712 void setRemainingResourceIds(List<String> theRemainingResourceIds) { 713 myRemainingResourceIds = theRemainingResourceIds; 714 } 715 716 List<String> getRemainingSearchUrls() { 717 return myRemainingSearchUrls; 718 } 719 720 void setRemainingSearchUrls(List<String> theRemainingSearchUrls) { 721 myRemainingSearchUrls = theRemainingSearchUrls; 722 } 723 724 String getCurrentSearchUuid() { 725 return myCurrentSearchUuid; 726 } 727 728 void setCurrentSearchUuid(String theCurrentSearchUuid) { 729 myCurrentSearchUuid = theCurrentSearchUuid; 730 } 731 732 public String getCurrentSearchUrl() { 733 return myCurrentSearchUrl; 734 } 735 736 public void setCurrentSearchUrl(String theCurrentSearchUrl) { 737 this.myCurrentSearchUrl = theCurrentSearchUrl; 738 } 739 740 int getCurrentSearchLastUploadedIndex() { 741 return myCurrentSearchLastUploadedIndex; 742 } 743 744 void setCurrentSearchLastUploadedIndex(int theCurrentSearchLastUploadedIndex) { 745 myCurrentSearchLastUploadedIndex = theCurrentSearchLastUploadedIndex; 746 } 747 748 public void clearCurrentSearchUrl() { 749 myCurrentSearchUrl = null; 750 } 751 752 public int getCurrentOffset() { 753 return myCurrentOffset; 754 } 755 756 public void setCurrentOffset(Integer theCurrentOffset) { 757 myCurrentOffset = ObjectUtils.defaultIfNull(theCurrentOffset, 0); 758 } 759 760 public void setRequestPartitionId(RequestPartitionId theRequestPartitionId) { 761 myRequestPartitionId = theRequestPartitionId; 762 } 763 764 public RequestPartitionId getRequestPartitionId() { 765 return myRequestPartitionId; 766 } 767 } 768}