001/*-
002 * #%L
003 * HAPI FHIR Subscription Server
004 * %%
005 * Copyright (C) 2014 - 2025 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.topic;
021
022import ca.uhn.fhir.broker.api.ISendResult;
023import ca.uhn.fhir.context.FhirContext;
024import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
025import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionDeliveryRequest;
026import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchDeliverer;
027import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
028import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
029import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
030import ca.uhn.fhir.jpa.subscription.model.CanonicalTopicSubscription;
031import ca.uhn.fhir.jpa.topic.filter.ISubscriptionTopicFilterMatcher;
032import ca.uhn.fhir.jpa.topic.filter.SubscriptionTopicFilterUtil;
033import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
034import ca.uhn.fhir.util.Logs;
035import org.hl7.fhir.instance.model.api.IBaseBundle;
036import org.hl7.fhir.instance.model.api.IBaseResource;
037import org.slf4j.Logger;
038
039import java.util.List;
040import java.util.UUID;
041
042/**
043 * Subscription topic notifications are natively supported in R5, R4B.  They are also partially supported and in R4
044 * via the subscription backport spec <a href="http://build.fhir.org/ig/HL7/fhir-subscription-backport-ig/components.html">Subscription Backport</a>.
045 * In all versions, it is possible for a FHIR Repository to submit topic subscription notifications triggered by some
046 * arbitrary "business event".  In R5 and R4B most subscription topic notifications will be triggered by a SubscriptionTopic
047 * match.  However, in the R4 backport, the SubscriptionTopic is not supported and the SubscriptionTopicDispatcher service
048 * is provided to generate those notifications instead.  Any custom java extension to the FHIR repository can @Autowire this service to
049 * send topic notifications to all Subscription resources subscribed to that topic.
050 */
051public class SubscriptionTopicDispatcher {
052        private static final Logger ourLog = Logs.getSubscriptionTopicLog();
053        private final FhirContext myFhirContext;
054        private final SubscriptionRegistry mySubscriptionRegistry;
055        private final SubscriptionMatchDeliverer mySubscriptionMatchDeliverer;
056        private final SubscriptionTopicPayloadBuilder mySubscriptionTopicPayloadBuilder;
057
058        public SubscriptionTopicDispatcher(
059                        FhirContext theFhirContext,
060                        SubscriptionRegistry theSubscriptionRegistry,
061                        SubscriptionMatchDeliverer theSubscriptionMatchDeliverer,
062                        SubscriptionTopicPayloadBuilder theSubscriptionTopicPayloadBuilder) {
063                myFhirContext = theFhirContext;
064                mySubscriptionRegistry = theSubscriptionRegistry;
065                mySubscriptionMatchDeliverer = theSubscriptionMatchDeliverer;
066                mySubscriptionTopicPayloadBuilder = theSubscriptionTopicPayloadBuilder;
067        }
068
069        /**
070         * Deliver a Subscription topic notification to all subscriptions for the given topic.
071         *
072         * @param theTopicUrl    Deliver to subscriptions for this topic
073         * @param theResources   The list of resources to deliver.  The first resource will be the primary "focus" resource per the Subscription documentation.
074         *                       This list should _not_ include the SubscriptionStatus.  The SubscriptionStatus will be added as the first element to
075         *                       the delivered bundle.  The reason for this is that the SubscriptionStatus needs to reference the subscription ID, which is
076         *                       not known until the bundle is delivered.
077         * @param theRequestType The type of request that led to this dispatch.  This determines the request type of the bundle entries
078         * @return The number of subscription notifications that were successfully queued for delivery
079         */
080        public int dispatch(String theTopicUrl, List<IBaseResource> theResources, RestOperationTypeEnum theRequestType) {
081                SubscriptionTopicDispatchRequest subscriptionTopicDispatchRequest = new SubscriptionTopicDispatchRequest(
082                                theTopicUrl,
083                                theResources,
084                                (f, r) -> InMemoryMatchResult.successfulMatch(),
085                                theRequestType,
086                                null,
087                                null,
088                                null);
089                return dispatch(subscriptionTopicDispatchRequest);
090        }
091
092        /**
093         * Deliver a Subscription topic notification to all subscriptions for the given topic.
094         *
095         * @param theSubscriptionTopicDispatchRequest contains the topic URL, the list of resources to deliver, and the request type
096         * @return The number of subscription notifications that were successfully queued for delivery
097         */
098        public int dispatch(SubscriptionTopicDispatchRequest theSubscriptionTopicDispatchRequest) {
099                int count = 0;
100
101                List<ActiveSubscription> topicSubscriptions =
102                                mySubscriptionRegistry.getTopicSubscriptionsByTopic(theSubscriptionTopicDispatchRequest.getTopicUrl());
103                if (!topicSubscriptions.isEmpty()) {
104                        for (ActiveSubscription activeSubscription : topicSubscriptions) {
105                                ISendResult result = matchFiltersAndDeliver(theSubscriptionTopicDispatchRequest, activeSubscription);
106                                if (result.isSuccessful()) {
107                                        count++;
108                                }
109                        }
110                }
111                return count;
112        }
113
114        private ISendResult matchFiltersAndDeliver(
115                        SubscriptionTopicDispatchRequest theSubscriptionTopicDispatchRequest,
116                        ActiveSubscription theActiveSubscription) {
117
118                String topicUrl = theSubscriptionTopicDispatchRequest.getTopicUrl();
119                List<IBaseResource> resources = theSubscriptionTopicDispatchRequest.getResources();
120                ISubscriptionTopicFilterMatcher subscriptionTopicFilterMatcher =
121                                theSubscriptionTopicDispatchRequest.getSubscriptionTopicFilterMatcher();
122
123                if (resources.size() > 0) {
124                        IBaseResource firstResource = resources.get(0);
125                        String resourceType = myFhirContext.getResourceType(firstResource);
126                        CanonicalSubscription subscription = theActiveSubscription.getSubscription();
127                        CanonicalTopicSubscription topicSubscription = subscription.getTopicSubscription();
128                        if (topicSubscription.hasFilters()) {
129                                ourLog.debug(
130                                                "Checking if resource {} matches {} subscription filters on {}",
131                                                firstResource.getIdElement().toUnqualifiedVersionless().getValue(),
132                                                topicSubscription.getFilters().size(),
133                                                subscription
134                                                                .getIdElement(myFhirContext)
135                                                                .toUnqualifiedVersionless()
136                                                                .getValue());
137
138                                if (!SubscriptionTopicFilterUtil.matchFilters(
139                                                firstResource, resourceType, subscriptionTopicFilterMatcher, topicSubscription)) {
140                                        return ISendResult.FAILURE;
141                                }
142                        }
143                }
144                theActiveSubscription.incrementDeliveriesCount();
145                IBaseBundle bundlePayload = mySubscriptionTopicPayloadBuilder.buildPayload(
146                                resources, theActiveSubscription, topicUrl, theSubscriptionTopicDispatchRequest.getRequestType());
147                bundlePayload.setId(UUID.randomUUID().toString());
148                SubscriptionDeliveryRequest subscriptionDeliveryRequest = new SubscriptionDeliveryRequest(
149                                bundlePayload, theActiveSubscription, theSubscriptionTopicDispatchRequest);
150                return mySubscriptionMatchDeliverer.deliverPayload(
151                                subscriptionDeliveryRequest, theSubscriptionTopicDispatchRequest.getInMemoryMatchResult());
152        }
153}