
001/*- 002 * #%L 003 * HAPI FHIR Subscription Server 004 * %% 005 * Copyright (C) 2014 - 2023 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.interceptor.model.RequestPartitionId; 024import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 025import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 026import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer; 027import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry; 028import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; 029import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; 030import ca.uhn.fhir.rest.api.server.RequestDetails; 031import ca.uhn.fhir.rest.api.server.SystemRequestDetails; 032import org.hl7.fhir.instance.model.api.IBaseResource; 033import org.hl7.fhir.instance.model.api.IIdType; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036import org.springframework.beans.factory.annotation.Autowired; 037import org.springframework.messaging.Message; 038import org.springframework.messaging.MessageHandler; 039import org.springframework.messaging.MessagingException; 040 041import javax.annotation.Nonnull; 042 043/** 044 * Responsible for transitioning subscription resources from REQUESTED to ACTIVE 045 * Once activated, the subscription is added to the SubscriptionRegistry. 046 * <p> 047 * Also validates criteria. If invalid, rejects the subscription without persisting the subscription. 048 */ 049public class SubscriptionRegisteringSubscriber implements MessageHandler { 050 private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionRegisteringSubscriber.class); 051 052 @Autowired 053 private FhirContext myFhirContext; 054 055 @Autowired 056 private SubscriptionRegistry mySubscriptionRegistry; 057 058 @Autowired 059 private SubscriptionCanonicalizer mySubscriptionCanonicalizer; 060 061 @Autowired 062 private DaoRegistry myDaoRegistry; 063 064 /** 065 * Constructor 066 */ 067 public SubscriptionRegisteringSubscriber() { 068 super(); 069 } 070 071 @Override 072 public void handleMessage(@Nonnull Message<?> theMessage) throws MessagingException { 073 if (!(theMessage instanceof ResourceModifiedJsonMessage)) { 074 ourLog.warn("Received message of unexpected type on matching channel: {}", theMessage); 075 return; 076 } 077 078 ResourceModifiedMessage payload = ((ResourceModifiedJsonMessage) theMessage).getPayload(); 079 080 if (!payload.hasPayloadType(this.myFhirContext, "Subscription")) { 081 return; 082 } 083 084 switch (payload.getOperationType()) { 085 case MANUALLY_TRIGGERED: 086 case TRANSACTION: 087 return; 088 case CREATE: 089 case UPDATE: 090 case DELETE: 091 break; 092 } 093 094 // We read the resource back from the DB instead of using the supplied copy for 095 // two reasons: 096 // - in order to store partition id in the userdata of the resource for partitioned subscriptions 097 // - in case we're processing out of order and a create-then-delete has been processed backwards (or vice versa) 098 099 IIdType payloadId = payload.getPayloadId(myFhirContext).toUnqualifiedVersionless(); 100 IFhirResourceDao<?> subscriptionDao = myDaoRegistry.getResourceDao("Subscription"); 101 RequestDetails systemRequestDetails = getPartitionAwareRequestDetails(payload); 102 IBaseResource payloadResource = subscriptionDao.read(payloadId, systemRequestDetails, true); 103 if (payloadResource == null) { 104 // Only for unit test 105 payloadResource = payload.getPayload(myFhirContext); 106 } 107 if (payloadResource.isDeleted()) { 108 mySubscriptionRegistry.unregisterSubscriptionIfRegistered(payloadId.getIdPart()); 109 return; 110 } 111 112 String statusString = mySubscriptionCanonicalizer.getSubscriptionStatus(payloadResource); 113 if ("active".equals(statusString)) { 114 mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(payloadResource); 115 } else { 116 mySubscriptionRegistry.unregisterSubscriptionIfRegistered(payloadId.getIdPart()); 117 } 118 } 119 120 /** 121 * There were some situations where the RequestDetails attempted to use the default partition 122 * and the partition name was a list containing null values (i.e. using the package installer to STORE_AND_INSTALL 123 * Subscriptions while partitioning was enabled). If any partition matches these criteria, 124 * {@link RequestPartitionId#defaultPartition()} is used to obtain the default partition. 125 */ 126 private RequestDetails getPartitionAwareRequestDetails(ResourceModifiedMessage payload) { 127 RequestPartitionId payloadPartitionId = payload.getPartitionId(); 128 if (payloadPartitionId == null || payloadPartitionId.isDefaultPartition()) { 129 // This may look redundant but the package installer STORE_AND_INSTALL Subscriptions when partitioning is 130 // enabled 131 // creates a corrupt default partition. This resets it to a clean one. 132 payloadPartitionId = RequestPartitionId.defaultPartition(); 133 } 134 return new SystemRequestDetails().setRequestPartitionId(payloadPartitionId); 135 } 136}