001/*-
002 * #%L
003 * HAPI FHIR Storage api
004 * %%
005 * Copyright (C) 2014 - 2025 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.cache;
021
022import ca.uhn.fhir.IHapiBootOrder;
023import ca.uhn.fhir.context.ConfigurationException;
024import ca.uhn.fhir.i18n.Msg;
025import ca.uhn.fhir.interceptor.model.RequestPartitionId;
026import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
027import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
028import ca.uhn.fhir.jpa.cache.IResourceChangeEvent;
029import ca.uhn.fhir.jpa.cache.IResourceChangeListener;
030import ca.uhn.fhir.jpa.cache.IResourceChangeListenerCache;
031import ca.uhn.fhir.jpa.cache.IResourceChangeListenerRegistry;
032import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
033import ca.uhn.fhir.jpa.searchparam.retry.Retrier;
034import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
035import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException;
036import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
037import ca.uhn.fhir.rest.server.util.IResourceRepositoryCache;
038import com.google.common.annotations.VisibleForTesting;
039import jakarta.annotation.Nonnull;
040import jakarta.annotation.PreDestroy;
041import org.apache.commons.lang3.time.DateUtils;
042import org.hl7.fhir.instance.model.api.IBaseResource;
043import org.hl7.fhir.instance.model.api.IIdType;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046import org.springframework.beans.factory.annotation.Autowired;
047import org.springframework.context.event.ContextClosedEvent;
048import org.springframework.context.event.ContextRefreshedEvent;
049import org.springframework.context.event.EventListener;
050import org.springframework.core.annotation.Order;
051
052import java.util.ArrayList;
053import java.util.Collection;
054import java.util.List;
055import java.util.concurrent.Semaphore;
056import java.util.concurrent.TimeUnit;
057
058public abstract class BaseResourceCacheSynchronizer implements IResourceChangeListener, IResourceRepositoryCache {
059        private static final Logger ourLog = LoggerFactory.getLogger(BaseResourceCacheSynchronizer.class);
060        public static final int MAX_RETRIES = 60; // 60 * 5 seconds = 5 minutes
061        public static final long REFRESH_INTERVAL = DateUtils.MILLIS_PER_MINUTE;
062        private static final long FORCE_REFRESH_TIMEOUT_SECONDS = 300;
063        private final String myResourceName;
064
065        @Autowired
066        private IResourceChangeListenerRegistry myResourceChangeListenerRegistry;
067
068        @Autowired
069        private DaoRegistry myDaoRegistry;
070
071        private SearchParameterMap mySearchParameterMap;
072        private final SystemRequestDetails mySystemRequestDetails = SystemRequestDetails.forAllPartitions();
073        private boolean myStopping;
074        private final Semaphore mySyncResourcesSemaphore = new Semaphore(1);
075        private final Object mySyncResourcesLock = new Object();
076
077        private Integer myMaxRetryCount = null;
078        private boolean myInitialized = false;
079        private final RequestPartitionId myRequestPartitionId;
080
081        protected BaseResourceCacheSynchronizer(String theResourceName, RequestPartitionId theRequestPartitionId) {
082                this(theResourceName, theRequestPartitionId, null, null);
083        }
084
085        protected BaseResourceCacheSynchronizer(
086                        String theResourceName,
087                        RequestPartitionId theRequestPartitionId,
088                        IResourceChangeListenerRegistry theResourceChangeListenerRegistry,
089                        DaoRegistry theDaoRegistry) {
090                myResourceName = theResourceName;
091                myRequestPartitionId = theRequestPartitionId;
092                myResourceChangeListenerRegistry = theResourceChangeListenerRegistry;
093                myDaoRegistry = theDaoRegistry;
094        }
095
096        /**
097         * This method performs a search in the DB, so use the {@link ContextRefreshedEvent}
098         * to ensure that it runs after the database initializer
099         */
100        @EventListener(classes = ContextRefreshedEvent.class)
101        @Order(IHapiBootOrder.AFTER_SUBSCRIPTION_INITIALIZED)
102        public void registerListener() {
103                if (myInitialized) {
104                        return;
105                }
106                if (myDaoRegistry.getResourceDaoOrNull(myResourceName) == null) {
107                        ourLog.info("No resource DAO found for resource type {}, not registering listener", myResourceName);
108                        return;
109                }
110
111                IResourceChangeListenerCache resourceCache =
112                                myResourceChangeListenerRegistry.registerResourceResourceChangeListener(
113                                                myResourceName, myRequestPartitionId, provideSearchParameterMap(), this, REFRESH_INTERVAL);
114                resourceCache.forceRefresh();
115                myInitialized = true;
116        }
117
118        private SearchParameterMap provideSearchParameterMap() {
119                SearchParameterMap searchParameterMap = mySearchParameterMap;
120                if (searchParameterMap == null) {
121                        searchParameterMap = getSearchParameterMap();
122                        mySearchParameterMap = searchParameterMap;
123                }
124                return searchParameterMap;
125        }
126
127        @PreDestroy
128        public void unregisterListener() {
129                myResourceChangeListenerRegistry.unregisterResourceResourceChangeListener(this);
130        }
131
132        private boolean daoNotAvailable() {
133                return myDaoRegistry == null || !myDaoRegistry.isResourceTypeSupported(myResourceName);
134        }
135
136        @Override
137        public void requestRefresh() {
138                if (daoNotAvailable()) {
139                        return;
140                }
141                if (!mySyncResourcesSemaphore.tryAcquire()) {
142                        return;
143                }
144                try {
145                        doSyncResourcesWithRetry();
146                } finally {
147                        mySyncResourcesSemaphore.release();
148                }
149        }
150
151        @VisibleForTesting
152        @Override
153        public void forceRefresh() {
154                if (daoNotAvailable()) {
155                        throw new ConfigurationException(Msg.code(2652) + "Attempt to force refresh without a dao");
156                }
157                try {
158                        if (mySyncResourcesSemaphore.tryAcquire(FORCE_REFRESH_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
159                                doSyncResourcesWithRetry();
160                        } else {
161                                String errorMessage = String.format(
162                                                Msg.code(2653) + "Timed out waiting %s %s to refresh %s cache",
163                                                FORCE_REFRESH_TIMEOUT_SECONDS,
164                                                "seconds",
165                                                myResourceName);
166                                ourLog.error(errorMessage);
167                                throw new ConfigurationException(Msg.code(2663) + errorMessage);
168                        }
169                } catch (InterruptedException e) {
170                        Thread.currentThread().interrupt();
171                        throw new InternalErrorException(Msg.code(2654) + e);
172                } finally {
173                        mySyncResourcesSemaphore.release();
174                }
175        }
176
177        @VisibleForTesting
178        public void acquireSemaphoreForUnitTest() throws InterruptedException {
179                mySyncResourcesSemaphore.acquire();
180        }
181
182        @VisibleForTesting
183        public int doSyncResourcesForUnitTest() {
184                // Two passes for delete flag to take effect
185                int first = doSyncResourcesWithRetry();
186                int second = doSyncResourcesWithRetry();
187                return first + second;
188        }
189
190        synchronized int doSyncResourcesWithRetry() {
191                // retry runs MAX_RETRIES times
192                // and if errors result every time, it will fail
193                Retrier<Integer> syncResourceRetrier = new Retrier<>(this::doSyncResources, getMaxRetries());
194                return syncResourceRetrier.runWithRetry();
195        }
196
197        private int getMaxRetries() {
198                if (myMaxRetryCount != null) {
199                        return myMaxRetryCount;
200                }
201                return MAX_RETRIES;
202        }
203
204        @VisibleForTesting
205        public void setMaxRetries(Integer theMaxRetries) {
206                myMaxRetryCount = theMaxRetries;
207        }
208
209        @SuppressWarnings("unchecked")
210        private int doSyncResources() {
211                if (isStopping()) {
212                        return 0;
213                }
214
215                synchronized (mySyncResourcesLock) {
216                        ourLog.debug("Starting sync {}s", myResourceName);
217
218                        List<IBaseResource> resourceList = (List<IBaseResource>)
219                                        getResourceDao().searchForResources(provideSearchParameterMap(), mySystemRequestDetails);
220                        return syncResourcesIntoCache(resourceList);
221                }
222        }
223
224        protected abstract int syncResourcesIntoCache(@Nonnull List<IBaseResource> resourceList);
225
226        @EventListener(ContextRefreshedEvent.class)
227        public void start() {
228                myStopping = false;
229        }
230
231        @EventListener(ContextClosedEvent.class)
232        public void shutdown() {
233                myStopping = true;
234        }
235
236        private boolean isStopping() {
237                return myStopping;
238        }
239
240        private IFhirResourceDao<?> getResourceDao() {
241                return myDaoRegistry.getResourceDao(myResourceName);
242        }
243
244        @Override
245        public void handleInit(@Nonnull Collection<IIdType> theResourceIds) {
246                if (daoNotAvailable()) {
247                        ourLog.warn(
248                                        "The resource type {} is enabled on this server, but there is no {} DAO configured.",
249                                        myResourceName,
250                                        myResourceName);
251                        return;
252                }
253                IFhirResourceDao<?> resourceDao = getResourceDao();
254                SystemRequestDetails systemRequestDetails = SystemRequestDetails.forAllPartitions();
255                List<IBaseResource> resourceList = new ArrayList<>();
256
257                /*
258                 * We are pretty lenient here, because any failure will block the server
259                 * from starting up. We should generally never fail to load here, but it
260                 * can potentially happen if a resource is manually deleted in the database
261                 * (ie. marked with a deletion time manually) but the indexes aren't cleaned
262                 * up.
263                 */
264                for (IIdType id : theResourceIds) {
265                        IBaseResource read;
266                        try {
267                                read = resourceDao.read(id, systemRequestDetails);
268                        } catch (BaseServerResponseException e) {
269                                ourLog.warn("Unable to fetch resource {}", id, e);
270                                continue;
271                        }
272                        resourceList.add(read);
273                }
274                handleInit(resourceList);
275        }
276
277        protected abstract void handleInit(@Nonnull List<IBaseResource> resourceList);
278
279        @Override
280        public void handleChange(@Nonnull IResourceChangeEvent theResourceChangeEvent) {
281                // For now ignore the contents of theResourceChangeEvent.  In the future, consider updating the registry based
282                // on
283                // known resources that have been created, updated & deleted
284                requestRefresh();
285        }
286
287        @Nonnull
288        protected abstract SearchParameterMap getSearchParameterMap();
289}