
001/*- 002 * #%L 003 * HAPI FHIR Storage api 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.cache; 021 022import ca.uhn.fhir.IHapiBootOrder; 023import ca.uhn.fhir.context.ConfigurationException; 024import ca.uhn.fhir.i18n.Msg; 025import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 026import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 027import ca.uhn.fhir.jpa.cache.IResourceChangeEvent; 028import ca.uhn.fhir.jpa.cache.IResourceChangeListener; 029import ca.uhn.fhir.jpa.cache.IResourceChangeListenerCache; 030import ca.uhn.fhir.jpa.cache.IResourceChangeListenerRegistry; 031import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 032import ca.uhn.fhir.jpa.searchparam.retry.Retrier; 033import ca.uhn.fhir.rest.api.server.SystemRequestDetails; 034import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException; 035import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 036import ca.uhn.fhir.rest.server.util.IResourceRepositoryCache; 037import com.google.common.annotations.VisibleForTesting; 038import jakarta.annotation.Nonnull; 039import jakarta.annotation.PreDestroy; 040import org.apache.commons.lang3.time.DateUtils; 041import org.hl7.fhir.instance.model.api.IBaseResource; 042import org.hl7.fhir.instance.model.api.IIdType; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045import org.springframework.beans.factory.annotation.Autowired; 046import org.springframework.context.event.ContextClosedEvent; 047import org.springframework.context.event.ContextRefreshedEvent; 048import org.springframework.context.event.EventListener; 049import org.springframework.core.annotation.Order; 050 051import java.util.ArrayList; 052import java.util.Collection; 053import java.util.List; 054import java.util.concurrent.Semaphore; 055import java.util.concurrent.TimeUnit; 056 057public abstract class BaseResourceCacheSynchronizer implements IResourceChangeListener, IResourceRepositoryCache { 058 private static final Logger ourLog = LoggerFactory.getLogger(BaseResourceCacheSynchronizer.class); 059 public static final int MAX_RETRIES = 60; // 60 * 5 seconds = 5 minutes 060 public static final long REFRESH_INTERVAL = DateUtils.MILLIS_PER_MINUTE; 061 private static final long FORCE_REFRESH_TIMEOUT_SECONDS = 300; 062 private final String myResourceName; 063 064 @Autowired 065 private IResourceChangeListenerRegistry myResourceChangeListenerRegistry; 066 067 @Autowired 068 private DaoRegistry myDaoRegistry; 069 070 private SearchParameterMap mySearchParameterMap; 071 private final SystemRequestDetails mySystemRequestDetails = SystemRequestDetails.forAllPartitions(); 072 private boolean myStopping; 073 private final Semaphore mySyncResourcesSemaphore = new Semaphore(1); 074 private final Object mySyncResourcesLock = new Object(); 075 076 private Integer myMaxRetryCount = null; 077 private boolean myInitialized = false; 078 079 protected BaseResourceCacheSynchronizer(String theResourceName) { 080 myResourceName = theResourceName; 081 } 082 083 protected BaseResourceCacheSynchronizer( 084 String theResourceName, 085 IResourceChangeListenerRegistry theResourceChangeListenerRegistry, 086 DaoRegistry theDaoRegistry) { 087 myResourceName = theResourceName; 088 myDaoRegistry = theDaoRegistry; 089 myResourceChangeListenerRegistry = theResourceChangeListenerRegistry; 090 } 091 092 /** 093 * This method performs a search in the DB, so use the {@link ContextRefreshedEvent} 094 * to ensure that it runs after the database initializer 095 */ 096 @EventListener(classes = ContextRefreshedEvent.class) 097 @Order(IHapiBootOrder.AFTER_SUBSCRIPTION_INITIALIZED) 098 public void registerListener() { 099 if (myInitialized) { 100 return; 101 } 102 if (myDaoRegistry.getResourceDaoOrNull(myResourceName) == null) { 103 ourLog.info("No resource DAO found for resource type {}, not registering listener", myResourceName); 104 return; 105 } 106 107 IResourceChangeListenerCache resourceCache = 108 myResourceChangeListenerRegistry.registerResourceResourceChangeListener( 109 myResourceName, provideSearchParameterMap(), this, REFRESH_INTERVAL); 110 resourceCache.forceRefresh(); 111 myInitialized = true; 112 } 113 114 private SearchParameterMap provideSearchParameterMap() { 115 SearchParameterMap searchParameterMap = mySearchParameterMap; 116 if (searchParameterMap == null) { 117 searchParameterMap = getSearchParameterMap(); 118 mySearchParameterMap = searchParameterMap; 119 } 120 return searchParameterMap; 121 } 122 123 @PreDestroy 124 public void unregisterListener() { 125 myResourceChangeListenerRegistry.unregisterResourceResourceChangeListener(this); 126 } 127 128 private boolean daoNotAvailable() { 129 return myDaoRegistry == null || !myDaoRegistry.isResourceTypeSupported(myResourceName); 130 } 131 132 @Override 133 public void requestRefresh() { 134 if (daoNotAvailable()) { 135 return; 136 } 137 if (!mySyncResourcesSemaphore.tryAcquire()) { 138 return; 139 } 140 try { 141 doSyncResourcesWithRetry(); 142 } finally { 143 mySyncResourcesSemaphore.release(); 144 } 145 } 146 147 @VisibleForTesting 148 @Override 149 public void forceRefresh() { 150 if (daoNotAvailable()) { 151 throw new ConfigurationException(Msg.code(2652) + "Attempt to force refresh without a dao"); 152 } 153 try { 154 if (mySyncResourcesSemaphore.tryAcquire(FORCE_REFRESH_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { 155 doSyncResourcesWithRetry(); 156 } else { 157 String errorMessage = String.format( 158 Msg.code(2653) + "Timed out waiting %s %s to refresh %s cache", 159 FORCE_REFRESH_TIMEOUT_SECONDS, 160 "seconds", 161 myResourceName); 162 ourLog.error(errorMessage); 163 throw new ConfigurationException(Msg.code(2663) + errorMessage); 164 } 165 } catch (InterruptedException e) { 166 Thread.currentThread().interrupt(); 167 throw new InternalErrorException(Msg.code(2654) + e); 168 } finally { 169 mySyncResourcesSemaphore.release(); 170 } 171 } 172 173 @VisibleForTesting 174 public void acquireSemaphoreForUnitTest() throws InterruptedException { 175 mySyncResourcesSemaphore.acquire(); 176 } 177 178 @VisibleForTesting 179 public int doSyncResourcesForUnitTest() { 180 // Two passes for delete flag to take effect 181 int first = doSyncResourcesWithRetry(); 182 int second = doSyncResourcesWithRetry(); 183 return first + second; 184 } 185 186 synchronized int doSyncResourcesWithRetry() { 187 // retry runs MAX_RETRIES times 188 // and if errors result every time, it will fail 189 Retrier<Integer> syncResourceRetrier = new Retrier<>(this::doSyncResources, getMaxRetries()); 190 return syncResourceRetrier.runWithRetry(); 191 } 192 193 private int getMaxRetries() { 194 if (myMaxRetryCount != null) { 195 return myMaxRetryCount; 196 } 197 return MAX_RETRIES; 198 } 199 200 @VisibleForTesting 201 public void setMaxRetries(Integer theMaxRetries) { 202 myMaxRetryCount = theMaxRetries; 203 } 204 205 @SuppressWarnings("unchecked") 206 private int doSyncResources() { 207 if (isStopping()) { 208 return 0; 209 } 210 211 synchronized (mySyncResourcesLock) { 212 ourLog.debug("Starting sync {}s", myResourceName); 213 214 List<IBaseResource> resourceList = (List<IBaseResource>) 215 getResourceDao().searchForResources(provideSearchParameterMap(), mySystemRequestDetails); 216 return syncResourcesIntoCache(resourceList); 217 } 218 } 219 220 protected abstract int syncResourcesIntoCache(@Nonnull List<IBaseResource> resourceList); 221 222 @EventListener(ContextRefreshedEvent.class) 223 public void start() { 224 myStopping = false; 225 } 226 227 @EventListener(ContextClosedEvent.class) 228 public void shutdown() { 229 myStopping = true; 230 } 231 232 private boolean isStopping() { 233 return myStopping; 234 } 235 236 private IFhirResourceDao<?> getResourceDao() { 237 return myDaoRegistry.getResourceDao(myResourceName); 238 } 239 240 @Override 241 public void handleInit(@Nonnull Collection<IIdType> theResourceIds) { 242 if (daoNotAvailable()) { 243 ourLog.warn( 244 "The resource type {} is enabled on this server, but there is no {} DAO configured.", 245 myResourceName, 246 myResourceName); 247 return; 248 } 249 IFhirResourceDao<?> resourceDao = getResourceDao(); 250 SystemRequestDetails systemRequestDetails = SystemRequestDetails.forAllPartitions(); 251 List<IBaseResource> resourceList = new ArrayList<>(); 252 253 /* 254 * We are pretty lenient here, because any failure will block the server 255 * from starting up. We should generally never fail to load here, but it 256 * can potentially happen if a resource is manually deleted in the database 257 * (ie. marked with a deletion time manually) but the indexes aren't cleaned 258 * up. 259 */ 260 for (IIdType id : theResourceIds) { 261 IBaseResource read; 262 try { 263 read = resourceDao.read(id, systemRequestDetails); 264 } catch (BaseServerResponseException e) { 265 ourLog.warn("Unable to fetch resource {}", id, e); 266 continue; 267 } 268 resourceList.add(read); 269 } 270 handleInit(resourceList); 271 } 272 273 protected abstract void handleInit(@Nonnull List<IBaseResource> resourceList); 274 275 @Override 276 public void handleChange(@Nonnull IResourceChangeEvent theResourceChangeEvent) { 277 // For now ignore the contents of theResourceChangeEvent. In the future, consider updating the registry based 278 // on 279 // known resources that have been created, updated & deleted 280 requestRefresh(); 281 } 282 283 @Nonnull 284 protected abstract SearchParameterMap getSearchParameterMap(); 285}