001/*-
002 * #%L
003 * HAPI FHIR Storage api
004 * %%
005 * Copyright (C) 2014 - 2025 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.cache;
021
022import ca.uhn.fhir.IHapiBootOrder;
023import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
024import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
025import ca.uhn.fhir.jpa.cache.IResourceChangeEvent;
026import ca.uhn.fhir.jpa.cache.IResourceChangeListener;
027import ca.uhn.fhir.jpa.cache.IResourceChangeListenerCache;
028import ca.uhn.fhir.jpa.cache.IResourceChangeListenerRegistry;
029import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
030import ca.uhn.fhir.jpa.searchparam.retry.Retrier;
031import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
032import com.google.common.annotations.VisibleForTesting;
033import jakarta.annotation.Nonnull;
034import jakarta.annotation.PreDestroy;
035import org.apache.commons.lang3.time.DateUtils;
036import org.hl7.fhir.instance.model.api.IBaseResource;
037import org.hl7.fhir.instance.model.api.IIdType;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040import org.springframework.beans.factory.annotation.Autowired;
041import org.springframework.context.event.ContextClosedEvent;
042import org.springframework.context.event.ContextRefreshedEvent;
043import org.springframework.context.event.ContextStartedEvent;
044import org.springframework.context.event.EventListener;
045import org.springframework.core.annotation.Order;
046
047import java.util.Collection;
048import java.util.List;
049import java.util.concurrent.Semaphore;
050import java.util.stream.Collectors;
051
052public abstract class BaseResourceCacheSynchronizer implements IResourceChangeListener {
053        private static final Logger ourLog = LoggerFactory.getLogger(BaseResourceCacheSynchronizer.class);
054        public static final int MAX_RETRIES = 60; // 60 * 5 seconds = 5 minutes
055        public static final long REFRESH_INTERVAL = DateUtils.MILLIS_PER_MINUTE;
056        private final String myResourceName;
057
058        @Autowired
059        private IResourceChangeListenerRegistry myResourceChangeListenerRegistry;
060
061        @Autowired
062        DaoRegistry myDaoRegistry;
063
064        private SearchParameterMap mySearchParameterMap;
065        private SystemRequestDetails mySystemRequestDetails;
066        private boolean myStopping;
067        private final Semaphore mySyncResourcesSemaphore = new Semaphore(1);
068        private final Object mySyncResourcesLock = new Object();
069
070        protected BaseResourceCacheSynchronizer(String theResourceName) {
071                myResourceName = theResourceName;
072        }
073
074        protected BaseResourceCacheSynchronizer(
075                        String theResourceName,
076                        IResourceChangeListenerRegistry theResourceChangeListenerRegistry,
077                        DaoRegistry theDaoRegistry) {
078                myResourceName = theResourceName;
079                myDaoRegistry = theDaoRegistry;
080                myResourceChangeListenerRegistry = theResourceChangeListenerRegistry;
081        }
082
083        /**
084         * This method performs a search in the DB, so use the {@link ContextStartedEvent}
085         * to ensure that it runs after the database initializer
086         */
087        @EventListener(classes = ContextRefreshedEvent.class)
088        @Order(IHapiBootOrder.AFTER_SUBSCRIPTION_INITIALIZED)
089        public void registerListener() {
090                if (myDaoRegistry.getResourceDaoOrNull(myResourceName) == null) {
091                        ourLog.info("No resource DAO found for resource type {}, not registering listener", myResourceName);
092                        return;
093                }
094                mySystemRequestDetails = SystemRequestDetails.forAllPartitions();
095
096                IResourceChangeListenerCache resourceCache =
097                                myResourceChangeListenerRegistry.registerResourceResourceChangeListener(
098                                                myResourceName, provideSearchParameterMap(), this, REFRESH_INTERVAL);
099                resourceCache.forceRefresh();
100        }
101
102        private SearchParameterMap provideSearchParameterMap() {
103                SearchParameterMap searchParameterMap = mySearchParameterMap;
104                if (searchParameterMap == null) {
105                        searchParameterMap = getSearchParameterMap();
106                        mySearchParameterMap = searchParameterMap;
107                }
108                return searchParameterMap;
109        }
110
111        @PreDestroy
112        public void unregisterListener() {
113                myResourceChangeListenerRegistry.unregisterResourceResourceChangeListener(this);
114        }
115
116        private boolean resourceDaoExists() {
117                return myDaoRegistry != null && myDaoRegistry.isResourceTypeSupported(myResourceName);
118        }
119
120        /**
121         * Read the existing resources from the database
122         */
123        public void syncDatabaseToCache() {
124                if (!resourceDaoExists()) {
125                        return;
126                }
127                if (!mySyncResourcesSemaphore.tryAcquire()) {
128                        return;
129                }
130                try {
131                        doSyncResourcesWithRetry();
132                } finally {
133                        mySyncResourcesSemaphore.release();
134                }
135        }
136
137        @VisibleForTesting
138        public void acquireSemaphoreForUnitTest() throws InterruptedException {
139                mySyncResourcesSemaphore.acquire();
140        }
141
142        @VisibleForTesting
143        public int doSyncResourcesForUnitTest() {
144                // Two passes for delete flag to take effect
145                int first = doSyncResourcesWithRetry();
146                int second = doSyncResourcesWithRetry();
147                return first + second;
148        }
149
150        synchronized int doSyncResourcesWithRetry() {
151                // retry runs MAX_RETRIES times
152                // and if errors result every time, it will fail
153                Retrier<Integer> syncResourceRetrier = new Retrier<>(this::doSyncResources, MAX_RETRIES);
154                return syncResourceRetrier.runWithRetry();
155        }
156
157        @SuppressWarnings("unchecked")
158        private int doSyncResources() {
159                if (isStopping()) {
160                        return 0;
161                }
162
163                synchronized (mySyncResourcesLock) {
164                        ourLog.debug("Starting sync {}s", myResourceName);
165
166                        List<IBaseResource> resourceList = (List<IBaseResource>)
167                                        getResourceDao().searchForResources(provideSearchParameterMap(), mySystemRequestDetails);
168                        return syncResourcesIntoCache(resourceList);
169                }
170        }
171
172        protected abstract int syncResourcesIntoCache(List<IBaseResource> resourceList);
173
174        @EventListener(ContextRefreshedEvent.class)
175        public void start() {
176                myStopping = false;
177        }
178
179        @EventListener(ContextClosedEvent.class)
180        public void shutdown() {
181                myStopping = true;
182        }
183
184        private boolean isStopping() {
185                return myStopping;
186        }
187
188        private IFhirResourceDao<?> getResourceDao() {
189                return myDaoRegistry.getResourceDao(myResourceName);
190        }
191
192        @Override
193        public void handleInit(Collection<IIdType> theResourceIds) {
194                if (!resourceDaoExists()) {
195                        ourLog.warn(
196                                        "The resource type {} is enabled on this server, but there is no {} DAO configured.",
197                                        myResourceName,
198                                        myResourceName);
199                        return;
200                }
201                IFhirResourceDao<?> resourceDao = getResourceDao();
202                SystemRequestDetails systemRequestDetails = SystemRequestDetails.forAllPartitions();
203                List<IBaseResource> resourceList = theResourceIds.stream()
204                                .map(n -> resourceDao.read(n, systemRequestDetails))
205                                .collect(Collectors.toList());
206                handleInit(resourceList);
207        }
208
209        protected abstract void handleInit(List<IBaseResource> resourceList);
210
211        @Override
212        public void handleChange(IResourceChangeEvent theResourceChangeEvent) {
213                // For now ignore the contents of theResourceChangeEvent.  In the future, consider updating the registry based
214                // on
215                // known resources that have been created, updated & deleted
216                syncDatabaseToCache();
217        }
218
219        @Nonnull
220        protected abstract SearchParameterMap getSearchParameterMap();
221}