001/*- 002 * #%L 003 * HAPI FHIR Subscription Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.topic; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.interceptor.model.RequestPartitionId; 024import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 025import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 026import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; 027import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; 028import ca.uhn.fhir.rest.api.server.RequestDetails; 029import ca.uhn.fhir.rest.api.server.SystemRequestDetails; 030import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException; 031import ca.uhn.fhir.util.Logs; 032import jakarta.annotation.Nonnull; 033import org.hl7.fhir.instance.model.api.IBaseResource; 034import org.hl7.fhir.instance.model.api.IIdType; 035import org.hl7.fhir.r5.model.Enumerations; 036import org.hl7.fhir.r5.model.SubscriptionTopic; 037import org.slf4j.Logger; 038import org.springframework.beans.factory.annotation.Autowired; 039import org.springframework.messaging.Message; 040import org.springframework.messaging.MessageHandler; 041import org.springframework.messaging.MessagingException; 042 043/** 044 * Responsible for transitioning subscription resources from REQUESTED to ACTIVE 045 * Once activated, the subscription is added to the SubscriptionRegistry. 046 * <p> 047 * Also validates criteria. If invalid, rejects the subscription without persisting the subscription. 048 */ 049public class SubscriptionTopicRegisteringSubscriber implements MessageHandler { 050 private static final Logger ourLog = Logs.getSubscriptionTopicLog(); 051 052 @Autowired 053 private FhirContext myFhirContext; 054 055 @Autowired 056 private SubscriptionTopicRegistry mySubscriptionTopicRegistry; 057 058 @Autowired 059 private DaoRegistry myDaoRegistry; 060 061 /** 062 * Constructor 063 */ 064 public SubscriptionTopicRegisteringSubscriber() { 065 super(); 066 } 067 068 @Override 069 public void handleMessage(@Nonnull Message<?> theMessage) throws MessagingException { 070 if (!(theMessage instanceof ResourceModifiedJsonMessage)) { 071 ourLog.warn("Received message of unexpected type on matching channel: {}", theMessage); 072 return; 073 } 074 075 ResourceModifiedMessage payload = ((ResourceModifiedJsonMessage) theMessage).getPayload(); 076 077 if (!payload.hasPayloadType(myFhirContext, "SubscriptionTopic")) { 078 return; 079 } 080 081 switch (payload.getOperationType()) { 082 case MANUALLY_TRIGGERED: 083 case TRANSACTION: 084 return; 085 case CREATE: 086 case UPDATE: 087 case DELETE: 088 break; 089 } 090 091 // We read the resource back from the DB instead of using the supplied copy for 092 // two reasons: 093 // - in order to store partition id in the userdata of the resource for partitioned subscriptions 094 // - in case we're processing out of order and a create-then-delete has been processed backwards (or vice versa) 095 096 IBaseResource payloadResource; 097 IIdType payloadId = payload.getPayloadId(myFhirContext).toUnqualifiedVersionless(); 098 try { 099 IFhirResourceDao<?> subscriptionDao = myDaoRegistry.getResourceDao("SubscriptionTopic"); 100 RequestDetails systemRequestDetails = getPartitionAwareRequestDetails(payload); 101 payloadResource = subscriptionDao.read(payloadId, systemRequestDetails); 102 if (payloadResource == null) { 103 // Only for unit test 104 payloadResource = payload.getPayload(myFhirContext); 105 } 106 } catch (ResourceGoneException e) { 107 mySubscriptionTopicRegistry.unregister(payloadId.getIdPart()); 108 return; 109 } 110 111 SubscriptionTopic subscriptionTopic = 112 SubscriptionTopicCanonicalizer.canonicalizeTopic(myFhirContext, payloadResource); 113 if (subscriptionTopic.getStatus() == Enumerations.PublicationStatus.ACTIVE) { 114 mySubscriptionTopicRegistry.register(subscriptionTopic); 115 } else { 116 mySubscriptionTopicRegistry.unregister(payloadId.getIdPart()); 117 } 118 } 119 120 /** 121 * There were some situations where the RequestDetails attempted to use the default partition 122 * and the partition name was a list containing null values (i.e. using the package installer to STORE_AND_INSTALL 123 * Subscriptions while partitioning was enabled). If any partition matches these criteria, 124 * {@link RequestPartitionId#defaultPartition()} is used to obtain the default partition. 125 */ 126 private RequestDetails getPartitionAwareRequestDetails(ResourceModifiedMessage payload) { 127 RequestPartitionId payloadPartitionId = payload.getPartitionId(); 128 if (payloadPartitionId == null || payloadPartitionId.isDefaultPartition()) { 129 // This may look redundant but the package installer STORE_AND_INSTALL Subscriptions when partitioning is 130 // enabled 131 // creates a corrupt default partition. This resets it to a clean one. 132 payloadPartitionId = RequestPartitionId.defaultPartition(); 133 } 134 return new SystemRequestDetails().setRequestPartitionId(payloadPartitionId); 135 } 136}