001/*-
002 * #%L
003 * HAPI FHIR Subscription Server
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.subscription.match.deliver.resthook;
021
022import ca.uhn.fhir.context.RuntimeResourceDefinition;
023import ca.uhn.fhir.i18n.Msg;
024import ca.uhn.fhir.interceptor.api.HookParams;
025import ca.uhn.fhir.interceptor.api.Pointcut;
026import ca.uhn.fhir.interceptor.model.RequestPartitionId;
027import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
028import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
029import ca.uhn.fhir.jpa.subscription.match.deliver.BaseSubscriptionDeliverySubscriber;
030import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
031import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
032import ca.uhn.fhir.rest.api.EncodingEnum;
033import ca.uhn.fhir.rest.api.RequestTypeEnum;
034import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
035import ca.uhn.fhir.rest.client.api.Header;
036import ca.uhn.fhir.rest.client.api.IGenericClient;
037import ca.uhn.fhir.rest.client.api.IHttpClient;
038import ca.uhn.fhir.rest.client.api.IHttpRequest;
039import ca.uhn.fhir.rest.client.api.IHttpResponse;
040import ca.uhn.fhir.rest.client.api.ServerValidationModeEnum;
041import ca.uhn.fhir.rest.client.interceptor.SimpleRequestHeaderInterceptor;
042import ca.uhn.fhir.rest.gclient.IClientExecutable;
043import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
044import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
045import ca.uhn.fhir.rest.server.messaging.BaseResourceModifiedMessage;
046import ca.uhn.fhir.util.BundleUtil;
047import ca.uhn.fhir.util.Logs;
048import ca.uhn.fhir.util.StopWatch;
049import jakarta.annotation.Nullable;
050import org.hl7.fhir.instance.model.api.IBaseBundle;
051import org.hl7.fhir.instance.model.api.IBaseResource;
052import org.hl7.fhir.instance.model.api.IIdType;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055import org.springframework.beans.factory.annotation.Autowired;
056import org.springframework.context.annotation.Scope;
057import org.springframework.messaging.MessagingException;
058
059import java.io.IOException;
060import java.util.ArrayList;
061import java.util.Collections;
062import java.util.HashMap;
063import java.util.List;
064import java.util.Map;
065
066import static org.apache.commons.lang3.StringUtils.isNotBlank;
067
068@Scope("prototype")
069public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionDeliverySubscriber {
070        private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionDeliveringRestHookSubscriber.class);
071
072        @Autowired
073        private DaoRegistry myDaoRegistry;
074
075        /**
076         * Constructor
077         */
078        public SubscriptionDeliveringRestHookSubscriber() {
079                super();
080        }
081
082        protected void deliverPayload(
083                        ResourceDeliveryMessage theMsg,
084                        CanonicalSubscription theSubscription,
085                        EncodingEnum thePayloadType,
086                        IGenericClient theClient) {
087                IBaseResource payloadResource = getAndMassagePayload(theMsg, theSubscription);
088
089                // Regardless of whether we have a payload, the rest-hook should be sent.
090                doDelivery(theMsg, theSubscription, thePayloadType, theClient, payloadResource);
091        }
092
093        protected void doDelivery(
094                        ResourceDeliveryMessage theMsg,
095                        CanonicalSubscription theSubscription,
096                        EncodingEnum thePayloadType,
097                        IGenericClient theClient,
098                        IBaseResource thePayloadResource) {
099                IClientExecutable<?, ?> operation;
100
101                if (theSubscription.isTopicSubscription()) {
102                        operation = createDeliveryRequestTopic((IBaseBundle) thePayloadResource, theClient);
103                } else if (isNotBlank(theSubscription.getPayloadSearchCriteria())) {
104                        operation = createDeliveryRequestTransaction(theSubscription, theClient, thePayloadResource);
105                } else if (thePayloadType != null) {
106                        operation = createDeliveryRequestNormal(theMsg, theClient, thePayloadResource);
107                } else {
108                        sendNotification(theMsg);
109                        operation = null;
110                }
111
112                if (operation != null) {
113
114                        if (thePayloadType != null) {
115                                operation.encoded(thePayloadType);
116                        }
117
118                        String payloadId = thePayloadResource.getIdElement().toUnqualified().getValue();
119                        StopWatch sw = new StopWatch();
120
121                        try {
122                                operation.execute();
123                        } catch (ResourceNotFoundException e) {
124                                ourLog.error("Cannot reach {} ", theMsg.getSubscription().getEndpointUrl());
125                                ourLog.error("Exception: ", e);
126                                throw e;
127                        }
128
129                        Logs.getSubscriptionTroubleshootingLog()
130                                        .debug(
131                                                        "Delivered {} rest-hook payload {} for {} in {}",
132                                                        theMsg.getOperationType(),
133                                                        payloadId,
134                                                        theSubscription
135                                                                        .getIdElement(myFhirContext)
136                                                                        .toUnqualifiedVersionless()
137                                                                        .getValue(),
138                                                        sw);
139                }
140        }
141
142        @Nullable
143        private IClientExecutable<?, ?> createDeliveryRequestNormal(
144                        ResourceDeliveryMessage theMsg, IGenericClient theClient, IBaseResource thePayloadResource) {
145                IClientExecutable<?, ?> operation;
146                switch (theMsg.getOperationType()) {
147                        case CREATE:
148                        case UPDATE:
149                                operation = theClient.update().resource(thePayloadResource);
150                                break;
151                        case DELETE:
152                                operation = theClient.delete().resourceById(theMsg.getPayloadId(myFhirContext));
153                                break;
154                        default:
155                                ourLog.warn("Ignoring delivery message of type: {}", theMsg.getOperationType());
156                                operation = null;
157                                break;
158                }
159                return operation;
160        }
161
162        private IClientExecutable<?, ?> createDeliveryRequestTransaction(
163                        CanonicalSubscription theSubscription, IGenericClient theClient, IBaseResource thePayloadResource) {
164                IBaseBundle bundle = createDeliveryBundleForPayloadSearchCriteria(theSubscription, thePayloadResource);
165                return theClient.transaction().withBundle(bundle);
166        }
167
168        private IClientExecutable<?, ?> createDeliveryRequestTopic(IBaseBundle theBundle, IGenericClient theClient) {
169                return theClient.transaction().withBundle(theBundle);
170        }
171
172        public IBaseResource getResource(IIdType thePayloadId, RequestPartitionId thePartitionId, boolean theDeletedOK)
173                        throws ResourceGoneException {
174                RuntimeResourceDefinition resourceDef = myFhirContext.getResourceDefinition(thePayloadId.getResourceType());
175                SystemRequestDetails systemRequestDetails = new SystemRequestDetails().setRequestPartitionId(thePartitionId);
176                IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(resourceDef.getImplementingClass());
177                return dao.read(thePayloadId.toVersionless(), systemRequestDetails, theDeletedOK);
178        }
179
180        /**
181         * Perform operations on the payload based on various subscription extension settings such as deliver latest version,
182         * delete and/or strip version id.
183         * @param theMsg
184         * @param theSubscription
185         * @return
186         */
187        protected IBaseResource getAndMassagePayload(
188                        ResourceDeliveryMessage theMsg, CanonicalSubscription theSubscription) {
189                IBaseResource payloadResource = theMsg.getPayload(myFhirContext);
190
191                if (payloadResource instanceof IBaseBundle) {
192                        return getAndMassageBundle(theMsg, (IBaseBundle) payloadResource, theSubscription);
193                } else {
194                        return getAndMassageResource(theMsg, payloadResource, theSubscription);
195                }
196        }
197
198        private IBaseResource getAndMassageBundle(
199                        ResourceDeliveryMessage theMsg, IBaseBundle theBundle, CanonicalSubscription theSubscription) {
200                BundleUtil.processEntries(myFhirContext, theBundle, entry -> {
201                        IBaseResource entryResource = entry.getResource();
202                        if (entryResource != null) {
203                                // SubscriptionStatus is a "virtual" resource type that is not stored in the repository
204                                if (!"SubscriptionStatus".equals(myFhirContext.getResourceType(entryResource))) {
205                                        IBaseResource updatedResource = getAndMassageResource(theMsg, entryResource, theSubscription);
206                                        entry.setFullUrl(updatedResource.getIdElement().getValue());
207                                        entry.setResource(updatedResource);
208                                }
209                        }
210                });
211                return theBundle;
212        }
213
214        private IBaseResource getAndMassageResource(
215                        ResourceDeliveryMessage theMsg, IBaseResource thePayloadResource, CanonicalSubscription theSubscription) {
216                if (thePayloadResource == null || theSubscription.getRestHookDetails().isDeliverLatestVersion()) {
217
218                        IIdType payloadId = theMsg.getPayloadId(myFhirContext).toVersionless();
219                        if (theSubscription.isTopicSubscription()) {
220                                payloadId = thePayloadResource.getIdElement().toVersionless();
221                        }
222                        try {
223                                if (payloadId != null) {
224                                        boolean deletedOK =
225                                                        theMsg.getOperationType() == BaseResourceModifiedMessage.OperationTypeEnum.DELETE;
226                                        thePayloadResource = getResource(payloadId, theMsg.getRequestPartitionId(), deletedOK);
227                                } else {
228                                        return null;
229                                }
230                        } catch (ResourceGoneException e) {
231                                ourLog.warn(
232                                                "Resource {} is deleted, not going to deliver for subscription {}",
233                                                payloadId,
234                                                theSubscription.getIdElement(myFhirContext));
235                                return null;
236                        }
237                }
238
239                IIdType resourceId = thePayloadResource.getIdElement();
240                if (theSubscription.getRestHookDetails().isStripVersionId()) {
241                        resourceId = resourceId.toVersionless();
242                        thePayloadResource.setId(resourceId);
243                        thePayloadResource.getMeta().setVersionId(null);
244                }
245                return thePayloadResource;
246        }
247
248        @Override
249        public void handleMessage(ResourceDeliveryMessage theMessage) throws MessagingException {
250                CanonicalSubscription subscription = theMessage.getSubscription();
251
252                // Interceptor call: SUBSCRIPTION_BEFORE_REST_HOOK_DELIVERY
253                HookParams params = new HookParams()
254                                .add(CanonicalSubscription.class, subscription)
255                                .add(ResourceDeliveryMessage.class, theMessage);
256                if (!getInterceptorBroadcaster().callHooks(Pointcut.SUBSCRIPTION_BEFORE_REST_HOOK_DELIVERY, params)) {
257                        return;
258                }
259
260                // Grab the endpoint from the subscription
261                String endpointUrl = subscription.getEndpointUrl();
262
263                // Grab the payload type (encoding mimetype) from the subscription
264                String payloadString = subscription.getPayloadString();
265                EncodingEnum payloadType = null;
266                if (payloadString != null) {
267                        payloadType = EncodingEnum.forContentType(payloadString);
268                }
269
270                // Create the client request
271                myFhirContext.getRestfulClientFactory().setServerValidationMode(ServerValidationModeEnum.NEVER);
272                IGenericClient client = null;
273                if (isNotBlank(endpointUrl)) {
274                        client = myFhirContext.newRestfulGenericClient(endpointUrl);
275
276                        // Additional headers specified in the subscription
277                        List<String> headers = subscription.getHeaders();
278                        for (String next : headers) {
279                                if (isNotBlank(next)) {
280                                        client.registerInterceptor(new SimpleRequestHeaderInterceptor(next));
281                                }
282                        }
283                }
284
285                deliverPayload(theMessage, subscription, payloadType, client);
286
287                // Interceptor call: SUBSCRIPTION_AFTER_REST_HOOK_DELIVERY
288                params = new HookParams()
289                                .add(CanonicalSubscription.class, subscription)
290                                .add(ResourceDeliveryMessage.class, theMessage);
291                if (!getInterceptorBroadcaster().callHooks(Pointcut.SUBSCRIPTION_AFTER_REST_HOOK_DELIVERY, params)) {
292                        //noinspection UnnecessaryReturnStatement
293                        return;
294                }
295        }
296
297        /**
298         * Sends a POST notification without a payload
299         */
300        protected void sendNotification(ResourceDeliveryMessage theMsg) {
301                Map<String, List<String>> params = new HashMap<>();
302                CanonicalSubscription subscription = theMsg.getSubscription();
303                List<Header> headers = parseHeadersFromSubscription(subscription);
304
305                StringBuilder url = new StringBuilder(subscription.getEndpointUrl());
306                IHttpClient client =
307                                myFhirContext.getRestfulClientFactory().getHttpClient(url, params, "", RequestTypeEnum.POST, headers);
308                IHttpRequest request = client.createParamRequest(myFhirContext, params, null);
309                try {
310                        IHttpResponse response = request.execute();
311                        // close connection in order to return a possible cached connection to the connection pool
312                        response.close();
313                } catch (IOException e) {
314                        ourLog.error(
315                                        "Error trying to reach {}: {}", theMsg.getSubscription().getEndpointUrl(), e.toString());
316                        throw new ResourceNotFoundException(Msg.code(5) + e.getMessage());
317                }
318        }
319
320        public static List<Header> parseHeadersFromSubscription(CanonicalSubscription subscription) {
321                List<Header> headers = null;
322                if (subscription != null) {
323                        for (String h : subscription.getHeaders()) {
324                                if (h != null) {
325                                        final int sep = h.indexOf(':');
326                                        if (sep > 0) {
327                                                final String name = h.substring(0, sep);
328                                                final String value = h.substring(sep + 1);
329                                                if (isNotBlank(name)) {
330                                                        if (headers == null) {
331                                                                headers = new ArrayList<>();
332                                                        }
333                                                        headers.add(new Header(name.trim(), value.trim()));
334                                                }
335                                        }
336                                }
337                        }
338                }
339                if (headers == null) {
340                        headers = Collections.emptyList();
341                } else {
342                        headers = Collections.unmodifiableList(headers);
343                }
344                return headers;
345        }
346}