001/*-
002 * #%L
003 * HAPI FHIR Subscription Server
004 * %%
005 * Copyright (C) 2014 - 2025 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.topic;
021
022import ca.uhn.fhir.context.FhirContext;
023import ca.uhn.fhir.context.FhirVersionEnum;
024import ca.uhn.fhir.i18n.Msg;
025import ca.uhn.fhir.interceptor.model.RequestPartitionId;
026import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
027import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
028import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
029import ca.uhn.fhir.jpa.searchparam.ResourceSearch;
030import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
031import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
032import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
033import ca.uhn.fhir.jpa.topic.status.INotificationStatusBuilder;
034import ca.uhn.fhir.jpa.topic.status.R4BNotificationStatusBuilder;
035import ca.uhn.fhir.jpa.topic.status.R4NotificationStatusBuilder;
036import ca.uhn.fhir.jpa.topic.status.R5NotificationStatusBuilder;
037import ca.uhn.fhir.rest.api.Constants;
038import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
039import ca.uhn.fhir.rest.api.server.IBundleProvider;
040import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
041import ca.uhn.fhir.rest.param.ParameterUtil;
042import ca.uhn.fhir.util.BundleBuilder;
043import jakarta.annotation.Nonnull;
044import org.apache.commons.lang3.ObjectUtils;
045import org.hl7.fhir.instance.model.api.IBaseBundle;
046import org.hl7.fhir.instance.model.api.IBaseResource;
047import org.hl7.fhir.r5.model.Bundle;
048import org.hl7.fhir.r5.model.StringType;
049import org.hl7.fhir.r5.model.SubscriptionTopic;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import java.util.HashSet;
054import java.util.List;
055import java.util.Optional;
056import java.util.Set;
057import java.util.stream.Collectors;
058
059import static ca.uhn.fhir.rest.api.Constants.PARAM_ID;
060import static ca.uhn.fhir.rest.api.Constants.PARAM_INCLUDE;
061import static ca.uhn.fhir.rest.api.Constants.PARAM_REVINCLUDE;
062import static org.hl7.fhir.r5.model.Subscription.SubscriptionPayloadContent.FULLRESOURCE;
063
064public class SubscriptionTopicPayloadBuilder {
065        private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTopicPayloadBuilder.class);
066        private static final int MAX_INCLUDES = 1000;
067        private final FhirContext myFhirContext;
068        private final FhirVersionEnum myFhirVersion;
069        private final INotificationStatusBuilder<? extends IBaseResource> myNotificationStatusBuilder;
070
071        private final DaoRegistry myDaoRegistry;
072
073        private final SubscriptionTopicRegistry mySubscriptionTopicRegistry;
074
075        private final MatchUrlService myMatchUrlService;
076
077        public SubscriptionTopicPayloadBuilder(
078                        FhirContext theFhirContext,
079                        DaoRegistry theDaoRegistry,
080                        SubscriptionTopicRegistry theSubscriptionTopicRegistry,
081                        MatchUrlService theMatchUrlService) {
082                myFhirContext = theFhirContext;
083                myDaoRegistry = theDaoRegistry;
084                mySubscriptionTopicRegistry = theSubscriptionTopicRegistry;
085                myMatchUrlService = theMatchUrlService;
086                myFhirVersion = myFhirContext.getVersion().getVersion();
087
088                switch (myFhirVersion) {
089                        case R4:
090                                myNotificationStatusBuilder = new R4NotificationStatusBuilder(myFhirContext);
091                                break;
092                        case R4B:
093                                myNotificationStatusBuilder = new R4BNotificationStatusBuilder(myFhirContext);
094                                break;
095                        case R5:
096                                myNotificationStatusBuilder = new R5NotificationStatusBuilder(myFhirContext);
097                                break;
098                        default:
099                                throw unsupportedFhirVersionException();
100                }
101        }
102
103        public IBaseBundle buildPayload(
104                        List<IBaseResource> theResources,
105                        ActiveSubscription theActiveSubscription,
106                        String theTopicUrl,
107                        RestOperationTypeEnum theRestOperationType) {
108                BundleBuilder bundleBuilder = new BundleBuilder(myFhirContext);
109
110                IBaseResource notificationStatus =
111                                myNotificationStatusBuilder.buildNotificationStatus(theResources, theActiveSubscription, theTopicUrl);
112                bundleBuilder.addCollectionEntry(notificationStatus);
113
114                addResources(theResources, theActiveSubscription.getSubscription(), theRestOperationType, bundleBuilder);
115
116                // Handle notification shape includes and revIncludes
117                Set<IBaseResource> notificationShapeResources = getNotificationShapeResources(theResources, theTopicUrl);
118                for (IBaseResource resource : notificationShapeResources) {
119                        bundleBuilder.addCollectionEntry(resource);
120                }
121
122                // Note we need to set the bundle type after we add the resources since adding the resources automatically sets
123                // the bundle type
124                setBundleType(bundleBuilder);
125                IBaseBundle retval = bundleBuilder.getBundle();
126                if (ourLog.isDebugEnabled()) {
127                        String bundle = myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(retval);
128                        ourLog.debug("Bundle: {}", bundle);
129                }
130                return retval;
131        }
132
133        private Set<IBaseResource> getNotificationShapeResources(List<IBaseResource> theResources, String theTopicUrl) {
134
135                Set<IBaseResource> resultResources = new HashSet<>();
136
137                if (theResources.isEmpty() || mySubscriptionTopicRegistry == null) {
138                        return Set.of();
139                }
140
141                // Get the topic from the registry
142                Optional<SubscriptionTopic> oTopic = mySubscriptionTopicRegistry.findSubscriptionTopicByUrl(theTopicUrl);
143                if (oTopic.isEmpty()) {
144                        ourLog.warn("No subscription topic found for URL: {}", theTopicUrl);
145                        return Set.of();
146                }
147                SubscriptionTopic topic = oTopic.get();
148
149                // Process each notification shape
150                for (SubscriptionTopic.SubscriptionTopicNotificationShapeComponent shape : topic.getNotificationShape()) {
151                        String resourceType = shape.getResource();
152
153                        if (resourceType == null) {
154                                ourLog.warn(
155                                                "Notification Shape on SubscriptionTopic/{} (with url {}) is missing mandatory resource.",
156                                                topic.getId(),
157                                                topic.getUrl());
158                                continue;
159                        }
160
161                        // If the resourceType doesn't match any of our resources, skip it
162                        List<IBaseResource> resourcesOfThisType = theResources.stream()
163                                        .filter(r -> myFhirContext.getResourceType(r).equals(resourceType))
164                                        .collect(Collectors.toList());
165
166                        if (resourcesOfThisType.isEmpty()) {
167                                continue;
168                        }
169
170                        List<StringType> include = shape.getInclude();
171                        List<StringType> revInclude = shape.getRevInclude();
172
173                        Set<IBaseResource> includedResources =
174                                        getIncludedResources(resourceType, resourcesOfThisType, include, revInclude);
175                        resultResources.addAll(includedResources);
176                }
177
178                return resultResources;
179        }
180
181        private Set<IBaseResource> getIncludedResources(
182                        String theResourceType,
183                        List<IBaseResource> theResourcesOfThisType,
184                        List<StringType> theIncludes,
185                        List<StringType> theRevIncludes) {
186                Set<IBaseResource> resultResources = new HashSet<>();
187                IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(theResourceType);
188
189                for (IBaseResource resource : theResourcesOfThisType) {
190                        StringBuilder query = new StringBuilder(theResourceType + "?" + PARAM_ID + "="
191                                        + resource.getIdElement().getIdPart());
192                        for (StringType include : theIncludes) {
193                                query.append("&" + PARAM_INCLUDE + "=").append(ParameterUtil.escapeAndUrlEncode(include.getValue()));
194                        }
195                        for (StringType revInclude : theRevIncludes) {
196                                query.append("&" + PARAM_REVINCLUDE + "=")
197                                                .append(ParameterUtil.escapeAndUrlEncode(revInclude.getValue()));
198                        }
199                        ResourceSearch resourceSearch =
200                                        myMatchUrlService.getResourceSearchWithIncludesAndRevIncludes(query.toString());
201                        SearchParameterMap map = resourceSearch.getSearchParameterMap();
202                        map.setLoadSynchronousUpTo(MAX_INCLUDES);
203                        SystemRequestDetails systemRequestDetails = buildSystemRequestDetails(resource);
204                        IBundleProvider result = dao.search(map, systemRequestDetails);
205
206                        Integer size = result.size();
207                        if (size != null && size >= MAX_INCLUDES) {
208                                ourLog.warn(
209                                                "More than {} include/revincluded resources found. Number of include/revincluded resources limited to {}",
210                                                MAX_INCLUDES,
211                                                MAX_INCLUDES);
212                        }
213                        result.getAllResources().stream()
214                                        .filter(r -> !r.getIdElement()
215                                                        .toUnqualifiedVersionless()
216                                                        .equals(resource.getIdElement().toUnqualifiedVersionless()))
217                                        .forEach(resultResources::add);
218                }
219
220                return resultResources;
221        }
222
223        @Nonnull
224        private static SystemRequestDetails buildSystemRequestDetails(IBaseResource resource) {
225                SystemRequestDetails systemRequestDetails = new SystemRequestDetails();
226                // TODO KHS might need to check cross-partition subscription settings here
227                RequestPartitionId requestPartitionId =
228                                (RequestPartitionId) resource.getUserData(Constants.RESOURCE_PARTITION_ID);
229                if (requestPartitionId == null) {
230                        systemRequestDetails.setRequestPartitionId(RequestPartitionId.allPartitions());
231                } else {
232                        systemRequestDetails.setRequestPartitionId(requestPartitionId);
233                }
234                return systemRequestDetails;
235        }
236
237        private void addResources(
238                        List<IBaseResource> theResources,
239                        CanonicalSubscription theCanonicalSubscription,
240                        RestOperationTypeEnum theRestOperationType,
241                        BundleBuilder theBundleBuilder) {
242
243                org.hl7.fhir.r5.model.Subscription.SubscriptionPayloadContent content =
244                                ObjectUtils.defaultIfNull(theCanonicalSubscription.getContent(), FULLRESOURCE);
245
246                switch (content) {
247                        case IDONLY:
248                                addIdOnly(theBundleBuilder, theResources, theRestOperationType);
249                                break;
250                        case FULLRESOURCE:
251                                addFullResources(theBundleBuilder, theResources, theRestOperationType);
252                                break;
253                        case EMPTY:
254                        default:
255                                // skip adding resource to the Bundle
256                                break;
257                }
258        }
259
260        private void addIdOnly(
261                        BundleBuilder bundleBuilder, List<IBaseResource> theResources, RestOperationTypeEnum theRestOperationType) {
262                for (IBaseResource resource : theResources) {
263                        switch (theRestOperationType) {
264                                case CREATE:
265                                        bundleBuilder.addTransactionCreateEntryIdOnly(resource);
266                                        break;
267                                case UPDATE:
268                                        bundleBuilder.addTransactionUpdateIdOnlyEntry(resource);
269                                        break;
270                                case DELETE:
271                                        bundleBuilder.addTransactionDeleteEntry(resource);
272                                        break;
273                                default:
274                                        break;
275                        }
276                }
277        }
278
279        private void addFullResources(
280                        BundleBuilder bundleBuilder, List<IBaseResource> theResources, RestOperationTypeEnum theRestOperationType) {
281                for (IBaseResource resource : theResources) {
282                        switch (theRestOperationType) {
283                                case CREATE:
284                                        bundleBuilder.addTransactionCreateEntry(resource);
285                                        break;
286                                case UPDATE:
287                                        bundleBuilder.addTransactionUpdateEntry(resource);
288                                        break;
289                                case DELETE:
290                                        bundleBuilder.addTransactionDeleteEntry(resource);
291                                        break;
292                                default:
293                                        break;
294                        }
295                }
296        }
297
298        private void setBundleType(BundleBuilder bundleBuilder) {
299                switch (myFhirVersion) {
300                        case R4:
301                        case R4B:
302                                bundleBuilder.setType(Bundle.BundleType.HISTORY.toCode());
303                                break;
304                        case R5:
305                                bundleBuilder.setType(Bundle.BundleType.SUBSCRIPTIONNOTIFICATION.toCode());
306                                break;
307                        default:
308                                throw unsupportedFhirVersionException();
309                }
310        }
311
312        private IllegalStateException unsupportedFhirVersionException() {
313                return new IllegalStateException(
314                                Msg.code(2331) + "SubscriptionTopic subscriptions are not supported on FHIR version: " + myFhirVersion);
315        }
316}