001/*- 002 * #%L 003 * HAPI FHIR Storage api 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.cache; 021 022import ca.uhn.fhir.IHapiBootOrder; 023import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 024import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 025import ca.uhn.fhir.jpa.cache.IResourceChangeEvent; 026import ca.uhn.fhir.jpa.cache.IResourceChangeListener; 027import ca.uhn.fhir.jpa.cache.IResourceChangeListenerCache; 028import ca.uhn.fhir.jpa.cache.IResourceChangeListenerRegistry; 029import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 030import ca.uhn.fhir.jpa.searchparam.retry.Retrier; 031import ca.uhn.fhir.rest.api.server.SystemRequestDetails; 032import com.google.common.annotations.VisibleForTesting; 033import jakarta.annotation.Nonnull; 034import jakarta.annotation.PreDestroy; 035import org.apache.commons.lang3.time.DateUtils; 036import org.hl7.fhir.instance.model.api.IBaseResource; 037import org.hl7.fhir.instance.model.api.IIdType; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040import org.springframework.beans.factory.annotation.Autowired; 041import org.springframework.context.event.ContextClosedEvent; 042import org.springframework.context.event.ContextRefreshedEvent; 043import org.springframework.context.event.ContextStartedEvent; 044import org.springframework.context.event.EventListener; 045import org.springframework.core.annotation.Order; 046 047import java.util.Collection; 048import java.util.List; 049import java.util.concurrent.Semaphore; 050import java.util.stream.Collectors; 051 052public abstract class BaseResourceCacheSynchronizer implements IResourceChangeListener { 053 private static final Logger ourLog = LoggerFactory.getLogger(BaseResourceCacheSynchronizer.class); 054 public static final int MAX_RETRIES = 60; // 60 * 5 seconds = 5 minutes 055 public static final long REFRESH_INTERVAL = DateUtils.MILLIS_PER_MINUTE; 056 private final String myResourceName; 057 058 @Autowired 059 private IResourceChangeListenerRegistry myResourceChangeListenerRegistry; 060 061 @Autowired 062 DaoRegistry myDaoRegistry; 063 064 private SearchParameterMap mySearchParameterMap; 065 private SystemRequestDetails mySystemRequestDetails; 066 private boolean myStopping; 067 private final Semaphore mySyncResourcesSemaphore = new Semaphore(1); 068 private final Object mySyncResourcesLock = new Object(); 069 070 protected BaseResourceCacheSynchronizer(String theResourceName) { 071 myResourceName = theResourceName; 072 } 073 074 protected BaseResourceCacheSynchronizer( 075 String theResourceName, 076 IResourceChangeListenerRegistry theResourceChangeListenerRegistry, 077 DaoRegistry theDaoRegistry) { 078 myResourceName = theResourceName; 079 myDaoRegistry = theDaoRegistry; 080 myResourceChangeListenerRegistry = theResourceChangeListenerRegistry; 081 } 082 083 /** 084 * This method performs a search in the DB, so use the {@link ContextStartedEvent} 085 * to ensure that it runs after the database initializer 086 */ 087 @EventListener(classes = ContextRefreshedEvent.class) 088 @Order(IHapiBootOrder.AFTER_SUBSCRIPTION_INITIALIZED) 089 public void registerListener() { 090 if (myDaoRegistry.getResourceDaoOrNull(myResourceName) == null) { 091 ourLog.info("No resource DAO found for resource type {}, not registering listener", myResourceName); 092 return; 093 } 094 mySystemRequestDetails = SystemRequestDetails.forAllPartitions(); 095 096 IResourceChangeListenerCache resourceCache = 097 myResourceChangeListenerRegistry.registerResourceResourceChangeListener( 098 myResourceName, provideSearchParameterMap(), this, REFRESH_INTERVAL); 099 resourceCache.forceRefresh(); 100 } 101 102 private SearchParameterMap provideSearchParameterMap() { 103 SearchParameterMap searchParameterMap = mySearchParameterMap; 104 if (searchParameterMap == null) { 105 searchParameterMap = getSearchParameterMap(); 106 mySearchParameterMap = searchParameterMap; 107 } 108 return searchParameterMap; 109 } 110 111 @PreDestroy 112 public void unregisterListener() { 113 myResourceChangeListenerRegistry.unregisterResourceResourceChangeListener(this); 114 } 115 116 private boolean resourceDaoExists() { 117 return myDaoRegistry != null && myDaoRegistry.isResourceTypeSupported(myResourceName); 118 } 119 120 /** 121 * Read the existing resources from the database 122 */ 123 public void syncDatabaseToCache() { 124 if (!resourceDaoExists()) { 125 return; 126 } 127 if (!mySyncResourcesSemaphore.tryAcquire()) { 128 return; 129 } 130 try { 131 doSyncResourcesWithRetry(); 132 } finally { 133 mySyncResourcesSemaphore.release(); 134 } 135 } 136 137 @VisibleForTesting 138 public void acquireSemaphoreForUnitTest() throws InterruptedException { 139 mySyncResourcesSemaphore.acquire(); 140 } 141 142 @VisibleForTesting 143 public int doSyncResourcesForUnitTest() { 144 // Two passes for delete flag to take effect 145 int first = doSyncResourcesWithRetry(); 146 int second = doSyncResourcesWithRetry(); 147 return first + second; 148 } 149 150 synchronized int doSyncResourcesWithRetry() { 151 // retry runs MAX_RETRIES times 152 // and if errors result every time, it will fail 153 Retrier<Integer> syncResourceRetrier = new Retrier<>(this::doSyncResources, MAX_RETRIES); 154 return syncResourceRetrier.runWithRetry(); 155 } 156 157 @SuppressWarnings("unchecked") 158 private int doSyncResources() { 159 if (isStopping()) { 160 return 0; 161 } 162 163 synchronized (mySyncResourcesLock) { 164 ourLog.debug("Starting sync {}s", myResourceName); 165 166 List<IBaseResource> resourceList = (List<IBaseResource>) 167 getResourceDao().searchForResources(provideSearchParameterMap(), mySystemRequestDetails); 168 return syncResourcesIntoCache(resourceList); 169 } 170 } 171 172 protected abstract int syncResourcesIntoCache(List<IBaseResource> resourceList); 173 174 @EventListener(ContextRefreshedEvent.class) 175 public void start() { 176 myStopping = false; 177 } 178 179 @EventListener(ContextClosedEvent.class) 180 public void shutdown() { 181 myStopping = true; 182 } 183 184 private boolean isStopping() { 185 return myStopping; 186 } 187 188 private IFhirResourceDao<?> getResourceDao() { 189 return myDaoRegistry.getResourceDao(myResourceName); 190 } 191 192 @Override 193 public void handleInit(Collection<IIdType> theResourceIds) { 194 if (!resourceDaoExists()) { 195 ourLog.warn( 196 "The resource type {} is enabled on this server, but there is no {} DAO configured.", 197 myResourceName, 198 myResourceName); 199 return; 200 } 201 IFhirResourceDao<?> resourceDao = getResourceDao(); 202 SystemRequestDetails systemRequestDetails = SystemRequestDetails.forAllPartitions(); 203 List<IBaseResource> resourceList = theResourceIds.stream() 204 .map(n -> resourceDao.read(n, systemRequestDetails)) 205 .collect(Collectors.toList()); 206 handleInit(resourceList); 207 } 208 209 protected abstract void handleInit(List<IBaseResource> resourceList); 210 211 @Override 212 public void handleChange(IResourceChangeEvent theResourceChangeEvent) { 213 // For now ignore the contents of theResourceChangeEvent. In the future, consider updating the registry based 214 // on 215 // known resources that have been created, updated & deleted 216 syncDatabaseToCache(); 217 } 218 219 @Nonnull 220 protected abstract SearchParameterMap getSearchParameterMap(); 221}