001/*-
002 * #%L
003 * HAPI FHIR JPA - Search Parameters
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.cache;
021
022import ca.uhn.fhir.jpa.model.sched.HapiJob;
023import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs;
024import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
025import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
026import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
027import com.google.common.annotations.VisibleForTesting;
028import jakarta.transaction.Transactional;
029import org.apache.commons.lang3.time.DateUtils;
030import org.hl7.fhir.instance.model.api.IIdType;
031import org.quartz.JobExecutionContext;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034import org.springframework.beans.factory.annotation.Autowired;
035import org.springframework.context.event.ContextClosedEvent;
036import org.springframework.context.event.ContextRefreshedEvent;
037import org.springframework.context.event.EventListener;
038import org.springframework.stereotype.Service;
039
040import java.util.ArrayList;
041import java.util.Iterator;
042import java.util.List;
043
044/**
045 * This service refreshes the {@link IResourceChangeListenerCache} caches and notifies their listener when
046 * those caches change.
047 *
048 * Think of it like a Ferris Wheel that completes a full rotation once every 10 seconds.
049 * Every time a chair passes the bottom it checks to see if it's time to refresh that seat.  If so,
050 * the Ferris Wheel stops, removes the riders, and loads a fresh cache for that chair, and calls the listener
051 * if any entries in the new cache are different from the last time that cache was loaded.
052 */
053@Service
054public class ResourceChangeListenerCacheRefresherImpl
055                implements IResourceChangeListenerCacheRefresher, IHasScheduledJobs {
056        private static final Logger ourLog = LoggerFactory.getLogger(ResourceChangeListenerCacheRefresherImpl.class);
057
058        /**
059         * All cache entries are checked at this interval to see if they need to be refreshed
060         */
061        static final long LOCAL_REFRESH_INTERVAL_MS = 10 * DateUtils.MILLIS_PER_SECOND;
062
063        @Autowired
064        private IResourceVersionSvc myResourceVersionSvc;
065
066        @Autowired
067        private ResourceChangeListenerRegistryImpl myResourceChangeListenerRegistry;
068
069        private boolean myStopping = false;
070
071        @Override
072        public void scheduleJobs(ISchedulerService theSchedulerService) {
073                ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
074                jobDetail.setId(getClass().getName());
075                jobDetail.setJobClass(Job.class);
076                theSchedulerService.scheduleLocalJob(LOCAL_REFRESH_INTERVAL_MS, jobDetail);
077        }
078
079        public static class Job implements HapiJob {
080                @Autowired
081                private IResourceChangeListenerCacheRefresher myTarget;
082
083                @Override
084                public void execute(JobExecutionContext theContext) {
085                        myTarget.refreshExpiredCachesAndNotifyListeners();
086                }
087        }
088
089        @Override
090        public ResourceChangeResult refreshExpiredCachesAndNotifyListeners() {
091                ResourceChangeResult retval = new ResourceChangeResult();
092                Iterator<ResourceChangeListenerCache> iterator = myResourceChangeListenerRegistry.iterator();
093                while (iterator.hasNext()) {
094                        ResourceChangeListenerCache entry = iterator.next();
095                        retval = retval.plus(entry.refreshCacheIfNecessary());
096                }
097                return retval;
098        }
099
100        @VisibleForTesting
101        public ResourceChangeResult forceRefreshAllCachesForUnitTest() {
102                ResourceChangeResult retval = new ResourceChangeResult();
103                Iterator<ResourceChangeListenerCache> iterator = myResourceChangeListenerRegistry.iterator();
104                while (iterator.hasNext()) {
105                        IResourceChangeListenerCache entry = iterator.next();
106                        retval = retval.plus(entry.forceRefresh());
107                }
108                return retval;
109        }
110
111        @VisibleForTesting
112        public void setResourceChangeListenerRegistry(
113                        ResourceChangeListenerRegistryImpl theResourceChangeListenerRegistry) {
114                myResourceChangeListenerRegistry = theResourceChangeListenerRegistry;
115        }
116
117        @VisibleForTesting
118        public void setResourceVersionSvc(IResourceVersionSvc theResourceVersionSvc) {
119                myResourceVersionSvc = theResourceVersionSvc;
120        }
121
122        @EventListener(ContextRefreshedEvent.class)
123        public void start() {
124                myStopping = false;
125        }
126
127        @EventListener(ContextClosedEvent.class)
128        public void shutdown() {
129                myStopping = true;
130        }
131
132        public boolean isStopping() {
133                return myStopping;
134        }
135
136        @Override
137        // Suspend any current transaction while we sync with the db.
138        // This avoids lock conflicts while reading the resource versions.
139        @Transactional(Transactional.TxType.NOT_SUPPORTED)
140        public ResourceChangeResult refreshCacheAndNotifyListener(IResourceChangeListenerCache theCache) {
141                ResourceChangeResult retVal = new ResourceChangeResult();
142
143                if (isStopping()) {
144                        ourLog.info("Context is stopping, aborting cache refresh");
145                        return retVal;
146                }
147                if (!myResourceChangeListenerRegistry.contains(theCache)) {
148                        ourLog.warn("Requesting cache refresh for unregistered listener {}.  Aborting.", theCache);
149                        return retVal;
150                }
151                SearchParameterMap searchParamMap = theCache.getSearchParameterMap();
152                ResourceVersionMap newResourceVersionMap =
153                                myResourceVersionSvc.getVersionMap(theCache.getResourceName(), searchParamMap);
154
155                retVal = retVal.plus(notifyListener(theCache, newResourceVersionMap));
156
157                return retVal;
158        }
159
160        /**
161         * Notify a listener with all matching resources if it hasn't been initialized yet, otherwise only notify it if
162         * any resources have changed
163         * @param theCache the target
164         * @param theNewResourceVersionMap the measured new resources
165         * @return the list of created, updated and deleted ids
166         */
167        ResourceChangeResult notifyListener(
168                        IResourceChangeListenerCache theCache, ResourceVersionMap theNewResourceVersionMap) {
169                ResourceChangeResult retval;
170                ResourceChangeListenerCache cache = (ResourceChangeListenerCache) theCache;
171                IResourceChangeListener resourceChangeListener = cache.getResourceChangeListener();
172                if (theCache.isInitialized()) {
173                        retval = compareLastVersionMapToNewVersionMapAndNotifyListenerOfChanges(
174                                        resourceChangeListener, cache.getResourceVersionCache(), theNewResourceVersionMap);
175                } else {
176                        cache.getResourceVersionCache().initialize(theNewResourceVersionMap);
177                        resourceChangeListener.handleInit(theNewResourceVersionMap.getSourceIds());
178                        retval = ResourceChangeResult.fromCreated(theNewResourceVersionMap.size());
179                        cache.setInitialized(true);
180                }
181                return retval;
182        }
183
184        private ResourceChangeResult compareLastVersionMapToNewVersionMapAndNotifyListenerOfChanges(
185                        IResourceChangeListener theListener,
186                        ResourceVersionCache theOldResourceVersionCache,
187                        ResourceVersionMap theNewResourceVersionMap) {
188                // If the new ResourceVersionMap does not have the old key - delete it
189                List<IIdType> deletedIds = new ArrayList<>();
190                theOldResourceVersionCache.keySet().forEach(id -> {
191                        if (!theNewResourceVersionMap.containsKey(id)) {
192                                deletedIds.add(id);
193                        }
194                });
195                deletedIds.forEach(theOldResourceVersionCache::removeResourceId);
196
197                List<IIdType> createdIds = new ArrayList<>();
198                List<IIdType> updatedIds = new ArrayList<>();
199
200                for (IIdType id : theNewResourceVersionMap.keySet()) {
201                        Long previousValue = theOldResourceVersionCache.put(id, theNewResourceVersionMap.get(id));
202                        IIdType newId = id.withVersion(theNewResourceVersionMap.get(id).toString());
203                        if (previousValue == null) {
204                                createdIds.add(newId);
205                        } else if (!theNewResourceVersionMap.get(id).equals(previousValue)) {
206                                updatedIds.add(newId);
207                        }
208                }
209
210                IResourceChangeEvent resourceChangeEvent =
211                                ResourceChangeEvent.fromCreatedUpdatedDeletedResourceIds(createdIds, updatedIds, deletedIds);
212                if (!resourceChangeEvent.isEmpty()) {
213                        theListener.handleChange(resourceChangeEvent);
214                }
215                return ResourceChangeResult.fromResourceChangeEvent(resourceChangeEvent);
216        }
217}