
001/*- 002 * #%L 003 * HAPI FHIR Subscription Server 004 * %% 005 * Copyright (C) 2014 - 2023 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.subscription.match.registry; 021 022import ca.uhn.fhir.cache.BaseResourceCacheSynchronizer; 023import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 024import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionActivatingSubscriber; 025import ca.uhn.fhir.rest.param.TokenOrListParam; 026import ca.uhn.fhir.rest.param.TokenParam; 027import ca.uhn.fhir.subscription.SubscriptionConstants; 028import org.apache.commons.lang3.StringUtils; 029import org.hl7.fhir.instance.model.api.IBaseResource; 030import org.hl7.fhir.r4.model.Subscription; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033import org.springframework.beans.factory.annotation.Autowired; 034 035import javax.annotation.Nonnull; 036import java.util.HashSet; 037import java.util.List; 038import java.util.Set; 039 040 041public class SubscriptionLoader extends BaseResourceCacheSynchronizer { 042 private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionLoader.class); 043 044 @Autowired 045 private SubscriptionRegistry mySubscriptionRegistry; 046 047 @Autowired 048 private SubscriptionActivatingSubscriber mySubscriptionActivatingInterceptor; 049 050 @Autowired 051 private SubscriptionCanonicalizer mySubscriptionCanonicalizer; 052 053 /** 054 * Constructor 055 */ 056 public SubscriptionLoader() { 057 super("Subscription"); 058 } 059 060 public int doSyncSubscriptionsForUnitTest() { 061 return super.doSyncResourcessForUnitTest(); 062 } 063 064 @Override 065 @Nonnull 066 protected SearchParameterMap getSearchParameterMap() { 067 SearchParameterMap map = new SearchParameterMap(); 068 069 if (mySearchParamRegistry.getActiveSearchParam("Subscription", "status") != null) { 070 map.add(Subscription.SP_STATUS, new TokenOrListParam() 071 .addOr(new TokenParam(null, Subscription.SubscriptionStatus.REQUESTED.toCode())) 072 .addOr(new TokenParam(null, Subscription.SubscriptionStatus.ACTIVE.toCode()))); 073 } 074 map.setLoadSynchronousUpTo(SubscriptionConstants.MAX_SUBSCRIPTION_RESULTS); 075 return map; 076 } 077 078 @Override 079 protected void handleInit(List<IBaseResource> resourceList) { 080 updateSubscriptionRegistry(resourceList); 081 } 082 083 @Override 084 protected int syncResourcesIntoCache(List<IBaseResource> resourceList) { 085 return updateSubscriptionRegistry(resourceList); 086 } 087 088 private int updateSubscriptionRegistry(List<IBaseResource> theResourceList) { 089 Set<String> allIds = new HashSet<>(); 090 int activatedCount = 0; 091 int registeredCount = 0; 092 093 for (IBaseResource resource : theResourceList) { 094 String nextId = resource.getIdElement().getIdPart(); 095 allIds.add(nextId); 096 097 boolean activated = activateSubscriptionIfRequested(resource); 098 if (activated) { 099 ++activatedCount; 100 } 101 102 boolean registered = mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(resource); 103 if (registered) { 104 registeredCount++; 105 } 106 } 107 108 mySubscriptionRegistry.unregisterAllSubscriptionsNotInCollection(allIds); 109 ourLog.debug("Finished sync subscriptions - activated {} and registered {}", theResourceList.size(), registeredCount); 110 return activatedCount; 111 } 112 113 /** 114 * @param theSubscription 115 * @return true if activated 116 */ 117 private boolean activateSubscriptionIfRequested(IBaseResource theSubscription) { 118 boolean successfullyActivated = false; 119 120 if (SubscriptionConstants.REQUESTED_STATUS.equals(mySubscriptionCanonicalizer.getSubscriptionStatus(theSubscription))) { 121 if (mySubscriptionActivatingInterceptor.isChannelTypeSupported(theSubscription)) { 122 // internally, subscriptions that cannot activate will be set to error 123 if (mySubscriptionActivatingInterceptor.activateSubscriptionIfRequired(theSubscription)) { 124 successfullyActivated = true; 125 } else { 126 logSubscriptionNotActivatedPlusErrorIfPossible(theSubscription); 127 } 128 } else { 129 ourLog.debug("Could not activate subscription {} because channel type {} is not supported.", 130 theSubscription.getIdElement(), 131 mySubscriptionCanonicalizer.getChannelType(theSubscription)); 132 } 133 } 134 135 return successfullyActivated; 136 } 137 138 /** 139 * Logs 140 * 141 * @param theSubscription 142 */ 143 private void logSubscriptionNotActivatedPlusErrorIfPossible(IBaseResource theSubscription) { 144 String error; 145 if (theSubscription instanceof Subscription) { 146 error = ((Subscription) theSubscription).getError(); 147 } else if (theSubscription instanceof org.hl7.fhir.dstu3.model.Subscription) { 148 error = ((org.hl7.fhir.dstu3.model.Subscription) theSubscription).getError(); 149 } else if (theSubscription instanceof org.hl7.fhir.dstu2.model.Subscription) { 150 error = ((org.hl7.fhir.dstu2.model.Subscription) theSubscription).getError(); 151 } else { 152 error = ""; 153 } 154 ourLog.error("Subscription " 155 + theSubscription.getIdElement().getIdPart() 156 + " could not be activated." 157 + " This will not prevent startup, but it could lead to undesirable outcomes! " 158 + (StringUtils.isBlank(error) ? "" : "Error: " + error) 159 ); 160 } 161 162 public void syncSubscriptions() { 163 super.syncDatabaseToCache(); 164 } 165} 166