001/*- 002 * #%L 003 * HAPI FHIR Storage api 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.cache; 021 022import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 023import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 024import ca.uhn.fhir.jpa.cache.IResourceChangeEvent; 025import ca.uhn.fhir.jpa.cache.IResourceChangeListener; 026import ca.uhn.fhir.jpa.cache.IResourceChangeListenerCache; 027import ca.uhn.fhir.jpa.cache.IResourceChangeListenerRegistry; 028import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 029import ca.uhn.fhir.jpa.searchparam.retry.Retrier; 030import ca.uhn.fhir.rest.api.server.SystemRequestDetails; 031import com.google.common.annotations.VisibleForTesting; 032import jakarta.annotation.Nonnull; 033import jakarta.annotation.PostConstruct; 034import jakarta.annotation.PreDestroy; 035import org.apache.commons.lang3.time.DateUtils; 036import org.hl7.fhir.instance.model.api.IBaseResource; 037import org.hl7.fhir.instance.model.api.IIdType; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040import org.springframework.beans.factory.annotation.Autowired; 041import org.springframework.context.event.ContextClosedEvent; 042import org.springframework.context.event.ContextRefreshedEvent; 043import org.springframework.context.event.EventListener; 044 045import java.util.Collection; 046import java.util.List; 047import java.util.concurrent.Semaphore; 048import java.util.stream.Collectors; 049 050public abstract class BaseResourceCacheSynchronizer implements IResourceChangeListener { 051 private static final Logger ourLog = LoggerFactory.getLogger(BaseResourceCacheSynchronizer.class); 052 public static final int MAX_RETRIES = 60; // 60 * 5 seconds = 5 minutes 053 public static final long REFRESH_INTERVAL = DateUtils.MILLIS_PER_MINUTE; 054 private final String myResourceName; 055 056 @Autowired 057 private IResourceChangeListenerRegistry myResourceChangeListenerRegistry; 058 059 @Autowired 060 DaoRegistry myDaoRegistry; 061 062 private SearchParameterMap mySearchParameterMap; 063 private SystemRequestDetails mySystemRequestDetails; 064 private boolean myStopping; 065 private final Semaphore mySyncResourcesSemaphore = new Semaphore(1); 066 private final Object mySyncResourcesLock = new Object(); 067 068 protected BaseResourceCacheSynchronizer(String theResourceName) { 069 myResourceName = theResourceName; 070 } 071 072 protected BaseResourceCacheSynchronizer( 073 String theResourceName, 074 IResourceChangeListenerRegistry theResourceChangeListenerRegistry, 075 DaoRegistry theDaoRegistry) { 076 myResourceName = theResourceName; 077 myDaoRegistry = theDaoRegistry; 078 myResourceChangeListenerRegistry = theResourceChangeListenerRegistry; 079 } 080 081 @PostConstruct 082 public void registerListener() { 083 if (myDaoRegistry.getResourceDaoOrNull(myResourceName) == null) { 084 ourLog.info("No resource DAO found for resource type {}, not registering listener", myResourceName); 085 return; 086 } 087 mySearchParameterMap = getSearchParameterMap(); 088 mySystemRequestDetails = SystemRequestDetails.forAllPartitions(); 089 090 IResourceChangeListenerCache resourceCache = 091 myResourceChangeListenerRegistry.registerResourceResourceChangeListener( 092 myResourceName, mySearchParameterMap, this, REFRESH_INTERVAL); 093 resourceCache.forceRefresh(); 094 } 095 096 @PreDestroy 097 public void unregisterListener() { 098 myResourceChangeListenerRegistry.unregisterResourceResourceChangeListener(this); 099 } 100 101 private boolean resourceDaoExists() { 102 return myDaoRegistry != null && myDaoRegistry.isResourceTypeSupported(myResourceName); 103 } 104 105 /** 106 * Read the existing resources from the database 107 */ 108 public void syncDatabaseToCache() { 109 if (!resourceDaoExists()) { 110 return; 111 } 112 if (!mySyncResourcesSemaphore.tryAcquire()) { 113 return; 114 } 115 try { 116 doSyncResourcesWithRetry(); 117 } finally { 118 mySyncResourcesSemaphore.release(); 119 } 120 } 121 122 @VisibleForTesting 123 public void acquireSemaphoreForUnitTest() throws InterruptedException { 124 mySyncResourcesSemaphore.acquire(); 125 } 126 127 @VisibleForTesting 128 public int doSyncResourcesForUnitTest() { 129 // Two passes for delete flag to take effect 130 int first = doSyncResourcesWithRetry(); 131 int second = doSyncResourcesWithRetry(); 132 return first + second; 133 } 134 135 synchronized int doSyncResourcesWithRetry() { 136 // retry runs MAX_RETRIES times 137 // and if errors result every time, it will fail 138 Retrier<Integer> syncResourceRetrier = new Retrier<>(this::doSyncResources, MAX_RETRIES); 139 return syncResourceRetrier.runWithRetry(); 140 } 141 142 @SuppressWarnings("unchecked") 143 private int doSyncResources() { 144 if (isStopping()) { 145 return 0; 146 } 147 148 synchronized (mySyncResourcesLock) { 149 ourLog.debug("Starting sync {}s", myResourceName); 150 151 List<IBaseResource> resourceList = (List<IBaseResource>) 152 getResourceDao().searchForResources(mySearchParameterMap, mySystemRequestDetails); 153 return syncResourcesIntoCache(resourceList); 154 } 155 } 156 157 protected abstract int syncResourcesIntoCache(List<IBaseResource> resourceList); 158 159 @EventListener(ContextRefreshedEvent.class) 160 public void start() { 161 myStopping = false; 162 } 163 164 @EventListener(ContextClosedEvent.class) 165 public void shutdown() { 166 myStopping = true; 167 } 168 169 private boolean isStopping() { 170 return myStopping; 171 } 172 173 private IFhirResourceDao<?> getResourceDao() { 174 return myDaoRegistry.getResourceDao(myResourceName); 175 } 176 177 @Override 178 public void handleInit(Collection<IIdType> theResourceIds) { 179 if (!resourceDaoExists()) { 180 ourLog.warn( 181 "The resource type {} is enabled on this server, but there is no {} DAO configured.", 182 myResourceName, 183 myResourceName); 184 return; 185 } 186 IFhirResourceDao<?> resourceDao = getResourceDao(); 187 SystemRequestDetails systemRequestDetails = SystemRequestDetails.forAllPartitions(); 188 List<IBaseResource> resourceList = theResourceIds.stream() 189 .map(n -> resourceDao.read(n, systemRequestDetails)) 190 .collect(Collectors.toList()); 191 handleInit(resourceList); 192 } 193 194 protected abstract void handleInit(List<IBaseResource> resourceList); 195 196 @Override 197 public void handleChange(IResourceChangeEvent theResourceChangeEvent) { 198 // For now ignore the contents of theResourceChangeEvent. In the future, consider updating the registry based 199 // on 200 // known resources that have been created, updated & deleted 201 syncDatabaseToCache(); 202 } 203 204 @Nonnull 205 protected abstract SearchParameterMap getSearchParameterMap(); 206}