001/*- 002 * #%L 003 * HAPI FHIR Subscription Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.interceptor.model.RequestPartitionId; 024import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 025import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 026import ca.uhn.fhir.jpa.model.config.SubscriptionSettings; 027import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer; 028import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType; 029import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; 030import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; 031import ca.uhn.fhir.rest.api.Constants; 032import ca.uhn.fhir.rest.api.server.SystemRequestDetails; 033import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException; 034import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException; 035import ca.uhn.fhir.subscription.SubscriptionConstants; 036import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc; 037import ca.uhn.fhir.util.SubscriptionUtil; 038import jakarta.annotation.Nonnull; 039import org.hl7.fhir.dstu2.model.Subscription; 040import org.hl7.fhir.instance.model.api.IBaseResource; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043import org.springframework.beans.factory.annotation.Autowired; 044import org.springframework.messaging.Message; 045import org.springframework.messaging.MessageHandler; 046import org.springframework.messaging.MessagingException; 047 048import java.util.Optional; 049 050/** 051 * Responsible for transitioning subscription resources from REQUESTED to ACTIVE 052 * Once activated, the subscription is added to the SubscriptionRegistry. 053 * <p> 054 * Also validates criteria. If invalid, rejects the subscription without persisting the subscription. 055 */ 056public class SubscriptionActivatingSubscriber implements MessageHandler { 057 private final Logger ourLog = LoggerFactory.getLogger(SubscriptionActivatingSubscriber.class); 058 059 @Autowired 060 private FhirContext myFhirContext; 061 062 @Autowired 063 private DaoRegistry myDaoRegistry; 064 065 @Autowired 066 private SubscriptionCanonicalizer mySubscriptionCanonicalizer; 067 068 @Autowired 069 private SubscriptionSettings mySubscriptionSettings; 070 071 @Autowired 072 private IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc; 073 /** 074 * Constructor 075 */ 076 public SubscriptionActivatingSubscriber() { 077 super(); 078 } 079 080 @Override 081 public void handleMessage(@Nonnull Message<?> theMessage) throws MessagingException { 082 if (!(theMessage instanceof ResourceModifiedJsonMessage)) { 083 ourLog.warn("Received message of unexpected type on matching channel: {}", theMessage); 084 return; 085 } 086 087 ResourceModifiedMessage payload = ((ResourceModifiedJsonMessage) theMessage).getPayload(); 088 if (!payload.hasPayloadType(myFhirContext, "Subscription")) { 089 return; 090 } 091 092 switch (payload.getOperationType()) { 093 case CREATE: 094 case UPDATE: 095 if (payload.getPayload(myFhirContext) == null) { 096 Optional<ResourceModifiedMessage> inflatedMsg = 097 myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessageOrNull( 098 payload); 099 if (inflatedMsg.isEmpty()) { 100 return; 101 } 102 payload = inflatedMsg.get(); 103 } 104 105 activateSubscriptionIfRequired(payload.getNewPayload(myFhirContext)); 106 break; 107 case TRANSACTION: 108 case DELETE: 109 case MANUALLY_TRIGGERED: 110 default: 111 break; 112 } 113 } 114 115 /** 116 * Note: This is synchronized because this is called both by matching channel messages 117 * as well as from Subscription Loader (which periodically refreshes from the DB to make 118 * sure nothing got missed). If these two mechanisms try to activate the same subscription 119 * at the same time they can get a constraint error. 120 */ 121 public synchronized boolean activateSubscriptionIfRequired(final IBaseResource theSubscription) { 122 // Grab the value for "Subscription.channel.type" so we can see if this 123 // subscriber applies. 124 CanonicalSubscriptionChannelType subscriptionChannelType = 125 mySubscriptionCanonicalizer.getChannelType(theSubscription); 126 127 // Only activate supported subscriptions 128 if (subscriptionChannelType == null 129 || !mySubscriptionSettings 130 .getSupportedSubscriptionTypes() 131 .contains(subscriptionChannelType.toCanonical())) { 132 return false; 133 } 134 135 String statusString = mySubscriptionCanonicalizer.getSubscriptionStatus(theSubscription); 136 137 if (SubscriptionConstants.REQUESTED_STATUS.equals(statusString)) { 138 return activateSubscription(theSubscription); 139 } 140 141 return false; 142 } 143 144 @SuppressWarnings("unchecked") 145 private boolean activateSubscription(final IBaseResource theSubscription) { 146 IFhirResourceDao subscriptionDao = myDaoRegistry.getSubscriptionDao(); 147 SystemRequestDetails srd = SystemRequestDetails.forAllPartitions(); 148 149 IBaseResource subscription = null; 150 try { 151 // read can throw ResourceGoneException 152 // if this happens, we will treat this as a failure to activate 153 subscription = 154 subscriptionDao.read(theSubscription.getIdElement(), SystemRequestDetails.forAllPartitions()); 155 subscription.setId(subscription.getIdElement().toVersionless()); 156 157 ourLog.info( 158 "Activating subscription {} from status {} to {}", 159 subscription.getIdElement().toUnqualified().getValue(), 160 SubscriptionConstants.REQUESTED_STATUS, 161 SubscriptionConstants.ACTIVE_STATUS); 162 SubscriptionUtil.setStatus(myFhirContext, subscription, SubscriptionConstants.ACTIVE_STATUS); 163 164 RequestPartitionId partitionId = 165 (RequestPartitionId) subscription.getUserData(Constants.RESOURCE_PARTITION_ID); 166 subscriptionDao.update(subscription, new SystemRequestDetails().setRequestPartitionId(partitionId)); 167 return true; 168 } catch (final UnprocessableEntityException | ResourceGoneException e) { 169 subscription = subscription != null ? subscription : theSubscription; 170 ourLog.error("Failed to activate subscription " + subscription.getIdElement() + " : " + e.getMessage()); 171 ourLog.info("Changing status of {} to ERROR", subscription.getIdElement()); 172 SubscriptionUtil.setStatus(myFhirContext, subscription, SubscriptionConstants.ERROR_STATUS); 173 SubscriptionUtil.setReason(myFhirContext, subscription, e.getMessage()); 174 subscriptionDao.update(subscription, srd); 175 return false; 176 } 177 } 178 179 public boolean isChannelTypeSupported(IBaseResource theSubscription) { 180 Subscription.SubscriptionChannelType channelType = 181 mySubscriptionCanonicalizer.getChannelType(theSubscription).toCanonical(); 182 return mySubscriptionSettings.getSupportedSubscriptionTypes().contains(channelType); 183 } 184}