001/*-
002 * #%L
003 * HAPI FHIR Storage api
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.cache;
021
022import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
023import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
024import ca.uhn.fhir.jpa.cache.IResourceChangeEvent;
025import ca.uhn.fhir.jpa.cache.IResourceChangeListener;
026import ca.uhn.fhir.jpa.cache.IResourceChangeListenerCache;
027import ca.uhn.fhir.jpa.cache.IResourceChangeListenerRegistry;
028import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
029import ca.uhn.fhir.jpa.searchparam.retry.Retrier;
030import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
031import com.google.common.annotations.VisibleForTesting;
032import jakarta.annotation.Nonnull;
033import jakarta.annotation.PostConstruct;
034import jakarta.annotation.PreDestroy;
035import org.apache.commons.lang3.time.DateUtils;
036import org.hl7.fhir.instance.model.api.IBaseResource;
037import org.hl7.fhir.instance.model.api.IIdType;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040import org.springframework.beans.factory.annotation.Autowired;
041import org.springframework.context.event.ContextClosedEvent;
042import org.springframework.context.event.ContextRefreshedEvent;
043import org.springframework.context.event.EventListener;
044
045import java.util.Collection;
046import java.util.List;
047import java.util.concurrent.Semaphore;
048import java.util.stream.Collectors;
049
050public abstract class BaseResourceCacheSynchronizer implements IResourceChangeListener {
051        private static final Logger ourLog = LoggerFactory.getLogger(BaseResourceCacheSynchronizer.class);
052        public static final int MAX_RETRIES = 60; // 60 * 5 seconds = 5 minutes
053        public static final long REFRESH_INTERVAL = DateUtils.MILLIS_PER_MINUTE;
054        private final String myResourceName;
055
056        @Autowired
057        private IResourceChangeListenerRegistry myResourceChangeListenerRegistry;
058
059        @Autowired
060        DaoRegistry myDaoRegistry;
061
062        private SearchParameterMap mySearchParameterMap;
063        private SystemRequestDetails mySystemRequestDetails;
064        private boolean myStopping;
065        private final Semaphore mySyncResourcesSemaphore = new Semaphore(1);
066        private final Object mySyncResourcesLock = new Object();
067
068        protected BaseResourceCacheSynchronizer(String theResourceName) {
069                myResourceName = theResourceName;
070        }
071
072        protected BaseResourceCacheSynchronizer(
073                        String theResourceName,
074                        IResourceChangeListenerRegistry theResourceChangeListenerRegistry,
075                        DaoRegistry theDaoRegistry) {
076                myResourceName = theResourceName;
077                myDaoRegistry = theDaoRegistry;
078                myResourceChangeListenerRegistry = theResourceChangeListenerRegistry;
079        }
080
081        @PostConstruct
082        public void registerListener() {
083                if (myDaoRegistry.getResourceDaoOrNull(myResourceName) == null) {
084                        ourLog.info("No resource DAO found for resource type {}, not registering listener", myResourceName);
085                        return;
086                }
087                mySearchParameterMap = getSearchParameterMap();
088                mySystemRequestDetails = SystemRequestDetails.forAllPartitions();
089
090                IResourceChangeListenerCache resourceCache =
091                                myResourceChangeListenerRegistry.registerResourceResourceChangeListener(
092                                                myResourceName, mySearchParameterMap, this, REFRESH_INTERVAL);
093                resourceCache.forceRefresh();
094        }
095
096        @PreDestroy
097        public void unregisterListener() {
098                myResourceChangeListenerRegistry.unregisterResourceResourceChangeListener(this);
099        }
100
101        private boolean resourceDaoExists() {
102                return myDaoRegistry != null && myDaoRegistry.isResourceTypeSupported(myResourceName);
103        }
104
105        /**
106         * Read the existing resources from the database
107         */
108        public void syncDatabaseToCache() {
109                if (!resourceDaoExists()) {
110                        return;
111                }
112                if (!mySyncResourcesSemaphore.tryAcquire()) {
113                        return;
114                }
115                try {
116                        doSyncResourcesWithRetry();
117                } finally {
118                        mySyncResourcesSemaphore.release();
119                }
120        }
121
122        @VisibleForTesting
123        public void acquireSemaphoreForUnitTest() throws InterruptedException {
124                mySyncResourcesSemaphore.acquire();
125        }
126
127        @VisibleForTesting
128        public int doSyncResourcesForUnitTest() {
129                // Two passes for delete flag to take effect
130                int first = doSyncResourcesWithRetry();
131                int second = doSyncResourcesWithRetry();
132                return first + second;
133        }
134
135        synchronized int doSyncResourcesWithRetry() {
136                // retry runs MAX_RETRIES times
137                // and if errors result every time, it will fail
138                Retrier<Integer> syncResourceRetrier = new Retrier<>(this::doSyncResources, MAX_RETRIES);
139                return syncResourceRetrier.runWithRetry();
140        }
141
142        @SuppressWarnings("unchecked")
143        private int doSyncResources() {
144                if (isStopping()) {
145                        return 0;
146                }
147
148                synchronized (mySyncResourcesLock) {
149                        ourLog.debug("Starting sync {}s", myResourceName);
150
151                        List<IBaseResource> resourceList = (List<IBaseResource>)
152                                        getResourceDao().searchForResources(mySearchParameterMap, mySystemRequestDetails);
153                        return syncResourcesIntoCache(resourceList);
154                }
155        }
156
157        protected abstract int syncResourcesIntoCache(List<IBaseResource> resourceList);
158
159        @EventListener(ContextRefreshedEvent.class)
160        public void start() {
161                myStopping = false;
162        }
163
164        @EventListener(ContextClosedEvent.class)
165        public void shutdown() {
166                myStopping = true;
167        }
168
169        private boolean isStopping() {
170                return myStopping;
171        }
172
173        private IFhirResourceDao<?> getResourceDao() {
174                return myDaoRegistry.getResourceDao(myResourceName);
175        }
176
177        @Override
178        public void handleInit(Collection<IIdType> theResourceIds) {
179                if (!resourceDaoExists()) {
180                        ourLog.warn(
181                                        "The resource type {} is enabled on this server, but there is no {} DAO configured.",
182                                        myResourceName,
183                                        myResourceName);
184                        return;
185                }
186                IFhirResourceDao<?> resourceDao = getResourceDao();
187                SystemRequestDetails systemRequestDetails = SystemRequestDetails.forAllPartitions();
188                List<IBaseResource> resourceList = theResourceIds.stream()
189                                .map(n -> resourceDao.read(n, systemRequestDetails))
190                                .collect(Collectors.toList());
191                handleInit(resourceList);
192        }
193
194        protected abstract void handleInit(List<IBaseResource> resourceList);
195
196        @Override
197        public void handleChange(IResourceChangeEvent theResourceChangeEvent) {
198                // For now ignore the contents of theResourceChangeEvent.  In the future, consider updating the registry based
199                // on
200                // known resources that have been created, updated & deleted
201                syncDatabaseToCache();
202        }
203
204        @Nonnull
205        protected abstract SearchParameterMap getSearchParameterMap();
206}