001/*-
002 * #%L
003 * HAPI FHIR Storage api
004 * %%
005 * Copyright (C) 2014 - 2025 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.cache;
021
022import ca.uhn.fhir.IHapiBootOrder;
023import ca.uhn.fhir.context.ConfigurationException;
024import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
025import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
026import ca.uhn.fhir.jpa.cache.IResourceChangeEvent;
027import ca.uhn.fhir.jpa.cache.IResourceChangeListener;
028import ca.uhn.fhir.jpa.cache.IResourceChangeListenerCache;
029import ca.uhn.fhir.jpa.cache.IResourceChangeListenerRegistry;
030import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
031import ca.uhn.fhir.jpa.searchparam.retry.Retrier;
032import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
033import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
034import ca.uhn.fhir.rest.server.util.IResourceRepositoryCache;
035import com.google.common.annotations.VisibleForTesting;
036import jakarta.annotation.Nonnull;
037import jakarta.annotation.PreDestroy;
038import org.apache.commons.lang3.time.DateUtils;
039import org.hl7.fhir.instance.model.api.IBaseResource;
040import org.hl7.fhir.instance.model.api.IIdType;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043import org.springframework.beans.factory.annotation.Autowired;
044import org.springframework.context.event.ContextClosedEvent;
045import org.springframework.context.event.ContextRefreshedEvent;
046import org.springframework.context.event.ContextStartedEvent;
047import org.springframework.context.event.EventListener;
048import org.springframework.core.annotation.Order;
049
050import java.util.Collection;
051import java.util.List;
052import java.util.concurrent.Semaphore;
053import java.util.concurrent.TimeUnit;
054import java.util.stream.Collectors;
055
056public abstract class BaseResourceCacheSynchronizer implements IResourceChangeListener, IResourceRepositoryCache {
057        private static final Logger ourLog = LoggerFactory.getLogger(BaseResourceCacheSynchronizer.class);
058        public static final int MAX_RETRIES = 60; // 60 * 5 seconds = 5 minutes
059        public static final long REFRESH_INTERVAL = DateUtils.MILLIS_PER_MINUTE;
060        private static final long FORCE_REFRESH_TIMEOUT_SECONDS = 300;
061        private final String myResourceName;
062
063        @Autowired
064        private IResourceChangeListenerRegistry myResourceChangeListenerRegistry;
065
066        @Autowired
067        private DaoRegistry myDaoRegistry;
068
069        private SearchParameterMap mySearchParameterMap;
070        private SystemRequestDetails mySystemRequestDetails;
071        private boolean myStopping;
072        private final Semaphore mySyncResourcesSemaphore = new Semaphore(1);
073        private final Object mySyncResourcesLock = new Object();
074
075        private Integer myMaxRetryCount = null;
076
077        protected BaseResourceCacheSynchronizer(String theResourceName) {
078                myResourceName = theResourceName;
079        }
080
081        protected BaseResourceCacheSynchronizer(
082                        String theResourceName,
083                        IResourceChangeListenerRegistry theResourceChangeListenerRegistry,
084                        DaoRegistry theDaoRegistry) {
085                myResourceName = theResourceName;
086                myDaoRegistry = theDaoRegistry;
087                myResourceChangeListenerRegistry = theResourceChangeListenerRegistry;
088        }
089
090        /**
091         * This method performs a search in the DB, so use the {@link ContextStartedEvent}
092         * to ensure that it runs after the database initializer
093         */
094        @EventListener(classes = ContextRefreshedEvent.class)
095        @Order(IHapiBootOrder.AFTER_SUBSCRIPTION_INITIALIZED)
096        public void registerListener() {
097                if (myDaoRegistry.getResourceDaoOrNull(myResourceName) == null) {
098                        ourLog.info("No resource DAO found for resource type {}, not registering listener", myResourceName);
099                        return;
100                }
101                mySystemRequestDetails = SystemRequestDetails.forAllPartitions();
102
103                IResourceChangeListenerCache resourceCache =
104                                myResourceChangeListenerRegistry.registerResourceResourceChangeListener(
105                                                myResourceName, provideSearchParameterMap(), this, REFRESH_INTERVAL);
106                resourceCache.forceRefresh();
107        }
108
109        private SearchParameterMap provideSearchParameterMap() {
110                SearchParameterMap searchParameterMap = mySearchParameterMap;
111                if (searchParameterMap == null) {
112                        searchParameterMap = getSearchParameterMap();
113                        mySearchParameterMap = searchParameterMap;
114                }
115                return searchParameterMap;
116        }
117
118        @PreDestroy
119        public void unregisterListener() {
120                myResourceChangeListenerRegistry.unregisterResourceResourceChangeListener(this);
121        }
122
123        private boolean daoNotAvailable() {
124                return myDaoRegistry == null || !myDaoRegistry.isResourceTypeSupported(myResourceName);
125        }
126
127        @Override
128        public void requestRefresh() {
129                if (daoNotAvailable()) {
130                        return;
131                }
132                if (!mySyncResourcesSemaphore.tryAcquire()) {
133                        return;
134                }
135                try {
136                        doSyncResourcesWithRetry();
137                } finally {
138                        mySyncResourcesSemaphore.release();
139                }
140        }
141
142        @VisibleForTesting
143        @Override
144        public void forceRefresh() {
145                if (daoNotAvailable()) {
146                        throw new ConfigurationException("Attempt to force refresh without a dao");
147                }
148                try {
149                        if (mySyncResourcesSemaphore.tryAcquire(FORCE_REFRESH_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
150                                doSyncResourcesWithRetry();
151                        } else {
152                                String errorMessage = String.format(
153                                                "Timed out waiting %s %s to refresh %s cache",
154                                                FORCE_REFRESH_TIMEOUT_SECONDS, "seconds", myResourceName);
155                                ourLog.error(errorMessage);
156                                throw new ConfigurationException(errorMessage);
157                        }
158                } catch (InterruptedException e) {
159                        Thread.currentThread().interrupt();
160                        throw new InternalErrorException(e);
161                } finally {
162                        mySyncResourcesSemaphore.release();
163                }
164        }
165
166        @VisibleForTesting
167        public void acquireSemaphoreForUnitTest() throws InterruptedException {
168                mySyncResourcesSemaphore.acquire();
169        }
170
171        @VisibleForTesting
172        public int doSyncResourcesForUnitTest() {
173                // Two passes for delete flag to take effect
174                int first = doSyncResourcesWithRetry();
175                int second = doSyncResourcesWithRetry();
176                return first + second;
177        }
178
179        synchronized int doSyncResourcesWithRetry() {
180                // retry runs MAX_RETRIES times
181                // and if errors result every time, it will fail
182                Retrier<Integer> syncResourceRetrier = new Retrier<>(this::doSyncResources, getMaxRetries());
183                return syncResourceRetrier.runWithRetry();
184        }
185
186        private int getMaxRetries() {
187                if (myMaxRetryCount != null) {
188                        return myMaxRetryCount;
189                }
190                return MAX_RETRIES;
191        }
192
193        @VisibleForTesting
194        public void setMaxRetries(Integer theMaxRetries) {
195                myMaxRetryCount = theMaxRetries;
196        }
197
198        @SuppressWarnings("unchecked")
199        private int doSyncResources() {
200                if (isStopping()) {
201                        return 0;
202                }
203
204                synchronized (mySyncResourcesLock) {
205                        ourLog.debug("Starting sync {}s", myResourceName);
206
207                        List<IBaseResource> resourceList = (List<IBaseResource>)
208                                        getResourceDao().searchForResources(provideSearchParameterMap(), mySystemRequestDetails);
209                        return syncResourcesIntoCache(resourceList);
210                }
211        }
212
213        protected abstract int syncResourcesIntoCache(@Nonnull List<IBaseResource> resourceList);
214
215        @EventListener(ContextRefreshedEvent.class)
216        public void start() {
217                myStopping = false;
218        }
219
220        @EventListener(ContextClosedEvent.class)
221        public void shutdown() {
222                myStopping = true;
223        }
224
225        private boolean isStopping() {
226                return myStopping;
227        }
228
229        private IFhirResourceDao<?> getResourceDao() {
230                return myDaoRegistry.getResourceDao(myResourceName);
231        }
232
233        @Override
234        public void handleInit(@Nonnull Collection<IIdType> theResourceIds) {
235                if (daoNotAvailable()) {
236                        ourLog.warn(
237                                        "The resource type {} is enabled on this server, but there is no {} DAO configured.",
238                                        myResourceName,
239                                        myResourceName);
240                        return;
241                }
242                IFhirResourceDao<?> resourceDao = getResourceDao();
243                SystemRequestDetails systemRequestDetails = SystemRequestDetails.forAllPartitions();
244                List<IBaseResource> resourceList = theResourceIds.stream()
245                                .map(n -> resourceDao.read(n, systemRequestDetails))
246                                .collect(Collectors.toList());
247                handleInit(resourceList);
248        }
249
250        protected abstract void handleInit(@Nonnull List<IBaseResource> resourceList);
251
252        @Override
253        public void handleChange(@Nonnull IResourceChangeEvent theResourceChangeEvent) {
254                // For now ignore the contents of theResourceChangeEvent.  In the future, consider updating the registry based
255                // on
256                // known resources that have been created, updated & deleted
257                requestRefresh();
258        }
259
260        @Nonnull
261        protected abstract SearchParameterMap getSearchParameterMap();
262}