001/*-
002 * #%L
003 * HAPI FHIR Storage api
004 * %%
005 * Copyright (C) 2014 - 2025 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.cache;
021
022import ca.uhn.fhir.IHapiBootOrder;
023import ca.uhn.fhir.context.ConfigurationException;
024import ca.uhn.fhir.i18n.Msg;
025import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
026import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
027import ca.uhn.fhir.jpa.cache.IResourceChangeEvent;
028import ca.uhn.fhir.jpa.cache.IResourceChangeListener;
029import ca.uhn.fhir.jpa.cache.IResourceChangeListenerCache;
030import ca.uhn.fhir.jpa.cache.IResourceChangeListenerRegistry;
031import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
032import ca.uhn.fhir.jpa.searchparam.retry.Retrier;
033import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
034import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException;
035import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
036import ca.uhn.fhir.rest.server.util.IResourceRepositoryCache;
037import com.google.common.annotations.VisibleForTesting;
038import jakarta.annotation.Nonnull;
039import jakarta.annotation.PreDestroy;
040import org.apache.commons.lang3.time.DateUtils;
041import org.hl7.fhir.instance.model.api.IBaseResource;
042import org.hl7.fhir.instance.model.api.IIdType;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045import org.springframework.beans.factory.annotation.Autowired;
046import org.springframework.context.event.ContextClosedEvent;
047import org.springframework.context.event.ContextRefreshedEvent;
048import org.springframework.context.event.EventListener;
049import org.springframework.core.annotation.Order;
050
051import java.util.ArrayList;
052import java.util.Collection;
053import java.util.List;
054import java.util.concurrent.Semaphore;
055import java.util.concurrent.TimeUnit;
056
057public abstract class BaseResourceCacheSynchronizer implements IResourceChangeListener, IResourceRepositoryCache {
058        private static final Logger ourLog = LoggerFactory.getLogger(BaseResourceCacheSynchronizer.class);
059        public static final int MAX_RETRIES = 60; // 60 * 5 seconds = 5 minutes
060        public static final long REFRESH_INTERVAL = DateUtils.MILLIS_PER_MINUTE;
061        private static final long FORCE_REFRESH_TIMEOUT_SECONDS = 300;
062        private final String myResourceName;
063
064        @Autowired
065        private IResourceChangeListenerRegistry myResourceChangeListenerRegistry;
066
067        @Autowired
068        private DaoRegistry myDaoRegistry;
069
070        private SearchParameterMap mySearchParameterMap;
071        private final SystemRequestDetails mySystemRequestDetails = SystemRequestDetails.forAllPartitions();
072        private boolean myStopping;
073        private final Semaphore mySyncResourcesSemaphore = new Semaphore(1);
074        private final Object mySyncResourcesLock = new Object();
075
076        private Integer myMaxRetryCount = null;
077        private boolean myInitialized = false;
078
079        protected BaseResourceCacheSynchronizer(String theResourceName) {
080                myResourceName = theResourceName;
081        }
082
083        protected BaseResourceCacheSynchronizer(
084                        String theResourceName,
085                        IResourceChangeListenerRegistry theResourceChangeListenerRegistry,
086                        DaoRegistry theDaoRegistry) {
087                myResourceName = theResourceName;
088                myDaoRegistry = theDaoRegistry;
089                myResourceChangeListenerRegistry = theResourceChangeListenerRegistry;
090        }
091
092        /**
093         * This method performs a search in the DB, so use the {@link ContextRefreshedEvent}
094         * to ensure that it runs after the database initializer
095         */
096        @EventListener(classes = ContextRefreshedEvent.class)
097        @Order(IHapiBootOrder.AFTER_SUBSCRIPTION_INITIALIZED)
098        public void registerListener() {
099                if (myInitialized) {
100                        return;
101                }
102                if (myDaoRegistry.getResourceDaoOrNull(myResourceName) == null) {
103                        ourLog.info("No resource DAO found for resource type {}, not registering listener", myResourceName);
104                        return;
105                }
106
107                IResourceChangeListenerCache resourceCache =
108                                myResourceChangeListenerRegistry.registerResourceResourceChangeListener(
109                                                myResourceName, provideSearchParameterMap(), this, REFRESH_INTERVAL);
110                resourceCache.forceRefresh();
111                myInitialized = true;
112        }
113
114        private SearchParameterMap provideSearchParameterMap() {
115                SearchParameterMap searchParameterMap = mySearchParameterMap;
116                if (searchParameterMap == null) {
117                        searchParameterMap = getSearchParameterMap();
118                        mySearchParameterMap = searchParameterMap;
119                }
120                return searchParameterMap;
121        }
122
123        @PreDestroy
124        public void unregisterListener() {
125                myResourceChangeListenerRegistry.unregisterResourceResourceChangeListener(this);
126        }
127
128        private boolean daoNotAvailable() {
129                return myDaoRegistry == null || !myDaoRegistry.isResourceTypeSupported(myResourceName);
130        }
131
132        @Override
133        public void requestRefresh() {
134                if (daoNotAvailable()) {
135                        return;
136                }
137                if (!mySyncResourcesSemaphore.tryAcquire()) {
138                        return;
139                }
140                try {
141                        doSyncResourcesWithRetry();
142                } finally {
143                        mySyncResourcesSemaphore.release();
144                }
145        }
146
147        @VisibleForTesting
148        @Override
149        public void forceRefresh() {
150                if (daoNotAvailable()) {
151                        throw new ConfigurationException(Msg.code(2652) + "Attempt to force refresh without a dao");
152                }
153                try {
154                        if (mySyncResourcesSemaphore.tryAcquire(FORCE_REFRESH_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
155                                doSyncResourcesWithRetry();
156                        } else {
157                                String errorMessage = String.format(
158                                                Msg.code(2653) + "Timed out waiting %s %s to refresh %s cache",
159                                                FORCE_REFRESH_TIMEOUT_SECONDS,
160                                                "seconds",
161                                                myResourceName);
162                                ourLog.error(errorMessage);
163                                throw new ConfigurationException(Msg.code(2663) + errorMessage);
164                        }
165                } catch (InterruptedException e) {
166                        Thread.currentThread().interrupt();
167                        throw new InternalErrorException(Msg.code(2654) + e);
168                } finally {
169                        mySyncResourcesSemaphore.release();
170                }
171        }
172
173        @VisibleForTesting
174        public void acquireSemaphoreForUnitTest() throws InterruptedException {
175                mySyncResourcesSemaphore.acquire();
176        }
177
178        @VisibleForTesting
179        public int doSyncResourcesForUnitTest() {
180                // Two passes for delete flag to take effect
181                int first = doSyncResourcesWithRetry();
182                int second = doSyncResourcesWithRetry();
183                return first + second;
184        }
185
186        synchronized int doSyncResourcesWithRetry() {
187                // retry runs MAX_RETRIES times
188                // and if errors result every time, it will fail
189                Retrier<Integer> syncResourceRetrier = new Retrier<>(this::doSyncResources, getMaxRetries());
190                return syncResourceRetrier.runWithRetry();
191        }
192
193        private int getMaxRetries() {
194                if (myMaxRetryCount != null) {
195                        return myMaxRetryCount;
196                }
197                return MAX_RETRIES;
198        }
199
200        @VisibleForTesting
201        public void setMaxRetries(Integer theMaxRetries) {
202                myMaxRetryCount = theMaxRetries;
203        }
204
205        @SuppressWarnings("unchecked")
206        private int doSyncResources() {
207                if (isStopping()) {
208                        return 0;
209                }
210
211                synchronized (mySyncResourcesLock) {
212                        ourLog.debug("Starting sync {}s", myResourceName);
213
214                        List<IBaseResource> resourceList = (List<IBaseResource>)
215                                        getResourceDao().searchForResources(provideSearchParameterMap(), mySystemRequestDetails);
216                        return syncResourcesIntoCache(resourceList);
217                }
218        }
219
220        protected abstract int syncResourcesIntoCache(@Nonnull List<IBaseResource> resourceList);
221
222        @EventListener(ContextRefreshedEvent.class)
223        public void start() {
224                myStopping = false;
225        }
226
227        @EventListener(ContextClosedEvent.class)
228        public void shutdown() {
229                myStopping = true;
230        }
231
232        private boolean isStopping() {
233                return myStopping;
234        }
235
236        private IFhirResourceDao<?> getResourceDao() {
237                return myDaoRegistry.getResourceDao(myResourceName);
238        }
239
240        @Override
241        public void handleInit(@Nonnull Collection<IIdType> theResourceIds) {
242                if (daoNotAvailable()) {
243                        ourLog.warn(
244                                        "The resource type {} is enabled on this server, but there is no {} DAO configured.",
245                                        myResourceName,
246                                        myResourceName);
247                        return;
248                }
249                IFhirResourceDao<?> resourceDao = getResourceDao();
250                SystemRequestDetails systemRequestDetails = SystemRequestDetails.forAllPartitions();
251                List<IBaseResource> resourceList = new ArrayList<>();
252
253                /*
254                 * We are pretty lenient here, because any failure will block the server
255                 * from starting up. We should generally never fail to load here, but it
256                 * can potentially happen if a resource is manually deleted in the database
257                 * (ie. marked with a deletion time manually) but the indexes aren't cleaned
258                 * up.
259                 */
260                for (IIdType id : theResourceIds) {
261                        IBaseResource read;
262                        try {
263                                read = resourceDao.read(id, systemRequestDetails);
264                        } catch (BaseServerResponseException e) {
265                                ourLog.warn("Unable to fetch resource {}", id, e);
266                                continue;
267                        }
268                        resourceList.add(read);
269                }
270                handleInit(resourceList);
271        }
272
273        protected abstract void handleInit(@Nonnull List<IBaseResource> resourceList);
274
275        @Override
276        public void handleChange(@Nonnull IResourceChangeEvent theResourceChangeEvent) {
277                // For now ignore the contents of theResourceChangeEvent.  In the future, consider updating the registry based
278                // on
279                // known resources that have been created, updated & deleted
280                requestRefresh();
281        }
282
283        @Nonnull
284        protected abstract SearchParameterMap getSearchParameterMap();
285}