001/*-
002 * #%L
003 * HAPI FHIR Search Parameters
004 * %%
005 * Copyright (C) 2014 - 2023 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.cache;
021
022import ca.uhn.fhir.jpa.model.sched.HapiJob;
023import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs;
024import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
025import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
026import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
027import com.google.common.annotations.VisibleForTesting;
028import org.apache.commons.lang3.time.DateUtils;
029import org.hl7.fhir.instance.model.api.IIdType;
030import org.quartz.JobExecutionContext;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033import org.springframework.beans.factory.annotation.Autowired;
034import org.springframework.context.event.ContextClosedEvent;
035import org.springframework.context.event.ContextRefreshedEvent;
036import org.springframework.context.event.EventListener;
037import org.springframework.stereotype.Service;
038
039import java.util.ArrayList;
040import java.util.Iterator;
041import java.util.List;
042
043/**
044 * This service refreshes the {@link IResourceChangeListenerCache} caches and notifies their listener when
045 * those caches change.
046 *
047 * Think of it like a Ferris Wheel that completes a full rotation once every 10 seconds.
048 * Every time a chair passes the bottom it checks to see if it's time to refresh that seat.  If so,
049 * the Ferris Wheel stops, removes the riders, and loads a fresh cache for that chair, and calls the listener
050 * if any entries in the new cache are different from the last time that cache was loaded.
051 */
052@Service
053public class ResourceChangeListenerCacheRefresherImpl implements IResourceChangeListenerCacheRefresher, IHasScheduledJobs {
054        private static final Logger ourLog = LoggerFactory.getLogger(ResourceChangeListenerCacheRefresherImpl.class);
055
056        /**
057         * All cache entries are checked at this interval to see if they need to be refreshed
058         */
059        static long LOCAL_REFRESH_INTERVAL_MS = 10 * DateUtils.MILLIS_PER_SECOND;
060
061        @Autowired
062        private IResourceVersionSvc myResourceVersionSvc;
063        @Autowired
064        private ResourceChangeListenerRegistryImpl myResourceChangeListenerRegistry;
065        private boolean myStopping = false;
066
067        @Override
068        public void scheduleJobs(ISchedulerService theSchedulerService) {
069                ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
070                jobDetail.setId(getClass().getName());
071                jobDetail.setJobClass(Job.class);
072                theSchedulerService.scheduleLocalJob(LOCAL_REFRESH_INTERVAL_MS, jobDetail);
073        }
074
075        public static class Job implements HapiJob {
076                @Autowired
077                private IResourceChangeListenerCacheRefresher myTarget;
078
079                @Override
080                public void execute(JobExecutionContext theContext) {
081                        myTarget.refreshExpiredCachesAndNotifyListeners();
082                }
083        }
084
085        @Override
086        public ResourceChangeResult refreshExpiredCachesAndNotifyListeners() {
087                ResourceChangeResult retval = new ResourceChangeResult();
088                Iterator<ResourceChangeListenerCache> iterator = myResourceChangeListenerRegistry.iterator();
089                while (iterator.hasNext()) {
090                        ResourceChangeListenerCache entry = iterator.next();
091                        retval = retval.plus(entry.refreshCacheIfNecessary());
092                }
093                return retval;
094        }
095
096        @VisibleForTesting
097        public ResourceChangeResult forceRefreshAllCachesForUnitTest() {
098                ResourceChangeResult retval = new ResourceChangeResult();
099                Iterator<ResourceChangeListenerCache> iterator = myResourceChangeListenerRegistry.iterator();
100                while (iterator.hasNext()) {
101                        IResourceChangeListenerCache entry = iterator.next();
102                        retval = retval.plus(entry.forceRefresh());
103                }
104                return retval;
105        }
106
107        @VisibleForTesting
108        public void setResourceChangeListenerRegistry(ResourceChangeListenerRegistryImpl theResourceChangeListenerRegistry) {
109                myResourceChangeListenerRegistry = theResourceChangeListenerRegistry;
110        }
111
112        @VisibleForTesting
113        public void setResourceVersionSvc(IResourceVersionSvc theResourceVersionSvc) {
114                myResourceVersionSvc = theResourceVersionSvc;
115        }
116
117
118        @EventListener(ContextRefreshedEvent.class)
119        public void start() {
120                myStopping = false;
121        }
122
123        @EventListener(ContextClosedEvent.class)
124        public void shutdown() {
125                myStopping = true;
126        }
127
128        public boolean isStopping() {
129                return myStopping;
130        }
131
132        @Override
133        public ResourceChangeResult refreshCacheAndNotifyListener(IResourceChangeListenerCache theCache) {
134                ResourceChangeResult retVal = new ResourceChangeResult();
135                if (isStopping()) {
136                        ourLog.info("Context is stopping, aborting cache refresh");
137                        return retVal;
138                }
139                if (!myResourceChangeListenerRegistry.contains(theCache)) {
140                        ourLog.warn("Requesting cache refresh for unregistered listener {}.  Aborting.", theCache);
141                        return retVal;
142                }
143                SearchParameterMap searchParamMap = theCache.getSearchParameterMap();
144                ResourceVersionMap newResourceVersionMap = myResourceVersionSvc.getVersionMap(theCache.getResourceName(), searchParamMap);
145                retVal = retVal.plus(notifyListener(theCache, newResourceVersionMap));
146
147                return retVal;
148        }
149
150        /**
151         * Notify a listener with all matching resources if it hasn't been initialized yet, otherwise only notify it if
152         * any resources have changed
153         * @param theCache
154         * @param theNewResourceVersionMap the measured new resources
155         * @return the list of created, updated and deleted ids
156         */
157        ResourceChangeResult notifyListener(IResourceChangeListenerCache theCache, ResourceVersionMap theNewResourceVersionMap) {
158                ResourceChangeResult retval;
159                ResourceChangeListenerCache cache = (ResourceChangeListenerCache) theCache;
160                IResourceChangeListener resourceChangeListener = cache.getResourceChangeListener();
161                if (theCache.isInitialized()) {
162                        retval = compareLastVersionMapToNewVersionMapAndNotifyListenerOfChanges(resourceChangeListener, cache.getResourceVersionCache(), theNewResourceVersionMap);
163                } else {
164                        cache.getResourceVersionCache().initialize(theNewResourceVersionMap);
165                        resourceChangeListener.handleInit(theNewResourceVersionMap.getSourceIds());
166                        retval = ResourceChangeResult.fromCreated(theNewResourceVersionMap.size());
167                        cache.setInitialized(true);
168                }
169                return retval;
170        }
171
172        private ResourceChangeResult compareLastVersionMapToNewVersionMapAndNotifyListenerOfChanges(IResourceChangeListener theListener, ResourceVersionCache theOldResourceVersionCache, ResourceVersionMap theNewResourceVersionMap) {
173                // If the new ResourceVersionMap does not have the old key - delete it
174                List<IIdType> deletedIds = new ArrayList<>();
175                theOldResourceVersionCache.keySet()
176                        .forEach(id -> {
177                                if (!theNewResourceVersionMap.containsKey(id)) {
178                                        deletedIds.add(id);
179                                }
180                        });
181                deletedIds.forEach(theOldResourceVersionCache::removeResourceId);
182
183                List<IIdType> createdIds = new ArrayList<>();
184                List<IIdType> updatedIds = new ArrayList<>();
185
186                for (IIdType id : theNewResourceVersionMap.keySet()) {
187                        Long previousValue = theOldResourceVersionCache.put(id, theNewResourceVersionMap.get(id));
188                        IIdType newId = id.withVersion(theNewResourceVersionMap.get(id).toString());
189                        if (previousValue == null) {
190                                createdIds.add(newId);
191                        } else if (!theNewResourceVersionMap.get(id).equals(previousValue)) {
192                                updatedIds.add(newId);
193                        }
194                }
195
196                IResourceChangeEvent resourceChangeEvent = ResourceChangeEvent.fromCreatedUpdatedDeletedResourceIds(createdIds, updatedIds, deletedIds);
197                if (!resourceChangeEvent.isEmpty()) {
198                        theListener.handleChange(resourceChangeEvent);
199                }
200                return ResourceChangeResult.fromResourceChangeEvent(resourceChangeEvent);
201        }
202}