
001/*- 002 * #%L 003 * HAPI FHIR Storage api 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.cache; 021 022import ca.uhn.fhir.IHapiBootOrder; 023import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 024import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 025import ca.uhn.fhir.jpa.cache.IResourceChangeEvent; 026import ca.uhn.fhir.jpa.cache.IResourceChangeListener; 027import ca.uhn.fhir.jpa.cache.IResourceChangeListenerCache; 028import ca.uhn.fhir.jpa.cache.IResourceChangeListenerRegistry; 029import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 030import ca.uhn.fhir.jpa.searchparam.retry.Retrier; 031import ca.uhn.fhir.rest.api.server.SystemRequestDetails; 032import com.google.common.annotations.VisibleForTesting; 033import jakarta.annotation.Nonnull; 034import jakarta.annotation.PreDestroy; 035import org.apache.commons.lang3.time.DateUtils; 036import org.hl7.fhir.instance.model.api.IBaseResource; 037import org.hl7.fhir.instance.model.api.IIdType; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040import org.springframework.beans.factory.annotation.Autowired; 041import org.springframework.context.event.ContextClosedEvent; 042import org.springframework.context.event.ContextRefreshedEvent; 043import org.springframework.context.event.ContextStartedEvent; 044import org.springframework.context.event.EventListener; 045import org.springframework.core.annotation.Order; 046 047import java.util.Collection; 048import java.util.List; 049import java.util.concurrent.Semaphore; 050import java.util.stream.Collectors; 051 052public abstract class BaseResourceCacheSynchronizer implements IResourceChangeListener { 053 private static final Logger ourLog = LoggerFactory.getLogger(BaseResourceCacheSynchronizer.class); 054 public static final int MAX_RETRIES = 60; // 60 * 5 seconds = 5 minutes 055 public static final long REFRESH_INTERVAL = DateUtils.MILLIS_PER_MINUTE; 056 private final String myResourceName; 057 058 @Autowired 059 private IResourceChangeListenerRegistry myResourceChangeListenerRegistry; 060 061 @Autowired 062 private DaoRegistry myDaoRegistry; 063 064 private SearchParameterMap mySearchParameterMap; 065 private SystemRequestDetails mySystemRequestDetails; 066 private boolean myStopping; 067 private final Semaphore mySyncResourcesSemaphore = new Semaphore(1); 068 private final Object mySyncResourcesLock = new Object(); 069 070 private Integer myMaxRetryCount = null; 071 072 protected BaseResourceCacheSynchronizer(String theResourceName) { 073 myResourceName = theResourceName; 074 } 075 076 protected BaseResourceCacheSynchronizer( 077 String theResourceName, 078 IResourceChangeListenerRegistry theResourceChangeListenerRegistry, 079 DaoRegistry theDaoRegistry) { 080 myResourceName = theResourceName; 081 myDaoRegistry = theDaoRegistry; 082 myResourceChangeListenerRegistry = theResourceChangeListenerRegistry; 083 } 084 085 /** 086 * This method performs a search in the DB, so use the {@link ContextStartedEvent} 087 * to ensure that it runs after the database initializer 088 */ 089 @EventListener(classes = ContextRefreshedEvent.class) 090 @Order(IHapiBootOrder.AFTER_SUBSCRIPTION_INITIALIZED) 091 public void registerListener() { 092 if (myDaoRegistry.getResourceDaoOrNull(myResourceName) == null) { 093 ourLog.info("No resource DAO found for resource type {}, not registering listener", myResourceName); 094 return; 095 } 096 mySystemRequestDetails = SystemRequestDetails.forAllPartitions(); 097 098 IResourceChangeListenerCache resourceCache = 099 myResourceChangeListenerRegistry.registerResourceResourceChangeListener( 100 myResourceName, provideSearchParameterMap(), this, REFRESH_INTERVAL); 101 resourceCache.forceRefresh(); 102 } 103 104 private SearchParameterMap provideSearchParameterMap() { 105 SearchParameterMap searchParameterMap = mySearchParameterMap; 106 if (searchParameterMap == null) { 107 searchParameterMap = getSearchParameterMap(); 108 mySearchParameterMap = searchParameterMap; 109 } 110 return searchParameterMap; 111 } 112 113 @PreDestroy 114 public void unregisterListener() { 115 myResourceChangeListenerRegistry.unregisterResourceResourceChangeListener(this); 116 } 117 118 private boolean resourceDaoExists() { 119 return myDaoRegistry != null && myDaoRegistry.isResourceTypeSupported(myResourceName); 120 } 121 122 /** 123 * Read the existing resources from the database 124 */ 125 public void syncDatabaseToCache() { 126 if (!resourceDaoExists()) { 127 return; 128 } 129 if (!mySyncResourcesSemaphore.tryAcquire()) { 130 return; 131 } 132 try { 133 doSyncResourcesWithRetry(); 134 } finally { 135 mySyncResourcesSemaphore.release(); 136 } 137 } 138 139 @VisibleForTesting 140 public void acquireSemaphoreForUnitTest() throws InterruptedException { 141 mySyncResourcesSemaphore.acquire(); 142 } 143 144 @VisibleForTesting 145 public int doSyncResourcesForUnitTest() { 146 // Two passes for delete flag to take effect 147 int first = doSyncResourcesWithRetry(); 148 int second = doSyncResourcesWithRetry(); 149 return first + second; 150 } 151 152 synchronized int doSyncResourcesWithRetry() { 153 // retry runs MAX_RETRIES times 154 // and if errors result every time, it will fail 155 Retrier<Integer> syncResourceRetrier = new Retrier<>(this::doSyncResources, getMaxRetries()); 156 return syncResourceRetrier.runWithRetry(); 157 } 158 159 private int getMaxRetries() { 160 if (myMaxRetryCount != null) { 161 return myMaxRetryCount; 162 } 163 return MAX_RETRIES; 164 } 165 166 @VisibleForTesting 167 public void setMaxRetries(Integer theMaxRetries) { 168 myMaxRetryCount = theMaxRetries; 169 } 170 171 @SuppressWarnings("unchecked") 172 private int doSyncResources() { 173 if (isStopping()) { 174 return 0; 175 } 176 177 synchronized (mySyncResourcesLock) { 178 ourLog.debug("Starting sync {}s", myResourceName); 179 180 List<IBaseResource> resourceList = (List<IBaseResource>) 181 getResourceDao().searchForResources(provideSearchParameterMap(), mySystemRequestDetails); 182 return syncResourcesIntoCache(resourceList); 183 } 184 } 185 186 protected abstract int syncResourcesIntoCache(List<IBaseResource> resourceList); 187 188 @EventListener(ContextRefreshedEvent.class) 189 public void start() { 190 myStopping = false; 191 } 192 193 @EventListener(ContextClosedEvent.class) 194 public void shutdown() { 195 myStopping = true; 196 } 197 198 private boolean isStopping() { 199 return myStopping; 200 } 201 202 private IFhirResourceDao<?> getResourceDao() { 203 return myDaoRegistry.getResourceDao(myResourceName); 204 } 205 206 @Override 207 public void handleInit(Collection<IIdType> theResourceIds) { 208 if (!resourceDaoExists()) { 209 ourLog.warn( 210 "The resource type {} is enabled on this server, but there is no {} DAO configured.", 211 myResourceName, 212 myResourceName); 213 return; 214 } 215 IFhirResourceDao<?> resourceDao = getResourceDao(); 216 SystemRequestDetails systemRequestDetails = SystemRequestDetails.forAllPartitions(); 217 List<IBaseResource> resourceList = theResourceIds.stream() 218 .map(n -> resourceDao.read(n, systemRequestDetails)) 219 .collect(Collectors.toList()); 220 handleInit(resourceList); 221 } 222 223 protected abstract void handleInit(List<IBaseResource> resourceList); 224 225 @Override 226 public void handleChange(IResourceChangeEvent theResourceChangeEvent) { 227 // For now ignore the contents of theResourceChangeEvent. In the future, consider updating the registry based 228 // on 229 // known resources that have been created, updated & deleted 230 syncDatabaseToCache(); 231 } 232 233 @Nonnull 234 protected abstract SearchParameterMap getSearchParameterMap(); 235}