
001/*- 002 * #%L 003 * HAPI FHIR Storage api 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.cache; 021 022import ca.uhn.fhir.IHapiBootOrder; 023import ca.uhn.fhir.context.ConfigurationException; 024import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 025import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 026import ca.uhn.fhir.jpa.cache.IResourceChangeEvent; 027import ca.uhn.fhir.jpa.cache.IResourceChangeListener; 028import ca.uhn.fhir.jpa.cache.IResourceChangeListenerCache; 029import ca.uhn.fhir.jpa.cache.IResourceChangeListenerRegistry; 030import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 031import ca.uhn.fhir.jpa.searchparam.retry.Retrier; 032import ca.uhn.fhir.rest.api.server.SystemRequestDetails; 033import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 034import ca.uhn.fhir.rest.server.util.IResourceRepositoryCache; 035import com.google.common.annotations.VisibleForTesting; 036import jakarta.annotation.Nonnull; 037import jakarta.annotation.PreDestroy; 038import org.apache.commons.lang3.time.DateUtils; 039import org.hl7.fhir.instance.model.api.IBaseResource; 040import org.hl7.fhir.instance.model.api.IIdType; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043import org.springframework.beans.factory.annotation.Autowired; 044import org.springframework.context.event.ContextClosedEvent; 045import org.springframework.context.event.ContextRefreshedEvent; 046import org.springframework.context.event.ContextStartedEvent; 047import org.springframework.context.event.EventListener; 048import org.springframework.core.annotation.Order; 049 050import java.util.Collection; 051import java.util.List; 052import java.util.concurrent.Semaphore; 053import java.util.concurrent.TimeUnit; 054import java.util.stream.Collectors; 055 056public abstract class BaseResourceCacheSynchronizer implements IResourceChangeListener, IResourceRepositoryCache { 057 private static final Logger ourLog = LoggerFactory.getLogger(BaseResourceCacheSynchronizer.class); 058 public static final int MAX_RETRIES = 60; // 60 * 5 seconds = 5 minutes 059 public static final long REFRESH_INTERVAL = DateUtils.MILLIS_PER_MINUTE; 060 private static final long FORCE_REFRESH_TIMEOUT_SECONDS = 300; 061 private final String myResourceName; 062 063 @Autowired 064 private IResourceChangeListenerRegistry myResourceChangeListenerRegistry; 065 066 @Autowired 067 private DaoRegistry myDaoRegistry; 068 069 private SearchParameterMap mySearchParameterMap; 070 private SystemRequestDetails mySystemRequestDetails; 071 private boolean myStopping; 072 private final Semaphore mySyncResourcesSemaphore = new Semaphore(1); 073 private final Object mySyncResourcesLock = new Object(); 074 075 private Integer myMaxRetryCount = null; 076 077 protected BaseResourceCacheSynchronizer(String theResourceName) { 078 myResourceName = theResourceName; 079 } 080 081 protected BaseResourceCacheSynchronizer( 082 String theResourceName, 083 IResourceChangeListenerRegistry theResourceChangeListenerRegistry, 084 DaoRegistry theDaoRegistry) { 085 myResourceName = theResourceName; 086 myDaoRegistry = theDaoRegistry; 087 myResourceChangeListenerRegistry = theResourceChangeListenerRegistry; 088 } 089 090 /** 091 * This method performs a search in the DB, so use the {@link ContextStartedEvent} 092 * to ensure that it runs after the database initializer 093 */ 094 @EventListener(classes = ContextRefreshedEvent.class) 095 @Order(IHapiBootOrder.AFTER_SUBSCRIPTION_INITIALIZED) 096 public void registerListener() { 097 if (myDaoRegistry.getResourceDaoOrNull(myResourceName) == null) { 098 ourLog.info("No resource DAO found for resource type {}, not registering listener", myResourceName); 099 return; 100 } 101 mySystemRequestDetails = SystemRequestDetails.forAllPartitions(); 102 103 IResourceChangeListenerCache resourceCache = 104 myResourceChangeListenerRegistry.registerResourceResourceChangeListener( 105 myResourceName, provideSearchParameterMap(), this, REFRESH_INTERVAL); 106 resourceCache.forceRefresh(); 107 } 108 109 private SearchParameterMap provideSearchParameterMap() { 110 SearchParameterMap searchParameterMap = mySearchParameterMap; 111 if (searchParameterMap == null) { 112 searchParameterMap = getSearchParameterMap(); 113 mySearchParameterMap = searchParameterMap; 114 } 115 return searchParameterMap; 116 } 117 118 @PreDestroy 119 public void unregisterListener() { 120 myResourceChangeListenerRegistry.unregisterResourceResourceChangeListener(this); 121 } 122 123 private boolean daoNotAvailable() { 124 return myDaoRegistry == null || !myDaoRegistry.isResourceTypeSupported(myResourceName); 125 } 126 127 @Override 128 public void requestRefresh() { 129 if (daoNotAvailable()) { 130 return; 131 } 132 if (!mySyncResourcesSemaphore.tryAcquire()) { 133 return; 134 } 135 try { 136 doSyncResourcesWithRetry(); 137 } finally { 138 mySyncResourcesSemaphore.release(); 139 } 140 } 141 142 @VisibleForTesting 143 @Override 144 public void forceRefresh() { 145 if (daoNotAvailable()) { 146 throw new ConfigurationException("Attempt to force refresh without a dao"); 147 } 148 try { 149 if (mySyncResourcesSemaphore.tryAcquire(FORCE_REFRESH_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { 150 doSyncResourcesWithRetry(); 151 } else { 152 String errorMessage = String.format( 153 "Timed out waiting %s %s to refresh %s cache", 154 FORCE_REFRESH_TIMEOUT_SECONDS, "seconds", myResourceName); 155 ourLog.error(errorMessage); 156 throw new ConfigurationException(errorMessage); 157 } 158 } catch (InterruptedException e) { 159 Thread.currentThread().interrupt(); 160 throw new InternalErrorException(e); 161 } finally { 162 mySyncResourcesSemaphore.release(); 163 } 164 } 165 166 @VisibleForTesting 167 public void acquireSemaphoreForUnitTest() throws InterruptedException { 168 mySyncResourcesSemaphore.acquire(); 169 } 170 171 @VisibleForTesting 172 public int doSyncResourcesForUnitTest() { 173 // Two passes for delete flag to take effect 174 int first = doSyncResourcesWithRetry(); 175 int second = doSyncResourcesWithRetry(); 176 return first + second; 177 } 178 179 synchronized int doSyncResourcesWithRetry() { 180 // retry runs MAX_RETRIES times 181 // and if errors result every time, it will fail 182 Retrier<Integer> syncResourceRetrier = new Retrier<>(this::doSyncResources, getMaxRetries()); 183 return syncResourceRetrier.runWithRetry(); 184 } 185 186 private int getMaxRetries() { 187 if (myMaxRetryCount != null) { 188 return myMaxRetryCount; 189 } 190 return MAX_RETRIES; 191 } 192 193 @VisibleForTesting 194 public void setMaxRetries(Integer theMaxRetries) { 195 myMaxRetryCount = theMaxRetries; 196 } 197 198 @SuppressWarnings("unchecked") 199 private int doSyncResources() { 200 if (isStopping()) { 201 return 0; 202 } 203 204 synchronized (mySyncResourcesLock) { 205 ourLog.debug("Starting sync {}s", myResourceName); 206 207 List<IBaseResource> resourceList = (List<IBaseResource>) 208 getResourceDao().searchForResources(provideSearchParameterMap(), mySystemRequestDetails); 209 return syncResourcesIntoCache(resourceList); 210 } 211 } 212 213 protected abstract int syncResourcesIntoCache(@Nonnull List<IBaseResource> resourceList); 214 215 @EventListener(ContextRefreshedEvent.class) 216 public void start() { 217 myStopping = false; 218 } 219 220 @EventListener(ContextClosedEvent.class) 221 public void shutdown() { 222 myStopping = true; 223 } 224 225 private boolean isStopping() { 226 return myStopping; 227 } 228 229 private IFhirResourceDao<?> getResourceDao() { 230 return myDaoRegistry.getResourceDao(myResourceName); 231 } 232 233 @Override 234 public void handleInit(@Nonnull Collection<IIdType> theResourceIds) { 235 if (daoNotAvailable()) { 236 ourLog.warn( 237 "The resource type {} is enabled on this server, but there is no {} DAO configured.", 238 myResourceName, 239 myResourceName); 240 return; 241 } 242 IFhirResourceDao<?> resourceDao = getResourceDao(); 243 SystemRequestDetails systemRequestDetails = SystemRequestDetails.forAllPartitions(); 244 List<IBaseResource> resourceList = theResourceIds.stream() 245 .map(n -> resourceDao.read(n, systemRequestDetails)) 246 .collect(Collectors.toList()); 247 handleInit(resourceList); 248 } 249 250 protected abstract void handleInit(@Nonnull List<IBaseResource> resourceList); 251 252 @Override 253 public void handleChange(@Nonnull IResourceChangeEvent theResourceChangeEvent) { 254 // For now ignore the contents of theResourceChangeEvent. In the future, consider updating the registry based 255 // on 256 // known resources that have been created, updated & deleted 257 requestRefresh(); 258 } 259 260 @Nonnull 261 protected abstract SearchParameterMap getSearchParameterMap(); 262}