001/*- 002 * #%L 003 * HAPI FHIR Subscription Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.subscription.match.deliver.resthook; 021 022import ca.uhn.fhir.context.RuntimeResourceDefinition; 023import ca.uhn.fhir.i18n.Msg; 024import ca.uhn.fhir.interceptor.api.HookParams; 025import ca.uhn.fhir.interceptor.api.Pointcut; 026import ca.uhn.fhir.interceptor.model.RequestPartitionId; 027import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 028import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 029import ca.uhn.fhir.jpa.subscription.match.deliver.BaseSubscriptionDeliverySubscriber; 030import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; 031import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage; 032import ca.uhn.fhir.rest.api.EncodingEnum; 033import ca.uhn.fhir.rest.api.RequestTypeEnum; 034import ca.uhn.fhir.rest.api.server.SystemRequestDetails; 035import ca.uhn.fhir.rest.client.api.Header; 036import ca.uhn.fhir.rest.client.api.IGenericClient; 037import ca.uhn.fhir.rest.client.api.IHttpClient; 038import ca.uhn.fhir.rest.client.api.IHttpRequest; 039import ca.uhn.fhir.rest.client.api.IHttpResponse; 040import ca.uhn.fhir.rest.client.api.ServerValidationModeEnum; 041import ca.uhn.fhir.rest.client.interceptor.SimpleRequestHeaderInterceptor; 042import ca.uhn.fhir.rest.gclient.IClientExecutable; 043import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException; 044import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException; 045import ca.uhn.fhir.rest.server.messaging.BaseResourceModifiedMessage; 046import ca.uhn.fhir.util.BundleUtil; 047import ca.uhn.fhir.util.Logs; 048import ca.uhn.fhir.util.StopWatch; 049import jakarta.annotation.Nullable; 050import org.hl7.fhir.instance.model.api.IBaseBundle; 051import org.hl7.fhir.instance.model.api.IBaseResource; 052import org.hl7.fhir.instance.model.api.IIdType; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055import org.springframework.beans.factory.annotation.Autowired; 056import org.springframework.context.annotation.Scope; 057import org.springframework.messaging.MessagingException; 058 059import java.io.IOException; 060import java.util.ArrayList; 061import java.util.Collections; 062import java.util.HashMap; 063import java.util.List; 064import java.util.Map; 065 066import static org.apache.commons.lang3.StringUtils.isNotBlank; 067 068@Scope("prototype") 069public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionDeliverySubscriber { 070 private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionDeliveringRestHookSubscriber.class); 071 072 @Autowired 073 private DaoRegistry myDaoRegistry; 074 075 /** 076 * Constructor 077 */ 078 public SubscriptionDeliveringRestHookSubscriber() { 079 super(); 080 } 081 082 protected void deliverPayload( 083 ResourceDeliveryMessage theMsg, 084 CanonicalSubscription theSubscription, 085 EncodingEnum thePayloadType, 086 IGenericClient theClient) { 087 IBaseResource payloadResource = getAndMassagePayload(theMsg, theSubscription); 088 089 // Regardless of whether we have a payload, the rest-hook should be sent. 090 doDelivery(theMsg, theSubscription, thePayloadType, theClient, payloadResource); 091 } 092 093 protected void doDelivery( 094 ResourceDeliveryMessage theMsg, 095 CanonicalSubscription theSubscription, 096 EncodingEnum thePayloadType, 097 IGenericClient theClient, 098 IBaseResource thePayloadResource) { 099 IClientExecutable<?, ?> operation; 100 101 if (theSubscription.isTopicSubscription()) { 102 operation = createDeliveryRequestTopic((IBaseBundle) thePayloadResource, theClient); 103 } else if (isNotBlank(theSubscription.getPayloadSearchCriteria())) { 104 operation = createDeliveryRequestTransaction(theSubscription, theClient, thePayloadResource); 105 } else if (thePayloadType != null) { 106 operation = createDeliveryRequestNormal(theMsg, theClient, thePayloadResource); 107 } else { 108 sendNotification(theMsg); 109 operation = null; 110 } 111 112 if (operation != null) { 113 114 if (thePayloadType != null) { 115 operation.encoded(thePayloadType); 116 } 117 118 String payloadId = thePayloadResource.getIdElement().toUnqualified().getValue(); 119 StopWatch sw = new StopWatch(); 120 121 try { 122 operation.execute(); 123 } catch (ResourceNotFoundException e) { 124 ourLog.error("Cannot reach {} ", theMsg.getSubscription().getEndpointUrl()); 125 ourLog.error("Exception: ", e); 126 throw e; 127 } 128 129 Logs.getSubscriptionTroubleshootingLog() 130 .debug( 131 "Delivered {} rest-hook payload {} for {} in {}", 132 theMsg.getOperationType(), 133 payloadId, 134 theSubscription 135 .getIdElement(myFhirContext) 136 .toUnqualifiedVersionless() 137 .getValue(), 138 sw); 139 } 140 } 141 142 @Nullable 143 private IClientExecutable<?, ?> createDeliveryRequestNormal( 144 ResourceDeliveryMessage theMsg, IGenericClient theClient, IBaseResource thePayloadResource) { 145 IClientExecutable<?, ?> operation; 146 switch (theMsg.getOperationType()) { 147 case CREATE: 148 case UPDATE: 149 operation = theClient.update().resource(thePayloadResource); 150 break; 151 case DELETE: 152 operation = theClient.delete().resourceById(theMsg.getPayloadId(myFhirContext)); 153 break; 154 default: 155 ourLog.warn("Ignoring delivery message of type: {}", theMsg.getOperationType()); 156 operation = null; 157 break; 158 } 159 return operation; 160 } 161 162 private IClientExecutable<?, ?> createDeliveryRequestTransaction( 163 CanonicalSubscription theSubscription, IGenericClient theClient, IBaseResource thePayloadResource) { 164 IBaseBundle bundle = createDeliveryBundleForPayloadSearchCriteria(theSubscription, thePayloadResource); 165 return theClient.transaction().withBundle(bundle); 166 } 167 168 private IClientExecutable<?, ?> createDeliveryRequestTopic(IBaseBundle theBundle, IGenericClient theClient) { 169 return theClient.transaction().withBundle(theBundle); 170 } 171 172 public IBaseResource getResource(IIdType thePayloadId, RequestPartitionId thePartitionId, boolean theDeletedOK) 173 throws ResourceGoneException { 174 RuntimeResourceDefinition resourceDef = myFhirContext.getResourceDefinition(thePayloadId.getResourceType()); 175 SystemRequestDetails systemRequestDetails = new SystemRequestDetails().setRequestPartitionId(thePartitionId); 176 IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(resourceDef.getImplementingClass()); 177 return dao.read(thePayloadId.toVersionless(), systemRequestDetails, theDeletedOK); 178 } 179 180 /** 181 * Perform operations on the payload based on various subscription extension settings such as deliver latest version, 182 * delete and/or strip version id. 183 * @param theMsg 184 * @param theSubscription 185 * @return 186 */ 187 protected IBaseResource getAndMassagePayload( 188 ResourceDeliveryMessage theMsg, CanonicalSubscription theSubscription) { 189 IBaseResource payloadResource = theMsg.getPayload(myFhirContext); 190 191 if (payloadResource instanceof IBaseBundle) { 192 return getAndMassageBundle(theMsg, (IBaseBundle) payloadResource, theSubscription); 193 } else { 194 return getAndMassageResource(theMsg, payloadResource, theSubscription); 195 } 196 } 197 198 private IBaseResource getAndMassageBundle( 199 ResourceDeliveryMessage theMsg, IBaseBundle theBundle, CanonicalSubscription theSubscription) { 200 BundleUtil.processEntries(myFhirContext, theBundle, entry -> { 201 IBaseResource entryResource = entry.getResource(); 202 if (entryResource != null) { 203 // SubscriptionStatus is a "virtual" resource type that is not stored in the repository 204 if (!"SubscriptionStatus".equals(myFhirContext.getResourceType(entryResource))) { 205 IBaseResource updatedResource = getAndMassageResource(theMsg, entryResource, theSubscription); 206 entry.setFullUrl(updatedResource.getIdElement().getValue()); 207 entry.setResource(updatedResource); 208 } 209 } 210 }); 211 return theBundle; 212 } 213 214 private IBaseResource getAndMassageResource( 215 ResourceDeliveryMessage theMsg, IBaseResource thePayloadResource, CanonicalSubscription theSubscription) { 216 if (thePayloadResource == null || theSubscription.getRestHookDetails().isDeliverLatestVersion()) { 217 218 IIdType payloadId = theMsg.getPayloadId(myFhirContext).toVersionless(); 219 if (theSubscription.isTopicSubscription()) { 220 payloadId = thePayloadResource.getIdElement().toVersionless(); 221 } 222 try { 223 if (payloadId != null) { 224 boolean deletedOK = 225 theMsg.getOperationType() == BaseResourceModifiedMessage.OperationTypeEnum.DELETE; 226 thePayloadResource = getResource(payloadId, theMsg.getRequestPartitionId(), deletedOK); 227 } else { 228 return null; 229 } 230 } catch (ResourceGoneException e) { 231 ourLog.warn( 232 "Resource {} is deleted, not going to deliver for subscription {}", 233 payloadId, 234 theSubscription.getIdElement(myFhirContext)); 235 return null; 236 } 237 } 238 239 IIdType resourceId = thePayloadResource.getIdElement(); 240 if (theSubscription.getRestHookDetails().isStripVersionId()) { 241 resourceId = resourceId.toVersionless(); 242 thePayloadResource.setId(resourceId); 243 thePayloadResource.getMeta().setVersionId(null); 244 } 245 return thePayloadResource; 246 } 247 248 @Override 249 public void handleMessage(ResourceDeliveryMessage theMessage) throws MessagingException { 250 CanonicalSubscription subscription = theMessage.getSubscription(); 251 252 // Interceptor call: SUBSCRIPTION_BEFORE_REST_HOOK_DELIVERY 253 HookParams params = new HookParams() 254 .add(CanonicalSubscription.class, subscription) 255 .add(ResourceDeliveryMessage.class, theMessage); 256 if (!getInterceptorBroadcaster().callHooks(Pointcut.SUBSCRIPTION_BEFORE_REST_HOOK_DELIVERY, params)) { 257 return; 258 } 259 260 // Grab the endpoint from the subscription 261 String endpointUrl = subscription.getEndpointUrl(); 262 263 // Grab the payload type (encoding mimetype) from the subscription 264 String payloadString = subscription.getPayloadString(); 265 EncodingEnum payloadType = null; 266 if (payloadString != null) { 267 payloadType = EncodingEnum.forContentType(payloadString); 268 } 269 270 // Create the client request 271 myFhirContext.getRestfulClientFactory().setServerValidationMode(ServerValidationModeEnum.NEVER); 272 IGenericClient client = null; 273 if (isNotBlank(endpointUrl)) { 274 client = myFhirContext.newRestfulGenericClient(endpointUrl); 275 276 // Additional headers specified in the subscription 277 List<String> headers = subscription.getHeaders(); 278 for (String next : headers) { 279 if (isNotBlank(next)) { 280 client.registerInterceptor(new SimpleRequestHeaderInterceptor(next)); 281 } 282 } 283 } 284 285 deliverPayload(theMessage, subscription, payloadType, client); 286 287 // Interceptor call: SUBSCRIPTION_AFTER_REST_HOOK_DELIVERY 288 params = new HookParams() 289 .add(CanonicalSubscription.class, subscription) 290 .add(ResourceDeliveryMessage.class, theMessage); 291 if (!getInterceptorBroadcaster().callHooks(Pointcut.SUBSCRIPTION_AFTER_REST_HOOK_DELIVERY, params)) { 292 //noinspection UnnecessaryReturnStatement 293 return; 294 } 295 } 296 297 /** 298 * Sends a POST notification without a payload 299 */ 300 protected void sendNotification(ResourceDeliveryMessage theMsg) { 301 Map<String, List<String>> params = new HashMap<>(); 302 CanonicalSubscription subscription = theMsg.getSubscription(); 303 List<Header> headers = parseHeadersFromSubscription(subscription); 304 305 StringBuilder url = new StringBuilder(subscription.getEndpointUrl()); 306 IHttpClient client = 307 myFhirContext.getRestfulClientFactory().getHttpClient(url, params, "", RequestTypeEnum.POST, headers); 308 IHttpRequest request = client.createParamRequest(myFhirContext, params, null); 309 try { 310 IHttpResponse response = request.execute(); 311 // close connection in order to return a possible cached connection to the connection pool 312 response.close(); 313 } catch (IOException e) { 314 ourLog.error( 315 "Error trying to reach {}: {}", theMsg.getSubscription().getEndpointUrl(), e.toString()); 316 throw new ResourceNotFoundException(Msg.code(5) + e.getMessage()); 317 } 318 } 319 320 public static List<Header> parseHeadersFromSubscription(CanonicalSubscription subscription) { 321 List<Header> headers = null; 322 if (subscription != null) { 323 for (String h : subscription.getHeaders()) { 324 if (h != null) { 325 final int sep = h.indexOf(':'); 326 if (sep > 0) { 327 final String name = h.substring(0, sep); 328 final String value = h.substring(sep + 1); 329 if (isNotBlank(name)) { 330 if (headers == null) { 331 headers = new ArrayList<>(); 332 } 333 headers.add(new Header(name.trim(), value.trim())); 334 } 335 } 336 } 337 } 338 } 339 if (headers == null) { 340 headers = Collections.emptyList(); 341 } else { 342 headers = Collections.unmodifiableList(headers); 343 } 344 return headers; 345 } 346}