001package ca.uhn.fhir.jpa.subscription.triggering;
002
003/*-
004 * #%L
005 * HAPI FHIR Subscription Server
006 * %%
007 * Copyright (C) 2014 - 2021 Smile CDR, Inc.
008 * %%
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *      http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 * #L%
021 */
022
023import ca.uhn.fhir.context.FhirContext;
024import ca.uhn.fhir.context.RuntimeResourceDefinition;
025import ca.uhn.fhir.interceptor.model.RequestPartitionId;
026import ca.uhn.fhir.jpa.api.config.DaoConfig;
027import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
028import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
029import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
030import ca.uhn.fhir.jpa.model.sched.HapiJob;
031import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
032import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
033import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
034import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
035import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer;
036import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
037import ca.uhn.fhir.model.dstu2.valueset.ResourceTypeEnum;
038import ca.uhn.fhir.rest.annotation.IdParam;
039import ca.uhn.fhir.rest.api.CacheControlDirective;
040import ca.uhn.fhir.rest.api.server.IBundleProvider;
041import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
042import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
043import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
044import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
045import ca.uhn.fhir.util.ParametersUtil;
046import ca.uhn.fhir.util.StopWatch;
047import ca.uhn.fhir.util.UrlUtil;
048import ca.uhn.fhir.util.ValidateUtil;
049import org.apache.commons.lang3.ObjectUtils;
050import org.apache.commons.lang3.Validate;
051import org.apache.commons.lang3.concurrent.BasicThreadFactory;
052import org.apache.commons.lang3.time.DateUtils;
053import org.apache.commons.lang3.tuple.Pair;
054import org.hl7.fhir.dstu2.model.IdType;
055import org.hl7.fhir.instance.model.api.IBaseParameters;
056import org.hl7.fhir.instance.model.api.IBaseResource;
057import org.hl7.fhir.instance.model.api.IIdType;
058import org.hl7.fhir.instance.model.api.IPrimitiveType;
059import org.quartz.JobExecutionContext;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062import org.springframework.beans.factory.annotation.Autowired;
063
064import javax.annotation.PostConstruct;
065import java.util.ArrayList;
066import java.util.Collections;
067import java.util.List;
068import java.util.UUID;
069import java.util.concurrent.ExecutorService;
070import java.util.concurrent.Future;
071import java.util.concurrent.LinkedBlockingQueue;
072import java.util.concurrent.RejectedExecutionException;
073import java.util.concurrent.RejectedExecutionHandler;
074import java.util.concurrent.ThreadPoolExecutor;
075import java.util.concurrent.TimeUnit;
076import java.util.stream.Collectors;
077
078import static ca.uhn.fhir.rest.server.provider.ProviderConstants.SUBSCRIPTION_TRIGGERING_PARAM_RESOURCE_ID;
079import static org.apache.commons.lang3.StringUtils.isBlank;
080import static org.apache.commons.lang3.StringUtils.isNotBlank;
081
082public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc {
083        private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTriggeringSvcImpl.class);
084        private static final int DEFAULT_MAX_SUBMIT = 10000;
085        private final List<SubscriptionTriggeringJobDetails> myActiveJobs = new ArrayList<>();
086        @Autowired
087        private FhirContext myFhirContext;
088        @Autowired
089        private DaoRegistry myDaoRegistry;
090        @Autowired
091        private DaoConfig myDaoConfig;
092        @Autowired
093        private ISearchCoordinatorSvc mySearchCoordinatorSvc;
094        @Autowired
095        private MatchUrlService myMatchUrlService;
096        @Autowired
097        private IResourceModifiedConsumer myResourceModifiedConsumer;
098        private int myMaxSubmitPerPass = DEFAULT_MAX_SUBMIT;
099        private ExecutorService myExecutorService;
100        @Autowired
101        private ISchedulerService mySchedulerService;
102
103        @Override
104        public IBaseParameters triggerSubscription(List<IPrimitiveType<String>> theResourceIds, List<IPrimitiveType<String>> theSearchUrls, @IdParam IIdType theSubscriptionId) {
105
106                if (myDaoConfig.getSupportedSubscriptionTypes().isEmpty()) {
107                        throw new PreconditionFailedException("Subscription processing not active on this server");
108                }
109
110                // Throw a 404 if the subscription doesn't exist
111                if (theSubscriptionId != null) {
112                        IFhirResourceDao<?> subscriptionDao = myDaoRegistry.getSubscriptionDao();
113                        IIdType subscriptionId = theSubscriptionId;
114                        if (!subscriptionId.hasResourceType()) {
115                                subscriptionId = subscriptionId.withResourceType(ResourceTypeEnum.SUBSCRIPTION.getCode());
116                        }
117                        subscriptionDao.read(subscriptionId);
118                }
119
120                List<IPrimitiveType<String>> resourceIds = ObjectUtils.defaultIfNull(theResourceIds, Collections.emptyList());
121                List<IPrimitiveType<String>> searchUrls = ObjectUtils.defaultIfNull(theSearchUrls, Collections.emptyList());
122
123                // Make sure we have at least one resource ID or search URL
124                if (resourceIds.size() == 0 && searchUrls.size() == 0) {
125                        throw new InvalidRequestException("No resource IDs or search URLs specified for triggering");
126                }
127
128                // Resource URLs must be compete
129                for (IPrimitiveType<String> next : resourceIds) {
130                        IdType resourceId = new IdType(next.getValue());
131                        ValidateUtil.isTrueOrThrowInvalidRequest(resourceId.hasResourceType(), SUBSCRIPTION_TRIGGERING_PARAM_RESOURCE_ID + " parameter must have resource type");
132                        ValidateUtil.isTrueOrThrowInvalidRequest(resourceId.hasIdPart(), SUBSCRIPTION_TRIGGERING_PARAM_RESOURCE_ID + " parameter must have resource ID part");
133                }
134
135                // Search URLs must be valid
136                for (IPrimitiveType<String> next : searchUrls) {
137                        if (!next.getValue().contains("?")) {
138                                throw new InvalidRequestException("Search URL is not valid (must be in the form \"[resource type]?[optional params]\")");
139                        }
140                }
141
142                SubscriptionTriggeringJobDetails jobDetails = new SubscriptionTriggeringJobDetails();
143                jobDetails.setJobId(UUID.randomUUID().toString());
144                jobDetails.setRemainingResourceIds(resourceIds.stream().map(t -> t.getValue()).collect(Collectors.toList()));
145                jobDetails.setRemainingSearchUrls(searchUrls.stream().map(t -> t.getValue()).collect(Collectors.toList()));
146                if (theSubscriptionId != null) {
147                        jobDetails.setSubscriptionId(theSubscriptionId.getIdPart());
148                }
149
150                // Submit job for processing
151                synchronized (myActiveJobs) {
152                        myActiveJobs.add(jobDetails);
153                        ourLog.info("Subscription triggering requested for {} resource and {} search - Gave job ID: {} and have {} jobs", resourceIds.size(), searchUrls.size(), jobDetails.getJobId(), myActiveJobs.size());
154                }
155
156                // Create a parameters response
157                IBaseParameters retVal = ParametersUtil.newInstance(myFhirContext);
158                IPrimitiveType<?> value = (IPrimitiveType<?>) myFhirContext.getElementDefinition("string").newInstance();
159                value.setValueAsString("Subscription triggering job submitted as JOB ID: " + jobDetails.myJobId);
160                ParametersUtil.addParameterToParameters(myFhirContext, retVal, "information", value);
161                return retVal;
162        }
163
164        @Override
165        public void runDeliveryPass() {
166
167                synchronized (myActiveJobs) {
168
169                        if (myActiveJobs.isEmpty()) {
170                                return;
171                        }
172
173                        String activeJobIds = myActiveJobs.stream().map(SubscriptionTriggeringJobDetails::getJobId).collect(Collectors.joining(", "));
174                        ourLog.info("Starting pass: currently have {} active job IDs: {}", myActiveJobs.size(), activeJobIds);
175
176                        SubscriptionTriggeringJobDetails activeJob = myActiveJobs.get(0);
177
178                        runJob(activeJob);
179
180                        // If the job is complete, remove it from the queue
181                        if (activeJob.getRemainingResourceIds().isEmpty()) {
182                                if (activeJob.getRemainingSearchUrls().isEmpty()) {
183                                        if (isBlank(activeJob.myCurrentSearchUuid)) {
184                                                myActiveJobs.remove(0);
185                                                String remainingJobsMsg = "";
186                                                if (myActiveJobs.size() > 0) {
187                                                        remainingJobsMsg = "(" + myActiveJobs.size() + " jobs remaining)";
188                                                }
189                                                ourLog.info("Subscription triggering job {} is complete{}", activeJob.getJobId(), remainingJobsMsg);
190                                        }
191                                }
192                        }
193
194                }
195
196        }
197
198        private void runJob(SubscriptionTriggeringJobDetails theJobDetails) {
199                StopWatch sw = new StopWatch();
200                ourLog.info("Starting pass of subscription triggering job {}", theJobDetails.getJobId());
201
202                // Submit individual resources
203                int totalSubmitted = 0;
204                List<Pair<String, Future<Void>>> futures = new ArrayList<>();
205                while (theJobDetails.getRemainingResourceIds().size() > 0 && totalSubmitted < myMaxSubmitPerPass) {
206                        totalSubmitted++;
207                        String nextResourceId = theJobDetails.getRemainingResourceIds().remove(0);
208                        Future<Void> future = submitResource(theJobDetails.getSubscriptionId(), nextResourceId);
209                        futures.add(Pair.of(nextResourceId, future));
210                }
211
212                // Make sure these all succeeded in submitting
213                if (validateFuturesAndReturnTrueIfWeShouldAbort(futures)) {
214                        return;
215                }
216
217                // If we don't have an active search started, and one needs to be.. start it
218                if (isBlank(theJobDetails.getCurrentSearchUuid()) && theJobDetails.getRemainingSearchUrls().size() > 0 && totalSubmitted < myMaxSubmitPerPass) {
219                        String nextSearchUrl = theJobDetails.getRemainingSearchUrls().remove(0);
220                        RuntimeResourceDefinition resourceDef = UrlUtil.parseUrlResourceType(myFhirContext, nextSearchUrl);
221                        String queryPart = nextSearchUrl.substring(nextSearchUrl.indexOf('?'));
222                        String resourceType = resourceDef.getName();
223
224                        IFhirResourceDao<?> callingDao = myDaoRegistry.getResourceDao(resourceType);
225                        SearchParameterMap params = myMatchUrlService.translateMatchUrl(queryPart, resourceDef);
226
227                        ourLog.info("Triggering job[{}] is starting a search for {}", theJobDetails.getJobId(), nextSearchUrl);
228
229                        IBundleProvider search = mySearchCoordinatorSvc.registerSearch(callingDao, params, resourceType, new CacheControlDirective(), null, RequestPartitionId.allPartitions());
230                        theJobDetails.setCurrentSearchUuid(search.getUuid());
231                        theJobDetails.setCurrentSearchResourceType(resourceType);
232                        theJobDetails.setCurrentSearchCount(params.getCount());
233                        theJobDetails.setCurrentSearchLastUploadedIndex(-1);
234                }
235
236                // If we have an active search going, submit resources from it
237                if (isNotBlank(theJobDetails.getCurrentSearchUuid()) && totalSubmitted < myMaxSubmitPerPass) {
238                        int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1;
239
240                        IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(theJobDetails.getCurrentSearchResourceType());
241
242                        int maxQuerySize = myMaxSubmitPerPass - totalSubmitted;
243                        int toIndex = fromIndex + maxQuerySize;
244                        if (theJobDetails.getCurrentSearchCount() != null) {
245                                toIndex = Math.min(toIndex, theJobDetails.getCurrentSearchCount());
246                        }
247                        ourLog.info("Triggering job[{}] search {} requesting resources {} - {}", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex);
248                        List<ResourcePersistentId> resourceIds = mySearchCoordinatorSvc.getResources(theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex, null);
249
250                        ourLog.info("Triggering job[{}] delivering {} resources", theJobDetails.getJobId(), resourceIds.size());
251                        int highestIndexSubmitted = theJobDetails.getCurrentSearchLastUploadedIndex();
252
253                        for (ResourcePersistentId next : resourceIds) {
254                                IBaseResource nextResource = resourceDao.readByPid(next);
255                                Future<Void> future = submitResource(theJobDetails.getSubscriptionId(), nextResource);
256                                futures.add(Pair.of(nextResource.getIdElement().getIdPart(), future));
257                                totalSubmitted++;
258                                highestIndexSubmitted++;
259                        }
260
261                        if (validateFuturesAndReturnTrueIfWeShouldAbort(futures)) {
262                                return;
263                        }
264
265                        theJobDetails.setCurrentSearchLastUploadedIndex(highestIndexSubmitted);
266
267                        if (resourceIds.size() == 0 || (theJobDetails.getCurrentSearchCount() != null && toIndex >= theJobDetails.getCurrentSearchCount())) {
268                                ourLog.info("Triggering job[{}] search {} has completed ", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid());
269                                theJobDetails.setCurrentSearchResourceType(null);
270                                theJobDetails.setCurrentSearchUuid(null);
271                                theJobDetails.setCurrentSearchLastUploadedIndex(-1);
272                                theJobDetails.setCurrentSearchCount(null);
273                        }
274                }
275
276                ourLog.info("Subscription trigger job[{}] triggered {} resources in {}ms ({} res / second)", theJobDetails.getJobId(), totalSubmitted, sw.getMillis(), sw.getThroughput(totalSubmitted, TimeUnit.SECONDS));
277        }
278
279        private boolean validateFuturesAndReturnTrueIfWeShouldAbort(List<Pair<String, Future<Void>>> theIdToFutures) {
280
281                for (Pair<String, Future<Void>> next : theIdToFutures) {
282                        String nextDeliveredId = next.getKey();
283                        try {
284                                Future<Void> nextFuture = next.getValue();
285                                nextFuture.get();
286                                ourLog.info("Finished redelivering {}", nextDeliveredId);
287                        } catch (Exception e) {
288                                ourLog.error("Failure triggering resource " + nextDeliveredId, e);
289                                return true;
290                        }
291                }
292
293                // Clear the list since it will potentially get reused
294                theIdToFutures.clear();
295                return false;
296        }
297
298        private Future<Void> submitResource(String theSubscriptionId, String theResourceIdToTrigger) {
299                org.hl7.fhir.r4.model.IdType resourceId = new org.hl7.fhir.r4.model.IdType(theResourceIdToTrigger);
300                IFhirResourceDao dao = myDaoRegistry.getResourceDao(resourceId.getResourceType());
301                IBaseResource resourceToTrigger = dao.read(resourceId);
302
303                return submitResource(theSubscriptionId, resourceToTrigger);
304        }
305
306        private Future<Void> submitResource(String theSubscriptionId, IBaseResource theResourceToTrigger) {
307
308                ourLog.info("Submitting resource {} to subscription {}", theResourceToTrigger.getIdElement().toUnqualifiedVersionless().getValue(), theSubscriptionId);
309
310                ResourceModifiedMessage msg = new ResourceModifiedMessage(myFhirContext, theResourceToTrigger, ResourceModifiedMessage.OperationTypeEnum.UPDATE);
311                msg.setSubscriptionId(theSubscriptionId);
312
313                return myExecutorService.submit(() -> {
314                        for (int i = 0; ; i++) {
315                                try {
316                                        myResourceModifiedConsumer.submitResourceModified(msg);
317                                        break;
318                                } catch (Exception e) {
319                                        if (i >= 3) {
320                                                throw new InternalErrorException(e);
321                                        }
322
323                                        ourLog.warn("Exception while retriggering subscriptions (going to sleep and retry): {}", e.toString());
324                                        Thread.sleep(1000);
325                                }
326                        }
327
328                        return null;
329                });
330
331        }
332
333        public void cancelAll() {
334                synchronized (myActiveJobs) {
335                        myActiveJobs.clear();
336                }
337        }
338
339        /**
340         * Sets the maximum number of resources that will be submitted in a single pass
341         */
342        public void setMaxSubmitPerPass(Integer theMaxSubmitPerPass) {
343                Integer maxSubmitPerPass = theMaxSubmitPerPass;
344                if (maxSubmitPerPass == null) {
345                        maxSubmitPerPass = DEFAULT_MAX_SUBMIT;
346                }
347                Validate.isTrue(maxSubmitPerPass > 0, "theMaxSubmitPerPass must be > 0");
348                myMaxSubmitPerPass = maxSubmitPerPass;
349        }
350
351        @PostConstruct
352        public void start() {
353                createExecutorService();
354                scheduleJob();
355        }
356
357        private void createExecutorService() {
358                LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>(1000);
359                BasicThreadFactory threadFactory = new BasicThreadFactory.Builder()
360                        .namingPattern("SubscriptionTriggering-%d")
361                        .daemon(false)
362                        .priority(Thread.NORM_PRIORITY)
363                        .build();
364                RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
365                        @Override
366                        public void rejectedExecution(Runnable theRunnable, ThreadPoolExecutor theExecutor) {
367                                ourLog.info("Note: Subscription triggering queue is full ({} elements), waiting for a slot to become available!", executorQueue.size());
368                                StopWatch sw = new StopWatch();
369                                try {
370                                        executorQueue.put(theRunnable);
371                                } catch (InterruptedException theE) {
372                                        // Restore interrupted state...
373                                        Thread.currentThread().interrupt();
374                                        throw new RejectedExecutionException("Task " + theRunnable.toString() +
375                                                " rejected from " + theE.toString());
376                                }
377                                ourLog.info("Slot become available after {}ms", sw.getMillis());
378                        }
379                };
380                myExecutorService = new ThreadPoolExecutor(
381                        0,
382                        10,
383                        0L,
384                        TimeUnit.MILLISECONDS,
385                        executorQueue,
386                        threadFactory,
387                        rejectedExecutionHandler);
388        }
389
390        private void scheduleJob() {
391                ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
392                jobDetail.setId(getClass().getName());
393                jobDetail.setJobClass(Job.class);
394                // Currently jobs ae kept in a local ArrayList so this should be a local job, and
395                // it can fire frequently without adding load
396                mySchedulerService.scheduleLocalJob(5 * DateUtils.MILLIS_PER_SECOND, jobDetail);
397        }
398
399        public int getActiveJobCount() {
400                return myActiveJobs.size();
401        }
402
403        public static class Job implements HapiJob {
404                @Autowired
405                private ISubscriptionTriggeringSvc myTarget;
406
407                @Override
408                public void execute(JobExecutionContext theContext) {
409                        myTarget.runDeliveryPass();
410                }
411        }
412
413        private static class SubscriptionTriggeringJobDetails {
414
415                private String myJobId;
416                private String mySubscriptionId;
417                private List<String> myRemainingResourceIds;
418                private List<String> myRemainingSearchUrls;
419                private String myCurrentSearchUuid;
420                private Integer myCurrentSearchCount;
421                private String myCurrentSearchResourceType;
422                private int myCurrentSearchLastUploadedIndex;
423
424                Integer getCurrentSearchCount() {
425                        return myCurrentSearchCount;
426                }
427
428                void setCurrentSearchCount(Integer theCurrentSearchCount) {
429                        myCurrentSearchCount = theCurrentSearchCount;
430                }
431
432                String getCurrentSearchResourceType() {
433                        return myCurrentSearchResourceType;
434                }
435
436                void setCurrentSearchResourceType(String theCurrentSearchResourceType) {
437                        myCurrentSearchResourceType = theCurrentSearchResourceType;
438                }
439
440                String getJobId() {
441                        return myJobId;
442                }
443
444                void setJobId(String theJobId) {
445                        myJobId = theJobId;
446                }
447
448                String getSubscriptionId() {
449                        return mySubscriptionId;
450                }
451
452                void setSubscriptionId(String theSubscriptionId) {
453                        mySubscriptionId = theSubscriptionId;
454                }
455
456                List<String> getRemainingResourceIds() {
457                        return myRemainingResourceIds;
458                }
459
460                void setRemainingResourceIds(List<String> theRemainingResourceIds) {
461                        myRemainingResourceIds = theRemainingResourceIds;
462                }
463
464                List<String> getRemainingSearchUrls() {
465                        return myRemainingSearchUrls;
466                }
467
468                void setRemainingSearchUrls(List<String> theRemainingSearchUrls) {
469                        myRemainingSearchUrls = theRemainingSearchUrls;
470                }
471
472                String getCurrentSearchUuid() {
473                        return myCurrentSearchUuid;
474                }
475
476                void setCurrentSearchUuid(String theCurrentSearchUuid) {
477                        myCurrentSearchUuid = theCurrentSearchUuid;
478                }
479
480                int getCurrentSearchLastUploadedIndex() {
481                        return myCurrentSearchLastUploadedIndex;
482                }
483
484                void setCurrentSearchLastUploadedIndex(int theCurrentSearchLastUploadedIndex) {
485                        myCurrentSearchLastUploadedIndex = theCurrentSearchLastUploadedIndex;
486                }
487        }
488
489}