
001/*- 002 * #%L 003 * HAPI FHIR Subscription Server 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.topic; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.context.FhirVersionEnum; 024import ca.uhn.fhir.i18n.Msg; 025import ca.uhn.fhir.interceptor.model.RequestPartitionId; 026import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 027import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 028import ca.uhn.fhir.jpa.searchparam.MatchUrlService; 029import ca.uhn.fhir.jpa.searchparam.ResourceSearch; 030import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 031import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription; 032import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; 033import ca.uhn.fhir.jpa.topic.status.INotificationStatusBuilder; 034import ca.uhn.fhir.jpa.topic.status.R4BNotificationStatusBuilder; 035import ca.uhn.fhir.jpa.topic.status.R4NotificationStatusBuilder; 036import ca.uhn.fhir.jpa.topic.status.R5NotificationStatusBuilder; 037import ca.uhn.fhir.rest.api.Constants; 038import ca.uhn.fhir.rest.api.RestOperationTypeEnum; 039import ca.uhn.fhir.rest.api.server.IBundleProvider; 040import ca.uhn.fhir.rest.api.server.SystemRequestDetails; 041import ca.uhn.fhir.rest.param.ParameterUtil; 042import ca.uhn.fhir.util.BundleBuilder; 043import jakarta.annotation.Nonnull; 044import org.apache.commons.lang3.ObjectUtils; 045import org.hl7.fhir.instance.model.api.IBaseBundle; 046import org.hl7.fhir.instance.model.api.IBaseResource; 047import org.hl7.fhir.r5.model.Bundle; 048import org.hl7.fhir.r5.model.StringType; 049import org.hl7.fhir.r5.model.SubscriptionTopic; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import java.util.HashSet; 054import java.util.List; 055import java.util.Optional; 056import java.util.Set; 057import java.util.stream.Collectors; 058 059import static ca.uhn.fhir.rest.api.Constants.PARAM_ID; 060import static ca.uhn.fhir.rest.api.Constants.PARAM_INCLUDE; 061import static ca.uhn.fhir.rest.api.Constants.PARAM_REVINCLUDE; 062import static org.hl7.fhir.r5.model.Subscription.SubscriptionPayloadContent.FULLRESOURCE; 063 064public class SubscriptionTopicPayloadBuilder { 065 private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTopicPayloadBuilder.class); 066 private static final int MAX_INCLUDES = 1000; 067 private final FhirContext myFhirContext; 068 private final FhirVersionEnum myFhirVersion; 069 private final INotificationStatusBuilder<? extends IBaseResource> myNotificationStatusBuilder; 070 071 private final DaoRegistry myDaoRegistry; 072 073 private final SubscriptionTopicRegistry mySubscriptionTopicRegistry; 074 075 private final MatchUrlService myMatchUrlService; 076 077 public SubscriptionTopicPayloadBuilder( 078 FhirContext theFhirContext, 079 DaoRegistry theDaoRegistry, 080 SubscriptionTopicRegistry theSubscriptionTopicRegistry, 081 MatchUrlService theMatchUrlService) { 082 myFhirContext = theFhirContext; 083 myDaoRegistry = theDaoRegistry; 084 mySubscriptionTopicRegistry = theSubscriptionTopicRegistry; 085 myMatchUrlService = theMatchUrlService; 086 myFhirVersion = myFhirContext.getVersion().getVersion(); 087 088 switch (myFhirVersion) { 089 case R4: 090 myNotificationStatusBuilder = new R4NotificationStatusBuilder(myFhirContext); 091 break; 092 case R4B: 093 myNotificationStatusBuilder = new R4BNotificationStatusBuilder(myFhirContext); 094 break; 095 case R5: 096 myNotificationStatusBuilder = new R5NotificationStatusBuilder(myFhirContext); 097 break; 098 default: 099 throw unsupportedFhirVersionException(); 100 } 101 } 102 103 public IBaseBundle buildPayload( 104 List<IBaseResource> theResources, 105 ActiveSubscription theActiveSubscription, 106 String theTopicUrl, 107 RestOperationTypeEnum theRestOperationType) { 108 BundleBuilder bundleBuilder = new BundleBuilder(myFhirContext); 109 110 IBaseResource notificationStatus = 111 myNotificationStatusBuilder.buildNotificationStatus(theResources, theActiveSubscription, theTopicUrl); 112 bundleBuilder.addCollectionEntry(notificationStatus); 113 114 addResources(theResources, theActiveSubscription.getSubscription(), theRestOperationType, bundleBuilder); 115 116 // Handle notification shape includes and revIncludes 117 Set<IBaseResource> notificationShapeResources = getNotificationShapeResources(theResources, theTopicUrl); 118 for (IBaseResource resource : notificationShapeResources) { 119 bundleBuilder.addCollectionEntry(resource); 120 } 121 122 // Note we need to set the bundle type after we add the resources since adding the resources automatically sets 123 // the bundle type 124 setBundleType(bundleBuilder); 125 IBaseBundle retval = bundleBuilder.getBundle(); 126 if (ourLog.isDebugEnabled()) { 127 String bundle = myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(retval); 128 ourLog.debug("Bundle: {}", bundle); 129 } 130 return retval; 131 } 132 133 private Set<IBaseResource> getNotificationShapeResources(List<IBaseResource> theResources, String theTopicUrl) { 134 135 Set<IBaseResource> resultResources = new HashSet<>(); 136 137 if (theResources.isEmpty() || mySubscriptionTopicRegistry == null) { 138 return Set.of(); 139 } 140 141 // Get the topic from the registry 142 Optional<SubscriptionTopic> oTopic = mySubscriptionTopicRegistry.findSubscriptionTopicByUrl(theTopicUrl); 143 if (oTopic.isEmpty()) { 144 ourLog.warn("No subscription topic found for URL: {}", theTopicUrl); 145 return Set.of(); 146 } 147 SubscriptionTopic topic = oTopic.get(); 148 149 // Process each notification shape 150 for (SubscriptionTopic.SubscriptionTopicNotificationShapeComponent shape : topic.getNotificationShape()) { 151 String resourceType = shape.getResource(); 152 153 if (resourceType == null) { 154 ourLog.warn( 155 "Notification Shape on SubscriptionTopic/{} (with url {}) is missing mandatory resource.", 156 topic.getId(), 157 topic.getUrl()); 158 continue; 159 } 160 161 // If the resourceType doesn't match any of our resources, skip it 162 List<IBaseResource> resourcesOfThisType = theResources.stream() 163 .filter(r -> myFhirContext.getResourceType(r).equals(resourceType)) 164 .collect(Collectors.toList()); 165 166 if (resourcesOfThisType.isEmpty()) { 167 continue; 168 } 169 170 List<StringType> include = shape.getInclude(); 171 List<StringType> revInclude = shape.getRevInclude(); 172 173 Set<IBaseResource> includedResources = 174 getIncludedResources(resourceType, resourcesOfThisType, include, revInclude); 175 resultResources.addAll(includedResources); 176 } 177 178 return resultResources; 179 } 180 181 private Set<IBaseResource> getIncludedResources( 182 String theResourceType, 183 List<IBaseResource> theResourcesOfThisType, 184 List<StringType> theIncludes, 185 List<StringType> theRevIncludes) { 186 Set<IBaseResource> resultResources = new HashSet<>(); 187 IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(theResourceType); 188 189 for (IBaseResource resource : theResourcesOfThisType) { 190 StringBuilder query = new StringBuilder(theResourceType + "?" + PARAM_ID + "=" 191 + resource.getIdElement().getIdPart()); 192 for (StringType include : theIncludes) { 193 query.append("&" + PARAM_INCLUDE + "=").append(ParameterUtil.escapeAndUrlEncode(include.getValue())); 194 } 195 for (StringType revInclude : theRevIncludes) { 196 query.append("&" + PARAM_REVINCLUDE + "=") 197 .append(ParameterUtil.escapeAndUrlEncode(revInclude.getValue())); 198 } 199 ResourceSearch resourceSearch = 200 myMatchUrlService.getResourceSearchWithIncludesAndRevIncludes(query.toString()); 201 SearchParameterMap map = resourceSearch.getSearchParameterMap(); 202 map.setLoadSynchronousUpTo(MAX_INCLUDES); 203 SystemRequestDetails systemRequestDetails = buildSystemRequestDetails(resource); 204 IBundleProvider result = dao.search(map, systemRequestDetails); 205 206 Integer size = result.size(); 207 if (size != null && size >= MAX_INCLUDES) { 208 ourLog.warn( 209 "More than {} include/revincluded resources found. Number of include/revincluded resources limited to {}", 210 MAX_INCLUDES, 211 MAX_INCLUDES); 212 } 213 result.getAllResources().stream() 214 .filter(r -> !r.getIdElement() 215 .toUnqualifiedVersionless() 216 .equals(resource.getIdElement().toUnqualifiedVersionless())) 217 .forEach(resultResources::add); 218 } 219 220 return resultResources; 221 } 222 223 @Nonnull 224 private static SystemRequestDetails buildSystemRequestDetails(IBaseResource resource) { 225 SystemRequestDetails systemRequestDetails = new SystemRequestDetails(); 226 // TODO KHS might need to check cross-partition subscription settings here 227 RequestPartitionId requestPartitionId = 228 (RequestPartitionId) resource.getUserData(Constants.RESOURCE_PARTITION_ID); 229 if (requestPartitionId == null) { 230 systemRequestDetails.setRequestPartitionId(RequestPartitionId.allPartitions()); 231 } else { 232 systemRequestDetails.setRequestPartitionId(requestPartitionId); 233 } 234 return systemRequestDetails; 235 } 236 237 private void addResources( 238 List<IBaseResource> theResources, 239 CanonicalSubscription theCanonicalSubscription, 240 RestOperationTypeEnum theRestOperationType, 241 BundleBuilder theBundleBuilder) { 242 243 org.hl7.fhir.r5.model.Subscription.SubscriptionPayloadContent content = 244 ObjectUtils.defaultIfNull(theCanonicalSubscription.getContent(), FULLRESOURCE); 245 246 switch (content) { 247 case IDONLY: 248 addIdOnly(theBundleBuilder, theResources, theRestOperationType); 249 break; 250 case FULLRESOURCE: 251 addFullResources(theBundleBuilder, theResources, theRestOperationType); 252 break; 253 case EMPTY: 254 default: 255 // skip adding resource to the Bundle 256 break; 257 } 258 } 259 260 private void addIdOnly( 261 BundleBuilder bundleBuilder, List<IBaseResource> theResources, RestOperationTypeEnum theRestOperationType) { 262 for (IBaseResource resource : theResources) { 263 switch (theRestOperationType) { 264 case CREATE: 265 bundleBuilder.addTransactionCreateEntryIdOnly(resource); 266 break; 267 case UPDATE: 268 bundleBuilder.addTransactionUpdateIdOnlyEntry(resource); 269 break; 270 case DELETE: 271 bundleBuilder.addTransactionDeleteEntry(resource); 272 break; 273 default: 274 break; 275 } 276 } 277 } 278 279 private void addFullResources( 280 BundleBuilder bundleBuilder, List<IBaseResource> theResources, RestOperationTypeEnum theRestOperationType) { 281 for (IBaseResource resource : theResources) { 282 switch (theRestOperationType) { 283 case CREATE: 284 bundleBuilder.addTransactionCreateEntry(resource); 285 break; 286 case UPDATE: 287 bundleBuilder.addTransactionUpdateEntry(resource); 288 break; 289 case DELETE: 290 bundleBuilder.addTransactionDeleteEntry(resource); 291 break; 292 default: 293 break; 294 } 295 } 296 } 297 298 private void setBundleType(BundleBuilder bundleBuilder) { 299 switch (myFhirVersion) { 300 case R4: 301 case R4B: 302 bundleBuilder.setType(Bundle.BundleType.HISTORY.toCode()); 303 break; 304 case R5: 305 bundleBuilder.setType(Bundle.BundleType.SUBSCRIPTIONNOTIFICATION.toCode()); 306 break; 307 default: 308 throw unsupportedFhirVersionException(); 309 } 310 } 311 312 private IllegalStateException unsupportedFhirVersionException() { 313 return new IllegalStateException( 314 Msg.code(2331) + "SubscriptionTopic subscriptions are not supported on FHIR version: " + myFhirVersion); 315 } 316}