
001/*- 002 * #%L 003 * HAPI FHIR Storage api 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.cache; 021 022import ca.uhn.fhir.IHapiBootOrder; 023import ca.uhn.fhir.context.ConfigurationException; 024import ca.uhn.fhir.i18n.Msg; 025import ca.uhn.fhir.interceptor.model.RequestPartitionId; 026import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 027import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 028import ca.uhn.fhir.jpa.cache.IResourceChangeEvent; 029import ca.uhn.fhir.jpa.cache.IResourceChangeListener; 030import ca.uhn.fhir.jpa.cache.IResourceChangeListenerCache; 031import ca.uhn.fhir.jpa.cache.IResourceChangeListenerRegistry; 032import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 033import ca.uhn.fhir.jpa.searchparam.retry.Retrier; 034import ca.uhn.fhir.rest.api.server.SystemRequestDetails; 035import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException; 036import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 037import ca.uhn.fhir.rest.server.util.IResourceRepositoryCache; 038import com.google.common.annotations.VisibleForTesting; 039import jakarta.annotation.Nonnull; 040import jakarta.annotation.PreDestroy; 041import org.apache.commons.lang3.time.DateUtils; 042import org.hl7.fhir.instance.model.api.IBaseResource; 043import org.hl7.fhir.instance.model.api.IIdType; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046import org.springframework.beans.factory.annotation.Autowired; 047import org.springframework.context.event.ContextClosedEvent; 048import org.springframework.context.event.ContextRefreshedEvent; 049import org.springframework.context.event.EventListener; 050import org.springframework.core.annotation.Order; 051 052import java.util.ArrayList; 053import java.util.Collection; 054import java.util.List; 055import java.util.concurrent.Semaphore; 056import java.util.concurrent.TimeUnit; 057 058public abstract class BaseResourceCacheSynchronizer implements IResourceChangeListener, IResourceRepositoryCache { 059 private static final Logger ourLog = LoggerFactory.getLogger(BaseResourceCacheSynchronizer.class); 060 public static final int MAX_RETRIES = 60; // 60 * 5 seconds = 5 minutes 061 public static final long REFRESH_INTERVAL = DateUtils.MILLIS_PER_MINUTE; 062 private static final long FORCE_REFRESH_TIMEOUT_SECONDS = 300; 063 private final String myResourceName; 064 065 @Autowired 066 private IResourceChangeListenerRegistry myResourceChangeListenerRegistry; 067 068 @Autowired 069 private DaoRegistry myDaoRegistry; 070 071 private SearchParameterMap mySearchParameterMap; 072 private final SystemRequestDetails mySystemRequestDetails = SystemRequestDetails.forAllPartitions(); 073 private boolean myStopping; 074 private final Semaphore mySyncResourcesSemaphore = new Semaphore(1); 075 private final Object mySyncResourcesLock = new Object(); 076 077 private Integer myMaxRetryCount = null; 078 private boolean myInitialized = false; 079 private final RequestPartitionId myRequestPartitionId; 080 081 protected BaseResourceCacheSynchronizer(String theResourceName, RequestPartitionId theRequestPartitionId) { 082 this(theResourceName, theRequestPartitionId, null, null); 083 } 084 085 protected BaseResourceCacheSynchronizer( 086 String theResourceName, 087 RequestPartitionId theRequestPartitionId, 088 IResourceChangeListenerRegistry theResourceChangeListenerRegistry, 089 DaoRegistry theDaoRegistry) { 090 myResourceName = theResourceName; 091 myRequestPartitionId = theRequestPartitionId; 092 myResourceChangeListenerRegistry = theResourceChangeListenerRegistry; 093 myDaoRegistry = theDaoRegistry; 094 } 095 096 /** 097 * This method performs a search in the DB, so use the {@link ContextRefreshedEvent} 098 * to ensure that it runs after the database initializer 099 */ 100 @EventListener(classes = ContextRefreshedEvent.class) 101 @Order(IHapiBootOrder.AFTER_SUBSCRIPTION_INITIALIZED) 102 public void registerListener() { 103 if (myInitialized) { 104 return; 105 } 106 if (myDaoRegistry.getResourceDaoOrNull(myResourceName) == null) { 107 ourLog.info("No resource DAO found for resource type {}, not registering listener", myResourceName); 108 return; 109 } 110 111 IResourceChangeListenerCache resourceCache = 112 myResourceChangeListenerRegistry.registerResourceResourceChangeListener( 113 myResourceName, myRequestPartitionId, provideSearchParameterMap(), this, REFRESH_INTERVAL); 114 resourceCache.forceRefresh(); 115 myInitialized = true; 116 } 117 118 private SearchParameterMap provideSearchParameterMap() { 119 SearchParameterMap searchParameterMap = mySearchParameterMap; 120 if (searchParameterMap == null) { 121 searchParameterMap = getSearchParameterMap(); 122 mySearchParameterMap = searchParameterMap; 123 } 124 return searchParameterMap; 125 } 126 127 @PreDestroy 128 public void unregisterListener() { 129 myResourceChangeListenerRegistry.unregisterResourceResourceChangeListener(this); 130 } 131 132 private boolean daoNotAvailable() { 133 return myDaoRegistry == null || !myDaoRegistry.isResourceTypeSupported(myResourceName); 134 } 135 136 @Override 137 public void requestRefresh() { 138 if (daoNotAvailable()) { 139 return; 140 } 141 if (!mySyncResourcesSemaphore.tryAcquire()) { 142 return; 143 } 144 try { 145 doSyncResourcesWithRetry(); 146 } finally { 147 mySyncResourcesSemaphore.release(); 148 } 149 } 150 151 @VisibleForTesting 152 @Override 153 public void forceRefresh() { 154 if (daoNotAvailable()) { 155 throw new ConfigurationException(Msg.code(2652) + "Attempt to force refresh without a dao"); 156 } 157 try { 158 if (mySyncResourcesSemaphore.tryAcquire(FORCE_REFRESH_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { 159 doSyncResourcesWithRetry(); 160 } else { 161 String errorMessage = String.format( 162 Msg.code(2653) + "Timed out waiting %s %s to refresh %s cache", 163 FORCE_REFRESH_TIMEOUT_SECONDS, 164 "seconds", 165 myResourceName); 166 ourLog.error(errorMessage); 167 throw new ConfigurationException(Msg.code(2663) + errorMessage); 168 } 169 } catch (InterruptedException e) { 170 Thread.currentThread().interrupt(); 171 throw new InternalErrorException(Msg.code(2654) + e); 172 } finally { 173 mySyncResourcesSemaphore.release(); 174 } 175 } 176 177 @VisibleForTesting 178 public void acquireSemaphoreForUnitTest() throws InterruptedException { 179 mySyncResourcesSemaphore.acquire(); 180 } 181 182 @VisibleForTesting 183 public int doSyncResourcesForUnitTest() { 184 // Two passes for delete flag to take effect 185 int first = doSyncResourcesWithRetry(); 186 int second = doSyncResourcesWithRetry(); 187 return first + second; 188 } 189 190 synchronized int doSyncResourcesWithRetry() { 191 // retry runs MAX_RETRIES times 192 // and if errors result every time, it will fail 193 Retrier<Integer> syncResourceRetrier = new Retrier<>(this::doSyncResources, getMaxRetries()); 194 return syncResourceRetrier.runWithRetry(); 195 } 196 197 private int getMaxRetries() { 198 if (myMaxRetryCount != null) { 199 return myMaxRetryCount; 200 } 201 return MAX_RETRIES; 202 } 203 204 @VisibleForTesting 205 public void setMaxRetries(Integer theMaxRetries) { 206 myMaxRetryCount = theMaxRetries; 207 } 208 209 @SuppressWarnings("unchecked") 210 private int doSyncResources() { 211 if (isStopping()) { 212 return 0; 213 } 214 215 synchronized (mySyncResourcesLock) { 216 ourLog.debug("Starting sync {}s", myResourceName); 217 218 List<IBaseResource> resourceList = (List<IBaseResource>) 219 getResourceDao().searchForResources(provideSearchParameterMap(), mySystemRequestDetails); 220 return syncResourcesIntoCache(resourceList); 221 } 222 } 223 224 protected abstract int syncResourcesIntoCache(@Nonnull List<IBaseResource> resourceList); 225 226 @EventListener(ContextRefreshedEvent.class) 227 public void start() { 228 myStopping = false; 229 } 230 231 @EventListener(ContextClosedEvent.class) 232 public void shutdown() { 233 myStopping = true; 234 } 235 236 private boolean isStopping() { 237 return myStopping; 238 } 239 240 private IFhirResourceDao<?> getResourceDao() { 241 return myDaoRegistry.getResourceDao(myResourceName); 242 } 243 244 @Override 245 public void handleInit(@Nonnull Collection<IIdType> theResourceIds) { 246 if (daoNotAvailable()) { 247 ourLog.warn( 248 "The resource type {} is enabled on this server, but there is no {} DAO configured.", 249 myResourceName, 250 myResourceName); 251 return; 252 } 253 IFhirResourceDao<?> resourceDao = getResourceDao(); 254 SystemRequestDetails systemRequestDetails = SystemRequestDetails.forAllPartitions(); 255 List<IBaseResource> resourceList = new ArrayList<>(); 256 257 /* 258 * We are pretty lenient here, because any failure will block the server 259 * from starting up. We should generally never fail to load here, but it 260 * can potentially happen if a resource is manually deleted in the database 261 * (ie. marked with a deletion time manually) but the indexes aren't cleaned 262 * up. 263 */ 264 for (IIdType id : theResourceIds) { 265 IBaseResource read; 266 try { 267 read = resourceDao.read(id, systemRequestDetails); 268 } catch (BaseServerResponseException e) { 269 ourLog.warn("Unable to fetch resource {}", id, e); 270 continue; 271 } 272 resourceList.add(read); 273 } 274 handleInit(resourceList); 275 } 276 277 protected abstract void handleInit(@Nonnull List<IBaseResource> resourceList); 278 279 @Override 280 public void handleChange(@Nonnull IResourceChangeEvent theResourceChangeEvent) { 281 // For now ignore the contents of theResourceChangeEvent. In the future, consider updating the registry based 282 // on 283 // known resources that have been created, updated & deleted 284 requestRefresh(); 285 } 286 287 @Nonnull 288 protected abstract SearchParameterMap getSearchParameterMap(); 289}