001/*-
002 * #%L
003 * HAPI FHIR Storage api
004 * %%
005 * Copyright (C) 2014 - 2025 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.cache;
021
022import ca.uhn.fhir.IHapiBootOrder;
023import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
024import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
025import ca.uhn.fhir.jpa.cache.IResourceChangeEvent;
026import ca.uhn.fhir.jpa.cache.IResourceChangeListener;
027import ca.uhn.fhir.jpa.cache.IResourceChangeListenerCache;
028import ca.uhn.fhir.jpa.cache.IResourceChangeListenerRegistry;
029import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
030import ca.uhn.fhir.jpa.searchparam.retry.Retrier;
031import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
032import com.google.common.annotations.VisibleForTesting;
033import jakarta.annotation.Nonnull;
034import jakarta.annotation.PreDestroy;
035import org.apache.commons.lang3.time.DateUtils;
036import org.hl7.fhir.instance.model.api.IBaseResource;
037import org.hl7.fhir.instance.model.api.IIdType;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040import org.springframework.beans.factory.annotation.Autowired;
041import org.springframework.context.event.ContextClosedEvent;
042import org.springframework.context.event.ContextRefreshedEvent;
043import org.springframework.context.event.ContextStartedEvent;
044import org.springframework.context.event.EventListener;
045import org.springframework.core.annotation.Order;
046
047import java.util.Collection;
048import java.util.List;
049import java.util.concurrent.Semaphore;
050import java.util.stream.Collectors;
051
052public abstract class BaseResourceCacheSynchronizer implements IResourceChangeListener {
053        private static final Logger ourLog = LoggerFactory.getLogger(BaseResourceCacheSynchronizer.class);
054        public static final int MAX_RETRIES = 60; // 60 * 5 seconds = 5 minutes
055        public static final long REFRESH_INTERVAL = DateUtils.MILLIS_PER_MINUTE;
056        private final String myResourceName;
057
058        @Autowired
059        private IResourceChangeListenerRegistry myResourceChangeListenerRegistry;
060
061        @Autowired
062        private DaoRegistry myDaoRegistry;
063
064        private SearchParameterMap mySearchParameterMap;
065        private SystemRequestDetails mySystemRequestDetails;
066        private boolean myStopping;
067        private final Semaphore mySyncResourcesSemaphore = new Semaphore(1);
068        private final Object mySyncResourcesLock = new Object();
069
070        private Integer myMaxRetryCount = null;
071
072        protected BaseResourceCacheSynchronizer(String theResourceName) {
073                myResourceName = theResourceName;
074        }
075
076        protected BaseResourceCacheSynchronizer(
077                        String theResourceName,
078                        IResourceChangeListenerRegistry theResourceChangeListenerRegistry,
079                        DaoRegistry theDaoRegistry) {
080                myResourceName = theResourceName;
081                myDaoRegistry = theDaoRegistry;
082                myResourceChangeListenerRegistry = theResourceChangeListenerRegistry;
083        }
084
085        /**
086         * This method performs a search in the DB, so use the {@link ContextStartedEvent}
087         * to ensure that it runs after the database initializer
088         */
089        @EventListener(classes = ContextRefreshedEvent.class)
090        @Order(IHapiBootOrder.AFTER_SUBSCRIPTION_INITIALIZED)
091        public void registerListener() {
092                if (myDaoRegistry.getResourceDaoOrNull(myResourceName) == null) {
093                        ourLog.info("No resource DAO found for resource type {}, not registering listener", myResourceName);
094                        return;
095                }
096                mySystemRequestDetails = SystemRequestDetails.forAllPartitions();
097
098                IResourceChangeListenerCache resourceCache =
099                                myResourceChangeListenerRegistry.registerResourceResourceChangeListener(
100                                                myResourceName, provideSearchParameterMap(), this, REFRESH_INTERVAL);
101                resourceCache.forceRefresh();
102        }
103
104        private SearchParameterMap provideSearchParameterMap() {
105                SearchParameterMap searchParameterMap = mySearchParameterMap;
106                if (searchParameterMap == null) {
107                        searchParameterMap = getSearchParameterMap();
108                        mySearchParameterMap = searchParameterMap;
109                }
110                return searchParameterMap;
111        }
112
113        @PreDestroy
114        public void unregisterListener() {
115                myResourceChangeListenerRegistry.unregisterResourceResourceChangeListener(this);
116        }
117
118        private boolean resourceDaoExists() {
119                return myDaoRegistry != null && myDaoRegistry.isResourceTypeSupported(myResourceName);
120        }
121
122        /**
123         * Read the existing resources from the database
124         */
125        public void syncDatabaseToCache() {
126                if (!resourceDaoExists()) {
127                        return;
128                }
129                if (!mySyncResourcesSemaphore.tryAcquire()) {
130                        return;
131                }
132                try {
133                        doSyncResourcesWithRetry();
134                } finally {
135                        mySyncResourcesSemaphore.release();
136                }
137        }
138
139        @VisibleForTesting
140        public void acquireSemaphoreForUnitTest() throws InterruptedException {
141                mySyncResourcesSemaphore.acquire();
142        }
143
144        @VisibleForTesting
145        public int doSyncResourcesForUnitTest() {
146                // Two passes for delete flag to take effect
147                int first = doSyncResourcesWithRetry();
148                int second = doSyncResourcesWithRetry();
149                return first + second;
150        }
151
152        synchronized int doSyncResourcesWithRetry() {
153                // retry runs MAX_RETRIES times
154                // and if errors result every time, it will fail
155                Retrier<Integer> syncResourceRetrier = new Retrier<>(this::doSyncResources, getMaxRetries());
156                return syncResourceRetrier.runWithRetry();
157        }
158
159        private int getMaxRetries() {
160                if (myMaxRetryCount != null) {
161                        return myMaxRetryCount;
162                }
163                return MAX_RETRIES;
164        }
165
166        @VisibleForTesting
167        public void setMaxRetries(Integer theMaxRetries) {
168                myMaxRetryCount = theMaxRetries;
169        }
170
171        @SuppressWarnings("unchecked")
172        private int doSyncResources() {
173                if (isStopping()) {
174                        return 0;
175                }
176
177                synchronized (mySyncResourcesLock) {
178                        ourLog.debug("Starting sync {}s", myResourceName);
179
180                        List<IBaseResource> resourceList = (List<IBaseResource>)
181                                        getResourceDao().searchForResources(provideSearchParameterMap(), mySystemRequestDetails);
182                        return syncResourcesIntoCache(resourceList);
183                }
184        }
185
186        protected abstract int syncResourcesIntoCache(List<IBaseResource> resourceList);
187
188        @EventListener(ContextRefreshedEvent.class)
189        public void start() {
190                myStopping = false;
191        }
192
193        @EventListener(ContextClosedEvent.class)
194        public void shutdown() {
195                myStopping = true;
196        }
197
198        private boolean isStopping() {
199                return myStopping;
200        }
201
202        private IFhirResourceDao<?> getResourceDao() {
203                return myDaoRegistry.getResourceDao(myResourceName);
204        }
205
206        @Override
207        public void handleInit(Collection<IIdType> theResourceIds) {
208                if (!resourceDaoExists()) {
209                        ourLog.warn(
210                                        "The resource type {} is enabled on this server, but there is no {} DAO configured.",
211                                        myResourceName,
212                                        myResourceName);
213                        return;
214                }
215                IFhirResourceDao<?> resourceDao = getResourceDao();
216                SystemRequestDetails systemRequestDetails = SystemRequestDetails.forAllPartitions();
217                List<IBaseResource> resourceList = theResourceIds.stream()
218                                .map(n -> resourceDao.read(n, systemRequestDetails))
219                                .collect(Collectors.toList());
220                handleInit(resourceList);
221        }
222
223        protected abstract void handleInit(List<IBaseResource> resourceList);
224
225        @Override
226        public void handleChange(IResourceChangeEvent theResourceChangeEvent) {
227                // For now ignore the contents of theResourceChangeEvent.  In the future, consider updating the registry based
228                // on
229                // known resources that have been created, updated & deleted
230                syncDatabaseToCache();
231        }
232
233        @Nonnull
234        protected abstract SearchParameterMap getSearchParameterMap();
235}