001/*-
002 * #%L
003 * HAPI FHIR Subscription Server
004 * %%
005 * Copyright (C) 2014 - 2023 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.subscription.triggering;
021
022import ca.uhn.fhir.context.FhirContext;
023import ca.uhn.fhir.context.RuntimeResourceDefinition;
024import ca.uhn.fhir.i18n.Msg;
025import ca.uhn.fhir.interceptor.model.RequestPartitionId;
026import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
027import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
028import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
029import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
030import ca.uhn.fhir.jpa.api.svc.ISearchSvc;
031import ca.uhn.fhir.jpa.dao.ISearchBuilder;
032import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
033import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
034import ca.uhn.fhir.jpa.model.sched.HapiJob;
035import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs;
036import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
037import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
038import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
039import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
040import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer;
041import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
042import ca.uhn.fhir.model.dstu2.valueset.ResourceTypeEnum;
043import ca.uhn.fhir.rest.api.CacheControlDirective;
044import ca.uhn.fhir.rest.api.server.IBundleProvider;
045import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
046import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
047import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
048import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
049import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
050import ca.uhn.fhir.util.ParametersUtil;
051import ca.uhn.fhir.util.StopWatch;
052import ca.uhn.fhir.util.UrlUtil;
053import ca.uhn.fhir.util.ValidateUtil;
054import org.apache.commons.lang3.ObjectUtils;
055import org.apache.commons.lang3.Validate;
056import org.apache.commons.lang3.concurrent.BasicThreadFactory;
057import org.apache.commons.lang3.time.DateUtils;
058import org.apache.commons.lang3.tuple.Pair;
059import org.hl7.fhir.dstu2.model.IdType;
060import org.hl7.fhir.instance.model.api.IBaseParameters;
061import org.hl7.fhir.instance.model.api.IBaseResource;
062import org.hl7.fhir.instance.model.api.IIdType;
063import org.hl7.fhir.instance.model.api.IPrimitiveType;
064import org.quartz.JobExecutionContext;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067import org.springframework.beans.factory.annotation.Autowired;
068
069import java.util.ArrayList;
070import java.util.Collections;
071import java.util.List;
072import java.util.UUID;
073import java.util.concurrent.ExecutorService;
074import java.util.concurrent.Future;
075import java.util.concurrent.LinkedBlockingQueue;
076import java.util.concurrent.RejectedExecutionException;
077import java.util.concurrent.RejectedExecutionHandler;
078import java.util.concurrent.ThreadPoolExecutor;
079import java.util.concurrent.TimeUnit;
080import java.util.stream.Collectors;
081import javax.annotation.Nullable;
082import javax.annotation.PostConstruct;
083
084import static ca.uhn.fhir.rest.server.provider.ProviderConstants.SUBSCRIPTION_TRIGGERING_PARAM_RESOURCE_ID;
085import static java.util.Objects.isNull;
086import static java.util.Objects.nonNull;
087import static org.apache.commons.collections4.CollectionUtils.isNotEmpty;
088import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
089import static org.apache.commons.lang3.StringUtils.isBlank;
090import static org.apache.commons.lang3.StringUtils.isNotBlank;
091
092public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc, IHasScheduledJobs {
093        private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTriggeringSvcImpl.class);
094        private static final int DEFAULT_MAX_SUBMIT = 10000;
095        private final List<SubscriptionTriggeringJobDetails> myActiveJobs = new ArrayList<>();
096
097        @Autowired
098        private FhirContext myFhirContext;
099
100        @Autowired
101        private DaoRegistry myDaoRegistry;
102
103        @Autowired
104        private JpaStorageSettings myStorageSettings;
105
106        @Autowired
107        private ISearchCoordinatorSvc mySearchCoordinatorSvc;
108
109        @Autowired
110        private MatchUrlService myMatchUrlService;
111
112        @Autowired
113        private IResourceModifiedConsumer myResourceModifiedConsumer;
114
115        @Autowired
116        private HapiTransactionService myTransactionService;
117
118        private int myMaxSubmitPerPass = DEFAULT_MAX_SUBMIT;
119        private ExecutorService myExecutorService;
120
121        @Autowired
122        private ISearchSvc mySearchService;
123
124        @Autowired
125        private SearchBuilderFactory mySearchBuilderFactory;
126
127        @Override
128        public IBaseParameters triggerSubscription(
129                        @Nullable List<IPrimitiveType<String>> theResourceIds,
130                        @Nullable List<IPrimitiveType<String>> theSearchUrls,
131                        @Nullable IIdType theSubscriptionId) {
132
133                if (myStorageSettings.getSupportedSubscriptionTypes().isEmpty()) {
134                        throw new PreconditionFailedException(Msg.code(22) + "Subscription processing not active on this server");
135                }
136
137                // Throw a 404 if the subscription doesn't exist
138                if (theSubscriptionId != null) {
139                        IFhirResourceDao<?> subscriptionDao = myDaoRegistry.getSubscriptionDao();
140                        IIdType subscriptionId = theSubscriptionId;
141                        if (!subscriptionId.hasResourceType()) {
142                                subscriptionId = subscriptionId.withResourceType(ResourceTypeEnum.SUBSCRIPTION.getCode());
143                        }
144                        subscriptionDao.read(subscriptionId, SystemRequestDetails.forAllPartitions());
145                }
146
147                List<IPrimitiveType<String>> resourceIds = defaultIfNull(theResourceIds, Collections.emptyList());
148                List<IPrimitiveType<String>> searchUrls = defaultIfNull(theSearchUrls, Collections.emptyList());
149
150                // Make sure we have at least one resource ID or search URL
151                if (resourceIds.size() == 0 && searchUrls.size() == 0) {
152                        throw new InvalidRequestException(Msg.code(23) + "No resource IDs or search URLs specified for triggering");
153                }
154
155                // Resource URLs must be compete
156                for (IPrimitiveType<String> next : resourceIds) {
157                        IdType resourceId = new IdType(next.getValue());
158                        ValidateUtil.isTrueOrThrowInvalidRequest(
159                                        resourceId.hasResourceType(),
160                                        SUBSCRIPTION_TRIGGERING_PARAM_RESOURCE_ID + " parameter must have resource type");
161                        ValidateUtil.isTrueOrThrowInvalidRequest(
162                                        resourceId.hasIdPart(),
163                                        SUBSCRIPTION_TRIGGERING_PARAM_RESOURCE_ID + " parameter must have resource ID part");
164                }
165
166                // Search URLs must be valid
167                for (IPrimitiveType<String> next : searchUrls) {
168                        if (!next.getValue().contains("?")) {
169                                throw new InvalidRequestException(Msg.code(24)
170                                                + "Search URL is not valid (must be in the form \"[resource type]?[optional params]\")");
171                        }
172                }
173
174                SubscriptionTriggeringJobDetails jobDetails = new SubscriptionTriggeringJobDetails();
175                jobDetails.setJobId(UUID.randomUUID().toString());
176                jobDetails.setRemainingResourceIds(
177                                resourceIds.stream().map(IPrimitiveType::getValue).collect(Collectors.toList()));
178                jobDetails.setRemainingSearchUrls(
179                                searchUrls.stream().map(IPrimitiveType::getValue).collect(Collectors.toList()));
180                if (theSubscriptionId != null) {
181                        jobDetails.setSubscriptionId(theSubscriptionId.getIdPart());
182                }
183
184                // Submit job for processing
185                synchronized (myActiveJobs) {
186                        myActiveJobs.add(jobDetails);
187                        ourLog.info(
188                                        "Subscription triggering requested for {} resource and {} search - Gave job ID: {} and have {} jobs",
189                                        resourceIds.size(),
190                                        searchUrls.size(),
191                                        jobDetails.getJobId(),
192                                        myActiveJobs.size());
193                }
194
195                // Create a parameters response
196                IBaseParameters retVal = ParametersUtil.newInstance(myFhirContext);
197                IPrimitiveType<?> value =
198                                (IPrimitiveType<?>) myFhirContext.getElementDefinition("string").newInstance();
199                value.setValueAsString("Subscription triggering job submitted as JOB ID: " + jobDetails.myJobId);
200                ParametersUtil.addParameterToParameters(myFhirContext, retVal, "information", value);
201                return retVal;
202        }
203
204        @Override
205        public void runDeliveryPass() {
206
207                synchronized (myActiveJobs) {
208                        if (myActiveJobs.isEmpty()) {
209                                return;
210                        }
211
212                        String activeJobIds = myActiveJobs.stream()
213                                        .map(SubscriptionTriggeringJobDetails::getJobId)
214                                        .collect(Collectors.joining(", "));
215                        ourLog.info("Starting pass: currently have {} active job IDs: {}", myActiveJobs.size(), activeJobIds);
216
217                        SubscriptionTriggeringJobDetails activeJob = myActiveJobs.get(0);
218
219                        runJob(activeJob);
220
221                        // If the job is complete, remove it from the queue
222                        if (activeJob.getRemainingResourceIds().isEmpty()) {
223                                if (activeJob.getRemainingSearchUrls().isEmpty()) {
224                                        if (jobHasCompleted(activeJob)) {
225                                                myActiveJobs.remove(0);
226                                                String remainingJobsMsg = "";
227                                                if (myActiveJobs.size() > 0) {
228                                                        remainingJobsMsg = "(" + myActiveJobs.size() + " jobs remaining)";
229                                                }
230                                                ourLog.info(
231                                                                "Subscription triggering job {} is complete{}", activeJob.getJobId(), remainingJobsMsg);
232                                        }
233                                }
234                        }
235                }
236        }
237
238        private void runJob(SubscriptionTriggeringJobDetails theJobDetails) {
239                StopWatch sw = new StopWatch();
240                ourLog.info("Starting pass of subscription triggering job {}", theJobDetails.getJobId());
241
242                // Submit individual resources
243                int totalSubmitted = 0;
244                List<Pair<String, Future<Void>>> futures = new ArrayList<>();
245                while (theJobDetails.getRemainingResourceIds().size() > 0 && totalSubmitted < myMaxSubmitPerPass) {
246                        totalSubmitted++;
247                        String nextResourceId = theJobDetails.getRemainingResourceIds().remove(0);
248                        Future<Void> future = submitResource(theJobDetails.getSubscriptionId(), nextResourceId);
249                        futures.add(Pair.of(nextResourceId, future));
250                }
251
252                // Make sure these all succeeded in submitting
253                if (validateFuturesAndReturnTrueIfWeShouldAbort(futures)) {
254                        return;
255                }
256
257                IBundleProvider search = null;
258
259                // This is the job initial step where we set ourselves up to do the actual re-submitting of resources
260                // to the broker.  Note that querying of resource can be done synchronously or asynchronously
261                if (isInitialStep(theJobDetails)
262                                && isNotEmpty(theJobDetails.getRemainingSearchUrls())
263                                && totalSubmitted < myMaxSubmitPerPass) {
264
265                        String nextSearchUrl = theJobDetails.getRemainingSearchUrls().remove(0);
266                        RuntimeResourceDefinition resourceDef = UrlUtil.parseUrlResourceType(myFhirContext, nextSearchUrl);
267                        String queryPart = nextSearchUrl.substring(nextSearchUrl.indexOf('?'));
268                        SearchParameterMap params = myMatchUrlService.translateMatchUrl(queryPart, resourceDef);
269
270                        String resourceType = resourceDef.getName();
271                        IFhirResourceDao<?> callingDao = myDaoRegistry.getResourceDao(resourceType);
272
273                        ourLog.info("Triggering job[{}] is starting a search for {}", theJobDetails.getJobId(), nextSearchUrl);
274
275                        search = mySearchCoordinatorSvc.registerSearch(
276                                        callingDao,
277                                        params,
278                                        resourceType,
279                                        new CacheControlDirective(),
280                                        null,
281                                        RequestPartitionId.allPartitions());
282
283                        if (isNull(search.getUuid())) {
284                                // we don't have a search uuid i.e. we're setting up for synchronous processing
285                                theJobDetails.setCurrentSearchUrl(nextSearchUrl);
286                                theJobDetails.setCurrentOffset(params.getOffset());
287
288                        } else {
289                                // populate properties for asynchronous path
290                                theJobDetails.setCurrentSearchUuid(search.getUuid());
291                        }
292
293                        theJobDetails.setCurrentSearchResourceType(resourceType);
294                        theJobDetails.setCurrentSearchCount(params.getCount());
295                        theJobDetails.setCurrentSearchLastUploadedIndex(-1);
296                }
297
298                // processing step for synchronous processing mode
299                if (isNotBlank(theJobDetails.getCurrentSearchUrl()) && totalSubmitted < myMaxSubmitPerPass) {
300                        List<IBaseResource> allCurrentResources;
301
302                        int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1;
303
304                        String searchUrl = theJobDetails.getCurrentSearchUrl();
305
306                        ourLog.info(
307                                        "Triggered job [{}] - Starting synchronous processing at offset {} and index {}",
308                                        theJobDetails.getJobId(),
309                                        theJobDetails.getCurrentOffset(),
310                                        fromIndex);
311
312                        int submittableCount = myMaxSubmitPerPass - totalSubmitted;
313                        int toIndex = fromIndex + submittableCount;
314
315                        if (nonNull(search) && !search.isEmpty()) {
316
317                                // we already have data from the initial step so process as much as we can.
318                                ourLog.info("Triggered job[{}] will process up to {} resources", theJobDetails.getJobId(), toIndex);
319                                allCurrentResources = search.getResources(0, toIndex);
320
321                        } else {
322                                if (theJobDetails.getCurrentSearchCount() != null) {
323                                        toIndex = Math.min(toIndex, theJobDetails.getCurrentSearchCount());
324                                }
325
326                                RuntimeResourceDefinition resourceDef = UrlUtil.parseUrlResourceType(myFhirContext, searchUrl);
327                                String queryPart = searchUrl.substring(searchUrl.indexOf('?'));
328                                SearchParameterMap params = myMatchUrlService.translateMatchUrl(queryPart, resourceDef);
329                                int offset = theJobDetails.getCurrentOffset() + fromIndex;
330                                params.setOffset(offset);
331                                params.setCount(toIndex);
332
333                                ourLog.info(
334                                                "Triggered job[{}] requesting {} resources from offset {}",
335                                                theJobDetails.getJobId(),
336                                                toIndex,
337                                                offset);
338
339                                search =
340                                                mySearchService.executeQuery(resourceDef.getName(), params, RequestPartitionId.allPartitions());
341                                allCurrentResources = search.getAllResources();
342                        }
343
344                        ourLog.info(
345                                        "Triggered job[{}] delivering {} resources", theJobDetails.getJobId(), allCurrentResources.size());
346                        int highestIndexSubmitted = theJobDetails.getCurrentSearchLastUploadedIndex();
347
348                        for (IBaseResource nextResource : allCurrentResources) {
349                                Future<Void> future = submitResource(theJobDetails.getSubscriptionId(), nextResource);
350                                futures.add(Pair.of(nextResource.getIdElement().getIdPart(), future));
351                                totalSubmitted++;
352                                highestIndexSubmitted++;
353                        }
354
355                        if (validateFuturesAndReturnTrueIfWeShouldAbort(futures)) {
356                                return;
357                        }
358
359                        theJobDetails.setCurrentSearchLastUploadedIndex(highestIndexSubmitted);
360
361                        ourLog.info(
362                                        "Triggered job[{}] lastUploadedIndex is {}",
363                                        theJobDetails.getJobId(),
364                                        theJobDetails.getCurrentSearchLastUploadedIndex());
365
366                        if (allCurrentResources.isEmpty()
367                                        || nonNull(theJobDetails.getCurrentSearchCount())
368                                                        && toIndex >= theJobDetails.getCurrentSearchCount()) {
369                                ourLog.info(
370                                                "Triggered job[{}] for search URL {} has completed ",
371                                                theJobDetails.getJobId(),
372                                                theJobDetails.getCurrentSearchUrl());
373                                theJobDetails.setCurrentSearchResourceType(null);
374                                theJobDetails.clearCurrentSearchUrl();
375                                theJobDetails.setCurrentSearchLastUploadedIndex(-1);
376                                theJobDetails.setCurrentSearchCount(null);
377                        }
378                }
379
380                // processing step for asynchronous processing mode
381                if (isNotBlank(theJobDetails.getCurrentSearchUuid()) && totalSubmitted < myMaxSubmitPerPass) {
382                        int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1;
383
384                        IFhirResourceDao<?> resourceDao =
385                                        myDaoRegistry.getResourceDao(theJobDetails.getCurrentSearchResourceType());
386
387                        int maxQuerySize = myMaxSubmitPerPass - totalSubmitted;
388                        int toIndex;
389                        if (theJobDetails.getCurrentSearchCount() != null) {
390                                toIndex = Math.min(fromIndex + maxQuerySize, theJobDetails.getCurrentSearchCount());
391                        } else {
392                                toIndex = fromIndex + maxQuerySize;
393                        }
394
395                        ourLog.info(
396                                        "Triggering job[{}] search {} requesting resources {} - {}",
397                                        theJobDetails.getJobId(),
398                                        theJobDetails.getCurrentSearchUuid(),
399                                        fromIndex,
400                                        toIndex);
401
402                        List<IResourcePersistentId<?>> resourceIds;
403                        RequestPartitionId requestPartitionId = RequestPartitionId.allPartitions();
404                        resourceIds = mySearchCoordinatorSvc.getResources(
405                                        theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex, null, requestPartitionId);
406
407                        ourLog.info("Triggering job[{}] delivering {} resources", theJobDetails.getJobId(), resourceIds.size());
408                        int highestIndexSubmitted = theJobDetails.getCurrentSearchLastUploadedIndex();
409
410                        String resourceType = myFhirContext.getResourceType(theJobDetails.getCurrentSearchResourceType());
411                        RuntimeResourceDefinition resourceDef =
412                                        myFhirContext.getResourceDefinition(theJobDetails.getCurrentSearchResourceType());
413                        ISearchBuilder searchBuilder = mySearchBuilderFactory.newSearchBuilder(
414                                        resourceDao, resourceType, resourceDef.getImplementingClass());
415                        List<IBaseResource> listToPopulate = new ArrayList<>();
416
417                        myTransactionService.withSystemRequest().execute(() -> {
418                                searchBuilder.loadResourcesByPid(
419                                                resourceIds, Collections.emptyList(), listToPopulate, false, new SystemRequestDetails());
420                        });
421
422                        for (IBaseResource nextResource : listToPopulate) {
423                                Future<Void> future = submitResource(theJobDetails.getSubscriptionId(), nextResource);
424                                futures.add(Pair.of(nextResource.getIdElement().getIdPart(), future));
425                                totalSubmitted++;
426                                highestIndexSubmitted++;
427                        }
428
429                        if (validateFuturesAndReturnTrueIfWeShouldAbort(futures)) {
430                                return;
431                        }
432
433                        theJobDetails.setCurrentSearchLastUploadedIndex(highestIndexSubmitted);
434
435                        if (resourceIds.size() == 0
436                                        || (theJobDetails.getCurrentSearchCount() != null
437                                                        && toIndex >= theJobDetails.getCurrentSearchCount())) {
438                                ourLog.info(
439                                                "Triggering job[{}] search {} has completed ",
440                                                theJobDetails.getJobId(),
441                                                theJobDetails.getCurrentSearchUuid());
442                                theJobDetails.setCurrentSearchResourceType(null);
443                                theJobDetails.setCurrentSearchUuid(null);
444                                theJobDetails.setCurrentSearchLastUploadedIndex(-1);
445                                theJobDetails.setCurrentSearchCount(null);
446                        }
447                }
448
449                ourLog.info(
450                                "Subscription trigger job[{}] triggered {} resources in {}ms ({} res / second)",
451                                theJobDetails.getJobId(),
452                                totalSubmitted,
453                                sw.getMillis(),
454                                sw.getThroughput(totalSubmitted, TimeUnit.SECONDS));
455        }
456
457        private boolean isInitialStep(SubscriptionTriggeringJobDetails theJobDetails) {
458                return isBlank(theJobDetails.myCurrentSearchUuid) && isBlank(theJobDetails.myCurrentSearchUrl);
459        }
460
461        private boolean jobHasCompleted(SubscriptionTriggeringJobDetails theJobDetails) {
462                return isInitialStep(theJobDetails);
463        }
464
465        private boolean validateFuturesAndReturnTrueIfWeShouldAbort(List<Pair<String, Future<Void>>> theIdToFutures) {
466
467                for (Pair<String, Future<Void>> next : theIdToFutures) {
468                        String nextDeliveredId = next.getKey();
469                        try {
470                                Future<Void> nextFuture = next.getValue();
471                                nextFuture.get();
472                                ourLog.info("Finished redelivering {}", nextDeliveredId);
473                        } catch (Exception e) {
474                                ourLog.error("Failure triggering resource " + nextDeliveredId, e);
475                                return true;
476                        }
477                }
478
479                // Clear the list since it will potentially get reused
480                theIdToFutures.clear();
481                return false;
482        }
483
484        private Future<Void> submitResource(String theSubscriptionId, String theResourceIdToTrigger) {
485                org.hl7.fhir.r4.model.IdType resourceId = new org.hl7.fhir.r4.model.IdType(theResourceIdToTrigger);
486                IFhirResourceDao dao = myDaoRegistry.getResourceDao(resourceId.getResourceType());
487                IBaseResource resourceToTrigger = dao.read(resourceId, SystemRequestDetails.forAllPartitions());
488
489                return submitResource(theSubscriptionId, resourceToTrigger);
490        }
491
492        private Future<Void> submitResource(String theSubscriptionId, IBaseResource theResourceToTrigger) {
493
494                ourLog.info(
495                                "Submitting resource {} to subscription {}",
496                                theResourceToTrigger.getIdElement().toUnqualifiedVersionless().getValue(),
497                                theSubscriptionId);
498
499                ResourceModifiedMessage msg = new ResourceModifiedMessage(
500                                myFhirContext, theResourceToTrigger, ResourceModifiedMessage.OperationTypeEnum.UPDATE);
501                msg.setSubscriptionId(theSubscriptionId);
502
503                return myExecutorService.submit(() -> {
504                        for (int i = 0; ; i++) {
505                                try {
506                                        myResourceModifiedConsumer.submitResourceModified(msg);
507                                        break;
508                                } catch (Exception e) {
509                                        if (i >= 3) {
510                                                throw new InternalErrorException(Msg.code(25) + e);
511                                        }
512
513                                        ourLog.warn(
514                                                        "Exception while retriggering subscriptions (going to sleep and retry): {}", e.toString());
515                                        Thread.sleep(1000);
516                                }
517                        }
518
519                        return null;
520                });
521        }
522
523        public void cancelAll() {
524                synchronized (myActiveJobs) {
525                        myActiveJobs.clear();
526                }
527        }
528
529        /**
530         * Sets the maximum number of resources that will be submitted in a single pass
531         */
532        public void setMaxSubmitPerPass(Integer theMaxSubmitPerPass) {
533                Integer maxSubmitPerPass = theMaxSubmitPerPass;
534                if (maxSubmitPerPass == null) {
535                        maxSubmitPerPass = DEFAULT_MAX_SUBMIT;
536                }
537                Validate.isTrue(maxSubmitPerPass > 0, "theMaxSubmitPerPass must be > 0");
538                myMaxSubmitPerPass = maxSubmitPerPass;
539        }
540
541        @PostConstruct
542        public void start() {
543                createExecutorService();
544        }
545
546        private void createExecutorService() {
547                LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>(1000);
548                BasicThreadFactory threadFactory = new BasicThreadFactory.Builder()
549                                .namingPattern("SubscriptionTriggering-%d")
550                                .daemon(false)
551                                .priority(Thread.NORM_PRIORITY)
552                                .build();
553                RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
554                        @Override
555                        public void rejectedExecution(Runnable theRunnable, ThreadPoolExecutor theExecutor) {
556                                ourLog.info(
557                                                "Note: Subscription triggering queue is full ({} elements), waiting for a slot to become available!",
558                                                executorQueue.size());
559                                StopWatch sw = new StopWatch();
560                                try {
561                                        executorQueue.put(theRunnable);
562                                } catch (InterruptedException theE) {
563                                        // Restore interrupted state...
564                                        Thread.currentThread().interrupt();
565                                        throw new RejectedExecutionException(
566                                                        Msg.code(26) + "Task " + theRunnable.toString() + " rejected from " + theE.toString());
567                                }
568                                ourLog.info("Slot become available after {}ms", sw.getMillis());
569                        }
570                };
571                myExecutorService = new ThreadPoolExecutor(
572                                10, 10, 0L, TimeUnit.MILLISECONDS, executorQueue, threadFactory, rejectedExecutionHandler);
573        }
574
575        @Override
576        public void scheduleJobs(ISchedulerService theSchedulerService) {
577                ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
578                jobDetail.setId(getClass().getName());
579                jobDetail.setJobClass(Job.class);
580                // Currently jobs ae kept in a local ArrayList so this should be a local job, and
581                // it can fire frequently without adding load
582                theSchedulerService.scheduleLocalJob(5 * DateUtils.MILLIS_PER_SECOND, jobDetail);
583        }
584
585        public int getActiveJobCount() {
586                return myActiveJobs.size();
587        }
588
589        public static class Job implements HapiJob {
590                @Autowired
591                private ISubscriptionTriggeringSvc myTarget;
592
593                @Override
594                public void execute(JobExecutionContext theContext) {
595                        myTarget.runDeliveryPass();
596                }
597        }
598
599        private static class SubscriptionTriggeringJobDetails {
600
601                private String myJobId;
602                private String mySubscriptionId;
603                private List<String> myRemainingResourceIds;
604                private List<String> myRemainingSearchUrls;
605                private String myCurrentSearchUuid;
606                private String myCurrentSearchUrl;
607                private Integer myCurrentSearchCount;
608                private String myCurrentSearchResourceType;
609                private int myCurrentSearchLastUploadedIndex;
610                private int myCurrentOffset;
611
612                Integer getCurrentSearchCount() {
613                        return myCurrentSearchCount;
614                }
615
616                void setCurrentSearchCount(Integer theCurrentSearchCount) {
617                        myCurrentSearchCount = theCurrentSearchCount;
618                }
619
620                String getCurrentSearchResourceType() {
621                        return myCurrentSearchResourceType;
622                }
623
624                void setCurrentSearchResourceType(String theCurrentSearchResourceType) {
625                        myCurrentSearchResourceType = theCurrentSearchResourceType;
626                }
627
628                String getJobId() {
629                        return myJobId;
630                }
631
632                void setJobId(String theJobId) {
633                        myJobId = theJobId;
634                }
635
636                String getSubscriptionId() {
637                        return mySubscriptionId;
638                }
639
640                void setSubscriptionId(String theSubscriptionId) {
641                        mySubscriptionId = theSubscriptionId;
642                }
643
644                List<String> getRemainingResourceIds() {
645                        return myRemainingResourceIds;
646                }
647
648                void setRemainingResourceIds(List<String> theRemainingResourceIds) {
649                        myRemainingResourceIds = theRemainingResourceIds;
650                }
651
652                List<String> getRemainingSearchUrls() {
653                        return myRemainingSearchUrls;
654                }
655
656                void setRemainingSearchUrls(List<String> theRemainingSearchUrls) {
657                        myRemainingSearchUrls = theRemainingSearchUrls;
658                }
659
660                String getCurrentSearchUuid() {
661                        return myCurrentSearchUuid;
662                }
663
664                void setCurrentSearchUuid(String theCurrentSearchUuid) {
665                        myCurrentSearchUuid = theCurrentSearchUuid;
666                }
667
668                public String getCurrentSearchUrl() {
669                        return myCurrentSearchUrl;
670                }
671
672                public void setCurrentSearchUrl(String theCurrentSearchUrl) {
673                        this.myCurrentSearchUrl = theCurrentSearchUrl;
674                }
675
676                int getCurrentSearchLastUploadedIndex() {
677                        return myCurrentSearchLastUploadedIndex;
678                }
679
680                void setCurrentSearchLastUploadedIndex(int theCurrentSearchLastUploadedIndex) {
681                        myCurrentSearchLastUploadedIndex = theCurrentSearchLastUploadedIndex;
682                }
683
684                public void clearCurrentSearchUrl() {
685                        myCurrentSearchUrl = null;
686                }
687
688                public int getCurrentOffset() {
689                        return myCurrentOffset;
690                }
691
692                public void setCurrentOffset(Integer theCurrentOffset) {
693                        myCurrentOffset = ObjectUtils.defaultIfNull(theCurrentOffset, 0);
694                }
695        }
696}