
001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.sp; 021 022import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; 023import ca.uhn.fhir.jpa.cache.ISearchParamIdentityCacheSvc; 024import ca.uhn.fhir.jpa.dao.data.IResourceIndexedSearchParamIdentityDao; 025import ca.uhn.fhir.jpa.model.entity.IndexedSearchParamIdentity; 026import ca.uhn.fhir.jpa.util.MemoryCacheService; 027import ca.uhn.fhir.util.ThreadPoolUtil; 028import jakarta.annotation.PostConstruct; 029import jakarta.annotation.PreDestroy; 030import org.slf4j.Logger; 031import org.springframework.dao.DataIntegrityViolationException; 032import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; 033import org.springframework.stereotype.Service; 034import org.springframework.transaction.PlatformTransactionManager; 035import org.springframework.transaction.TransactionDefinition; 036import org.springframework.transaction.support.TransactionSynchronization; 037import org.springframework.transaction.support.TransactionSynchronizationManager; 038import org.springframework.transaction.support.TransactionTemplate; 039 040import java.util.Collection; 041import java.util.Map; 042import java.util.Objects; 043import java.util.concurrent.Callable; 044import java.util.concurrent.ConcurrentHashMap; 045import java.util.concurrent.ExecutionException; 046import java.util.concurrent.Future; 047import java.util.concurrent.FutureTask; 048import java.util.concurrent.ThreadPoolExecutor; 049 050import static org.slf4j.LoggerFactory.getLogger; 051 052@Service 053public class SearchParamIdentityCacheSvcImpl implements ISearchParamIdentityCacheSvc { 054 private static final Logger ourLog = getLogger(SearchParamIdentityCacheSvcImpl.class); 055 056 private static final int MAX_RETRY_COUNT = 20; 057 private static final String CACHE_THREAD_PREFIX = "search-parameter-identity-cache-"; 058 private static final int THREAD_POOL_QUEUE_SIZE = 5000; 059 private final JpaStorageSettings myStorageSettings; 060 private final IResourceIndexedSearchParamIdentityDao mySearchParamIdentityDao; 061 private final TransactionTemplate myTxTemplate; 062 private final ThreadPoolTaskExecutor myThreadPoolTaskExecutor; 063 private final MemoryCacheService myMemoryCacheService; 064 private final UniqueTaskExecutor myUniqueTaskExecutor; 065 066 public SearchParamIdentityCacheSvcImpl( 067 JpaStorageSettings theStorageSettings, 068 IResourceIndexedSearchParamIdentityDao theResourceIndexedSearchParamIdentityDao, 069 PlatformTransactionManager theTxManager, 070 MemoryCacheService theMemoryCacheService) { 071 myStorageSettings = theStorageSettings; 072 mySearchParamIdentityDao = theResourceIndexedSearchParamIdentityDao; 073 myTxTemplate = new TransactionTemplate(theTxManager); 074 myTxTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); 075 myThreadPoolTaskExecutor = createExecutor(); 076 myMemoryCacheService = theMemoryCacheService; 077 myUniqueTaskExecutor = new UniqueTaskExecutor(myThreadPoolTaskExecutor); 078 } 079 080 /** 081 * Creates a thread pool executor for asynchronously executing 082 * {@link PersistSearchParameterIdentityTask} instances. 083 * <p> 084 * Uses a fixed pool size of 1 and a bounded queue with a capacity of 5000. 085 * <p> 086 * If the queue is full and all threads are busy, new tasks are silently 087 * discarded using {@link ThreadPoolExecutor.DiscardPolicy}. 088 */ 089 private ThreadPoolTaskExecutor createExecutor() { 090 return ThreadPoolUtil.newThreadPool( 091 1, 1, CACHE_THREAD_PREFIX, THREAD_POOL_QUEUE_SIZE, new ThreadPoolExecutor.DiscardPolicy()); 092 } 093 094 @PostConstruct 095 public void start() { 096 initCache(); 097 } 098 099 @PreDestroy 100 public void preDestroy() { 101 myThreadPoolTaskExecutor.shutdown(); 102 } 103 104 /** 105 * Initializes the cache by preloading search parameter identities {@link IndexedSearchParamIdentity}. 106 */ 107 protected void initCache() { 108 // populate cache with IndexedSearchParamIdentities from database 109 Collection<Object[]> spIdentities = 110 Objects.requireNonNull(myTxTemplate.execute(t -> mySearchParamIdentityDao.getAllHashIdentities())); 111 spIdentities.forEach( 112 i -> CacheUtils.putSearchParamIdentityToCache(myMemoryCacheService, (Long) i[0], (Integer) i[1])); 113 } 114 115 private void submitPersistSearchParameterIdentityTask( 116 Long hashIdentity, String theResourceType, String theSearchParamName) { 117 PersistSearchParameterIdentityTask persistSpIdentityTask = new PersistSearchParameterIdentityTask.Builder() 118 .hashIdentity(hashIdentity) 119 .resourceType(theResourceType) 120 .paramName(theSearchParamName) 121 .memoryCacheService(myMemoryCacheService) 122 .txTemplate(myTxTemplate) 123 .searchParamIdentityDao(mySearchParamIdentityDao) 124 .build(); 125 126 myUniqueTaskExecutor.submitIfAbsent(persistSpIdentityTask); 127 } 128 129 /** 130 * Asynchronously ensures that a {@link IndexedSearchParamIdentity} exists for the given 131 * hash identity, parameter name, and resource type. If the identity is already present 132 * in the in-memory cache, no action is taken. 133 * 134 * <p>If the identity is missing, a {@link PersistSearchParameterIdentityTask} is created 135 * and submitted for asynchronous execution. To avoid modifying the cache during an 136 * active transaction, task submission is deferred until after the transaction is committed. 137 * 138 * @param theHashIdentity The hash identity representing the search parameter. 139 * @param theResourceType The resource type (e.g., "Patient", "Observation"). 140 * @param theParamName The search parameter name. 141 * 142 * @see PersistSearchParameterIdentityTask 143 */ 144 public void findOrCreateSearchParamIdentity(Long theHashIdentity, String theResourceType, String theParamName) { 145 if (!myStorageSettings.isWriteToSearchParamIdentityTable()) { 146 return; 147 } 148 149 // check if SearchParamIdentity is already in cache 150 Integer spIdentityId = CacheUtils.getSearchParamIdentityFromCache(myMemoryCacheService, theHashIdentity); 151 152 if (spIdentityId != null) { 153 return; 154 } 155 156 // cache miss, submit PersistSearchParameterIdentityTask to execute it in separate thread 157 if (TransactionSynchronizationManager.isSynchronizationActive()) { 158 TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { 159 @Override 160 public void afterCommit() { 161 submitPersistSearchParameterIdentityTask(theHashIdentity, theResourceType, theParamName); 162 } 163 }); 164 } else { 165 submitPersistSearchParameterIdentityTask(theHashIdentity, theResourceType, theParamName); 166 } 167 } 168 169 /** 170 * This class is responsible for ensuring that a unique {@link IndexedSearchParamIdentity} 171 * exists for a given hash identity (parameter name and resource type). This task is 172 * executed asynchronously to avoid blocking the main thread during persistence. 173 * 174 * <p> 175 * This task checks the in-memory cache for the given hash identity and, if missing, 176 * attempts to create or retrieve the corresponding {@link IndexedSearchParamIdentity} 177 * from the database. The result is then added to the cache. 178 * 179 * <p> 180 * Up to 20 retries are permitted in case of a {@link DataIntegrityViolationException}, 181 * which can occur due to concurrent insert attempts for the same identity. If all retries 182 * fail, the {@link IndexedSearchParamIdentity} will not be saved during this execution, 183 * but the task may be retried later when submitted again. 184 * 185 * @see IndexedSearchParamIdentity 186 * @see MemoryCacheService 187 */ 188 public static class PersistSearchParameterIdentityTask implements Callable<Void> { 189 190 private final Long myHashIdentity; 191 private final String myResourceType; 192 private final String myParamName; 193 private final TransactionTemplate myTxTemplate; 194 private final MemoryCacheService myMemoryCacheService; 195 private final IResourceIndexedSearchParamIdentityDao myResourceIndexedSearchParamIdentityDao; 196 197 private PersistSearchParameterIdentityTask(Builder theBuilder) { 198 this.myHashIdentity = theBuilder.myHashIdentity; 199 this.myResourceType = theBuilder.myResourceType; 200 this.myParamName = theBuilder.myParamName; 201 this.myTxTemplate = theBuilder.myTxTemplate; 202 this.myMemoryCacheService = theBuilder.myMemoryCacheService; 203 this.myResourceIndexedSearchParamIdentityDao = theBuilder.mySearchParamIdentityDao; 204 } 205 206 public Long getHashIdentity() { 207 return myHashIdentity; 208 } 209 210 public String getMyResourceType() { 211 return myResourceType; 212 } 213 214 public String getMyParamName() { 215 return myParamName; 216 } 217 218 public static class Builder { 219 private Long myHashIdentity; 220 private String myResourceType; 221 private String myParamName; 222 private TransactionTemplate myTxTemplate; 223 private MemoryCacheService myMemoryCacheService; 224 private IResourceIndexedSearchParamIdentityDao mySearchParamIdentityDao; 225 226 public Builder hashIdentity(Long theHashIdentity) { 227 this.myHashIdentity = theHashIdentity; 228 return this; 229 } 230 231 public Builder resourceType(String theResourceType) { 232 this.myResourceType = theResourceType; 233 return this; 234 } 235 236 public Builder paramName(String theParamName) { 237 this.myParamName = theParamName; 238 return this; 239 } 240 241 public Builder txTemplate(TransactionTemplate theTxTemplate) { 242 this.myTxTemplate = theTxTemplate; 243 return this; 244 } 245 246 public Builder memoryCacheService(MemoryCacheService theMemoryCacheService) { 247 this.myMemoryCacheService = theMemoryCacheService; 248 return this; 249 } 250 251 public Builder searchParamIdentityDao(IResourceIndexedSearchParamIdentityDao theSearchParamIdentityDao) { 252 this.mySearchParamIdentityDao = theSearchParamIdentityDao; 253 return this; 254 } 255 256 public PersistSearchParameterIdentityTask build() { 257 return new PersistSearchParameterIdentityTask(this); 258 } 259 } 260 261 @Override 262 public Void call() throws Exception { 263 Integer spIdentityId; 264 int retry = 0; 265 while (retry++ < MAX_RETRY_COUNT) { 266 spIdentityId = CacheUtils.getSearchParamIdentityFromCache(myMemoryCacheService, myHashIdentity); 267 268 if (spIdentityId != null) { 269 return null; 270 } 271 272 try { 273 // try to retrieve search parameter identity from db, create if missing, update cache 274 spIdentityId = findOrCreateIndexedSearchParamIdentity(myHashIdentity, myParamName, myResourceType); 275 CacheUtils.putSearchParamIdentityToCache(myMemoryCacheService, myHashIdentity, spIdentityId); 276 return null; 277 } catch (DataIntegrityViolationException theDataIntegrityViolationException) { 278 // retryable exception - unique search parameter identity or hash identity constraint violation 279 ourLog.trace( 280 "Failed to save SearchParamIndexIdentity for search parameter with hash identity: {}, " 281 + "resourceType: {}, paramName: {}, retry attempt: {}", 282 myHashIdentity, 283 myResourceType, 284 myParamName, 285 retry, 286 theDataIntegrityViolationException); 287 } 288 } 289 ourLog.warn( 290 "Failed saving IndexedSearchParamIdentity with hash identity: {}, resourceType: {}, paramName: {}", 291 myHashIdentity, 292 myResourceType, 293 myParamName); 294 return null; 295 } 296 297 private Integer findOrCreateIndexedSearchParamIdentity( 298 Long theHashIdentity, String theParamName, String theResourceType) { 299 300 return myTxTemplate.execute(tx -> { 301 IndexedSearchParamIdentity identity = 302 myResourceIndexedSearchParamIdentityDao.getSearchParameterIdByHashIdentity(theHashIdentity); 303 if (identity != null) { 304 ourLog.trace( 305 "DB read success IndexedSearchParamIdentity with hash identity: {}, resourceType: {}, paramName: {}", 306 theHashIdentity, 307 theResourceType, 308 theParamName); 309 return identity.getSpIdentityId(); 310 } 311 312 IndexedSearchParamIdentity indexedSearchParamIdentity = new IndexedSearchParamIdentity(); 313 indexedSearchParamIdentity.setHashIdentity(theHashIdentity); 314 indexedSearchParamIdentity.setParamName(theParamName); 315 indexedSearchParamIdentity.setResourceType(theResourceType); 316 317 myResourceIndexedSearchParamIdentityDao.save(indexedSearchParamIdentity); 318 ourLog.trace( 319 "Write success IndexedSearchParamIdentity with hash identity: {}, resourceType: {}, paramName: {},", 320 theHashIdentity, 321 theResourceType, 322 theParamName); 323 324 return indexedSearchParamIdentity.getSpIdentityId(); 325 }); 326 } 327 } 328 329 public static class CacheUtils { 330 331 private CacheUtils() {} 332 333 public static Integer getSearchParamIdentityFromCache( 334 MemoryCacheService memoryCacheService, Long hashIdentity) { 335 return memoryCacheService.getIfPresent( 336 MemoryCacheService.CacheEnum.HASH_IDENTITY_TO_SEARCH_PARAM_IDENTITY, hashIdentity); 337 } 338 339 public static void putSearchParamIdentityToCache( 340 MemoryCacheService memoryCacheService, Long theHashIdentity, Integer theSpIdentityId) { 341 memoryCacheService.put( 342 MemoryCacheService.CacheEnum.HASH_IDENTITY_TO_SEARCH_PARAM_IDENTITY, 343 theHashIdentity, 344 theSpIdentityId); 345 } 346 } 347 348 /** 349 * Ensures only one instance of the PersistSearchParameterIdentityTask is running per hash identity. 350 * If a task is already in progress, it will not be scheduled again. 351 */ 352 private static class UniqueTaskExecutor { 353 private final ThreadPoolTaskExecutor myTaskExecutor; 354 private final Map<Long, Future<Void>> myInFlightTasks = new ConcurrentHashMap<>(); 355 356 public UniqueTaskExecutor(ThreadPoolTaskExecutor theTaskExecutor) { 357 myTaskExecutor = theTaskExecutor; 358 } 359 360 public void submitIfAbsent(PersistSearchParameterIdentityTask theTask) { 361 Long hashIdentity = theTask.getHashIdentity(); 362 363 // already have a task with same hashIdentity - skip scheduling 364 Future<Void> existing = myInFlightTasks.get(hashIdentity); 365 if (existing != null) { 366 return; 367 } 368 369 // put FutureTask in the map. If another thread already put it - skip scheduling. 370 FutureTask<Void> futureTask = new LoggingFutureTask(theTask); 371 existing = myInFlightTasks.putIfAbsent(hashIdentity, futureTask); 372 if (existing != null) { 373 return; 374 } 375 376 myTaskExecutor.execute(() -> { 377 try { 378 futureTask.run(); 379 } finally { 380 // remove from the cache once done or failed 381 myInFlightTasks.remove(hashIdentity, futureTask); 382 } 383 }); 384 } 385 } 386 387 /** 388 * A {@link FutureTask} implementation that logs any exception thrown 389 * during execution of a {@link PersistSearchParameterIdentityTask}. 390 * <p> 391 * Since {@link PersistSearchParameterIdentityTask} runs asynchronously in a 392 * separate thread, any exception it throws can only be observed by calling {@link #get()}. 393 * <p> 394 * This class overrides {@link #done()} to call {@code get()}, 395 * and log {@link ExecutionException} or {@link InterruptedException}. 396 */ 397 private static class LoggingFutureTask extends FutureTask<Void> { 398 private final Long myHashIdentity; 399 private final String myResourceType; 400 private final String myParamName; 401 402 public LoggingFutureTask(PersistSearchParameterIdentityTask theTask) { 403 super(theTask); 404 this.myHashIdentity = theTask.getHashIdentity(); 405 this.myResourceType = theTask.getMyResourceType(); 406 this.myParamName = theTask.getMyParamName(); 407 } 408 409 @Override 410 protected void done() { 411 try { 412 get(); 413 } catch (ExecutionException theException) { 414 ourLog.error( 415 "PersistSearchParameterIdentityTask failed. Hash identity: {}, resourceType: {}, paramName: {}, ", 416 myHashIdentity, 417 myResourceType, 418 myParamName, 419 theException.getCause()); 420 } catch (InterruptedException theInterruptedException) { 421 Thread.currentThread().interrupt(); 422 ourLog.warn( 423 "PersistSearchParameterIdentityTask was interrupted. Hash identity: {}, resourceType: {}, paramName: {}, ", 424 myHashIdentity, 425 myResourceType, 426 myParamName, 427 theInterruptedException); 428 } 429 } 430 } 431}