
001/*- 002 * #%L 003 * HAPI FHIR JPA - Search Parameters 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.cache; 021 022import ca.uhn.fhir.i18n.Msg; 023import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 024import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult; 025import ca.uhn.fhir.jpa.searchparam.matcher.SearchParamMatcher; 026import ca.uhn.fhir.jpa.searchparam.retry.Retrier; 027import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 028import com.google.common.annotations.VisibleForTesting; 029import org.apache.commons.lang3.SerializationUtils; 030import org.apache.commons.lang3.builder.ToStringBuilder; 031import org.hl7.fhir.instance.model.api.IBaseResource; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034import org.springframework.beans.factory.annotation.Autowired; 035import org.springframework.context.annotation.Scope; 036import org.springframework.stereotype.Component; 037 038import java.time.Clock; 039import java.time.Duration; 040import java.time.Instant; 041import java.time.ZoneId; 042import java.util.concurrent.Semaphore; 043import java.util.concurrent.TimeUnit; 044 045@Component 046@Scope("prototype") 047public class ResourceChangeListenerCache implements IResourceChangeListenerCache { 048 private static final Logger ourLog = LoggerFactory.getLogger(ResourceChangeListenerCache.class); 049 /** 050 * Max number of retries to do for cache refreshing 051 */ 052 private static final int MAX_RETRIES = 60; 053 054 /** 055 * Timeout in milliseconds for acquiring the semaphore lock (1 minute) 056 */ 057 private static final long SEMAPHORE_TIMEOUT_MS = 60000; 058 059 private static Instant ourNowForUnitTests; 060 061 /** 062 * Semaphore used to control access to the cache refresh operation 063 */ 064 private final Semaphore myCacheSemaphore = new Semaphore(1); 065 066 @Autowired 067 IResourceChangeListenerCacheRefresher myResourceChangeListenerCacheRefresher; 068 069 @Autowired 070 SearchParamMatcher mySearchParamMatcher; 071 072 private final String myResourceName; 073 private final IResourceChangeListener myResourceChangeListener; 074 private final SearchParameterMap mySearchParameterMap; 075 private final ResourceVersionCache myResourceVersionCache = new ResourceVersionCache(); 076 private final long myRemoteRefreshIntervalMs; 077 078 private boolean myInitialized = false; 079 private Instant myNextRefreshTime = Instant.MIN; 080 081 public ResourceChangeListenerCache( 082 String theResourceName, 083 IResourceChangeListener theResourceChangeListener, 084 SearchParameterMap theSearchParameterMap, 085 long theRemoteRefreshIntervalMs) { 086 myResourceName = theResourceName; 087 myResourceChangeListener = theResourceChangeListener; 088 mySearchParameterMap = SerializationUtils.clone(theSearchParameterMap); 089 myRemoteRefreshIntervalMs = theRemoteRefreshIntervalMs; 090 } 091 092 /** 093 * Request that the cache be refreshed at the next convenient time (in a different thread) 094 */ 095 @Override 096 public void requestRefresh() { 097 myNextRefreshTime = Instant.MIN; 098 } 099 100 /** 101 * Request that a cache be refreshed now, in the current thread 102 */ 103 @Override 104 public ResourceChangeResult forceRefresh() { 105 requestRefresh(); 106 return refreshCacheWithRetry(); 107 } 108 109 /** 110 * Refresh the cache if theResource matches our SearchParameterMap 111 * 112 * @param theResource 113 */ 114 public void requestRefreshIfWatching(IBaseResource theResource) { 115 if (matches(theResource)) { 116 requestRefresh(); 117 } 118 } 119 120 public boolean matches(IBaseResource theResource) { 121 InMemoryMatchResult result = mySearchParamMatcher.match(mySearchParameterMap, theResource); 122 if (!result.supported()) { 123 // This should never happen since we enforce only in-memory SearchParamMaps at registration time 124 throw new IllegalStateException(Msg.code(483) + "Search Parameter Map " + mySearchParameterMap 125 + " cannot be processed in-memory: " + result.getUnsupportedReason()); 126 } 127 return result.matched(); 128 } 129 130 @Override 131 public ResourceChangeResult refreshCacheIfNecessary() { 132 ResourceChangeResult retval = new ResourceChangeResult(); 133 if (isTimeToRefresh()) { 134 retval = refreshCacheWithRetry(); 135 } 136 return retval; 137 } 138 139 protected boolean isTimeToRefresh() { 140 return myNextRefreshTime.isBefore(now()); 141 } 142 143 static Instant now() { 144 if (ourNowForUnitTests != null) { 145 return ourNowForUnitTests; 146 } 147 return Instant.now(); 148 } 149 150 public ResourceChangeResult refreshCacheWithRetry() { 151 ResourceChangeResult retval; 152 try { 153 retval = refreshCacheAndNotifyListenersWithRetry(); 154 } finally { 155 myNextRefreshTime = now().plus(Duration.ofMillis(myRemoteRefreshIntervalMs)); 156 } 157 return retval; 158 } 159 160 @VisibleForTesting 161 public void setResourceChangeListenerCacheRefresher( 162 IResourceChangeListenerCacheRefresher theResourceChangeListenerCacheRefresher) { 163 myResourceChangeListenerCacheRefresher = theResourceChangeListenerCacheRefresher; 164 } 165 166 private ResourceChangeResult refreshCacheAndNotifyListenersWithRetry() { 167 Retrier<ResourceChangeResult> refreshCacheRetrier = 168 new Retrier<>(this::tryRefreshCacheAndNotifyListener, getMaxRetries()); 169 return refreshCacheRetrier.runWithRetry(); 170 } 171 172 private ResourceChangeResult tryRefreshCacheAndNotifyListener() { 173 boolean acquired = false; 174 try { 175 // Try to acquire the semaphore with a timeout 176 acquired = myCacheSemaphore.tryAcquire(SEMAPHORE_TIMEOUT_MS, TimeUnit.MILLISECONDS); 177 if (acquired) { 178 return myResourceChangeListenerCacheRefresher.refreshCacheAndNotifyListener(this); 179 } else { 180 ourLog.warn( 181 "Timed out waiting {} ms to acquire lock for refreshing cache for resource {}", 182 SEMAPHORE_TIMEOUT_MS, 183 myResourceName); 184 throw new InternalErrorException( 185 Msg.code(2702) + "Timed out waiting to acquire lock for refreshing cache"); 186 } 187 } catch (InterruptedException e) { 188 Thread.currentThread().interrupt(); 189 throw new InternalErrorException(Msg.code(2703) + "Interrupted while waiting to refresh cache", e); 190 } finally { 191 if (acquired) { 192 myCacheSemaphore.release(); 193 } 194 } 195 } 196 197 @Override 198 public Instant getNextRefreshTime() { 199 return myNextRefreshTime; 200 } 201 202 @Override 203 public SearchParameterMap getSearchParameterMap() { 204 return mySearchParameterMap; 205 } 206 207 @Override 208 public boolean isInitialized() { 209 return myInitialized; 210 } 211 212 public ResourceChangeListenerCache setInitialized(boolean theInitialized) { 213 myInitialized = theInitialized; 214 return this; 215 } 216 217 @Override 218 public String getResourceName() { 219 return myResourceName; 220 } 221 222 public ResourceVersionCache getResourceVersionCache() { 223 return myResourceVersionCache; 224 } 225 226 public IResourceChangeListener getResourceChangeListener() { 227 return myResourceChangeListener; 228 } 229 230 /** 231 * @param theTime has format like "12:34:56" i.e. HH:MM:SS 232 */ 233 @VisibleForTesting 234 public static void setNowForUnitTests(String theTime) { 235 if (theTime == null) { 236 ourNowForUnitTests = null; 237 return; 238 } 239 String datetime = "2020-11-16T" + theTime + "Z"; 240 Clock clock = Clock.fixed(Instant.parse(datetime), ZoneId.systemDefault()); 241 ourNowForUnitTests = Instant.now(clock); 242 } 243 244 @VisibleForTesting 245 Instant getNextRefreshTimeForUnitTest() { 246 return myNextRefreshTime; 247 } 248 249 @VisibleForTesting 250 public void clearForUnitTest() { 251 requestRefresh(); 252 myResourceVersionCache.clear(); 253 } 254 255 @Override 256 public String toString() { 257 return new ToStringBuilder(this) 258 .append("myResourceName", myResourceName) 259 .append("mySearchParameterMap", mySearchParameterMap) 260 .append("myInitialized", myInitialized) 261 .toString(); 262 } 263 264 static int getMaxRetries() { 265 return MAX_RETRIES; 266 } 267}