001/*-
002 * #%L
003 * HAPI FHIR Subscription Server
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.subscription.triggering;
021
022import ca.uhn.fhir.context.FhirContext;
023import ca.uhn.fhir.context.RuntimeResourceDefinition;
024import ca.uhn.fhir.i18n.Msg;
025import ca.uhn.fhir.interceptor.model.RequestPartitionId;
026import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
027import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
028import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
029import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
030import ca.uhn.fhir.jpa.api.svc.ISearchSvc;
031import ca.uhn.fhir.jpa.dao.ISearchBuilder;
032import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
033import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
034import ca.uhn.fhir.jpa.model.sched.HapiJob;
035import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs;
036import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
037import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
038import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
039import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
040import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
041import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer;
042import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
043import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
044import ca.uhn.fhir.rest.api.CacheControlDirective;
045import ca.uhn.fhir.rest.api.server.IBundleProvider;
046import ca.uhn.fhir.rest.api.server.RequestDetails;
047import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
048import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
049import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
050import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
051import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
052import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
053import ca.uhn.fhir.util.ParametersUtil;
054import ca.uhn.fhir.util.StopWatch;
055import ca.uhn.fhir.util.UrlUtil;
056import ca.uhn.fhir.util.ValidateUtil;
057import com.google.common.collect.Lists;
058import jakarta.annotation.Nullable;
059import jakarta.annotation.PostConstruct;
060import org.apache.commons.lang3.ObjectUtils;
061import org.apache.commons.lang3.Validate;
062import org.apache.commons.lang3.concurrent.BasicThreadFactory;
063import org.apache.commons.lang3.time.DateUtils;
064import org.hl7.fhir.dstu2.model.IdType;
065import org.hl7.fhir.instance.model.api.IBaseParameters;
066import org.hl7.fhir.instance.model.api.IBaseResource;
067import org.hl7.fhir.instance.model.api.IIdType;
068import org.hl7.fhir.instance.model.api.IPrimitiveType;
069import org.quartz.JobExecutionContext;
070import org.slf4j.Logger;
071import org.slf4j.LoggerFactory;
072import org.springframework.beans.factory.annotation.Autowired;
073
074import java.util.ArrayList;
075import java.util.Collections;
076import java.util.List;
077import java.util.UUID;
078import java.util.concurrent.ExecutorService;
079import java.util.concurrent.Future;
080import java.util.concurrent.LinkedBlockingQueue;
081import java.util.concurrent.RejectedExecutionException;
082import java.util.concurrent.RejectedExecutionHandler;
083import java.util.concurrent.ThreadPoolExecutor;
084import java.util.concurrent.TimeUnit;
085import java.util.concurrent.atomic.AtomicInteger;
086import java.util.stream.Collectors;
087
088import static ca.uhn.fhir.rest.server.provider.ProviderConstants.SUBSCRIPTION_TRIGGERING_PARAM_RESOURCE_ID;
089import static java.util.Objects.isNull;
090import static java.util.Objects.nonNull;
091import static org.apache.commons.collections4.CollectionUtils.isNotEmpty;
092import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
093import static org.apache.commons.lang3.StringUtils.isBlank;
094import static org.apache.commons.lang3.StringUtils.isNotBlank;
095
096public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc, IHasScheduledJobs {
097        private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTriggeringSvcImpl.class);
098        private static final int DEFAULT_MAX_SUBMIT = 10000;
099        private final List<SubscriptionTriggeringJobDetails> myActiveJobs = new ArrayList<>();
100
101        @Autowired
102        private FhirContext myFhirContext;
103
104        @Autowired
105        private DaoRegistry myDaoRegistry;
106
107        @Autowired
108        private JpaStorageSettings myStorageSettings;
109
110        @Autowired
111        private ISearchCoordinatorSvc<? extends IResourcePersistentId<?>> mySearchCoordinatorSvc;
112
113        @Autowired
114        private MatchUrlService myMatchUrlService;
115
116        @Autowired
117        private IResourceModifiedConsumer myResourceModifiedConsumer;
118
119        @Autowired
120        private HapiTransactionService myTransactionService;
121
122        private int myMaxSubmitPerPass = DEFAULT_MAX_SUBMIT;
123        private ExecutorService myExecutorService;
124
125        @Autowired
126        private ISearchSvc mySearchService;
127
128        @Autowired
129        IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
130
131        @Autowired
132        private SearchBuilderFactory mySearchBuilderFactory;
133
134        @Autowired
135        private SubscriptionCanonicalizer mySubscriptionCanonicalizer;
136
137        @Override
138        public IBaseParameters triggerSubscription(
139                        @Nullable List<IPrimitiveType<String>> theResourceIds,
140                        @Nullable List<IPrimitiveType<String>> theSearchUrls,
141                        @Nullable IIdType theSubscriptionId,
142                        RequestDetails theRequestDetails) {
143
144                if (myStorageSettings.getSupportedSubscriptionTypes().isEmpty()) {
145                        throw new PreconditionFailedException(Msg.code(22) + "Subscription processing not active on this server");
146                }
147
148                RequestPartitionId requestPartitionId;
149
150                // Throw a 404 if the subscription doesn't exist, otherwise determine its partition.
151                if (theSubscriptionId != null) {
152                        IFhirResourceDao<?> subscriptionDao = myDaoRegistry.getSubscriptionDao();
153                        IBaseResource subscription = subscriptionDao.read(theSubscriptionId, theRequestDetails);
154                        if (mySubscriptionCanonicalizer.canonicalize(subscription).getCrossPartitionEnabled()) {
155                                requestPartitionId = RequestPartitionId.allPartitions();
156                        } else {
157                                // Otherwise, trust the partition passed in via tenant/interceptor.
158                                requestPartitionId = myRequestPartitionHelperSvc.determineGenericPartitionForRequest(theRequestDetails);
159                        }
160                } else {
161                        // If we have no specific subscription, allow standard partition selection
162                        requestPartitionId = myRequestPartitionHelperSvc.determineGenericPartitionForRequest(theRequestDetails);
163                }
164
165                List<IPrimitiveType<String>> resourceIds = defaultIfNull(theResourceIds, Collections.emptyList());
166                List<IPrimitiveType<String>> searchUrls = defaultIfNull(theSearchUrls, Collections.emptyList());
167                // Make sure we have at least one resource ID or search URL
168                if (resourceIds.isEmpty() && searchUrls.isEmpty()) {
169                        throw new InvalidRequestException(Msg.code(23) + "No resource IDs or search URLs specified for triggering");
170                }
171
172                // Resource URLs must be compete
173                for (IPrimitiveType<String> next : resourceIds) {
174                        IdType resourceId = new IdType(next.getValue());
175                        ValidateUtil.isTrueOrThrowInvalidRequest(
176                                        resourceId.hasResourceType(),
177                                        SUBSCRIPTION_TRIGGERING_PARAM_RESOURCE_ID + " parameter must have resource type");
178                        ValidateUtil.isTrueOrThrowInvalidRequest(
179                                        resourceId.hasIdPart(),
180                                        SUBSCRIPTION_TRIGGERING_PARAM_RESOURCE_ID + " parameter must have resource ID part");
181                }
182
183                // Search URLs must be valid
184                for (IPrimitiveType<String> next : searchUrls) {
185                        if (!next.getValue().contains("?")) {
186                                throw new InvalidRequestException(Msg.code(24)
187                                                + "Search URL is not valid (must be in the form \"[resource type]?[optional params]\")");
188                        }
189                }
190
191                SubscriptionTriggeringJobDetails jobDetails = new SubscriptionTriggeringJobDetails();
192                jobDetails.setJobId(UUID.randomUUID().toString());
193                jobDetails.setRequestPartitionId(
194                                requestPartitionId == null ? RequestPartitionId.allPartitions() : requestPartitionId);
195                jobDetails.setRemainingResourceIds(
196                                resourceIds.stream().map(IPrimitiveType::getValue).collect(Collectors.toList()));
197                jobDetails.setRemainingSearchUrls(
198                                searchUrls.stream().map(IPrimitiveType::getValue).collect(Collectors.toList()));
199                if (theSubscriptionId != null) {
200                        jobDetails.setSubscriptionId(theSubscriptionId.getIdPart());
201                }
202
203                // Submit job for processing
204                synchronized (myActiveJobs) {
205                        myActiveJobs.add(jobDetails);
206                        ourLog.info(
207                                        "Subscription triggering requested for {} resource and {} search - Gave job ID: {} and have {} jobs",
208                                        resourceIds.size(),
209                                        searchUrls.size(),
210                                        jobDetails.getJobId(),
211                                        myActiveJobs.size());
212                }
213
214                // Create a parameters response
215                IBaseParameters retVal = ParametersUtil.newInstance(myFhirContext);
216                IPrimitiveType<?> value =
217                                (IPrimitiveType<?>) myFhirContext.getElementDefinition("string").newInstance();
218                value.setValueAsString("Subscription triggering job submitted as JOB ID: " + jobDetails.myJobId);
219                ParametersUtil.addParameterToParameters(myFhirContext, retVal, "information", value);
220                return retVal;
221        }
222
223        @Override
224        public IBaseParameters triggerSubscription(
225                        @Nullable List<IPrimitiveType<String>> theResourceIds,
226                        @Nullable List<IPrimitiveType<String>> theSearchUrls,
227                        @Nullable IIdType theSubscriptionId) {
228                return triggerSubscription(
229                                theResourceIds, theSearchUrls, theSubscriptionId, SystemRequestDetails.newSystemRequestAllPartitions());
230        }
231
232        @Override
233        public void runDeliveryPass() {
234
235                synchronized (myActiveJobs) {
236                        if (myActiveJobs.isEmpty()) {
237                                return;
238                        }
239
240                        String activeJobIds = myActiveJobs.stream()
241                                        .map(SubscriptionTriggeringJobDetails::getJobId)
242                                        .collect(Collectors.joining(", "));
243                        ourLog.info("Starting pass: currently have {} active job IDs: {}", myActiveJobs.size(), activeJobIds);
244
245                        SubscriptionTriggeringJobDetails activeJob = myActiveJobs.get(0);
246
247                        runJob(activeJob);
248
249                        // If the job is complete, remove it from the queue
250                        if (activeJob.getRemainingResourceIds().isEmpty()) {
251                                if (activeJob.getRemainingSearchUrls().isEmpty()) {
252                                        if (jobHasCompleted(activeJob)) {
253                                                myActiveJobs.remove(0);
254                                                String remainingJobsMsg = "";
255                                                if (myActiveJobs.size() > 0) {
256                                                        remainingJobsMsg = "(" + myActiveJobs.size() + " jobs remaining)";
257                                                }
258                                                ourLog.info(
259                                                                "Subscription triggering job {} is complete{}", activeJob.getJobId(), remainingJobsMsg);
260                                        }
261                                }
262                        }
263                }
264        }
265
266        private void runJob(SubscriptionTriggeringJobDetails theJobDetails) {
267                StopWatch sw = new StopWatch();
268                ourLog.info("Starting pass of subscription triggering job {}", theJobDetails.getJobId());
269
270                // Submit individual resources
271                AtomicInteger totalSubmitted = new AtomicInteger(0);
272                List<Future<?>> futures = new ArrayList<>();
273                while (!theJobDetails.getRemainingResourceIds().isEmpty() && totalSubmitted.get() < myMaxSubmitPerPass) {
274                        totalSubmitted.incrementAndGet();
275                        String nextResourceId = theJobDetails.getRemainingResourceIds().remove(0);
276                        submitResource(theJobDetails.getSubscriptionId(), theJobDetails.getRequestPartitionId(), nextResourceId);
277                }
278
279                // Make sure these all succeeded in submitting
280                if (validateFuturesAndReturnTrueIfWeShouldAbort(futures)) {
281                        return;
282                }
283
284                IBundleProvider search = null;
285
286                // This is the job initial step where we set ourselves up to do the actual re-submitting of resources
287                // to the broker.  Note that querying of resource can be done synchronously or asynchronously
288                if (isInitialStep(theJobDetails)
289                                && isNotEmpty(theJobDetails.getRemainingSearchUrls())
290                                && totalSubmitted.get() < myMaxSubmitPerPass) {
291
292                        String nextSearchUrl = theJobDetails.getRemainingSearchUrls().remove(0);
293                        RuntimeResourceDefinition resourceDef = UrlUtil.parseUrlResourceType(myFhirContext, nextSearchUrl);
294                        String queryPart = nextSearchUrl.substring(nextSearchUrl.indexOf('?'));
295                        SearchParameterMap params = myMatchUrlService.translateMatchUrl(queryPart, resourceDef);
296
297                        String resourceType = resourceDef.getName();
298                        IFhirResourceDao<?> callingDao = myDaoRegistry.getResourceDao(resourceType);
299
300                        ourLog.info("Triggering job[{}] is starting a search for {}", theJobDetails.getJobId(), nextSearchUrl);
301                        search = mySearchCoordinatorSvc.registerSearch(
302                                        callingDao,
303                                        params,
304                                        resourceType,
305                                        new CacheControlDirective(),
306                                        null,
307                                        theJobDetails.getRequestPartitionId());
308
309                        if (isNull(search.getUuid())) {
310                                // we don't have a search uuid i.e. we're setting up for synchronous processing
311                                theJobDetails.setCurrentSearchUrl(nextSearchUrl);
312                                theJobDetails.setCurrentOffset(params.getOffset());
313                        } else {
314                                // populate properties for asynchronous path
315                                theJobDetails.setCurrentSearchUuid(search.getUuid());
316                        }
317
318                        theJobDetails.setCurrentSearchResourceType(resourceType);
319                        theJobDetails.setCurrentSearchCount(params.getCount());
320                        theJobDetails.setCurrentSearchLastUploadedIndex(-1);
321                }
322
323                /*
324                 * Processing step for synchronous processing mode - This is only called if the
325                 * server is configured to force offset searches, ie using ForceSynchronousSearchInterceptor.
326                 * Otherwise, we'll always do async mode.
327                 */
328                if (isNotBlank(theJobDetails.getCurrentSearchUrl()) && totalSubmitted.get() < myMaxSubmitPerPass) {
329                        processSynchronous(theJobDetails, totalSubmitted, futures, search);
330                }
331
332                // processing step for asynchronous processing mode
333                if (isNotBlank(theJobDetails.getCurrentSearchUuid()) && totalSubmitted.get() < myMaxSubmitPerPass) {
334                        processAsynchronous(theJobDetails, totalSubmitted, futures);
335                }
336
337                ourLog.info(
338                                "Subscription trigger job[{}] triggered {} resources in {}ms ({} res / second)",
339                                theJobDetails.getJobId(),
340                                totalSubmitted,
341                                sw.getMillis(),
342                                sw.getThroughput(totalSubmitted.get(), TimeUnit.SECONDS));
343        }
344
345        private void processAsynchronous(
346                        SubscriptionTriggeringJobDetails theJobDetails, AtomicInteger totalSubmitted, List<Future<?>> futures) {
347                int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1;
348
349                IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(theJobDetails.getCurrentSearchResourceType());
350
351                int maxQuerySize = myMaxSubmitPerPass - totalSubmitted.get();
352                int toIndex;
353                if (theJobDetails.getCurrentSearchCount() != null) {
354                        toIndex = Math.min(fromIndex + maxQuerySize, theJobDetails.getCurrentSearchCount());
355                } else {
356                        toIndex = fromIndex + maxQuerySize;
357                }
358
359                ourLog.info(
360                                "Triggering job[{}] search {} requesting resources {} - {} from partition {}",
361                                theJobDetails.getJobId(),
362                                theJobDetails.getCurrentSearchUuid(),
363                                fromIndex,
364                                toIndex,
365                                theJobDetails.getRequestPartitionId());
366
367                List<? extends IResourcePersistentId<?>> allResourceIds;
368                RequestPartitionId requestPartitionId = theJobDetails.getRequestPartitionId();
369                try {
370                        allResourceIds = mySearchCoordinatorSvc.getResources(
371                                        theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex, null, requestPartitionId);
372                } catch (ResourceGoneException e) {
373                        ourLog.trace("Search has expired, submission is done.");
374                        allResourceIds = new ArrayList<>();
375                }
376
377                ourLog.info("Triggering job[{}] delivering {} resources", theJobDetails.getJobId(), allResourceIds.size());
378                AtomicInteger highestIndexSubmitted = new AtomicInteger(theJobDetails.getCurrentSearchLastUploadedIndex());
379
380                List<? extends List<? extends IResourcePersistentId<?>>> partitions = Lists.partition(allResourceIds, 100);
381                for (List<? extends IResourcePersistentId<?>> resourceIds : partitions) {
382                        Runnable job = () -> {
383                                String resourceType = myFhirContext.getResourceType(theJobDetails.getCurrentSearchResourceType());
384                                RuntimeResourceDefinition resourceDef =
385                                                myFhirContext.getResourceDefinition(theJobDetails.getCurrentSearchResourceType());
386                                ISearchBuilder searchBuilder = mySearchBuilderFactory.newSearchBuilder(
387                                                resourceDao, resourceType, resourceDef.getImplementingClass());
388                                List<IBaseResource> listToPopulate = new ArrayList<>();
389
390                                myTransactionService.withRequest(null).execute(() -> {
391                                        searchBuilder.loadResourcesByPid(
392                                                        resourceIds, Collections.emptyList(), listToPopulate, false, new SystemRequestDetails());
393                                });
394
395                                for (IBaseResource nextResource : listToPopulate) {
396                                        submitResource(
397                                                        theJobDetails.getSubscriptionId(), theJobDetails.getRequestPartitionId(), nextResource);
398                                        totalSubmitted.incrementAndGet();
399                                        highestIndexSubmitted.incrementAndGet();
400                                }
401                        };
402
403                        Future<?> future = myExecutorService.submit(job);
404                        futures.add(future);
405                }
406
407                if (!validateFuturesAndReturnTrueIfWeShouldAbort(futures)) {
408
409                        theJobDetails.setCurrentSearchLastUploadedIndex(highestIndexSubmitted.get());
410
411                        if (allResourceIds.isEmpty()
412                                        || (theJobDetails.getCurrentSearchCount() != null
413                                                        && toIndex >= theJobDetails.getCurrentSearchCount())) {
414                                ourLog.info(
415                                                "Triggering job[{}] search {} has completed ",
416                                                theJobDetails.getJobId(),
417                                                theJobDetails.getCurrentSearchUuid());
418                                theJobDetails.setCurrentSearchResourceType(null);
419                                theJobDetails.setCurrentSearchUuid(null);
420                                theJobDetails.setCurrentSearchLastUploadedIndex(-1);
421                                theJobDetails.setCurrentSearchCount(null);
422                        }
423                }
424        }
425
426        private void processSynchronous(
427                        SubscriptionTriggeringJobDetails theJobDetails,
428                        AtomicInteger totalSubmitted,
429                        List<Future<?>> futures,
430                        IBundleProvider search) {
431                List<IBaseResource> allCurrentResources;
432
433                int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1;
434
435                String searchUrl = theJobDetails.getCurrentSearchUrl();
436
437                ourLog.info(
438                                "Triggered job [{}] - Starting synchronous processing at offset {} and index {} on partition {}",
439                                theJobDetails.getJobId(),
440                                theJobDetails.getCurrentOffset(),
441                                fromIndex,
442                                theJobDetails.getRequestPartitionId());
443
444                int submittableCount = myMaxSubmitPerPass - totalSubmitted.get();
445                int toIndex = fromIndex + submittableCount;
446
447                if (nonNull(search) && !search.isEmpty()) {
448
449                        if (search.getCurrentPageSize() != null) {
450                                toIndex = search.getCurrentPageSize();
451                        }
452
453                        // we already have data from the initial step so process as much as we can.
454                        ourLog.info(
455                                        "Triggered job[{}] will process up to {} resources from partition {}",
456                                        theJobDetails.getJobId(),
457                                        toIndex,
458                                        theJobDetails.getRequestPartitionId());
459                        allCurrentResources = search.getResources(0, toIndex);
460
461                } else {
462                        if (theJobDetails.getCurrentSearchCount() != null) {
463                                toIndex = Math.min(toIndex, theJobDetails.getCurrentSearchCount());
464                        }
465
466                        RuntimeResourceDefinition resourceDef = UrlUtil.parseUrlResourceType(myFhirContext, searchUrl);
467                        String queryPart = searchUrl.substring(searchUrl.indexOf('?'));
468                        SearchParameterMap params = myMatchUrlService.translateMatchUrl(queryPart, resourceDef);
469                        int offset = theJobDetails.getCurrentOffset() + fromIndex;
470                        params.setOffset(offset);
471                        params.setCount(toIndex);
472
473                        ourLog.info(
474                                        "Triggered job[{}] requesting {} resources from offset {}",
475                                        theJobDetails.getJobId(),
476                                        toIndex,
477                                        offset);
478
479                        search = mySearchService.executeQuery(resourceDef.getName(), params, theJobDetails.getRequestPartitionId());
480                        allCurrentResources = search.getResources(0, submittableCount);
481                }
482
483                ourLog.info("Triggered job[{}] delivering {} resources", theJobDetails.getJobId(), allCurrentResources.size());
484                AtomicInteger highestIndexSubmitted = new AtomicInteger(theJobDetails.getCurrentSearchLastUploadedIndex());
485
486                for (IBaseResource nextResource : allCurrentResources) {
487                        Future<?> future = myExecutorService.submit(() -> submitResource(
488                                        theJobDetails.getSubscriptionId(), theJobDetails.getRequestPartitionId(), nextResource));
489                        futures.add(future);
490                        totalSubmitted.incrementAndGet();
491                        highestIndexSubmitted.incrementAndGet();
492                }
493
494                if (!validateFuturesAndReturnTrueIfWeShouldAbort(futures)) {
495
496                        theJobDetails.setCurrentSearchLastUploadedIndex(highestIndexSubmitted.get());
497
498                        ourLog.info(
499                                        "Triggered job[{}] lastUploadedIndex is {}",
500                                        theJobDetails.getJobId(),
501                                        theJobDetails.getCurrentSearchLastUploadedIndex());
502
503                        if (allCurrentResources.isEmpty()
504                                        || nonNull(theJobDetails.getCurrentSearchCount())
505                                                        && toIndex > theJobDetails.getCurrentSearchCount()) {
506                                ourLog.info(
507                                                "Triggered job[{}] for search URL {} has completed ",
508                                                theJobDetails.getJobId(),
509                                                theJobDetails.getCurrentSearchUrl());
510                                theJobDetails.setCurrentSearchResourceType(null);
511                                theJobDetails.clearCurrentSearchUrl();
512                                theJobDetails.setCurrentSearchLastUploadedIndex(-1);
513                                theJobDetails.setCurrentSearchCount(null);
514                        }
515                }
516        }
517
518        private boolean isInitialStep(SubscriptionTriggeringJobDetails theJobDetails) {
519                return isBlank(theJobDetails.myCurrentSearchUuid) && isBlank(theJobDetails.myCurrentSearchUrl);
520        }
521
522        private boolean jobHasCompleted(SubscriptionTriggeringJobDetails theJobDetails) {
523                return isInitialStep(theJobDetails);
524        }
525
526        private boolean validateFuturesAndReturnTrueIfWeShouldAbort(List<Future<?>> theFutures) {
527
528                for (Future<?> nextFuture : theFutures) {
529                        try {
530                                nextFuture.get();
531                        } catch (Exception e) {
532                                ourLog.error("Failure triggering resource", e);
533                                return true;
534                        }
535                }
536
537                // Clear the list since it will potentially get reused
538                theFutures.clear();
539                return false;
540        }
541
542        private void submitResource(
543                        String theSubscriptionId, RequestPartitionId theRequestPartitionId, String theResourceIdToTrigger) {
544                org.hl7.fhir.r4.model.IdType resourceId = new org.hl7.fhir.r4.model.IdType(theResourceIdToTrigger);
545                IFhirResourceDao dao = myDaoRegistry.getResourceDao(resourceId.getResourceType());
546                IBaseResource resourceToTrigger = dao.read(resourceId, SystemRequestDetails.forAllPartitions());
547
548                submitResource(theSubscriptionId, theRequestPartitionId, resourceToTrigger);
549        }
550
551        private void submitResource(
552                        String theSubscriptionId, RequestPartitionId theRequestPartitionId, IBaseResource theResourceToTrigger) {
553
554                ourLog.info(
555                                "Submitting resource {} to subscription {}",
556                                theResourceToTrigger.getIdElement().toUnqualifiedVersionless().getValue(),
557                                theSubscriptionId);
558
559                ResourceModifiedMessage msg = new ResourceModifiedMessage(
560                                myFhirContext,
561                                theResourceToTrigger,
562                                ResourceModifiedMessage.OperationTypeEnum.MANUALLY_TRIGGERED,
563                                theRequestPartitionId);
564                msg.setSubscriptionId(theSubscriptionId);
565
566                for (int i = 0; ; i++) {
567                        try {
568                                myResourceModifiedConsumer.submitResourceModified(msg);
569                                break;
570                        } catch (Exception e) {
571                                if (i >= 3) {
572                                        throw new InternalErrorException(Msg.code(25) + e);
573                                }
574
575                                ourLog.warn("Exception while retriggering subscriptions (going to sleep and retry): {}", e.toString());
576                                try {
577                                        Thread.sleep(1000);
578                                } catch (InterruptedException ex) {
579                                        Thread.currentThread().interrupt();
580                                }
581                        }
582                }
583        }
584
585        public void cancelAll() {
586                synchronized (myActiveJobs) {
587                        myActiveJobs.clear();
588                }
589        }
590
591        /**
592         * Sets the maximum number of resources that will be submitted in a single pass
593         */
594        public void setMaxSubmitPerPass(Integer theMaxSubmitPerPass) {
595                Integer maxSubmitPerPass = theMaxSubmitPerPass;
596                if (maxSubmitPerPass == null) {
597                        maxSubmitPerPass = DEFAULT_MAX_SUBMIT;
598                }
599                Validate.isTrue(maxSubmitPerPass > 0, "theMaxSubmitPerPass must be > 0");
600                myMaxSubmitPerPass = maxSubmitPerPass;
601        }
602
603        @PostConstruct
604        public void start() {
605                createExecutorService();
606        }
607
608        private void createExecutorService() {
609                LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>(1000);
610                BasicThreadFactory threadFactory = new BasicThreadFactory.Builder()
611                                .namingPattern("SubscriptionTriggering-%d")
612                                .daemon(false)
613                                .priority(Thread.NORM_PRIORITY)
614                                .build();
615                RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
616                        @Override
617                        public void rejectedExecution(Runnable theRunnable, ThreadPoolExecutor theExecutor) {
618                                ourLog.info(
619                                                "Note: Subscription triggering queue is full ({} elements), waiting for a slot to become available!",
620                                                executorQueue.size());
621                                StopWatch sw = new StopWatch();
622                                try {
623                                        executorQueue.put(theRunnable);
624                                } catch (InterruptedException theE) {
625                                        // Restore interrupted state...
626                                        Thread.currentThread().interrupt();
627                                        throw new RejectedExecutionException(
628                                                        Msg.code(26) + "Task " + theRunnable.toString() + " rejected from " + theE.toString());
629                                }
630                                ourLog.info("Slot become available after {}ms", sw.getMillis());
631                        }
632                };
633                myExecutorService = new ThreadPoolExecutor(
634                                10, 10, 0L, TimeUnit.MILLISECONDS, executorQueue, threadFactory, rejectedExecutionHandler);
635        }
636
637        @Override
638        public void scheduleJobs(ISchedulerService theSchedulerService) {
639                ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
640                jobDetail.setId(getClass().getName());
641                jobDetail.setJobClass(Job.class);
642                // Currently jobs ae kept in a local ArrayList so this should be a local job, and
643                // it can fire frequently without adding load
644                theSchedulerService.scheduleLocalJob(5 * DateUtils.MILLIS_PER_SECOND, jobDetail);
645        }
646
647        public int getActiveJobCount() {
648                return myActiveJobs.size();
649        }
650
651        public static class Job implements HapiJob {
652                @Autowired
653                private ISubscriptionTriggeringSvc myTarget;
654
655                @Override
656                public void execute(JobExecutionContext theContext) {
657                        myTarget.runDeliveryPass();
658                }
659        }
660
661        private static class SubscriptionTriggeringJobDetails {
662
663                private String myJobId;
664                private String mySubscriptionId;
665                private List<String> myRemainingResourceIds;
666                private List<String> myRemainingSearchUrls;
667                private String myCurrentSearchUuid;
668                private String myCurrentSearchUrl;
669                private Integer myCurrentSearchCount;
670                private String myCurrentSearchResourceType;
671                private int myCurrentSearchLastUploadedIndex;
672                private int myCurrentOffset;
673
674                private RequestPartitionId myRequestPartitionId;
675
676                Integer getCurrentSearchCount() {
677                        return myCurrentSearchCount;
678                }
679
680                void setCurrentSearchCount(Integer theCurrentSearchCount) {
681                        myCurrentSearchCount = theCurrentSearchCount;
682                }
683
684                String getCurrentSearchResourceType() {
685                        return myCurrentSearchResourceType;
686                }
687
688                void setCurrentSearchResourceType(String theCurrentSearchResourceType) {
689                        myCurrentSearchResourceType = theCurrentSearchResourceType;
690                }
691
692                String getJobId() {
693                        return myJobId;
694                }
695
696                void setJobId(String theJobId) {
697                        myJobId = theJobId;
698                }
699
700                String getSubscriptionId() {
701                        return mySubscriptionId;
702                }
703
704                void setSubscriptionId(String theSubscriptionId) {
705                        mySubscriptionId = theSubscriptionId;
706                }
707
708                List<String> getRemainingResourceIds() {
709                        return myRemainingResourceIds;
710                }
711
712                void setRemainingResourceIds(List<String> theRemainingResourceIds) {
713                        myRemainingResourceIds = theRemainingResourceIds;
714                }
715
716                List<String> getRemainingSearchUrls() {
717                        return myRemainingSearchUrls;
718                }
719
720                void setRemainingSearchUrls(List<String> theRemainingSearchUrls) {
721                        myRemainingSearchUrls = theRemainingSearchUrls;
722                }
723
724                String getCurrentSearchUuid() {
725                        return myCurrentSearchUuid;
726                }
727
728                void setCurrentSearchUuid(String theCurrentSearchUuid) {
729                        myCurrentSearchUuid = theCurrentSearchUuid;
730                }
731
732                public String getCurrentSearchUrl() {
733                        return myCurrentSearchUrl;
734                }
735
736                public void setCurrentSearchUrl(String theCurrentSearchUrl) {
737                        this.myCurrentSearchUrl = theCurrentSearchUrl;
738                }
739
740                int getCurrentSearchLastUploadedIndex() {
741                        return myCurrentSearchLastUploadedIndex;
742                }
743
744                void setCurrentSearchLastUploadedIndex(int theCurrentSearchLastUploadedIndex) {
745                        myCurrentSearchLastUploadedIndex = theCurrentSearchLastUploadedIndex;
746                }
747
748                public void clearCurrentSearchUrl() {
749                        myCurrentSearchUrl = null;
750                }
751
752                public int getCurrentOffset() {
753                        return myCurrentOffset;
754                }
755
756                public void setCurrentOffset(Integer theCurrentOffset) {
757                        myCurrentOffset = ObjectUtils.defaultIfNull(theCurrentOffset, 0);
758                }
759
760                public void setRequestPartitionId(RequestPartitionId theRequestPartitionId) {
761                        myRequestPartitionId = theRequestPartitionId;
762                }
763
764                public RequestPartitionId getRequestPartitionId() {
765                        return myRequestPartitionId;
766                }
767        }
768}