001/*- 002 * #%L 003 * HAPI FHIR Subscription Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.subscription.submit.interceptor; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.context.FhirVersionEnum; 024import ca.uhn.fhir.i18n.Msg; 025import ca.uhn.fhir.interceptor.api.Hook; 026import ca.uhn.fhir.interceptor.api.Interceptor; 027import ca.uhn.fhir.interceptor.api.Pointcut; 028import ca.uhn.fhir.interceptor.model.RequestPartitionId; 029import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 030import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 031import ca.uhn.fhir.jpa.model.config.SubscriptionSettings; 032import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc; 033import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 034import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionMatchingStrategy; 035import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator; 036import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer; 037import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; 038import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType; 039import ca.uhn.fhir.jpa.subscription.submit.interceptor.validator.IChannelTypeValidator; 040import ca.uhn.fhir.jpa.subscription.submit.interceptor.validator.SubscriptionChannelTypeValidatorFactory; 041import ca.uhn.fhir.jpa.subscription.submit.interceptor.validator.SubscriptionQueryValidator; 042import ca.uhn.fhir.parser.DataFormatException; 043import ca.uhn.fhir.rest.api.server.IBundleProvider; 044import ca.uhn.fhir.rest.api.server.RequestDetails; 045import ca.uhn.fhir.rest.api.server.SystemRequestDetails; 046import ca.uhn.fhir.rest.param.UriParam; 047import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 048import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; 049import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException; 050import ca.uhn.fhir.subscription.SubscriptionConstants; 051import ca.uhn.fhir.util.HapiExtensions; 052import ca.uhn.fhir.util.SubscriptionUtil; 053import com.google.common.annotations.VisibleForTesting; 054import org.hl7.fhir.instance.model.api.IBaseResource; 055import org.hl7.fhir.r4.model.Extension; 056import org.hl7.fhir.r4.model.StringType; 057import org.hl7.fhir.r4.model.Subscription; 058import org.hl7.fhir.r5.model.SubscriptionTopic; 059import org.springframework.beans.factory.annotation.Autowired; 060 061import java.net.URI; 062import java.net.URISyntaxException; 063import java.util.ArrayList; 064import java.util.List; 065import java.util.Optional; 066 067import static ca.uhn.fhir.subscription.SubscriptionConstants.ORDER_SUBSCRIPTION_VALIDATING; 068import static org.apache.commons.lang3.StringUtils.isBlank; 069 070@Interceptor 071public class SubscriptionValidatingInterceptor { 072 073 @Autowired 074 private DaoRegistry myDaoRegistry; 075 076 @Autowired 077 private SubscriptionSettings mySubscriptionSettings; 078 079 @Autowired 080 private SubscriptionStrategyEvaluator mySubscriptionStrategyEvaluator; 081 082 @Autowired 083 private SubscriptionCanonicalizer mySubscriptionCanonicalizer; 084 085 private FhirContext myFhirContext; 086 087 @Autowired 088 private IRequestPartitionHelperSvc myRequestPartitionHelperSvc; 089 090 @Autowired 091 private SubscriptionQueryValidator mySubscriptionQueryValidator; 092 093 @Autowired 094 private SubscriptionChannelTypeValidatorFactory mySubscriptionChannelTypeValidatorFactory; 095 096 @Hook(value = Pointcut.STORAGE_PRESTORAGE_RESOURCE_CREATED, order = ORDER_SUBSCRIPTION_VALIDATING) 097 public void resourcePreCreate( 098 IBaseResource theResource, RequestDetails theRequestDetails, RequestPartitionId theRequestPartitionId) { 099 validateSubmittedSubscription( 100 theResource, theRequestDetails, theRequestPartitionId, Pointcut.STORAGE_PRESTORAGE_RESOURCE_CREATED); 101 } 102 103 @Hook(value = Pointcut.STORAGE_PRESTORAGE_RESOURCE_UPDATED, order = ORDER_SUBSCRIPTION_VALIDATING) 104 public void resourceUpdated( 105 IBaseResource theOldResource, 106 IBaseResource theResource, 107 RequestDetails theRequestDetails, 108 RequestPartitionId theRequestPartitionId) { 109 validateSubmittedSubscription( 110 theResource, theRequestDetails, theRequestPartitionId, Pointcut.STORAGE_PRESTORAGE_RESOURCE_UPDATED); 111 } 112 113 @Autowired 114 public void setFhirContext(FhirContext theFhirContext) { 115 myFhirContext = theFhirContext; 116 } 117 118 @VisibleForTesting 119 void validateSubmittedSubscription( 120 IBaseResource theSubscription, 121 RequestDetails theRequestDetails, 122 RequestPartitionId theRequestPartitionId, 123 Pointcut thePointcut) { 124 if (Pointcut.STORAGE_PRESTORAGE_RESOURCE_CREATED != thePointcut 125 && Pointcut.STORAGE_PRESTORAGE_RESOURCE_UPDATED != thePointcut) { 126 throw new UnprocessableEntityException(Msg.code(2267) 127 + "Expected Pointcut to be either STORAGE_PRESTORAGE_RESOURCE_CREATED or STORAGE_PRESTORAGE_RESOURCE_UPDATED but was: " 128 + thePointcut); 129 } 130 131 if (!"Subscription".equals(myFhirContext.getResourceType(theSubscription))) { 132 return; 133 } 134 135 CanonicalSubscription subscription; 136 try { 137 subscription = mySubscriptionCanonicalizer.canonicalize(theSubscription); 138 } catch (InternalErrorException e) { 139 throw new UnprocessableEntityException(Msg.code(955) + e.getMessage()); 140 } 141 boolean finished = false; 142 if (subscription.getStatus() == null) { 143 throw new UnprocessableEntityException(Msg.code(8) 144 + "Can not process submitted Subscription - Subscription.status must be populated on this server"); 145 } 146 147 switch (subscription.getStatus()) { 148 case REQUESTED: 149 case ACTIVE: 150 break; 151 case ERROR: 152 case OFF: 153 case NULL: 154 finished = true; 155 break; 156 } 157 158 validatePermissions(theSubscription, theRequestDetails, theRequestPartitionId, thePointcut); 159 160 mySubscriptionCanonicalizer.setMatchingStrategyTag(theSubscription, null); 161 162 if (!finished) { 163 164 if (subscription.getPayloadSearchCriteria() != null) { 165 validateQuery( 166 subscription.getPayloadSearchCriteria(), 167 "Subscription.extension(url='" + HapiExtensions.EXT_SUBSCRIPTION_PAYLOAD_SEARCH_CRITERIA 168 + "')"); 169 } 170 validateCriteria(theSubscription, subscription); 171 172 validateChannelType(subscription); 173 174 try { 175 SubscriptionMatchingStrategy strategy = mySubscriptionStrategyEvaluator.determineStrategy(subscription); 176 if (SubscriptionMatchingStrategy.IN_MEMORY != strategy 177 && mySubscriptionSettings.isOnlyAllowInMemorySubscriptions()) { 178 throw new InvalidRequestException( 179 Msg.code(2367) 180 + "This server is configured to only allow in-memory subscriptions. This subscription's criteria cannot be evaluated in-memory."); 181 } 182 mySubscriptionCanonicalizer.setMatchingStrategyTag(theSubscription, strategy); 183 } catch (InvalidRequestException | DataFormatException e) { 184 throw new UnprocessableEntityException(Msg.code(9) + "Invalid subscription criteria submitted: " 185 + subscription.getCriteriaString() + " " + e.getMessage()); 186 } 187 188 if (subscription.getChannelType() == null) { 189 throw new UnprocessableEntityException( 190 Msg.code(10) + "Subscription.channel.type must be populated on this server"); 191 } else if (subscription.getChannelType() == CanonicalSubscriptionChannelType.MESSAGE) { 192 validateMessageSubscriptionEndpoint(subscription.getEndpointUrl()); 193 } 194 } 195 } 196 197 private void validateCriteria(IBaseResource theSubscription, CanonicalSubscription theCanonicalSubscription) { 198 if (theCanonicalSubscription.isTopicSubscription()) { 199 if (myFhirContext.getVersion().getVersion() == FhirVersionEnum.R4) { 200 validateR4BackportSubscription((Subscription) theSubscription); 201 } else { 202 validateR5PlusTopicSubscription(theCanonicalSubscription); 203 } 204 } else { 205 validateQuery(theCanonicalSubscription.getCriteriaString(), "Subscription.criteria"); 206 } 207 } 208 209 private void validateR5PlusTopicSubscription(CanonicalSubscription theCanonicalSubscription) { 210 Optional<IBaseResource> oTopic = findSubscriptionTopicByUrl(theCanonicalSubscription.getTopic()); 211 if (!oTopic.isPresent()) { 212 throw new UnprocessableEntityException( 213 Msg.code(2322) + "No SubscriptionTopic exists with topic: " + theCanonicalSubscription.getTopic()); 214 } 215 } 216 217 private void validateR4BackportSubscription(Subscription theSubscription) { 218 // This is an R4 backport topic subscription 219 // In R4, topic subscriptions exist without a corresponding SubscriptionTopic 220 Subscription r4Subscription = theSubscription; 221 List<String> filterUrls = new ArrayList<>(); 222 List<Extension> filterUrlExtensions = r4Subscription 223 .getCriteriaElement() 224 .getExtensionsByUrl(SubscriptionConstants.SUBSCRIPTION_TOPIC_FILTER_URL); 225 filterUrlExtensions.forEach(filterUrlExtension -> { 226 StringType filterUrlElement = (StringType) filterUrlExtension.getValue(); 227 if (filterUrlElement != null) { 228 filterUrls.add(filterUrlElement.getValue()); 229 } 230 }); 231 if (filterUrls.isEmpty()) { 232 // Trigger a "no criteria" validation exception 233 validateQuery( 234 null, 235 "Subscription.criteria.extension with url " + SubscriptionConstants.SUBSCRIPTION_TOPIC_FILTER_URL); 236 } else { 237 filterUrls.forEach(filterUrl -> validateQuery( 238 filterUrl, 239 "Subscription.criteria.extension with url " + SubscriptionConstants.SUBSCRIPTION_TOPIC_FILTER_URL)); 240 } 241 } 242 243 protected void validatePermissions( 244 IBaseResource theSubscription, 245 RequestDetails theRequestDetails, 246 RequestPartitionId theRequestPartitionId, 247 Pointcut thePointcut) { 248 // If the subscription has the cross partition tag 249 if (SubscriptionUtil.isDefinedAsCrossPartitionSubcription(theSubscription) 250 && !(theRequestDetails instanceof SystemRequestDetails)) { 251 if (!mySubscriptionSettings.isCrossPartitionSubscriptionEnabled()) { 252 throw new UnprocessableEntityException( 253 Msg.code(2009) + "Cross partition subscription is not enabled on this server"); 254 } 255 256 if (theRequestPartitionId == null && Pointcut.STORAGE_PRESTORAGE_RESOURCE_UPDATED == thePointcut) { 257 return; 258 } 259 260 // if we have a partition id already, we'll use that 261 // otherwise we might end up with READ and CREATE pointcuts 262 // returning conflicting partitions (say, all vs default) 263 RequestPartitionId toCheckPartitionId = theRequestPartitionId != null 264 ? theRequestPartitionId 265 : determinePartition(theRequestDetails, theSubscription); 266 267 if (!toCheckPartitionId.isDefaultPartition()) { 268 throw new UnprocessableEntityException( 269 Msg.code(2010) + "Cross partition subscription must be created on the default partition"); 270 } 271 } 272 } 273 274 private RequestPartitionId determinePartition(RequestDetails theRequestDetails, IBaseResource theResource) { 275 switch (theRequestDetails.getRestOperationType()) { 276 case CREATE: 277 return myRequestPartitionHelperSvc.determineCreatePartitionForRequest( 278 theRequestDetails, theResource, "Subscription"); 279 case UPDATE: 280 return myRequestPartitionHelperSvc.determineReadPartitionForRequestForRead( 281 theRequestDetails, "Subscription", theResource.getIdElement()); 282 default: 283 return null; 284 } 285 } 286 287 public void validateQuery(String theQuery, String theFieldName) { 288 mySubscriptionQueryValidator.validateCriteria(theQuery, theFieldName); 289 } 290 291 private Optional<IBaseResource> findSubscriptionTopicByUrl(String theCriteria) { 292 myDaoRegistry.getResourceDao("SubscriptionTopic"); 293 SearchParameterMap map = SearchParameterMap.newSynchronous(); 294 map.add(SubscriptionTopic.SP_URL, new UriParam(theCriteria)); 295 IFhirResourceDao subscriptionTopicDao = myDaoRegistry.getResourceDao("SubscriptionTopic"); 296 IBundleProvider search = subscriptionTopicDao.search(map, new SystemRequestDetails()); 297 return search.getResources(0, 1).stream().findFirst(); 298 } 299 300 public void validateMessageSubscriptionEndpoint(String theEndpointUrl) { 301 if (theEndpointUrl == null) { 302 throw new UnprocessableEntityException(Msg.code(16) + "No endpoint defined for message subscription"); 303 } 304 305 try { 306 URI uri = new URI(theEndpointUrl); 307 308 if (!"channel".equals(uri.getScheme())) { 309 throw new UnprocessableEntityException(Msg.code(17) 310 + "Only 'channel' protocol is supported for Subscriptions with channel type 'message'"); 311 } 312 String channelName = uri.getSchemeSpecificPart(); 313 if (isBlank(channelName)) { 314 throw new UnprocessableEntityException( 315 Msg.code(18) + "A channel name must appear after channel: in a message Subscription endpoint"); 316 } 317 } catch (URISyntaxException e) { 318 throw new UnprocessableEntityException( 319 Msg.code(19) + "Invalid subscription endpoint uri " + theEndpointUrl, e); 320 } 321 } 322 323 @SuppressWarnings("WeakerAccess") 324 protected void validateChannelType(CanonicalSubscription theSubscription) { 325 if (theSubscription.getChannelType() == null) { 326 throw new UnprocessableEntityException(Msg.code(20) + "Subscription.channel.type must be populated"); 327 } 328 329 IChannelTypeValidator iChannelTypeValidator = 330 mySubscriptionChannelTypeValidatorFactory.getValidatorForChannelType(theSubscription.getChannelType()); 331 iChannelTypeValidator.validateChannelType(theSubscription); 332 } 333 334 @SuppressWarnings("WeakerAccess") 335 @VisibleForTesting 336 public void setSubscriptionCanonicalizerForUnitTest(SubscriptionCanonicalizer theSubscriptionCanonicalizer) { 337 mySubscriptionCanonicalizer = theSubscriptionCanonicalizer; 338 } 339 340 @SuppressWarnings("WeakerAccess") 341 @VisibleForTesting 342 public void setDaoRegistryForUnitTest(DaoRegistry theDaoRegistry) { 343 myDaoRegistry = theDaoRegistry; 344 } 345 346 @VisibleForTesting 347 public void setSubscriptionSettingsForUnitTest(SubscriptionSettings theSubscriptionSettings) { 348 mySubscriptionSettings = theSubscriptionSettings; 349 } 350 351 @VisibleForTesting 352 public void setRequestPartitionHelperSvcForUnitTest(IRequestPartitionHelperSvc theRequestPartitionHelperSvc) { 353 myRequestPartitionHelperSvc = theRequestPartitionHelperSvc; 354 } 355 356 @VisibleForTesting 357 @SuppressWarnings("WeakerAccess") 358 public void setSubscriptionStrategyEvaluatorForUnitTest( 359 SubscriptionStrategyEvaluator theSubscriptionStrategyEvaluator) { 360 mySubscriptionStrategyEvaluator = theSubscriptionStrategyEvaluator; 361 mySubscriptionQueryValidator = new SubscriptionQueryValidator(myDaoRegistry, theSubscriptionStrategyEvaluator); 362 } 363 364 @VisibleForTesting 365 public void setSubscriptionChannelTypeValidatorFactoryForUnitTest( 366 SubscriptionChannelTypeValidatorFactory theSubscriptionChannelTypeValidatorFactory) { 367 mySubscriptionChannelTypeValidatorFactory = theSubscriptionChannelTypeValidatorFactory; 368 } 369}