001/*-
002 * #%L
003 * HAPI FHIR Subscription Server
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.subscription.submit.interceptor;
021
022import ca.uhn.fhir.context.FhirContext;
023import ca.uhn.fhir.context.FhirVersionEnum;
024import ca.uhn.fhir.i18n.Msg;
025import ca.uhn.fhir.interceptor.api.Hook;
026import ca.uhn.fhir.interceptor.api.Interceptor;
027import ca.uhn.fhir.interceptor.api.Pointcut;
028import ca.uhn.fhir.interceptor.model.RequestPartitionId;
029import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
030import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
031import ca.uhn.fhir.jpa.model.config.SubscriptionSettings;
032import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
033import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
034import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionMatchingStrategy;
035import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator;
036import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
037import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
038import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
039import ca.uhn.fhir.parser.DataFormatException;
040import ca.uhn.fhir.rest.api.EncodingEnum;
041import ca.uhn.fhir.rest.api.server.IBundleProvider;
042import ca.uhn.fhir.rest.api.server.RequestDetails;
043import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
044import ca.uhn.fhir.rest.param.UriParam;
045import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
046import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
047import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
048import ca.uhn.fhir.subscription.SubscriptionConstants;
049import ca.uhn.fhir.util.HapiExtensions;
050import ca.uhn.fhir.util.SubscriptionUtil;
051import com.google.common.annotations.VisibleForTesting;
052import org.hl7.fhir.instance.model.api.IBaseResource;
053import org.hl7.fhir.r4.model.Extension;
054import org.hl7.fhir.r4.model.StringType;
055import org.hl7.fhir.r4.model.Subscription;
056import org.hl7.fhir.r5.model.SubscriptionTopic;
057import org.springframework.beans.factory.annotation.Autowired;
058
059import java.net.URI;
060import java.net.URISyntaxException;
061import java.util.ArrayList;
062import java.util.List;
063import java.util.Optional;
064
065import static org.apache.commons.lang3.StringUtils.isBlank;
066
067@Interceptor
068public class SubscriptionValidatingInterceptor {
069
070        @Autowired
071        private DaoRegistry myDaoRegistry;
072
073        @Autowired
074        private SubscriptionSettings mySubscriptionSettings;
075
076        @Autowired
077        private SubscriptionStrategyEvaluator mySubscriptionStrategyEvaluator;
078
079        @Autowired
080        private SubscriptionCanonicalizer mySubscriptionCanonicalizer;
081
082        private FhirContext myFhirContext;
083
084        @Autowired
085        private IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
086
087        @Autowired
088        private SubscriptionQueryValidator mySubscriptionQueryValidator;
089
090        @Hook(Pointcut.STORAGE_PRESTORAGE_RESOURCE_CREATED)
091        public void resourcePreCreate(
092                        IBaseResource theResource, RequestDetails theRequestDetails, RequestPartitionId theRequestPartitionId) {
093                validateSubmittedSubscription(
094                                theResource, theRequestDetails, theRequestPartitionId, Pointcut.STORAGE_PRESTORAGE_RESOURCE_CREATED);
095        }
096
097        @Hook(Pointcut.STORAGE_PRESTORAGE_RESOURCE_UPDATED)
098        public void resourceUpdated(
099                        IBaseResource theOldResource,
100                        IBaseResource theResource,
101                        RequestDetails theRequestDetails,
102                        RequestPartitionId theRequestPartitionId) {
103                validateSubmittedSubscription(
104                                theResource, theRequestDetails, theRequestPartitionId, Pointcut.STORAGE_PRESTORAGE_RESOURCE_UPDATED);
105        }
106
107        @Autowired
108        public void setFhirContext(FhirContext theFhirContext) {
109                myFhirContext = theFhirContext;
110        }
111
112        @VisibleForTesting
113        void validateSubmittedSubscription(
114                        IBaseResource theSubscription,
115                        RequestDetails theRequestDetails,
116                        RequestPartitionId theRequestPartitionId,
117                        Pointcut thePointcut) {
118                if (Pointcut.STORAGE_PRESTORAGE_RESOURCE_CREATED != thePointcut
119                                && Pointcut.STORAGE_PRESTORAGE_RESOURCE_UPDATED != thePointcut) {
120                        throw new UnprocessableEntityException(Msg.code(2267)
121                                        + "Expected Pointcut to be either STORAGE_PRESTORAGE_RESOURCE_CREATED or STORAGE_PRESTORAGE_RESOURCE_UPDATED but was: "
122                                        + thePointcut);
123                }
124
125                if (!"Subscription".equals(myFhirContext.getResourceType(theSubscription))) {
126                        return;
127                }
128
129                CanonicalSubscription subscription;
130                try {
131                        subscription = mySubscriptionCanonicalizer.canonicalize(theSubscription);
132                } catch (InternalErrorException e) {
133                        throw new UnprocessableEntityException(Msg.code(955) + e.getMessage());
134                }
135                boolean finished = false;
136                if (subscription.getStatus() == null) {
137                        throw new UnprocessableEntityException(Msg.code(8)
138                                        + "Can not process submitted Subscription - Subscription.status must be populated on this server");
139                }
140
141                switch (subscription.getStatus()) {
142                        case REQUESTED:
143                        case ACTIVE:
144                                break;
145                        case ERROR:
146                        case OFF:
147                        case NULL:
148                                finished = true;
149                                break;
150                }
151
152                validatePermissions(theSubscription, subscription, theRequestDetails, theRequestPartitionId, thePointcut);
153
154                mySubscriptionCanonicalizer.setMatchingStrategyTag(theSubscription, null);
155
156                if (!finished) {
157
158                        if (subscription.getPayloadSearchCriteria() != null) {
159                                validateQuery(
160                                                subscription.getPayloadSearchCriteria(),
161                                                "Subscription.extension(url='" + HapiExtensions.EXT_SUBSCRIPTION_PAYLOAD_SEARCH_CRITERIA
162                                                                + "')");
163                        }
164                        validateCriteria(theSubscription, subscription);
165
166                        validateChannelType(subscription);
167
168                        try {
169                                SubscriptionMatchingStrategy strategy = mySubscriptionStrategyEvaluator.determineStrategy(subscription);
170                                if (!(SubscriptionMatchingStrategy.IN_MEMORY == strategy)
171                                                && mySubscriptionSettings.isOnlyAllowInMemorySubscriptions()) {
172                                        throw new InvalidRequestException(
173                                                        Msg.code(2367)
174                                                                        + "This server is configured to only allow in-memory subscriptions. This subscription's criteria cannot be evaluated in-memory.");
175                                }
176                                mySubscriptionCanonicalizer.setMatchingStrategyTag(theSubscription, strategy);
177                        } catch (InvalidRequestException | DataFormatException e) {
178                                throw new UnprocessableEntityException(Msg.code(9) + "Invalid subscription criteria submitted: "
179                                                + subscription.getCriteriaString() + " " + e.getMessage());
180                        }
181
182                        if (subscription.getChannelType() == null) {
183                                throw new UnprocessableEntityException(
184                                                Msg.code(10) + "Subscription.channel.type must be populated on this server");
185                        } else if (subscription.getChannelType() == CanonicalSubscriptionChannelType.MESSAGE) {
186                                validateMessageSubscriptionEndpoint(subscription.getEndpointUrl());
187                        }
188                }
189        }
190
191        private void validateCriteria(IBaseResource theSubscription, CanonicalSubscription theCanonicalSubscription) {
192                if (theCanonicalSubscription.isTopicSubscription()) {
193                        if (myFhirContext.getVersion().getVersion() == FhirVersionEnum.R4) {
194                                validateR4BackportSubscription((Subscription) theSubscription);
195                        } else {
196                                validateR5PlusTopicSubscription(theCanonicalSubscription);
197                        }
198                } else {
199                        validateQuery(theCanonicalSubscription.getCriteriaString(), "Subscription.criteria");
200                }
201        }
202
203        private void validateR5PlusTopicSubscription(CanonicalSubscription theCanonicalSubscription) {
204                Optional<IBaseResource> oTopic = findSubscriptionTopicByUrl(theCanonicalSubscription.getTopic());
205                if (!oTopic.isPresent()) {
206                        throw new UnprocessableEntityException(
207                                        Msg.code(2322) + "No SubscriptionTopic exists with topic: " + theCanonicalSubscription.getTopic());
208                }
209        }
210
211        private void validateR4BackportSubscription(Subscription theSubscription) {
212                // This is an R4 backport topic subscription
213                // In R4, topic subscriptions exist without a corresponding SubscriptionTopic
214                Subscription r4Subscription = theSubscription;
215                List<String> filterUrls = new ArrayList<>();
216                List<Extension> filterUrlExtensions = r4Subscription
217                                .getCriteriaElement()
218                                .getExtensionsByUrl(SubscriptionConstants.SUBSCRIPTION_TOPIC_FILTER_URL);
219                filterUrlExtensions.forEach(filterUrlExtension -> {
220                        StringType filterUrlElement = (StringType) filterUrlExtension.getValue();
221                        if (filterUrlElement != null) {
222                                filterUrls.add(filterUrlElement.getValue());
223                        }
224                });
225                if (filterUrls.isEmpty()) {
226                        // Trigger a "no criteria" validation exception
227                        validateQuery(
228                                        null,
229                                        "Subscription.criteria.extension with url " + SubscriptionConstants.SUBSCRIPTION_TOPIC_FILTER_URL);
230                } else {
231                        filterUrls.forEach(filterUrl -> validateQuery(
232                                        filterUrl,
233                                        "Subscription.criteria.extension with url " + SubscriptionConstants.SUBSCRIPTION_TOPIC_FILTER_URL));
234                }
235        }
236
237        protected void validatePermissions(
238                        IBaseResource theSubscription,
239                        CanonicalSubscription theCanonicalSubscription,
240                        RequestDetails theRequestDetails,
241                        RequestPartitionId theRequestPartitionId,
242                        Pointcut thePointcut) {
243                // If the subscription has the cross partition tag
244                if (SubscriptionUtil.isCrossPartition(theSubscription)
245                                && !(theRequestDetails instanceof SystemRequestDetails)) {
246                        if (!mySubscriptionSettings.isCrossPartitionSubscriptionEnabled()) {
247                                throw new UnprocessableEntityException(
248                                                Msg.code(2009) + "Cross partition subscription is not enabled on this server");
249                        }
250
251                        if (theRequestPartitionId == null && Pointcut.STORAGE_PRESTORAGE_RESOURCE_UPDATED == thePointcut) {
252                                return;
253                        }
254
255                        // if we have a partition id already, we'll use that
256                        // otherwise we might end up with READ and CREATE pointcuts
257                        // returning conflicting partitions (say, all vs default)
258                        RequestPartitionId toCheckPartitionId = theRequestPartitionId != null
259                                        ? theRequestPartitionId
260                                        : determinePartition(theRequestDetails, theSubscription);
261
262                        if (!toCheckPartitionId.isDefaultPartition()) {
263                                throw new UnprocessableEntityException(
264                                                Msg.code(2010) + "Cross partition subscription must be created on the default partition");
265                        }
266                }
267        }
268
269        private RequestPartitionId determinePartition(RequestDetails theRequestDetails, IBaseResource theResource) {
270                switch (theRequestDetails.getRestOperationType()) {
271                        case CREATE:
272                                return myRequestPartitionHelperSvc.determineCreatePartitionForRequest(
273                                                theRequestDetails, theResource, "Subscription");
274                        case UPDATE:
275                                return myRequestPartitionHelperSvc.determineReadPartitionForRequestForRead(
276                                                theRequestDetails, "Subscription", theResource.getIdElement());
277                        default:
278                                return null;
279                }
280        }
281
282        public void validateQuery(String theQuery, String theFieldName) {
283                mySubscriptionQueryValidator.validateCriteria(theQuery, theFieldName);
284        }
285
286        private Optional<IBaseResource> findSubscriptionTopicByUrl(String theCriteria) {
287                myDaoRegistry.getResourceDao("SubscriptionTopic");
288                SearchParameterMap map = SearchParameterMap.newSynchronous();
289                map.add(SubscriptionTopic.SP_URL, new UriParam(theCriteria));
290                IFhirResourceDao subscriptionTopicDao = myDaoRegistry.getResourceDao("SubscriptionTopic");
291                IBundleProvider search = subscriptionTopicDao.search(map, new SystemRequestDetails());
292                return search.getResources(0, 1).stream().findFirst();
293        }
294
295        public void validateMessageSubscriptionEndpoint(String theEndpointUrl) {
296                if (theEndpointUrl == null) {
297                        throw new UnprocessableEntityException(Msg.code(16) + "No endpoint defined for message subscription");
298                }
299
300                try {
301                        URI uri = new URI(theEndpointUrl);
302
303                        if (!"channel".equals(uri.getScheme())) {
304                                throw new UnprocessableEntityException(Msg.code(17)
305                                                + "Only 'channel' protocol is supported for Subscriptions with channel type 'message'");
306                        }
307                        String channelName = uri.getSchemeSpecificPart();
308                        if (isBlank(channelName)) {
309                                throw new UnprocessableEntityException(
310                                                Msg.code(18) + "A channel name must appear after channel: in a message Subscription endpoint");
311                        }
312                } catch (URISyntaxException e) {
313                        throw new UnprocessableEntityException(
314                                        Msg.code(19) + "Invalid subscription endpoint uri " + theEndpointUrl, e);
315                }
316        }
317
318        @SuppressWarnings("WeakerAccess")
319        protected void validateChannelType(CanonicalSubscription theSubscription) {
320                if (theSubscription.getChannelType() == null) {
321                        throw new UnprocessableEntityException(Msg.code(20) + "Subscription.channel.type must be populated");
322                } else if (theSubscription.getChannelType() == CanonicalSubscriptionChannelType.RESTHOOK) {
323                        validateChannelPayload(theSubscription);
324                        validateChannelEndpoint(theSubscription);
325                }
326        }
327
328        @SuppressWarnings("WeakerAccess")
329        protected void validateChannelEndpoint(CanonicalSubscription theResource) {
330                if (isBlank(theResource.getEndpointUrl())) {
331                        throw new UnprocessableEntityException(
332                                        Msg.code(21) + "Rest-hook subscriptions must have Subscription.channel.endpoint defined");
333                }
334        }
335
336        @SuppressWarnings("WeakerAccess")
337        protected void validateChannelPayload(CanonicalSubscription theResource) {
338                if (!isBlank(theResource.getPayloadString())
339                                && EncodingEnum.forContentType(theResource.getPayloadString()) == null) {
340                        throw new UnprocessableEntityException(Msg.code(1985) + "Invalid value for Subscription.channel.payload: "
341                                        + theResource.getPayloadString());
342                }
343        }
344
345        @SuppressWarnings("WeakerAccess")
346        @VisibleForTesting
347        public void setSubscriptionCanonicalizerForUnitTest(SubscriptionCanonicalizer theSubscriptionCanonicalizer) {
348                mySubscriptionCanonicalizer = theSubscriptionCanonicalizer;
349        }
350
351        @SuppressWarnings("WeakerAccess")
352        @VisibleForTesting
353        public void setDaoRegistryForUnitTest(DaoRegistry theDaoRegistry) {
354                myDaoRegistry = theDaoRegistry;
355        }
356
357        @VisibleForTesting
358        public void setSubscriptionSettingsForUnitTest(SubscriptionSettings theSubscriptionSettings) {
359                mySubscriptionSettings = theSubscriptionSettings;
360        }
361
362        @VisibleForTesting
363        public void setRequestPartitionHelperSvcForUnitTest(IRequestPartitionHelperSvc theRequestPartitionHelperSvc) {
364                myRequestPartitionHelperSvc = theRequestPartitionHelperSvc;
365        }
366
367        @VisibleForTesting
368        @SuppressWarnings("WeakerAccess")
369        public void setSubscriptionStrategyEvaluatorForUnitTest(
370                        SubscriptionStrategyEvaluator theSubscriptionStrategyEvaluator) {
371                mySubscriptionStrategyEvaluator = theSubscriptionStrategyEvaluator;
372                mySubscriptionQueryValidator = new SubscriptionQueryValidator(myDaoRegistry, theSubscriptionStrategyEvaluator);
373        }
374}