![](/hapi-fhir/images/logos/raccoon-forwards.png)
001/*- 002 * #%L 003 * HAPI FHIR Subscription Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.subscription.submit.interceptor; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.context.FhirVersionEnum; 024import ca.uhn.fhir.i18n.Msg; 025import ca.uhn.fhir.interceptor.api.Hook; 026import ca.uhn.fhir.interceptor.api.Interceptor; 027import ca.uhn.fhir.interceptor.api.Pointcut; 028import ca.uhn.fhir.interceptor.model.RequestPartitionId; 029import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 030import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 031import ca.uhn.fhir.jpa.model.config.SubscriptionSettings; 032import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc; 033import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 034import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionMatchingStrategy; 035import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator; 036import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer; 037import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; 038import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType; 039import ca.uhn.fhir.parser.DataFormatException; 040import ca.uhn.fhir.rest.api.EncodingEnum; 041import ca.uhn.fhir.rest.api.server.IBundleProvider; 042import ca.uhn.fhir.rest.api.server.RequestDetails; 043import ca.uhn.fhir.rest.api.server.SystemRequestDetails; 044import ca.uhn.fhir.rest.param.UriParam; 045import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 046import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; 047import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException; 048import ca.uhn.fhir.subscription.SubscriptionConstants; 049import ca.uhn.fhir.util.HapiExtensions; 050import ca.uhn.fhir.util.SubscriptionUtil; 051import com.google.common.annotations.VisibleForTesting; 052import org.hl7.fhir.instance.model.api.IBaseResource; 053import org.hl7.fhir.r4.model.Extension; 054import org.hl7.fhir.r4.model.StringType; 055import org.hl7.fhir.r4.model.Subscription; 056import org.hl7.fhir.r5.model.SubscriptionTopic; 057import org.springframework.beans.factory.annotation.Autowired; 058 059import java.net.URI; 060import java.net.URISyntaxException; 061import java.util.ArrayList; 062import java.util.List; 063import java.util.Optional; 064 065import static org.apache.commons.lang3.StringUtils.isBlank; 066 067@Interceptor 068public class SubscriptionValidatingInterceptor { 069 070 @Autowired 071 private DaoRegistry myDaoRegistry; 072 073 @Autowired 074 private SubscriptionSettings mySubscriptionSettings; 075 076 @Autowired 077 private SubscriptionStrategyEvaluator mySubscriptionStrategyEvaluator; 078 079 @Autowired 080 private SubscriptionCanonicalizer mySubscriptionCanonicalizer; 081 082 private FhirContext myFhirContext; 083 084 @Autowired 085 private IRequestPartitionHelperSvc myRequestPartitionHelperSvc; 086 087 @Autowired 088 private SubscriptionQueryValidator mySubscriptionQueryValidator; 089 090 @Hook(Pointcut.STORAGE_PRESTORAGE_RESOURCE_CREATED) 091 public void resourcePreCreate( 092 IBaseResource theResource, RequestDetails theRequestDetails, RequestPartitionId theRequestPartitionId) { 093 validateSubmittedSubscription( 094 theResource, theRequestDetails, theRequestPartitionId, Pointcut.STORAGE_PRESTORAGE_RESOURCE_CREATED); 095 } 096 097 @Hook(Pointcut.STORAGE_PRESTORAGE_RESOURCE_UPDATED) 098 public void resourceUpdated( 099 IBaseResource theOldResource, 100 IBaseResource theResource, 101 RequestDetails theRequestDetails, 102 RequestPartitionId theRequestPartitionId) { 103 validateSubmittedSubscription( 104 theResource, theRequestDetails, theRequestPartitionId, Pointcut.STORAGE_PRESTORAGE_RESOURCE_UPDATED); 105 } 106 107 @Autowired 108 public void setFhirContext(FhirContext theFhirContext) { 109 myFhirContext = theFhirContext; 110 } 111 112 @VisibleForTesting 113 void validateSubmittedSubscription( 114 IBaseResource theSubscription, 115 RequestDetails theRequestDetails, 116 RequestPartitionId theRequestPartitionId, 117 Pointcut thePointcut) { 118 if (Pointcut.STORAGE_PRESTORAGE_RESOURCE_CREATED != thePointcut 119 && Pointcut.STORAGE_PRESTORAGE_RESOURCE_UPDATED != thePointcut) { 120 throw new UnprocessableEntityException(Msg.code(2267) 121 + "Expected Pointcut to be either STORAGE_PRESTORAGE_RESOURCE_CREATED or STORAGE_PRESTORAGE_RESOURCE_UPDATED but was: " 122 + thePointcut); 123 } 124 125 if (!"Subscription".equals(myFhirContext.getResourceType(theSubscription))) { 126 return; 127 } 128 129 CanonicalSubscription subscription; 130 try { 131 subscription = mySubscriptionCanonicalizer.canonicalize(theSubscription); 132 } catch (InternalErrorException e) { 133 throw new UnprocessableEntityException(Msg.code(955) + e.getMessage()); 134 } 135 boolean finished = false; 136 if (subscription.getStatus() == null) { 137 throw new UnprocessableEntityException(Msg.code(8) 138 + "Can not process submitted Subscription - Subscription.status must be populated on this server"); 139 } 140 141 switch (subscription.getStatus()) { 142 case REQUESTED: 143 case ACTIVE: 144 break; 145 case ERROR: 146 case OFF: 147 case NULL: 148 finished = true; 149 break; 150 } 151 152 validatePermissions(theSubscription, subscription, theRequestDetails, theRequestPartitionId, thePointcut); 153 154 mySubscriptionCanonicalizer.setMatchingStrategyTag(theSubscription, null); 155 156 if (!finished) { 157 158 if (subscription.getPayloadSearchCriteria() != null) { 159 validateQuery( 160 subscription.getPayloadSearchCriteria(), 161 "Subscription.extension(url='" + HapiExtensions.EXT_SUBSCRIPTION_PAYLOAD_SEARCH_CRITERIA 162 + "')"); 163 } 164 validateCriteria(theSubscription, subscription); 165 166 validateChannelType(subscription); 167 168 try { 169 SubscriptionMatchingStrategy strategy = mySubscriptionStrategyEvaluator.determineStrategy(subscription); 170 if (!(SubscriptionMatchingStrategy.IN_MEMORY == strategy) 171 && mySubscriptionSettings.isOnlyAllowInMemorySubscriptions()) { 172 throw new InvalidRequestException( 173 Msg.code(2367) 174 + "This server is configured to only allow in-memory subscriptions. This subscription's criteria cannot be evaluated in-memory."); 175 } 176 mySubscriptionCanonicalizer.setMatchingStrategyTag(theSubscription, strategy); 177 } catch (InvalidRequestException | DataFormatException e) { 178 throw new UnprocessableEntityException(Msg.code(9) + "Invalid subscription criteria submitted: " 179 + subscription.getCriteriaString() + " " + e.getMessage()); 180 } 181 182 if (subscription.getChannelType() == null) { 183 throw new UnprocessableEntityException( 184 Msg.code(10) + "Subscription.channel.type must be populated on this server"); 185 } else if (subscription.getChannelType() == CanonicalSubscriptionChannelType.MESSAGE) { 186 validateMessageSubscriptionEndpoint(subscription.getEndpointUrl()); 187 } 188 } 189 } 190 191 private void validateCriteria(IBaseResource theSubscription, CanonicalSubscription theCanonicalSubscription) { 192 if (theCanonicalSubscription.isTopicSubscription()) { 193 if (myFhirContext.getVersion().getVersion() == FhirVersionEnum.R4) { 194 validateR4BackportSubscription((Subscription) theSubscription); 195 } else { 196 validateR5PlusTopicSubscription(theCanonicalSubscription); 197 } 198 } else { 199 validateQuery(theCanonicalSubscription.getCriteriaString(), "Subscription.criteria"); 200 } 201 } 202 203 private void validateR5PlusTopicSubscription(CanonicalSubscription theCanonicalSubscription) { 204 Optional<IBaseResource> oTopic = findSubscriptionTopicByUrl(theCanonicalSubscription.getTopic()); 205 if (!oTopic.isPresent()) { 206 throw new UnprocessableEntityException( 207 Msg.code(2322) + "No SubscriptionTopic exists with topic: " + theCanonicalSubscription.getTopic()); 208 } 209 } 210 211 private void validateR4BackportSubscription(Subscription theSubscription) { 212 // This is an R4 backport topic subscription 213 // In R4, topic subscriptions exist without a corresponding SubscriptionTopic 214 Subscription r4Subscription = theSubscription; 215 List<String> filterUrls = new ArrayList<>(); 216 List<Extension> filterUrlExtensions = r4Subscription 217 .getCriteriaElement() 218 .getExtensionsByUrl(SubscriptionConstants.SUBSCRIPTION_TOPIC_FILTER_URL); 219 filterUrlExtensions.forEach(filterUrlExtension -> { 220 StringType filterUrlElement = (StringType) filterUrlExtension.getValue(); 221 if (filterUrlElement != null) { 222 filterUrls.add(filterUrlElement.getValue()); 223 } 224 }); 225 if (filterUrls.isEmpty()) { 226 // Trigger a "no criteria" validation exception 227 validateQuery( 228 null, 229 "Subscription.criteria.extension with url " + SubscriptionConstants.SUBSCRIPTION_TOPIC_FILTER_URL); 230 } else { 231 filterUrls.forEach(filterUrl -> validateQuery( 232 filterUrl, 233 "Subscription.criteria.extension with url " + SubscriptionConstants.SUBSCRIPTION_TOPIC_FILTER_URL)); 234 } 235 } 236 237 protected void validatePermissions( 238 IBaseResource theSubscription, 239 CanonicalSubscription theCanonicalSubscription, 240 RequestDetails theRequestDetails, 241 RequestPartitionId theRequestPartitionId, 242 Pointcut thePointcut) { 243 // If the subscription has the cross partition tag 244 if (SubscriptionUtil.isCrossPartition(theSubscription) 245 && !(theRequestDetails instanceof SystemRequestDetails)) { 246 if (!mySubscriptionSettings.isCrossPartitionSubscriptionEnabled()) { 247 throw new UnprocessableEntityException( 248 Msg.code(2009) + "Cross partition subscription is not enabled on this server"); 249 } 250 251 if (theRequestPartitionId == null && Pointcut.STORAGE_PRESTORAGE_RESOURCE_UPDATED == thePointcut) { 252 return; 253 } 254 255 // if we have a partition id already, we'll use that 256 // otherwise we might end up with READ and CREATE pointcuts 257 // returning conflicting partitions (say, all vs default) 258 RequestPartitionId toCheckPartitionId = theRequestPartitionId != null 259 ? theRequestPartitionId 260 : determinePartition(theRequestDetails, theSubscription); 261 262 if (!toCheckPartitionId.isDefaultPartition()) { 263 throw new UnprocessableEntityException( 264 Msg.code(2010) + "Cross partition subscription must be created on the default partition"); 265 } 266 } 267 } 268 269 private RequestPartitionId determinePartition(RequestDetails theRequestDetails, IBaseResource theResource) { 270 switch (theRequestDetails.getRestOperationType()) { 271 case CREATE: 272 return myRequestPartitionHelperSvc.determineCreatePartitionForRequest( 273 theRequestDetails, theResource, "Subscription"); 274 case UPDATE: 275 return myRequestPartitionHelperSvc.determineReadPartitionForRequestForRead( 276 theRequestDetails, "Subscription", theResource.getIdElement()); 277 default: 278 return null; 279 } 280 } 281 282 public void validateQuery(String theQuery, String theFieldName) { 283 mySubscriptionQueryValidator.validateCriteria(theQuery, theFieldName); 284 } 285 286 private Optional<IBaseResource> findSubscriptionTopicByUrl(String theCriteria) { 287 myDaoRegistry.getResourceDao("SubscriptionTopic"); 288 SearchParameterMap map = SearchParameterMap.newSynchronous(); 289 map.add(SubscriptionTopic.SP_URL, new UriParam(theCriteria)); 290 IFhirResourceDao subscriptionTopicDao = myDaoRegistry.getResourceDao("SubscriptionTopic"); 291 IBundleProvider search = subscriptionTopicDao.search(map, new SystemRequestDetails()); 292 return search.getResources(0, 1).stream().findFirst(); 293 } 294 295 public void validateMessageSubscriptionEndpoint(String theEndpointUrl) { 296 if (theEndpointUrl == null) { 297 throw new UnprocessableEntityException(Msg.code(16) + "No endpoint defined for message subscription"); 298 } 299 300 try { 301 URI uri = new URI(theEndpointUrl); 302 303 if (!"channel".equals(uri.getScheme())) { 304 throw new UnprocessableEntityException(Msg.code(17) 305 + "Only 'channel' protocol is supported for Subscriptions with channel type 'message'"); 306 } 307 String channelName = uri.getSchemeSpecificPart(); 308 if (isBlank(channelName)) { 309 throw new UnprocessableEntityException( 310 Msg.code(18) + "A channel name must appear after channel: in a message Subscription endpoint"); 311 } 312 } catch (URISyntaxException e) { 313 throw new UnprocessableEntityException( 314 Msg.code(19) + "Invalid subscription endpoint uri " + theEndpointUrl, e); 315 } 316 } 317 318 @SuppressWarnings("WeakerAccess") 319 protected void validateChannelType(CanonicalSubscription theSubscription) { 320 if (theSubscription.getChannelType() == null) { 321 throw new UnprocessableEntityException(Msg.code(20) + "Subscription.channel.type must be populated"); 322 } else if (theSubscription.getChannelType() == CanonicalSubscriptionChannelType.RESTHOOK) { 323 validateChannelPayload(theSubscription); 324 validateChannelEndpoint(theSubscription); 325 } 326 } 327 328 @SuppressWarnings("WeakerAccess") 329 protected void validateChannelEndpoint(CanonicalSubscription theResource) { 330 if (isBlank(theResource.getEndpointUrl())) { 331 throw new UnprocessableEntityException( 332 Msg.code(21) + "Rest-hook subscriptions must have Subscription.channel.endpoint defined"); 333 } 334 } 335 336 @SuppressWarnings("WeakerAccess") 337 protected void validateChannelPayload(CanonicalSubscription theResource) { 338 if (!isBlank(theResource.getPayloadString()) 339 && EncodingEnum.forContentType(theResource.getPayloadString()) == null) { 340 throw new UnprocessableEntityException(Msg.code(1985) + "Invalid value for Subscription.channel.payload: " 341 + theResource.getPayloadString()); 342 } 343 } 344 345 @SuppressWarnings("WeakerAccess") 346 @VisibleForTesting 347 public void setSubscriptionCanonicalizerForUnitTest(SubscriptionCanonicalizer theSubscriptionCanonicalizer) { 348 mySubscriptionCanonicalizer = theSubscriptionCanonicalizer; 349 } 350 351 @SuppressWarnings("WeakerAccess") 352 @VisibleForTesting 353 public void setDaoRegistryForUnitTest(DaoRegistry theDaoRegistry) { 354 myDaoRegistry = theDaoRegistry; 355 } 356 357 @VisibleForTesting 358 public void setSubscriptionSettingsForUnitTest(SubscriptionSettings theSubscriptionSettings) { 359 mySubscriptionSettings = theSubscriptionSettings; 360 } 361 362 @VisibleForTesting 363 public void setRequestPartitionHelperSvcForUnitTest(IRequestPartitionHelperSvc theRequestPartitionHelperSvc) { 364 myRequestPartitionHelperSvc = theRequestPartitionHelperSvc; 365 } 366 367 @VisibleForTesting 368 @SuppressWarnings("WeakerAccess") 369 public void setSubscriptionStrategyEvaluatorForUnitTest( 370 SubscriptionStrategyEvaluator theSubscriptionStrategyEvaluator) { 371 mySubscriptionStrategyEvaluator = theSubscriptionStrategyEvaluator; 372 mySubscriptionQueryValidator = new SubscriptionQueryValidator(myDaoRegistry, theSubscriptionStrategyEvaluator); 373 } 374}