001/*-
002 * #%L
003 * HAPI FHIR JPA - Search Parameters
004 * %%
005 * Copyright (C) 2014 - 2025 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.cache;
021
022import ca.uhn.fhir.i18n.Msg;
023import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
024import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
025import ca.uhn.fhir.jpa.searchparam.matcher.SearchParamMatcher;
026import ca.uhn.fhir.jpa.searchparam.retry.Retrier;
027import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
028import com.google.common.annotations.VisibleForTesting;
029import org.apache.commons.lang3.SerializationUtils;
030import org.apache.commons.lang3.builder.ToStringBuilder;
031import org.hl7.fhir.instance.model.api.IBaseResource;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034import org.springframework.beans.factory.annotation.Autowired;
035import org.springframework.context.annotation.Scope;
036import org.springframework.stereotype.Component;
037
038import java.time.Clock;
039import java.time.Duration;
040import java.time.Instant;
041import java.time.ZoneId;
042import java.util.concurrent.Semaphore;
043import java.util.concurrent.TimeUnit;
044
045@Component
046@Scope("prototype")
047public class ResourceChangeListenerCache implements IResourceChangeListenerCache {
048        private static final Logger ourLog = LoggerFactory.getLogger(ResourceChangeListenerCache.class);
049        /**
050         * Max number of retries to do for cache refreshing
051         */
052        private static final int MAX_RETRIES = 60;
053
054        /**
055         * Timeout in milliseconds for acquiring the semaphore lock (1 minute)
056         */
057        private static final long SEMAPHORE_TIMEOUT_MS = 60000;
058
059        private static Instant ourNowForUnitTests;
060
061        /**
062         * Semaphore used to control access to the cache refresh operation
063         */
064        private final Semaphore myCacheSemaphore = new Semaphore(1);
065
066        @Autowired
067        IResourceChangeListenerCacheRefresher myResourceChangeListenerCacheRefresher;
068
069        @Autowired
070        SearchParamMatcher mySearchParamMatcher;
071
072        private final String myResourceName;
073        private final IResourceChangeListener myResourceChangeListener;
074        private final SearchParameterMap mySearchParameterMap;
075        private final ResourceVersionCache myResourceVersionCache = new ResourceVersionCache();
076        private final long myRemoteRefreshIntervalMs;
077
078        private boolean myInitialized = false;
079        private Instant myNextRefreshTime = Instant.MIN;
080
081        public ResourceChangeListenerCache(
082                        String theResourceName,
083                        IResourceChangeListener theResourceChangeListener,
084                        SearchParameterMap theSearchParameterMap,
085                        long theRemoteRefreshIntervalMs) {
086                myResourceName = theResourceName;
087                myResourceChangeListener = theResourceChangeListener;
088                mySearchParameterMap = SerializationUtils.clone(theSearchParameterMap);
089                myRemoteRefreshIntervalMs = theRemoteRefreshIntervalMs;
090        }
091
092        /**
093         * Request that the cache be refreshed at the next convenient time (in a different thread)
094         */
095        @Override
096        public void requestRefresh() {
097                myNextRefreshTime = Instant.MIN;
098        }
099
100        /**
101         * Request that a cache be refreshed now, in the current thread
102         */
103        @Override
104        public ResourceChangeResult forceRefresh() {
105                requestRefresh();
106                return refreshCacheWithRetry();
107        }
108
109        /**
110         * Refresh the cache if theResource matches our SearchParameterMap
111         *
112         * @param theResource
113         */
114        public void requestRefreshIfWatching(IBaseResource theResource) {
115                if (matches(theResource)) {
116                        requestRefresh();
117                }
118        }
119
120        public boolean matches(IBaseResource theResource) {
121                InMemoryMatchResult result = mySearchParamMatcher.match(mySearchParameterMap, theResource);
122                if (!result.supported()) {
123                        // This should never happen since we enforce only in-memory SearchParamMaps at registration time
124                        throw new IllegalStateException(Msg.code(483) + "Search Parameter Map " + mySearchParameterMap
125                                        + " cannot be processed in-memory: " + result.getUnsupportedReason());
126                }
127                return result.matched();
128        }
129
130        @Override
131        public ResourceChangeResult refreshCacheIfNecessary() {
132                ResourceChangeResult retval = new ResourceChangeResult();
133                if (isTimeToRefresh()) {
134                        retval = refreshCacheWithRetry();
135                }
136                return retval;
137        }
138
139        protected boolean isTimeToRefresh() {
140                return myNextRefreshTime.isBefore(now());
141        }
142
143        static Instant now() {
144                if (ourNowForUnitTests != null) {
145                        return ourNowForUnitTests;
146                }
147                return Instant.now();
148        }
149
150        public ResourceChangeResult refreshCacheWithRetry() {
151                ResourceChangeResult retval;
152                try {
153                        retval = refreshCacheAndNotifyListenersWithRetry();
154                } finally {
155                        myNextRefreshTime = now().plus(Duration.ofMillis(myRemoteRefreshIntervalMs));
156                }
157                return retval;
158        }
159
160        @VisibleForTesting
161        public void setResourceChangeListenerCacheRefresher(
162                        IResourceChangeListenerCacheRefresher theResourceChangeListenerCacheRefresher) {
163                myResourceChangeListenerCacheRefresher = theResourceChangeListenerCacheRefresher;
164        }
165
166        private ResourceChangeResult refreshCacheAndNotifyListenersWithRetry() {
167                Retrier<ResourceChangeResult> refreshCacheRetrier =
168                                new Retrier<>(this::tryRefreshCacheAndNotifyListener, getMaxRetries());
169                return refreshCacheRetrier.runWithRetry();
170        }
171
172        private ResourceChangeResult tryRefreshCacheAndNotifyListener() {
173                boolean acquired = false;
174                try {
175                        // Try to acquire the semaphore with a timeout
176                        acquired = myCacheSemaphore.tryAcquire(SEMAPHORE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
177                        if (acquired) {
178                                return myResourceChangeListenerCacheRefresher.refreshCacheAndNotifyListener(this);
179                        } else {
180                                ourLog.warn(
181                                                "Timed out waiting {} ms to acquire lock for refreshing cache for resource {}",
182                                                SEMAPHORE_TIMEOUT_MS,
183                                                myResourceName);
184                                throw new InternalErrorException(
185                                                Msg.code(2702) + "Timed out waiting to acquire lock for refreshing cache");
186                        }
187                } catch (InterruptedException e) {
188                        Thread.currentThread().interrupt();
189                        throw new InternalErrorException(Msg.code(2703) + "Interrupted while waiting to refresh cache", e);
190                } finally {
191                        if (acquired) {
192                                myCacheSemaphore.release();
193                        }
194                }
195        }
196
197        @Override
198        public Instant getNextRefreshTime() {
199                return myNextRefreshTime;
200        }
201
202        @Override
203        public SearchParameterMap getSearchParameterMap() {
204                return mySearchParameterMap;
205        }
206
207        @Override
208        public boolean isInitialized() {
209                return myInitialized;
210        }
211
212        public ResourceChangeListenerCache setInitialized(boolean theInitialized) {
213                myInitialized = theInitialized;
214                return this;
215        }
216
217        @Override
218        public String getResourceName() {
219                return myResourceName;
220        }
221
222        public ResourceVersionCache getResourceVersionCache() {
223                return myResourceVersionCache;
224        }
225
226        public IResourceChangeListener getResourceChangeListener() {
227                return myResourceChangeListener;
228        }
229
230        /**
231         * @param theTime has format like "12:34:56" i.e. HH:MM:SS
232         */
233        @VisibleForTesting
234        public static void setNowForUnitTests(String theTime) {
235                if (theTime == null) {
236                        ourNowForUnitTests = null;
237                        return;
238                }
239                String datetime = "2020-11-16T" + theTime + "Z";
240                Clock clock = Clock.fixed(Instant.parse(datetime), ZoneId.systemDefault());
241                ourNowForUnitTests = Instant.now(clock);
242        }
243
244        @VisibleForTesting
245        Instant getNextRefreshTimeForUnitTest() {
246                return myNextRefreshTime;
247        }
248
249        @VisibleForTesting
250        public void clearForUnitTest() {
251                requestRefresh();
252                myResourceVersionCache.clear();
253        }
254
255        @Override
256        public String toString() {
257                return new ToStringBuilder(this)
258                                .append("myResourceName", myResourceName)
259                                .append("mySearchParameterMap", mySearchParameterMap)
260                                .append("myInitialized", myInitialized)
261                                .toString();
262        }
263
264        static int getMaxRetries() {
265                return MAX_RETRIES;
266        }
267}