
001/*- 002 * #%L 003 * HAPI FHIR Subscription Server 004 * %% 005 * Copyright (C) 2014 - 2023 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 024import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 025import ca.uhn.fhir.jpa.model.entity.StorageSettings; 026import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer; 027import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType; 028import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; 029import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; 030import ca.uhn.fhir.rest.api.server.SystemRequestDetails; 031import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException; 032import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException; 033import ca.uhn.fhir.subscription.SubscriptionConstants; 034import ca.uhn.fhir.util.SubscriptionUtil; 035import org.hl7.fhir.dstu2.model.Subscription; 036import org.hl7.fhir.instance.model.api.IBaseResource; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039import org.springframework.beans.factory.annotation.Autowired; 040import org.springframework.messaging.Message; 041import org.springframework.messaging.MessageHandler; 042import org.springframework.messaging.MessagingException; 043 044import javax.annotation.Nonnull; 045 046/** 047 * Responsible for transitioning subscription resources from REQUESTED to ACTIVE 048 * Once activated, the subscription is added to the SubscriptionRegistry. 049 * <p> 050 * Also validates criteria. If invalid, rejects the subscription without persisting the subscription. 051 */ 052public class SubscriptionActivatingSubscriber implements MessageHandler { 053 private final Logger ourLog = LoggerFactory.getLogger(SubscriptionActivatingSubscriber.class); 054 055 @Autowired 056 private FhirContext myFhirContext; 057 058 @Autowired 059 private DaoRegistry myDaoRegistry; 060 061 @Autowired 062 private SubscriptionCanonicalizer mySubscriptionCanonicalizer; 063 064 @Autowired 065 private StorageSettings myStorageSettings; 066 067 /** 068 * Constructor 069 */ 070 public SubscriptionActivatingSubscriber() { 071 super(); 072 } 073 074 @Override 075 public void handleMessage(@Nonnull Message<?> theMessage) throws MessagingException { 076 if (!(theMessage instanceof ResourceModifiedJsonMessage)) { 077 ourLog.warn("Received message of unexpected type on matching channel: {}", theMessage); 078 return; 079 } 080 081 ResourceModifiedMessage payload = ((ResourceModifiedJsonMessage) theMessage).getPayload(); 082 if (!payload.hasPayloadType(myFhirContext, "Subscription")) { 083 return; 084 } 085 086 switch (payload.getOperationType()) { 087 case CREATE: 088 case UPDATE: 089 activateSubscriptionIfRequired(payload.getNewPayload(myFhirContext)); 090 break; 091 case TRANSACTION: 092 case DELETE: 093 case MANUALLY_TRIGGERED: 094 default: 095 break; 096 } 097 } 098 099 /** 100 * Note: This is synchronized because this is called both by matching channel messages 101 * as well as from Subscription Loader (which periodically refreshes from the DB to make 102 * sure nothing got missed). If these two mechanisms try to activate the same subscription 103 * at the same time they can get a constraint error. 104 */ 105 public synchronized boolean activateSubscriptionIfRequired(final IBaseResource theSubscription) { 106 // Grab the value for "Subscription.channel.type" so we can see if this 107 // subscriber applies.. 108 CanonicalSubscriptionChannelType subscriptionChannelType = 109 mySubscriptionCanonicalizer.getChannelType(theSubscription); 110 111 // Only activate supported subscriptions 112 if (subscriptionChannelType == null 113 || !myStorageSettings.getSupportedSubscriptionTypes().contains(subscriptionChannelType.toCanonical())) { 114 return false; 115 } 116 117 String statusString = mySubscriptionCanonicalizer.getSubscriptionStatus(theSubscription); 118 119 if (SubscriptionConstants.REQUESTED_STATUS.equals(statusString)) { 120 return activateSubscription(theSubscription); 121 } 122 123 return false; 124 } 125 126 @SuppressWarnings("unchecked") 127 private boolean activateSubscription(final IBaseResource theSubscription) { 128 IFhirResourceDao subscriptionDao = myDaoRegistry.getSubscriptionDao(); 129 SystemRequestDetails srd = SystemRequestDetails.forAllPartitions(); 130 131 IBaseResource subscription = null; 132 try { 133 // read can throw ResourceGoneException 134 // if this happens, we will treat this as a failure to activate 135 subscription = 136 subscriptionDao.read(theSubscription.getIdElement(), SystemRequestDetails.forAllPartitions()); 137 subscription.setId(subscription.getIdElement().toVersionless()); 138 139 ourLog.info( 140 "Activating subscription {} from status {} to {}", 141 subscription.getIdElement().toUnqualified().getValue(), 142 SubscriptionConstants.REQUESTED_STATUS, 143 SubscriptionConstants.ACTIVE_STATUS); 144 SubscriptionUtil.setStatus(myFhirContext, subscription, SubscriptionConstants.ACTIVE_STATUS); 145 subscriptionDao.update(subscription, srd); 146 return true; 147 } catch (final UnprocessableEntityException | ResourceGoneException e) { 148 subscription = subscription != null ? subscription : theSubscription; 149 ourLog.error("Failed to activate subscription " + subscription.getIdElement() + " : " + e.getMessage()); 150 ourLog.info("Changing status of {} to ERROR", subscription.getIdElement()); 151 SubscriptionUtil.setStatus(myFhirContext, subscription, SubscriptionConstants.ERROR_STATUS); 152 SubscriptionUtil.setReason(myFhirContext, subscription, e.getMessage()); 153 subscriptionDao.update(subscription, srd); 154 return false; 155 } 156 } 157 158 public boolean isChannelTypeSupported(IBaseResource theSubscription) { 159 Subscription.SubscriptionChannelType channelType = 160 mySubscriptionCanonicalizer.getChannelType(theSubscription).toCanonical(); 161 return myStorageSettings.getSupportedSubscriptionTypes().contains(channelType); 162 } 163}