001/*-
002 * #%L
003 * HAPI FHIR Storage api
004 * %%
005 * Copyright (C) 2014 - 2025 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.cache;
021
022import ca.uhn.fhir.IHapiBootOrder;
023import ca.uhn.fhir.context.ConfigurationException;
024import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
025import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
026import ca.uhn.fhir.jpa.cache.IResourceChangeEvent;
027import ca.uhn.fhir.jpa.cache.IResourceChangeListener;
028import ca.uhn.fhir.jpa.cache.IResourceChangeListenerCache;
029import ca.uhn.fhir.jpa.cache.IResourceChangeListenerRegistry;
030import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
031import ca.uhn.fhir.jpa.searchparam.retry.Retrier;
032import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
033import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException;
034import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
035import ca.uhn.fhir.rest.server.util.IResourceRepositoryCache;
036import com.google.common.annotations.VisibleForTesting;
037import jakarta.annotation.Nonnull;
038import jakarta.annotation.PreDestroy;
039import org.apache.commons.lang3.time.DateUtils;
040import org.hl7.fhir.instance.model.api.IBaseResource;
041import org.hl7.fhir.instance.model.api.IIdType;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044import org.springframework.beans.factory.annotation.Autowired;
045import org.springframework.context.event.ContextClosedEvent;
046import org.springframework.context.event.ContextRefreshedEvent;
047import org.springframework.context.event.ContextStartedEvent;
048import org.springframework.context.event.EventListener;
049import org.springframework.core.annotation.Order;
050
051import java.util.ArrayList;
052import java.util.Collection;
053import java.util.List;
054import java.util.concurrent.Semaphore;
055import java.util.concurrent.TimeUnit;
056
057public abstract class BaseResourceCacheSynchronizer implements IResourceChangeListener, IResourceRepositoryCache {
058        private static final Logger ourLog = LoggerFactory.getLogger(BaseResourceCacheSynchronizer.class);
059        public static final int MAX_RETRIES = 60; // 60 * 5 seconds = 5 minutes
060        public static final long REFRESH_INTERVAL = DateUtils.MILLIS_PER_MINUTE;
061        private static final long FORCE_REFRESH_TIMEOUT_SECONDS = 300;
062        private final String myResourceName;
063
064        @Autowired
065        private IResourceChangeListenerRegistry myResourceChangeListenerRegistry;
066
067        @Autowired
068        private DaoRegistry myDaoRegistry;
069
070        private SearchParameterMap mySearchParameterMap;
071        private SystemRequestDetails mySystemRequestDetails;
072        private boolean myStopping;
073        private final Semaphore mySyncResourcesSemaphore = new Semaphore(1);
074        private final Object mySyncResourcesLock = new Object();
075
076        private Integer myMaxRetryCount = null;
077
078        protected BaseResourceCacheSynchronizer(String theResourceName) {
079                myResourceName = theResourceName;
080        }
081
082        protected BaseResourceCacheSynchronizer(
083                        String theResourceName,
084                        IResourceChangeListenerRegistry theResourceChangeListenerRegistry,
085                        DaoRegistry theDaoRegistry) {
086                myResourceName = theResourceName;
087                myDaoRegistry = theDaoRegistry;
088                myResourceChangeListenerRegistry = theResourceChangeListenerRegistry;
089        }
090
091        /**
092         * This method performs a search in the DB, so use the {@link ContextStartedEvent}
093         * to ensure that it runs after the database initializer
094         */
095        @EventListener(classes = ContextRefreshedEvent.class)
096        @Order(IHapiBootOrder.AFTER_SUBSCRIPTION_INITIALIZED)
097        public void registerListener() {
098                if (myDaoRegistry.getResourceDaoOrNull(myResourceName) == null) {
099                        ourLog.info("No resource DAO found for resource type {}, not registering listener", myResourceName);
100                        return;
101                }
102                mySystemRequestDetails = SystemRequestDetails.forAllPartitions();
103
104                IResourceChangeListenerCache resourceCache =
105                                myResourceChangeListenerRegistry.registerResourceResourceChangeListener(
106                                                myResourceName, provideSearchParameterMap(), this, REFRESH_INTERVAL);
107                resourceCache.forceRefresh();
108        }
109
110        private SearchParameterMap provideSearchParameterMap() {
111                SearchParameterMap searchParameterMap = mySearchParameterMap;
112                if (searchParameterMap == null) {
113                        searchParameterMap = getSearchParameterMap();
114                        mySearchParameterMap = searchParameterMap;
115                }
116                return searchParameterMap;
117        }
118
119        @PreDestroy
120        public void unregisterListener() {
121                myResourceChangeListenerRegistry.unregisterResourceResourceChangeListener(this);
122        }
123
124        private boolean daoNotAvailable() {
125                return myDaoRegistry == null || !myDaoRegistry.isResourceTypeSupported(myResourceName);
126        }
127
128        @Override
129        public void requestRefresh() {
130                if (daoNotAvailable()) {
131                        return;
132                }
133                if (!mySyncResourcesSemaphore.tryAcquire()) {
134                        return;
135                }
136                try {
137                        doSyncResourcesWithRetry();
138                } finally {
139                        mySyncResourcesSemaphore.release();
140                }
141        }
142
143        @VisibleForTesting
144        @Override
145        public void forceRefresh() {
146                if (daoNotAvailable()) {
147                        throw new ConfigurationException("Attempt to force refresh without a dao");
148                }
149                try {
150                        if (mySyncResourcesSemaphore.tryAcquire(FORCE_REFRESH_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
151                                doSyncResourcesWithRetry();
152                        } else {
153                                String errorMessage = String.format(
154                                                "Timed out waiting %s %s to refresh %s cache",
155                                                FORCE_REFRESH_TIMEOUT_SECONDS, "seconds", myResourceName);
156                                ourLog.error(errorMessage);
157                                throw new ConfigurationException(errorMessage);
158                        }
159                } catch (InterruptedException e) {
160                        Thread.currentThread().interrupt();
161                        throw new InternalErrorException(e);
162                } finally {
163                        mySyncResourcesSemaphore.release();
164                }
165        }
166
167        @VisibleForTesting
168        public void acquireSemaphoreForUnitTest() throws InterruptedException {
169                mySyncResourcesSemaphore.acquire();
170        }
171
172        @VisibleForTesting
173        public int doSyncResourcesForUnitTest() {
174                // Two passes for delete flag to take effect
175                int first = doSyncResourcesWithRetry();
176                int second = doSyncResourcesWithRetry();
177                return first + second;
178        }
179
180        synchronized int doSyncResourcesWithRetry() {
181                // retry runs MAX_RETRIES times
182                // and if errors result every time, it will fail
183                Retrier<Integer> syncResourceRetrier = new Retrier<>(this::doSyncResources, getMaxRetries());
184                return syncResourceRetrier.runWithRetry();
185        }
186
187        private int getMaxRetries() {
188                if (myMaxRetryCount != null) {
189                        return myMaxRetryCount;
190                }
191                return MAX_RETRIES;
192        }
193
194        @VisibleForTesting
195        public void setMaxRetries(Integer theMaxRetries) {
196                myMaxRetryCount = theMaxRetries;
197        }
198
199        @SuppressWarnings("unchecked")
200        private int doSyncResources() {
201                if (isStopping()) {
202                        return 0;
203                }
204
205                synchronized (mySyncResourcesLock) {
206                        ourLog.debug("Starting sync {}s", myResourceName);
207
208                        List<IBaseResource> resourceList = (List<IBaseResource>)
209                                        getResourceDao().searchForResources(provideSearchParameterMap(), mySystemRequestDetails);
210                        return syncResourcesIntoCache(resourceList);
211                }
212        }
213
214        protected abstract int syncResourcesIntoCache(@Nonnull List<IBaseResource> resourceList);
215
216        @EventListener(ContextRefreshedEvent.class)
217        public void start() {
218                myStopping = false;
219        }
220
221        @EventListener(ContextClosedEvent.class)
222        public void shutdown() {
223                myStopping = true;
224        }
225
226        private boolean isStopping() {
227                return myStopping;
228        }
229
230        private IFhirResourceDao<?> getResourceDao() {
231                return myDaoRegistry.getResourceDao(myResourceName);
232        }
233
234        @Override
235        public void handleInit(@Nonnull Collection<IIdType> theResourceIds) {
236                if (daoNotAvailable()) {
237                        ourLog.warn(
238                                        "The resource type {} is enabled on this server, but there is no {} DAO configured.",
239                                        myResourceName,
240                                        myResourceName);
241                        return;
242                }
243                IFhirResourceDao<?> resourceDao = getResourceDao();
244                SystemRequestDetails systemRequestDetails = SystemRequestDetails.forAllPartitions();
245                List<IBaseResource> resourceList = new ArrayList<>();
246
247                /*
248                 * We are pretty lenient here, because any failure will block the server
249                 * from starting up. We should generally never fail to load here, but it
250                 * can potentially happen if a resource is manually deleted in the database
251                 * (ie. marked with a deletion time manually) but the indexes aren't cleaned
252                 * up.
253                 */
254                for (IIdType id : theResourceIds) {
255                        IBaseResource read;
256                        try {
257                                read = resourceDao.read(id, systemRequestDetails);
258                        } catch (BaseServerResponseException e) {
259                                ourLog.warn("Unable to fetch resource {}", id, e);
260                                continue;
261                        }
262                        resourceList.add(read);
263                }
264                handleInit(resourceList);
265        }
266
267        protected abstract void handleInit(@Nonnull List<IBaseResource> resourceList);
268
269        @Override
270        public void handleChange(@Nonnull IResourceChangeEvent theResourceChangeEvent) {
271                // For now ignore the contents of theResourceChangeEvent.  In the future, consider updating the registry based
272                // on
273                // known resources that have been created, updated & deleted
274                requestRefresh();
275        }
276
277        @Nonnull
278        protected abstract SearchParameterMap getSearchParameterMap();
279}