
001/*- 002 * #%L 003 * HAPI FHIR Storage api 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.cache; 021 022import ca.uhn.fhir.IHapiBootOrder; 023import ca.uhn.fhir.context.ConfigurationException; 024import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 025import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 026import ca.uhn.fhir.jpa.cache.IResourceChangeEvent; 027import ca.uhn.fhir.jpa.cache.IResourceChangeListener; 028import ca.uhn.fhir.jpa.cache.IResourceChangeListenerCache; 029import ca.uhn.fhir.jpa.cache.IResourceChangeListenerRegistry; 030import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 031import ca.uhn.fhir.jpa.searchparam.retry.Retrier; 032import ca.uhn.fhir.rest.api.server.SystemRequestDetails; 033import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException; 034import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 035import ca.uhn.fhir.rest.server.util.IResourceRepositoryCache; 036import com.google.common.annotations.VisibleForTesting; 037import jakarta.annotation.Nonnull; 038import jakarta.annotation.PreDestroy; 039import org.apache.commons.lang3.time.DateUtils; 040import org.hl7.fhir.instance.model.api.IBaseResource; 041import org.hl7.fhir.instance.model.api.IIdType; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044import org.springframework.beans.factory.annotation.Autowired; 045import org.springframework.context.event.ContextClosedEvent; 046import org.springframework.context.event.ContextRefreshedEvent; 047import org.springframework.context.event.ContextStartedEvent; 048import org.springframework.context.event.EventListener; 049import org.springframework.core.annotation.Order; 050 051import java.util.ArrayList; 052import java.util.Collection; 053import java.util.List; 054import java.util.concurrent.Semaphore; 055import java.util.concurrent.TimeUnit; 056 057public abstract class BaseResourceCacheSynchronizer implements IResourceChangeListener, IResourceRepositoryCache { 058 private static final Logger ourLog = LoggerFactory.getLogger(BaseResourceCacheSynchronizer.class); 059 public static final int MAX_RETRIES = 60; // 60 * 5 seconds = 5 minutes 060 public static final long REFRESH_INTERVAL = DateUtils.MILLIS_PER_MINUTE; 061 private static final long FORCE_REFRESH_TIMEOUT_SECONDS = 300; 062 private final String myResourceName; 063 064 @Autowired 065 private IResourceChangeListenerRegistry myResourceChangeListenerRegistry; 066 067 @Autowired 068 private DaoRegistry myDaoRegistry; 069 070 private SearchParameterMap mySearchParameterMap; 071 private SystemRequestDetails mySystemRequestDetails; 072 private boolean myStopping; 073 private final Semaphore mySyncResourcesSemaphore = new Semaphore(1); 074 private final Object mySyncResourcesLock = new Object(); 075 076 private Integer myMaxRetryCount = null; 077 078 protected BaseResourceCacheSynchronizer(String theResourceName) { 079 myResourceName = theResourceName; 080 } 081 082 protected BaseResourceCacheSynchronizer( 083 String theResourceName, 084 IResourceChangeListenerRegistry theResourceChangeListenerRegistry, 085 DaoRegistry theDaoRegistry) { 086 myResourceName = theResourceName; 087 myDaoRegistry = theDaoRegistry; 088 myResourceChangeListenerRegistry = theResourceChangeListenerRegistry; 089 } 090 091 /** 092 * This method performs a search in the DB, so use the {@link ContextStartedEvent} 093 * to ensure that it runs after the database initializer 094 */ 095 @EventListener(classes = ContextRefreshedEvent.class) 096 @Order(IHapiBootOrder.AFTER_SUBSCRIPTION_INITIALIZED) 097 public void registerListener() { 098 if (myDaoRegistry.getResourceDaoOrNull(myResourceName) == null) { 099 ourLog.info("No resource DAO found for resource type {}, not registering listener", myResourceName); 100 return; 101 } 102 mySystemRequestDetails = SystemRequestDetails.forAllPartitions(); 103 104 IResourceChangeListenerCache resourceCache = 105 myResourceChangeListenerRegistry.registerResourceResourceChangeListener( 106 myResourceName, provideSearchParameterMap(), this, REFRESH_INTERVAL); 107 resourceCache.forceRefresh(); 108 } 109 110 private SearchParameterMap provideSearchParameterMap() { 111 SearchParameterMap searchParameterMap = mySearchParameterMap; 112 if (searchParameterMap == null) { 113 searchParameterMap = getSearchParameterMap(); 114 mySearchParameterMap = searchParameterMap; 115 } 116 return searchParameterMap; 117 } 118 119 @PreDestroy 120 public void unregisterListener() { 121 myResourceChangeListenerRegistry.unregisterResourceResourceChangeListener(this); 122 } 123 124 private boolean daoNotAvailable() { 125 return myDaoRegistry == null || !myDaoRegistry.isResourceTypeSupported(myResourceName); 126 } 127 128 @Override 129 public void requestRefresh() { 130 if (daoNotAvailable()) { 131 return; 132 } 133 if (!mySyncResourcesSemaphore.tryAcquire()) { 134 return; 135 } 136 try { 137 doSyncResourcesWithRetry(); 138 } finally { 139 mySyncResourcesSemaphore.release(); 140 } 141 } 142 143 @VisibleForTesting 144 @Override 145 public void forceRefresh() { 146 if (daoNotAvailable()) { 147 throw new ConfigurationException("Attempt to force refresh without a dao"); 148 } 149 try { 150 if (mySyncResourcesSemaphore.tryAcquire(FORCE_REFRESH_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { 151 doSyncResourcesWithRetry(); 152 } else { 153 String errorMessage = String.format( 154 "Timed out waiting %s %s to refresh %s cache", 155 FORCE_REFRESH_TIMEOUT_SECONDS, "seconds", myResourceName); 156 ourLog.error(errorMessage); 157 throw new ConfigurationException(errorMessage); 158 } 159 } catch (InterruptedException e) { 160 Thread.currentThread().interrupt(); 161 throw new InternalErrorException(e); 162 } finally { 163 mySyncResourcesSemaphore.release(); 164 } 165 } 166 167 @VisibleForTesting 168 public void acquireSemaphoreForUnitTest() throws InterruptedException { 169 mySyncResourcesSemaphore.acquire(); 170 } 171 172 @VisibleForTesting 173 public int doSyncResourcesForUnitTest() { 174 // Two passes for delete flag to take effect 175 int first = doSyncResourcesWithRetry(); 176 int second = doSyncResourcesWithRetry(); 177 return first + second; 178 } 179 180 synchronized int doSyncResourcesWithRetry() { 181 // retry runs MAX_RETRIES times 182 // and if errors result every time, it will fail 183 Retrier<Integer> syncResourceRetrier = new Retrier<>(this::doSyncResources, getMaxRetries()); 184 return syncResourceRetrier.runWithRetry(); 185 } 186 187 private int getMaxRetries() { 188 if (myMaxRetryCount != null) { 189 return myMaxRetryCount; 190 } 191 return MAX_RETRIES; 192 } 193 194 @VisibleForTesting 195 public void setMaxRetries(Integer theMaxRetries) { 196 myMaxRetryCount = theMaxRetries; 197 } 198 199 @SuppressWarnings("unchecked") 200 private int doSyncResources() { 201 if (isStopping()) { 202 return 0; 203 } 204 205 synchronized (mySyncResourcesLock) { 206 ourLog.debug("Starting sync {}s", myResourceName); 207 208 List<IBaseResource> resourceList = (List<IBaseResource>) 209 getResourceDao().searchForResources(provideSearchParameterMap(), mySystemRequestDetails); 210 return syncResourcesIntoCache(resourceList); 211 } 212 } 213 214 protected abstract int syncResourcesIntoCache(@Nonnull List<IBaseResource> resourceList); 215 216 @EventListener(ContextRefreshedEvent.class) 217 public void start() { 218 myStopping = false; 219 } 220 221 @EventListener(ContextClosedEvent.class) 222 public void shutdown() { 223 myStopping = true; 224 } 225 226 private boolean isStopping() { 227 return myStopping; 228 } 229 230 private IFhirResourceDao<?> getResourceDao() { 231 return myDaoRegistry.getResourceDao(myResourceName); 232 } 233 234 @Override 235 public void handleInit(@Nonnull Collection<IIdType> theResourceIds) { 236 if (daoNotAvailable()) { 237 ourLog.warn( 238 "The resource type {} is enabled on this server, but there is no {} DAO configured.", 239 myResourceName, 240 myResourceName); 241 return; 242 } 243 IFhirResourceDao<?> resourceDao = getResourceDao(); 244 SystemRequestDetails systemRequestDetails = SystemRequestDetails.forAllPartitions(); 245 List<IBaseResource> resourceList = new ArrayList<>(); 246 247 /* 248 * We are pretty lenient here, because any failure will block the server 249 * from starting up. We should generally never fail to load here, but it 250 * can potentially happen if a resource is manually deleted in the database 251 * (ie. marked with a deletion time manually) but the indexes aren't cleaned 252 * up. 253 */ 254 for (IIdType id : theResourceIds) { 255 IBaseResource read; 256 try { 257 read = resourceDao.read(id, systemRequestDetails); 258 } catch (BaseServerResponseException e) { 259 ourLog.warn("Unable to fetch resource {}", id, e); 260 continue; 261 } 262 resourceList.add(read); 263 } 264 handleInit(resourceList); 265 } 266 267 protected abstract void handleInit(@Nonnull List<IBaseResource> resourceList); 268 269 @Override 270 public void handleChange(@Nonnull IResourceChangeEvent theResourceChangeEvent) { 271 // For now ignore the contents of theResourceChangeEvent. In the future, consider updating the registry based 272 // on 273 // known resources that have been created, updated & deleted 274 requestRefresh(); 275 } 276 277 @Nonnull 278 protected abstract SearchParameterMap getSearchParameterMap(); 279}