001/*- 002 * #%L 003 * HAPI FHIR Subscription Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.interceptor.model.RequestPartitionId; 024import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 025import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 026import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer; 027import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry; 028import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; 029import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; 030import ca.uhn.fhir.rest.api.server.RequestDetails; 031import ca.uhn.fhir.rest.api.server.SystemRequestDetails; 032import jakarta.annotation.Nonnull; 033import org.hl7.fhir.instance.model.api.IBaseResource; 034import org.hl7.fhir.instance.model.api.IIdType; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037import org.springframework.beans.factory.annotation.Autowired; 038import org.springframework.messaging.Message; 039import org.springframework.messaging.MessageHandler; 040import org.springframework.messaging.MessagingException; 041 042/** 043 * Responsible for transitioning subscription resources from REQUESTED to ACTIVE 044 * Once activated, the subscription is added to the SubscriptionRegistry. 045 * <p> 046 * Also validates criteria. If invalid, rejects the subscription without persisting the subscription. 047 */ 048public class SubscriptionRegisteringSubscriber implements MessageHandler { 049 private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionRegisteringSubscriber.class); 050 051 @Autowired 052 private FhirContext myFhirContext; 053 054 @Autowired 055 private SubscriptionRegistry mySubscriptionRegistry; 056 057 @Autowired 058 private SubscriptionCanonicalizer mySubscriptionCanonicalizer; 059 060 @Autowired 061 private DaoRegistry myDaoRegistry; 062 063 /** 064 * Constructor 065 */ 066 public SubscriptionRegisteringSubscriber() { 067 super(); 068 } 069 070 @Override 071 public void handleMessage(@Nonnull Message<?> theMessage) throws MessagingException { 072 if (!(theMessage instanceof ResourceModifiedJsonMessage)) { 073 ourLog.warn("Received message of unexpected type on matching channel: {}", theMessage); 074 return; 075 } 076 077 ResourceModifiedMessage payload = ((ResourceModifiedJsonMessage) theMessage).getPayload(); 078 079 if (!payload.hasPayloadType(this.myFhirContext, "Subscription")) { 080 return; 081 } 082 083 switch (payload.getOperationType()) { 084 case MANUALLY_TRIGGERED: 085 case TRANSACTION: 086 return; 087 case CREATE: 088 case UPDATE: 089 case DELETE: 090 break; 091 } 092 093 // We read the resource back from the DB instead of using the supplied copy for 094 // two reasons: 095 // - in order to store partition id in the userdata of the resource for partitioned subscriptions 096 // - in case we're processing out of order and a create-then-delete has been processed backwards (or vice versa) 097 098 IIdType payloadId = payload.getPayloadId(myFhirContext).toUnqualifiedVersionless(); 099 IFhirResourceDao<?> subscriptionDao = myDaoRegistry.getResourceDao("Subscription"); 100 RequestDetails systemRequestDetails = getPartitionAwareRequestDetails(payload); 101 IBaseResource payloadResource = subscriptionDao.read(payloadId, systemRequestDetails, true); 102 if (payloadResource == null) { 103 // Only for unit test 104 payloadResource = payload.getPayload(myFhirContext); 105 } 106 if (payloadResource.isDeleted()) { 107 mySubscriptionRegistry.unregisterSubscriptionIfRegistered(payloadId.getIdPart()); 108 return; 109 } 110 111 String statusString = mySubscriptionCanonicalizer.getSubscriptionStatus(payloadResource); 112 if ("active".equals(statusString)) { 113 mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(payloadResource); 114 } else { 115 mySubscriptionRegistry.unregisterSubscriptionIfRegistered(payloadId.getIdPart()); 116 } 117 } 118 119 /** 120 * There were some situations where the RequestDetails attempted to use the default partition 121 * and the partition name was a list containing null values (i.e. using the package installer to STORE_AND_INSTALL 122 * Subscriptions while partitioning was enabled). If any partition matches these criteria, 123 * {@link RequestPartitionId#defaultPartition()} is used to obtain the default partition. 124 */ 125 private RequestDetails getPartitionAwareRequestDetails(ResourceModifiedMessage payload) { 126 RequestPartitionId payloadPartitionId = payload.getPartitionId(); 127 if (payloadPartitionId == null || payloadPartitionId.isDefaultPartition()) { 128 // This may look redundant but the package installer STORE_AND_INSTALL Subscriptions when partitioning is 129 // enabled 130 // creates a corrupt default partition. This resets it to a clean one. 131 payloadPartitionId = RequestPartitionId.defaultPartition(); 132 } 133 return new SystemRequestDetails().setRequestPartitionId(payloadPartitionId); 134 } 135}