001/*-
002 * #%L
003 * HAPI FHIR JPA Server
004 * %%
005 * Copyright (C) 2014 - 2025 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.sp;
021
022import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
023import ca.uhn.fhir.jpa.cache.ISearchParamIdentityCacheSvc;
024import ca.uhn.fhir.jpa.dao.data.IResourceIndexedSearchParamIdentityDao;
025import ca.uhn.fhir.jpa.model.entity.IndexedSearchParamIdentity;
026import ca.uhn.fhir.jpa.util.MemoryCacheService;
027import ca.uhn.fhir.util.ThreadPoolUtil;
028import jakarta.annotation.PostConstruct;
029import jakarta.annotation.PreDestroy;
030import org.slf4j.Logger;
031import org.springframework.dao.DataIntegrityViolationException;
032import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
033import org.springframework.stereotype.Service;
034import org.springframework.transaction.PlatformTransactionManager;
035import org.springframework.transaction.TransactionDefinition;
036import org.springframework.transaction.support.TransactionSynchronization;
037import org.springframework.transaction.support.TransactionSynchronizationManager;
038import org.springframework.transaction.support.TransactionTemplate;
039
040import java.util.Collection;
041import java.util.Map;
042import java.util.Objects;
043import java.util.concurrent.Callable;
044import java.util.concurrent.ConcurrentHashMap;
045import java.util.concurrent.ExecutionException;
046import java.util.concurrent.Future;
047import java.util.concurrent.FutureTask;
048import java.util.concurrent.ThreadPoolExecutor;
049
050import static org.slf4j.LoggerFactory.getLogger;
051
052@Service
053public class SearchParamIdentityCacheSvcImpl implements ISearchParamIdentityCacheSvc {
054        private static final Logger ourLog = getLogger(SearchParamIdentityCacheSvcImpl.class);
055
056        private static final int MAX_RETRY_COUNT = 20;
057        private static final String CACHE_THREAD_PREFIX = "search-parameter-identity-cache-";
058        private static final int THREAD_POOL_QUEUE_SIZE = 5000;
059        private final JpaStorageSettings myStorageSettings;
060        private final IResourceIndexedSearchParamIdentityDao mySearchParamIdentityDao;
061        private final TransactionTemplate myTxTemplate;
062        private final ThreadPoolTaskExecutor myThreadPoolTaskExecutor;
063        private final MemoryCacheService myMemoryCacheService;
064        private final UniqueTaskExecutor myUniqueTaskExecutor;
065
066        public SearchParamIdentityCacheSvcImpl(
067                        JpaStorageSettings theStorageSettings,
068                        IResourceIndexedSearchParamIdentityDao theResourceIndexedSearchParamIdentityDao,
069                        PlatformTransactionManager theTxManager,
070                        MemoryCacheService theMemoryCacheService) {
071                myStorageSettings = theStorageSettings;
072                mySearchParamIdentityDao = theResourceIndexedSearchParamIdentityDao;
073                myTxTemplate = new TransactionTemplate(theTxManager);
074                myTxTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
075                myThreadPoolTaskExecutor = createExecutor();
076                myMemoryCacheService = theMemoryCacheService;
077                myUniqueTaskExecutor = new UniqueTaskExecutor(myThreadPoolTaskExecutor);
078        }
079
080        /**
081         * Creates a thread pool executor for asynchronously executing
082         * {@link PersistSearchParameterIdentityTask} instances.
083         * <p>
084         * Uses a fixed pool size of 1 and a bounded queue with a capacity of 5000.
085         * <p>
086         * If the queue is full and all threads are busy, new tasks are silently
087         * discarded using {@link ThreadPoolExecutor.DiscardPolicy}.
088         */
089        private ThreadPoolTaskExecutor createExecutor() {
090                return ThreadPoolUtil.newThreadPool(
091                                1, 1, CACHE_THREAD_PREFIX, THREAD_POOL_QUEUE_SIZE, new ThreadPoolExecutor.DiscardPolicy());
092        }
093
094        @PostConstruct
095        public void start() {
096                initCache();
097        }
098
099        @PreDestroy
100        public void preDestroy() {
101                myThreadPoolTaskExecutor.shutdown();
102        }
103
104        /**
105         * Initializes the cache by preloading search parameter identities {@link IndexedSearchParamIdentity}.
106         */
107        protected void initCache() {
108                // populate cache with IndexedSearchParamIdentities from database
109                Collection<Object[]> spIdentities =
110                                Objects.requireNonNull(myTxTemplate.execute(t -> mySearchParamIdentityDao.getAllHashIdentities()));
111                spIdentities.forEach(
112                                i -> CacheUtils.putSearchParamIdentityToCache(myMemoryCacheService, (Long) i[0], (Integer) i[1]));
113        }
114
115        private void submitPersistSearchParameterIdentityTask(
116                        Long hashIdentity, String theResourceType, String theSearchParamName) {
117                PersistSearchParameterIdentityTask persistSpIdentityTask = new PersistSearchParameterIdentityTask.Builder()
118                                .hashIdentity(hashIdentity)
119                                .resourceType(theResourceType)
120                                .paramName(theSearchParamName)
121                                .memoryCacheService(myMemoryCacheService)
122                                .txTemplate(myTxTemplate)
123                                .searchParamIdentityDao(mySearchParamIdentityDao)
124                                .build();
125
126                myUniqueTaskExecutor.submitIfAbsent(persistSpIdentityTask);
127        }
128
129        @Override
130        public boolean hasInFlightTasks() {
131                return myUniqueTaskExecutor.hasInFlightTasks();
132        }
133
134        /**
135         * Asynchronously ensures that a {@link IndexedSearchParamIdentity} exists for the given
136         * hash identity, parameter name, and resource type. If the identity is already present
137         * in the in-memory cache, no action is taken.
138         *
139         * <p>If the identity is missing, a {@link PersistSearchParameterIdentityTask} is created
140         * and submitted for asynchronous execution. To avoid modifying the cache during an
141         * active transaction, task submission is deferred until after the transaction is committed.
142         *
143         * @param theHashIdentity The hash identity representing the search parameter.
144         * @param theResourceType The resource type (e.g., "Patient", "Observation").
145         * @param theParamName    The search parameter name.
146         *
147         * @see PersistSearchParameterIdentityTask
148         */
149        public void findOrCreateSearchParamIdentity(Long theHashIdentity, String theResourceType, String theParamName) {
150                if (!myStorageSettings.isWriteToSearchParamIdentityTable()) {
151                        return;
152                }
153
154                // check if SearchParamIdentity is already in cache
155                Integer spIdentityId = CacheUtils.getSearchParamIdentityFromCache(myMemoryCacheService, theHashIdentity);
156
157                if (spIdentityId != null) {
158                        return;
159                }
160
161                // cache miss, submit PersistSearchParameterIdentityTask to execute it in separate thread
162                if (TransactionSynchronizationManager.isSynchronizationActive()) {
163                        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
164                                @Override
165                                public void afterCommit() {
166                                        submitPersistSearchParameterIdentityTask(theHashIdentity, theResourceType, theParamName);
167                                }
168                        });
169                } else {
170                        submitPersistSearchParameterIdentityTask(theHashIdentity, theResourceType, theParamName);
171                }
172        }
173
174        /**
175         * This class is responsible for ensuring that a unique {@link IndexedSearchParamIdentity}
176         * exists for a given hash identity (parameter name and resource type). This task is
177         * executed asynchronously to avoid blocking the main thread during persistence.
178         *
179         * <p>
180         * This task checks the in-memory cache for the given hash identity and, if missing,
181         * attempts to create or retrieve the corresponding {@link IndexedSearchParamIdentity}
182         * from the database. The result is then added to the cache.
183         *
184         * <p>
185         * Up to 20 retries are permitted in case of a {@link DataIntegrityViolationException},
186         * which can occur due to concurrent insert attempts for the same identity. If all retries
187         * fail, the {@link IndexedSearchParamIdentity} will not be saved during this execution,
188         * but the task may be retried later when submitted again.
189         *
190         * @see IndexedSearchParamIdentity
191         * @see MemoryCacheService
192         */
193        public static class PersistSearchParameterIdentityTask implements Callable<Void> {
194
195                private final Long myHashIdentity;
196                private final String myResourceType;
197                private final String myParamName;
198                private final TransactionTemplate myTxTemplate;
199                private final MemoryCacheService myMemoryCacheService;
200                private final IResourceIndexedSearchParamIdentityDao myResourceIndexedSearchParamIdentityDao;
201
202                private PersistSearchParameterIdentityTask(Builder theBuilder) {
203                        this.myHashIdentity = theBuilder.myHashIdentity;
204                        this.myResourceType = theBuilder.myResourceType;
205                        this.myParamName = theBuilder.myParamName;
206                        this.myTxTemplate = theBuilder.myTxTemplate;
207                        this.myMemoryCacheService = theBuilder.myMemoryCacheService;
208                        this.myResourceIndexedSearchParamIdentityDao = theBuilder.mySearchParamIdentityDao;
209                }
210
211                public Long getHashIdentity() {
212                        return myHashIdentity;
213                }
214
215                public String getMyResourceType() {
216                        return myResourceType;
217                }
218
219                public String getMyParamName() {
220                        return myParamName;
221                }
222
223                public static class Builder {
224                        private Long myHashIdentity;
225                        private String myResourceType;
226                        private String myParamName;
227                        private TransactionTemplate myTxTemplate;
228                        private MemoryCacheService myMemoryCacheService;
229                        private IResourceIndexedSearchParamIdentityDao mySearchParamIdentityDao;
230
231                        public Builder hashIdentity(Long theHashIdentity) {
232                                this.myHashIdentity = theHashIdentity;
233                                return this;
234                        }
235
236                        public Builder resourceType(String theResourceType) {
237                                this.myResourceType = theResourceType;
238                                return this;
239                        }
240
241                        public Builder paramName(String theParamName) {
242                                this.myParamName = theParamName;
243                                return this;
244                        }
245
246                        public Builder txTemplate(TransactionTemplate theTxTemplate) {
247                                this.myTxTemplate = theTxTemplate;
248                                return this;
249                        }
250
251                        public Builder memoryCacheService(MemoryCacheService theMemoryCacheService) {
252                                this.myMemoryCacheService = theMemoryCacheService;
253                                return this;
254                        }
255
256                        public Builder searchParamIdentityDao(IResourceIndexedSearchParamIdentityDao theSearchParamIdentityDao) {
257                                this.mySearchParamIdentityDao = theSearchParamIdentityDao;
258                                return this;
259                        }
260
261                        public PersistSearchParameterIdentityTask build() {
262                                return new PersistSearchParameterIdentityTask(this);
263                        }
264                }
265
266                @Override
267                public Void call() throws Exception {
268                        Integer spIdentityId;
269                        int retry = 0;
270                        while (retry++ < MAX_RETRY_COUNT) {
271                                spIdentityId = CacheUtils.getSearchParamIdentityFromCache(myMemoryCacheService, myHashIdentity);
272
273                                if (spIdentityId != null) {
274                                        return null;
275                                }
276
277                                try {
278                                        // try to retrieve search parameter identity from db, create if missing, update cache
279                                        spIdentityId = findOrCreateIndexedSearchParamIdentity(myHashIdentity, myParamName, myResourceType);
280                                        CacheUtils.putSearchParamIdentityToCache(myMemoryCacheService, myHashIdentity, spIdentityId);
281                                        return null;
282                                } catch (DataIntegrityViolationException theDataIntegrityViolationException) {
283                                        // retryable exception - unique search parameter identity or hash identity constraint violation
284                                        ourLog.trace(
285                                                        "Failed to save SearchParamIndexIdentity for search parameter with hash identity: {}, "
286                                                                        + "resourceType: {}, paramName: {}, retry attempt: {}",
287                                                        myHashIdentity,
288                                                        myResourceType,
289                                                        myParamName,
290                                                        retry,
291                                                        theDataIntegrityViolationException);
292                                }
293                        }
294                        ourLog.warn(
295                                        "Failed saving IndexedSearchParamIdentity with hash identity: {}, resourceType: {}, paramName: {}",
296                                        myHashIdentity,
297                                        myResourceType,
298                                        myParamName);
299                        return null;
300                }
301
302                private Integer findOrCreateIndexedSearchParamIdentity(
303                                Long theHashIdentity, String theParamName, String theResourceType) {
304
305                        return myTxTemplate.execute(tx -> {
306                                IndexedSearchParamIdentity identity =
307                                                myResourceIndexedSearchParamIdentityDao.getSearchParameterIdByHashIdentity(theHashIdentity);
308                                if (identity != null) {
309                                        ourLog.trace(
310                                                        "DB read success IndexedSearchParamIdentity with hash identity: {}, resourceType: {}, paramName: {}",
311                                                        theHashIdentity,
312                                                        theResourceType,
313                                                        theParamName);
314                                        return identity.getSpIdentityId();
315                                }
316
317                                IndexedSearchParamIdentity indexedSearchParamIdentity = new IndexedSearchParamIdentity();
318                                indexedSearchParamIdentity.setHashIdentity(theHashIdentity);
319                                indexedSearchParamIdentity.setParamName(theParamName);
320                                indexedSearchParamIdentity.setResourceType(theResourceType);
321
322                                myResourceIndexedSearchParamIdentityDao.save(indexedSearchParamIdentity);
323                                ourLog.trace(
324                                                "Write success IndexedSearchParamIdentity with hash identity: {}, resourceType: {}, paramName: {},",
325                                                theHashIdentity,
326                                                theResourceType,
327                                                theParamName);
328
329                                return indexedSearchParamIdentity.getSpIdentityId();
330                        });
331                }
332        }
333
334        public static class CacheUtils {
335
336                private CacheUtils() {}
337
338                public static Integer getSearchParamIdentityFromCache(
339                                MemoryCacheService memoryCacheService, Long hashIdentity) {
340                        return memoryCacheService.getIfPresent(
341                                        MemoryCacheService.CacheEnum.HASH_IDENTITY_TO_SEARCH_PARAM_IDENTITY, hashIdentity);
342                }
343
344                public static void putSearchParamIdentityToCache(
345                                MemoryCacheService memoryCacheService, Long theHashIdentity, Integer theSpIdentityId) {
346                        memoryCacheService.put(
347                                        MemoryCacheService.CacheEnum.HASH_IDENTITY_TO_SEARCH_PARAM_IDENTITY,
348                                        theHashIdentity,
349                                        theSpIdentityId);
350                }
351        }
352
353        /**
354         * Ensures only one instance of the PersistSearchParameterIdentityTask is running per hash identity.
355         * If a task is already in progress, it will not be scheduled again.
356         */
357        private static class UniqueTaskExecutor {
358                private final ThreadPoolTaskExecutor myTaskExecutor;
359                private final Map<Long, Future<Void>> myInFlightTasks = new ConcurrentHashMap<>();
360
361                public UniqueTaskExecutor(ThreadPoolTaskExecutor theTaskExecutor) {
362                        myTaskExecutor = theTaskExecutor;
363                }
364
365                public void submitIfAbsent(PersistSearchParameterIdentityTask theTask) {
366                        Long hashIdentity = theTask.getHashIdentity();
367
368                        // already have a task with same hashIdentity - skip scheduling
369                        Future<Void> existing = myInFlightTasks.get(hashIdentity);
370                        if (existing != null) {
371                                return;
372                        }
373
374                        // put FutureTask in the map. If another thread already put it - skip scheduling.
375                        FutureTask<Void> futureTask = new LoggingFutureTask(theTask);
376                        existing = myInFlightTasks.putIfAbsent(hashIdentity, futureTask);
377                        if (existing != null) {
378                                return;
379                        }
380
381                        myTaskExecutor.execute(() -> {
382                                try {
383                                        futureTask.run();
384                                } finally {
385                                        // remove from the cache once done or failed
386                                        myInFlightTasks.remove(hashIdentity, futureTask);
387                                }
388                        });
389                }
390
391                public synchronized boolean hasInFlightTasks() {
392                        return !myInFlightTasks.isEmpty();
393                }
394        }
395
396        /**
397         * A {@link FutureTask} implementation that logs any exception thrown
398         * during execution of a {@link PersistSearchParameterIdentityTask}.
399         * <p>
400         * Since {@link PersistSearchParameterIdentityTask} runs asynchronously in a
401         * separate thread, any exception it throws can only be observed by calling {@link #get()}.
402         * <p>
403         * This class overrides {@link #done()} to call {@code get()},
404         * and log {@link ExecutionException} or {@link InterruptedException}.
405         */
406        private static class LoggingFutureTask extends FutureTask<Void> {
407                private final Long myHashIdentity;
408                private final String myResourceType;
409                private final String myParamName;
410
411                public LoggingFutureTask(PersistSearchParameterIdentityTask theTask) {
412                        super(theTask);
413                        this.myHashIdentity = theTask.getHashIdentity();
414                        this.myResourceType = theTask.getMyResourceType();
415                        this.myParamName = theTask.getMyParamName();
416                }
417
418                @Override
419                protected void done() {
420                        try {
421                                get();
422                        } catch (ExecutionException theException) {
423                                ourLog.error(
424                                                "PersistSearchParameterIdentityTask failed. Hash identity: {}, resourceType: {}, paramName: {}, ",
425                                                myHashIdentity,
426                                                myResourceType,
427                                                myParamName,
428                                                theException.getCause());
429                        } catch (InterruptedException theInterruptedException) {
430                                Thread.currentThread().interrupt();
431                                ourLog.warn(
432                                                "PersistSearchParameterIdentityTask was interrupted. Hash identity: {}, resourceType: {}, paramName: {}, ",
433                                                myHashIdentity,
434                                                myResourceType,
435                                                myParamName,
436                                                theInterruptedException);
437                        }
438                }
439        }
440}