
001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.sp; 021 022import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; 023import ca.uhn.fhir.jpa.cache.ISearchParamIdentityCacheSvc; 024import ca.uhn.fhir.jpa.dao.data.IResourceIndexedSearchParamIdentityDao; 025import ca.uhn.fhir.jpa.model.entity.IndexedSearchParamIdentity; 026import ca.uhn.fhir.jpa.util.MemoryCacheService; 027import ca.uhn.fhir.util.ThreadPoolUtil; 028import jakarta.annotation.PostConstruct; 029import jakarta.annotation.PreDestroy; 030import org.slf4j.Logger; 031import org.springframework.dao.DataIntegrityViolationException; 032import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; 033import org.springframework.stereotype.Service; 034import org.springframework.transaction.PlatformTransactionManager; 035import org.springframework.transaction.TransactionDefinition; 036import org.springframework.transaction.support.TransactionSynchronization; 037import org.springframework.transaction.support.TransactionSynchronizationManager; 038import org.springframework.transaction.support.TransactionTemplate; 039 040import java.util.Collection; 041import java.util.Map; 042import java.util.Objects; 043import java.util.concurrent.Callable; 044import java.util.concurrent.ConcurrentHashMap; 045import java.util.concurrent.ExecutionException; 046import java.util.concurrent.Future; 047import java.util.concurrent.FutureTask; 048import java.util.concurrent.ThreadPoolExecutor; 049 050import static org.slf4j.LoggerFactory.getLogger; 051 052@Service 053public class SearchParamIdentityCacheSvcImpl implements ISearchParamIdentityCacheSvc { 054 private static final Logger ourLog = getLogger(SearchParamIdentityCacheSvcImpl.class); 055 056 private static final int MAX_RETRY_COUNT = 20; 057 private static final String CACHE_THREAD_PREFIX = "search-parameter-identity-cache-"; 058 private static final int THREAD_POOL_QUEUE_SIZE = 5000; 059 private final JpaStorageSettings myStorageSettings; 060 private final IResourceIndexedSearchParamIdentityDao mySearchParamIdentityDao; 061 private final TransactionTemplate myTxTemplate; 062 private final ThreadPoolTaskExecutor myThreadPoolTaskExecutor; 063 private final MemoryCacheService myMemoryCacheService; 064 private final UniqueTaskExecutor myUniqueTaskExecutor; 065 066 public SearchParamIdentityCacheSvcImpl( 067 JpaStorageSettings theStorageSettings, 068 IResourceIndexedSearchParamIdentityDao theResourceIndexedSearchParamIdentityDao, 069 PlatformTransactionManager theTxManager, 070 MemoryCacheService theMemoryCacheService) { 071 myStorageSettings = theStorageSettings; 072 mySearchParamIdentityDao = theResourceIndexedSearchParamIdentityDao; 073 myTxTemplate = new TransactionTemplate(theTxManager); 074 myTxTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); 075 myThreadPoolTaskExecutor = createExecutor(); 076 myMemoryCacheService = theMemoryCacheService; 077 myUniqueTaskExecutor = new UniqueTaskExecutor(myThreadPoolTaskExecutor); 078 } 079 080 /** 081 * Creates a thread pool executor for asynchronously executing 082 * {@link PersistSearchParameterIdentityTask} instances. 083 * <p> 084 * Uses a fixed pool size of 1 and a bounded queue with a capacity of 5000. 085 * <p> 086 * If the queue is full and all threads are busy, new tasks are silently 087 * discarded using {@link ThreadPoolExecutor.DiscardPolicy}. 088 */ 089 private ThreadPoolTaskExecutor createExecutor() { 090 return ThreadPoolUtil.newThreadPool( 091 1, 1, CACHE_THREAD_PREFIX, THREAD_POOL_QUEUE_SIZE, new ThreadPoolExecutor.DiscardPolicy()); 092 } 093 094 @PostConstruct 095 public void start() { 096 initCache(); 097 } 098 099 @PreDestroy 100 public void preDestroy() { 101 myThreadPoolTaskExecutor.shutdown(); 102 } 103 104 /** 105 * Initializes the cache by preloading search parameter identities {@link IndexedSearchParamIdentity}. 106 */ 107 protected void initCache() { 108 // populate cache with IndexedSearchParamIdentities from database 109 Collection<Object[]> spIdentities = 110 Objects.requireNonNull(myTxTemplate.execute(t -> mySearchParamIdentityDao.getAllHashIdentities())); 111 spIdentities.forEach( 112 i -> CacheUtils.putSearchParamIdentityToCache(myMemoryCacheService, (Long) i[0], (Integer) i[1])); 113 } 114 115 private void submitPersistSearchParameterIdentityTask( 116 Long hashIdentity, String theResourceType, String theSearchParamName) { 117 PersistSearchParameterIdentityTask persistSpIdentityTask = new PersistSearchParameterIdentityTask.Builder() 118 .hashIdentity(hashIdentity) 119 .resourceType(theResourceType) 120 .paramName(theSearchParamName) 121 .memoryCacheService(myMemoryCacheService) 122 .txTemplate(myTxTemplate) 123 .searchParamIdentityDao(mySearchParamIdentityDao) 124 .build(); 125 126 myUniqueTaskExecutor.submitIfAbsent(persistSpIdentityTask); 127 } 128 129 @Override 130 public boolean hasInFlightTasks() { 131 return myUniqueTaskExecutor.hasInFlightTasks(); 132 } 133 134 /** 135 * Asynchronously ensures that a {@link IndexedSearchParamIdentity} exists for the given 136 * hash identity, parameter name, and resource type. If the identity is already present 137 * in the in-memory cache, no action is taken. 138 * 139 * <p>If the identity is missing, a {@link PersistSearchParameterIdentityTask} is created 140 * and submitted for asynchronous execution. To avoid modifying the cache during an 141 * active transaction, task submission is deferred until after the transaction is committed. 142 * 143 * @param theHashIdentity The hash identity representing the search parameter. 144 * @param theResourceType The resource type (e.g., "Patient", "Observation"). 145 * @param theParamName The search parameter name. 146 * 147 * @see PersistSearchParameterIdentityTask 148 */ 149 public void findOrCreateSearchParamIdentity(Long theHashIdentity, String theResourceType, String theParamName) { 150 if (!myStorageSettings.isWriteToSearchParamIdentityTable()) { 151 return; 152 } 153 154 // check if SearchParamIdentity is already in cache 155 Integer spIdentityId = CacheUtils.getSearchParamIdentityFromCache(myMemoryCacheService, theHashIdentity); 156 157 if (spIdentityId != null) { 158 return; 159 } 160 161 // cache miss, submit PersistSearchParameterIdentityTask to execute it in separate thread 162 if (TransactionSynchronizationManager.isSynchronizationActive()) { 163 TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { 164 @Override 165 public void afterCommit() { 166 submitPersistSearchParameterIdentityTask(theHashIdentity, theResourceType, theParamName); 167 } 168 }); 169 } else { 170 submitPersistSearchParameterIdentityTask(theHashIdentity, theResourceType, theParamName); 171 } 172 } 173 174 /** 175 * This class is responsible for ensuring that a unique {@link IndexedSearchParamIdentity} 176 * exists for a given hash identity (parameter name and resource type). This task is 177 * executed asynchronously to avoid blocking the main thread during persistence. 178 * 179 * <p> 180 * This task checks the in-memory cache for the given hash identity and, if missing, 181 * attempts to create or retrieve the corresponding {@link IndexedSearchParamIdentity} 182 * from the database. The result is then added to the cache. 183 * 184 * <p> 185 * Up to 20 retries are permitted in case of a {@link DataIntegrityViolationException}, 186 * which can occur due to concurrent insert attempts for the same identity. If all retries 187 * fail, the {@link IndexedSearchParamIdentity} will not be saved during this execution, 188 * but the task may be retried later when submitted again. 189 * 190 * @see IndexedSearchParamIdentity 191 * @see MemoryCacheService 192 */ 193 public static class PersistSearchParameterIdentityTask implements Callable<Void> { 194 195 private final Long myHashIdentity; 196 private final String myResourceType; 197 private final String myParamName; 198 private final TransactionTemplate myTxTemplate; 199 private final MemoryCacheService myMemoryCacheService; 200 private final IResourceIndexedSearchParamIdentityDao myResourceIndexedSearchParamIdentityDao; 201 202 private PersistSearchParameterIdentityTask(Builder theBuilder) { 203 this.myHashIdentity = theBuilder.myHashIdentity; 204 this.myResourceType = theBuilder.myResourceType; 205 this.myParamName = theBuilder.myParamName; 206 this.myTxTemplate = theBuilder.myTxTemplate; 207 this.myMemoryCacheService = theBuilder.myMemoryCacheService; 208 this.myResourceIndexedSearchParamIdentityDao = theBuilder.mySearchParamIdentityDao; 209 } 210 211 public Long getHashIdentity() { 212 return myHashIdentity; 213 } 214 215 public String getMyResourceType() { 216 return myResourceType; 217 } 218 219 public String getMyParamName() { 220 return myParamName; 221 } 222 223 public static class Builder { 224 private Long myHashIdentity; 225 private String myResourceType; 226 private String myParamName; 227 private TransactionTemplate myTxTemplate; 228 private MemoryCacheService myMemoryCacheService; 229 private IResourceIndexedSearchParamIdentityDao mySearchParamIdentityDao; 230 231 public Builder hashIdentity(Long theHashIdentity) { 232 this.myHashIdentity = theHashIdentity; 233 return this; 234 } 235 236 public Builder resourceType(String theResourceType) { 237 this.myResourceType = theResourceType; 238 return this; 239 } 240 241 public Builder paramName(String theParamName) { 242 this.myParamName = theParamName; 243 return this; 244 } 245 246 public Builder txTemplate(TransactionTemplate theTxTemplate) { 247 this.myTxTemplate = theTxTemplate; 248 return this; 249 } 250 251 public Builder memoryCacheService(MemoryCacheService theMemoryCacheService) { 252 this.myMemoryCacheService = theMemoryCacheService; 253 return this; 254 } 255 256 public Builder searchParamIdentityDao(IResourceIndexedSearchParamIdentityDao theSearchParamIdentityDao) { 257 this.mySearchParamIdentityDao = theSearchParamIdentityDao; 258 return this; 259 } 260 261 public PersistSearchParameterIdentityTask build() { 262 return new PersistSearchParameterIdentityTask(this); 263 } 264 } 265 266 @Override 267 public Void call() throws Exception { 268 Integer spIdentityId; 269 int retry = 0; 270 while (retry++ < MAX_RETRY_COUNT) { 271 spIdentityId = CacheUtils.getSearchParamIdentityFromCache(myMemoryCacheService, myHashIdentity); 272 273 if (spIdentityId != null) { 274 return null; 275 } 276 277 try { 278 // try to retrieve search parameter identity from db, create if missing, update cache 279 spIdentityId = findOrCreateIndexedSearchParamIdentity(myHashIdentity, myParamName, myResourceType); 280 CacheUtils.putSearchParamIdentityToCache(myMemoryCacheService, myHashIdentity, spIdentityId); 281 return null; 282 } catch (DataIntegrityViolationException theDataIntegrityViolationException) { 283 // retryable exception - unique search parameter identity or hash identity constraint violation 284 ourLog.trace( 285 "Failed to save SearchParamIndexIdentity for search parameter with hash identity: {}, " 286 + "resourceType: {}, paramName: {}, retry attempt: {}", 287 myHashIdentity, 288 myResourceType, 289 myParamName, 290 retry, 291 theDataIntegrityViolationException); 292 } 293 } 294 ourLog.warn( 295 "Failed saving IndexedSearchParamIdentity with hash identity: {}, resourceType: {}, paramName: {}", 296 myHashIdentity, 297 myResourceType, 298 myParamName); 299 return null; 300 } 301 302 private Integer findOrCreateIndexedSearchParamIdentity( 303 Long theHashIdentity, String theParamName, String theResourceType) { 304 305 return myTxTemplate.execute(tx -> { 306 IndexedSearchParamIdentity identity = 307 myResourceIndexedSearchParamIdentityDao.getSearchParameterIdByHashIdentity(theHashIdentity); 308 if (identity != null) { 309 ourLog.trace( 310 "DB read success IndexedSearchParamIdentity with hash identity: {}, resourceType: {}, paramName: {}", 311 theHashIdentity, 312 theResourceType, 313 theParamName); 314 return identity.getSpIdentityId(); 315 } 316 317 IndexedSearchParamIdentity indexedSearchParamIdentity = new IndexedSearchParamIdentity(); 318 indexedSearchParamIdentity.setHashIdentity(theHashIdentity); 319 indexedSearchParamIdentity.setParamName(theParamName); 320 indexedSearchParamIdentity.setResourceType(theResourceType); 321 322 myResourceIndexedSearchParamIdentityDao.save(indexedSearchParamIdentity); 323 ourLog.trace( 324 "Write success IndexedSearchParamIdentity with hash identity: {}, resourceType: {}, paramName: {},", 325 theHashIdentity, 326 theResourceType, 327 theParamName); 328 329 return indexedSearchParamIdentity.getSpIdentityId(); 330 }); 331 } 332 } 333 334 public static class CacheUtils { 335 336 private CacheUtils() {} 337 338 public static Integer getSearchParamIdentityFromCache( 339 MemoryCacheService memoryCacheService, Long hashIdentity) { 340 return memoryCacheService.getIfPresent( 341 MemoryCacheService.CacheEnum.HASH_IDENTITY_TO_SEARCH_PARAM_IDENTITY, hashIdentity); 342 } 343 344 public static void putSearchParamIdentityToCache( 345 MemoryCacheService memoryCacheService, Long theHashIdentity, Integer theSpIdentityId) { 346 memoryCacheService.put( 347 MemoryCacheService.CacheEnum.HASH_IDENTITY_TO_SEARCH_PARAM_IDENTITY, 348 theHashIdentity, 349 theSpIdentityId); 350 } 351 } 352 353 /** 354 * Ensures only one instance of the PersistSearchParameterIdentityTask is running per hash identity. 355 * If a task is already in progress, it will not be scheduled again. 356 */ 357 private static class UniqueTaskExecutor { 358 private final ThreadPoolTaskExecutor myTaskExecutor; 359 private final Map<Long, Future<Void>> myInFlightTasks = new ConcurrentHashMap<>(); 360 361 public UniqueTaskExecutor(ThreadPoolTaskExecutor theTaskExecutor) { 362 myTaskExecutor = theTaskExecutor; 363 } 364 365 public void submitIfAbsent(PersistSearchParameterIdentityTask theTask) { 366 Long hashIdentity = theTask.getHashIdentity(); 367 368 // already have a task with same hashIdentity - skip scheduling 369 Future<Void> existing = myInFlightTasks.get(hashIdentity); 370 if (existing != null) { 371 return; 372 } 373 374 // put FutureTask in the map. If another thread already put it - skip scheduling. 375 FutureTask<Void> futureTask = new LoggingFutureTask(theTask); 376 existing = myInFlightTasks.putIfAbsent(hashIdentity, futureTask); 377 if (existing != null) { 378 return; 379 } 380 381 myTaskExecutor.execute(() -> { 382 try { 383 futureTask.run(); 384 } finally { 385 // remove from the cache once done or failed 386 myInFlightTasks.remove(hashIdentity, futureTask); 387 } 388 }); 389 } 390 391 public synchronized boolean hasInFlightTasks() { 392 return !myInFlightTasks.isEmpty(); 393 } 394 } 395 396 /** 397 * A {@link FutureTask} implementation that logs any exception thrown 398 * during execution of a {@link PersistSearchParameterIdentityTask}. 399 * <p> 400 * Since {@link PersistSearchParameterIdentityTask} runs asynchronously in a 401 * separate thread, any exception it throws can only be observed by calling {@link #get()}. 402 * <p> 403 * This class overrides {@link #done()} to call {@code get()}, 404 * and log {@link ExecutionException} or {@link InterruptedException}. 405 */ 406 private static class LoggingFutureTask extends FutureTask<Void> { 407 private final Long myHashIdentity; 408 private final String myResourceType; 409 private final String myParamName; 410 411 public LoggingFutureTask(PersistSearchParameterIdentityTask theTask) { 412 super(theTask); 413 this.myHashIdentity = theTask.getHashIdentity(); 414 this.myResourceType = theTask.getMyResourceType(); 415 this.myParamName = theTask.getMyParamName(); 416 } 417 418 @Override 419 protected void done() { 420 try { 421 get(); 422 } catch (ExecutionException theException) { 423 ourLog.error( 424 "PersistSearchParameterIdentityTask failed. Hash identity: {}, resourceType: {}, paramName: {}, ", 425 myHashIdentity, 426 myResourceType, 427 myParamName, 428 theException.getCause()); 429 } catch (InterruptedException theInterruptedException) { 430 Thread.currentThread().interrupt(); 431 ourLog.warn( 432 "PersistSearchParameterIdentityTask was interrupted. Hash identity: {}, resourceType: {}, paramName: {}, ", 433 myHashIdentity, 434 myResourceType, 435 myParamName, 436 theInterruptedException); 437 } 438 } 439 } 440}