
001/*- 002 * #%L 003 * HAPI FHIR Subscription Server 004 * %% 005 * Copyright (C) 2014 - 2023 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.subscription.submit.interceptor; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.context.FhirVersionEnum; 024import ca.uhn.fhir.i18n.Msg; 025import ca.uhn.fhir.interceptor.api.Hook; 026import ca.uhn.fhir.interceptor.api.Interceptor; 027import ca.uhn.fhir.interceptor.api.Pointcut; 028import ca.uhn.fhir.interceptor.model.RequestPartitionId; 029import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; 030import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 031import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 032import ca.uhn.fhir.jpa.model.entity.StorageSettings; 033import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc; 034import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 035import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionMatchingStrategy; 036import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator; 037import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer; 038import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; 039import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType; 040import ca.uhn.fhir.parser.DataFormatException; 041import ca.uhn.fhir.rest.api.EncodingEnum; 042import ca.uhn.fhir.rest.api.server.IBundleProvider; 043import ca.uhn.fhir.rest.api.server.RequestDetails; 044import ca.uhn.fhir.rest.api.server.SystemRequestDetails; 045import ca.uhn.fhir.rest.param.UriParam; 046import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 047import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; 048import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException; 049import ca.uhn.fhir.util.HapiExtensions; 050import ca.uhn.fhir.util.SubscriptionUtil; 051import com.google.common.annotations.VisibleForTesting; 052import org.hl7.fhir.instance.model.api.IBaseResource; 053import org.hl7.fhir.r5.model.SubscriptionTopic; 054import org.springframework.beans.factory.annotation.Autowired; 055 056import java.net.URI; 057import java.net.URISyntaxException; 058import java.util.Optional; 059 060import static org.apache.commons.lang3.StringUtils.isBlank; 061@Interceptor 062public class SubscriptionValidatingInterceptor { 063 064 @Autowired 065 private SubscriptionCanonicalizer mySubscriptionCanonicalizer; 066 @Autowired 067 private DaoRegistry myDaoRegistry; 068 @Autowired 069 private StorageSettings myStorageSettings; 070 @Autowired 071 private SubscriptionStrategyEvaluator mySubscriptionStrategyEvaluator; 072 073 private FhirContext myFhirContext; 074 @Autowired 075 private IRequestPartitionHelperSvc myRequestPartitionHelperSvc; 076 @Autowired 077 private SubscriptionQueryValidator mySubscriptionQueryValidator; 078 079 @Hook(Pointcut.STORAGE_PRESTORAGE_RESOURCE_CREATED) 080 public void resourcePreCreate(IBaseResource theResource, RequestDetails theRequestDetails, RequestPartitionId theRequestPartitionId) { 081 validateSubmittedSubscription(theResource, theRequestDetails, theRequestPartitionId, Pointcut.STORAGE_PRESTORAGE_RESOURCE_CREATED); 082 } 083 084 @Hook(Pointcut.STORAGE_PRESTORAGE_RESOURCE_UPDATED) 085 public void resourceUpdated(IBaseResource theOldResource, IBaseResource theResource, RequestDetails theRequestDetails, RequestPartitionId theRequestPartitionId) { 086 validateSubmittedSubscription(theResource, theRequestDetails, theRequestPartitionId, Pointcut.STORAGE_PRESTORAGE_RESOURCE_UPDATED); 087 } 088 089 @Autowired 090 public void setFhirContext(FhirContext theFhirContext) { 091 myFhirContext = theFhirContext; 092 } 093 094 @VisibleForTesting 095 void validateSubmittedSubscription(IBaseResource theSubscription, 096 RequestDetails theRequestDetails, 097 RequestPartitionId theRequestPartitionId, 098 Pointcut thePointcut) { 099 if (Pointcut.STORAGE_PRESTORAGE_RESOURCE_CREATED != thePointcut && Pointcut.STORAGE_PRESTORAGE_RESOURCE_UPDATED != thePointcut) { 100 throw new UnprocessableEntityException(Msg.code(2267) + "Expected Pointcut to be either STORAGE_PRESTORAGE_RESOURCE_CREATED or STORAGE_PRESTORAGE_RESOURCE_UPDATED but was: " + thePointcut); 101 } 102 103 if (!"Subscription".equals(myFhirContext.getResourceType(theSubscription))) { 104 return; 105 } 106 107 CanonicalSubscription subscription; 108 try { 109 subscription = mySubscriptionCanonicalizer.canonicalize(theSubscription); 110 } catch (InternalErrorException e) { 111 throw new UnprocessableEntityException(Msg.code(955) + e.getMessage()); 112 } 113 boolean finished = false; 114 if (subscription.getStatus() == null) { 115 throw new UnprocessableEntityException(Msg.code(8) + "Can not process submitted Subscription - Subscription.status must be populated on this server"); 116 } 117 118 switch (subscription.getStatus()) { 119 case REQUESTED: 120 case ACTIVE: 121 break; 122 case ERROR: 123 case OFF: 124 case NULL: 125 finished = true; 126 break; 127 } 128 129 validatePermissions(theSubscription, subscription, theRequestDetails, theRequestPartitionId, thePointcut); 130 131 mySubscriptionCanonicalizer.setMatchingStrategyTag(theSubscription, null); 132 133 if (!finished) { 134 135 if (subscription.isTopicSubscription()) { 136 if (myFhirContext.getVersion().getVersion() != FhirVersionEnum.R4) { // In R4 topic subscriptions exist without a corresponidng SubscriptionTopic resource 137 Optional<IBaseResource> oTopic = findSubscriptionTopicByUrl(subscription.getTopic()); 138 if (!oTopic.isPresent()) { 139 throw new UnprocessableEntityException(Msg.code(2322) + "No SubscriptionTopic exists with topic: " + subscription.getTopic()); 140 } 141 } 142 } else { 143 validateQuery(subscription.getCriteriaString(), "Subscription.criteria"); 144 145 if (subscription.getPayloadSearchCriteria() != null) { 146 validateQuery(subscription.getPayloadSearchCriteria(), "Subscription.extension(url='" + HapiExtensions.EXT_SUBSCRIPTION_PAYLOAD_SEARCH_CRITERIA + "')"); 147 } 148 } 149 150 validateChannelType(subscription); 151 152 try { 153 SubscriptionMatchingStrategy strategy = mySubscriptionStrategyEvaluator.determineStrategy(subscription); 154 mySubscriptionCanonicalizer.setMatchingStrategyTag(theSubscription, strategy); 155 } catch (InvalidRequestException | DataFormatException e) { 156 throw new UnprocessableEntityException(Msg.code(9) + "Invalid subscription criteria submitted: " + subscription.getCriteriaString() + " " + e.getMessage()); 157 } 158 159 if (subscription.getChannelType() == null) { 160 throw new UnprocessableEntityException(Msg.code(10) + "Subscription.channel.type must be populated on this server"); 161 } else if (subscription.getChannelType() == CanonicalSubscriptionChannelType.MESSAGE) { 162 validateMessageSubscriptionEndpoint(subscription.getEndpointUrl()); 163 } 164 165 166 } 167 } 168 169 protected void validatePermissions(IBaseResource theSubscription, 170 CanonicalSubscription theCanonicalSubscription, 171 RequestDetails theRequestDetails, 172 RequestPartitionId theRequestPartitionId, 173 Pointcut thePointcut) { 174 // If the subscription has the cross partition tag 175 if (SubscriptionUtil.isCrossPartition(theSubscription) && !(theRequestDetails instanceof SystemRequestDetails)) { 176 if (!myStorageSettings.isCrossPartitionSubscriptionEnabled()){ 177 throw new UnprocessableEntityException(Msg.code(2009) + "Cross partition subscription is not enabled on this server"); 178 } 179 180 if (theRequestPartitionId == null && Pointcut.STORAGE_PRESTORAGE_RESOURCE_UPDATED == thePointcut) { 181 return; 182 } 183 184 // if we have a partition id already, we'll use that 185 // otherwise we might end up with READ and CREATE pointcuts 186 // returning conflicting partitions (say, all vs default) 187 RequestPartitionId toCheckPartitionId = theRequestPartitionId != null ? 188 theRequestPartitionId : 189 determinePartition(theRequestDetails, theSubscription); 190 191 if (!toCheckPartitionId.isDefaultPartition()) { 192 throw new UnprocessableEntityException(Msg.code(2010) + "Cross partition subscription must be created on the default partition"); 193 } 194 } 195 } 196 197 private RequestPartitionId determinePartition(RequestDetails theRequestDetails, IBaseResource theResource) { 198 switch (theRequestDetails.getRestOperationType()) { 199 case CREATE: 200 return myRequestPartitionHelperSvc.determineCreatePartitionForRequest(theRequestDetails, theResource, "Subscription"); 201 case UPDATE: 202 return myRequestPartitionHelperSvc.determineReadPartitionForRequestForRead(theRequestDetails, "Subscription", theResource.getIdElement()); 203 default: 204 return null; 205 } 206 } 207 208 public void validateQuery(String theQuery, String theFieldName) { 209 mySubscriptionQueryValidator.validateCriteria(theQuery, theFieldName); 210 } 211 212 private Optional<IBaseResource> findSubscriptionTopicByUrl(String theCriteria) { 213 myDaoRegistry.getResourceDao("SubscriptionTopic"); 214 SearchParameterMap map = SearchParameterMap.newSynchronous(); 215 map.add(SubscriptionTopic.SP_URL, new UriParam(theCriteria)); 216 IFhirResourceDao subscriptionTopicDao = myDaoRegistry.getResourceDao("SubscriptionTopic"); 217 IBundleProvider search = subscriptionTopicDao.search(map, new SystemRequestDetails()); 218 return search.getResources(0, 1).stream().findFirst(); 219 } 220 221 public void validateMessageSubscriptionEndpoint(String theEndpointUrl) { 222 if (theEndpointUrl == null) { 223 throw new UnprocessableEntityException(Msg.code(16) + "No endpoint defined for message subscription"); 224 } 225 226 try { 227 URI uri = new URI(theEndpointUrl); 228 229 if (!"channel".equals(uri.getScheme())) { 230 throw new UnprocessableEntityException(Msg.code(17) + "Only 'channel' protocol is supported for Subscriptions with channel type 'message'"); 231 } 232 String channelName = uri.getSchemeSpecificPart(); 233 if (isBlank(channelName)) { 234 throw new UnprocessableEntityException(Msg.code(18) + "A channel name must appear after channel: in a message Subscription endpoint"); 235 } 236 } catch (URISyntaxException e) { 237 throw new UnprocessableEntityException(Msg.code(19) + "Invalid subscription endpoint uri " + theEndpointUrl, e); 238 } 239 } 240 241 @SuppressWarnings("WeakerAccess") 242 protected void validateChannelType(CanonicalSubscription theSubscription) { 243 if (theSubscription.getChannelType() == null) { 244 throw new UnprocessableEntityException(Msg.code(20) + "Subscription.channel.type must be populated"); 245 } else if (theSubscription.getChannelType() == CanonicalSubscriptionChannelType.RESTHOOK) { 246 validateChannelPayload(theSubscription); 247 validateChannelEndpoint(theSubscription); 248 } 249 } 250 251 @SuppressWarnings("WeakerAccess") 252 protected void validateChannelEndpoint(CanonicalSubscription theResource) { 253 if (isBlank(theResource.getEndpointUrl())) { 254 throw new UnprocessableEntityException(Msg.code(21) + "Rest-hook subscriptions must have Subscription.channel.endpoint defined"); 255 } 256 } 257 258 @SuppressWarnings("WeakerAccess") 259 protected void validateChannelPayload(CanonicalSubscription theResource) { 260 if (!isBlank(theResource.getPayloadString()) && EncodingEnum.forContentType(theResource.getPayloadString()) == null) { 261 throw new UnprocessableEntityException(Msg.code(1985) + "Invalid value for Subscription.channel.payload: " + theResource.getPayloadString()); 262 } 263 } 264 265 @SuppressWarnings("WeakerAccess") 266 @VisibleForTesting 267 public void setSubscriptionCanonicalizerForUnitTest(SubscriptionCanonicalizer theSubscriptionCanonicalizer) { 268 mySubscriptionCanonicalizer = theSubscriptionCanonicalizer; 269 } 270 271 @SuppressWarnings("WeakerAccess") 272 @VisibleForTesting 273 public void setDaoRegistryForUnitTest(DaoRegistry theDaoRegistry) { 274 myDaoRegistry = theDaoRegistry; 275 } 276 277 @VisibleForTesting 278 public void setStorageSettingsForUnitTest(JpaStorageSettings theStorageSettings) { 279 myStorageSettings = theStorageSettings; 280 } 281 282 @VisibleForTesting 283 public void setRequestPartitionHelperSvcForUnitTest(IRequestPartitionHelperSvc theRequestPartitionHelperSvc) { 284 myRequestPartitionHelperSvc = theRequestPartitionHelperSvc; 285 } 286 287 288 @VisibleForTesting 289 @SuppressWarnings("WeakerAccess") 290 public void setSubscriptionStrategyEvaluatorForUnitTest(SubscriptionStrategyEvaluator theSubscriptionStrategyEvaluator) { 291 mySubscriptionStrategyEvaluator = theSubscriptionStrategyEvaluator; 292 mySubscriptionQueryValidator = new SubscriptionQueryValidator(myDaoRegistry, theSubscriptionStrategyEvaluator); 293 } 294 295}