001/*-
002 * #%L
003 * HAPI FHIR JPA Server
004 * %%
005 * Copyright (C) 2014 - 2025 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.sp;
021
022import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
023import ca.uhn.fhir.jpa.cache.ISearchParamIdentityCacheSvc;
024import ca.uhn.fhir.jpa.dao.data.IResourceIndexedSearchParamIdentityDao;
025import ca.uhn.fhir.jpa.model.entity.IndexedSearchParamIdentity;
026import ca.uhn.fhir.jpa.util.MemoryCacheService;
027import ca.uhn.fhir.util.ThreadPoolUtil;
028import jakarta.annotation.PostConstruct;
029import jakarta.annotation.PreDestroy;
030import org.slf4j.Logger;
031import org.springframework.dao.DataIntegrityViolationException;
032import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
033import org.springframework.stereotype.Service;
034import org.springframework.transaction.PlatformTransactionManager;
035import org.springframework.transaction.TransactionDefinition;
036import org.springframework.transaction.support.TransactionSynchronization;
037import org.springframework.transaction.support.TransactionSynchronizationManager;
038import org.springframework.transaction.support.TransactionTemplate;
039
040import java.util.Collection;
041import java.util.Map;
042import java.util.Objects;
043import java.util.concurrent.Callable;
044import java.util.concurrent.ConcurrentHashMap;
045import java.util.concurrent.ExecutionException;
046import java.util.concurrent.Future;
047import java.util.concurrent.FutureTask;
048import java.util.concurrent.ThreadPoolExecutor;
049
050import static org.slf4j.LoggerFactory.getLogger;
051
052@Service
053public class SearchParamIdentityCacheSvcImpl implements ISearchParamIdentityCacheSvc {
054        private static final Logger ourLog = getLogger(SearchParamIdentityCacheSvcImpl.class);
055
056        private static final int MAX_RETRY_COUNT = 20;
057        private static final String CACHE_THREAD_PREFIX = "search-parameter-identity-cache-";
058        private static final int THREAD_POOL_QUEUE_SIZE = 5000;
059        private final JpaStorageSettings myStorageSettings;
060        private final IResourceIndexedSearchParamIdentityDao mySearchParamIdentityDao;
061        private final TransactionTemplate myTxTemplate;
062        private final ThreadPoolTaskExecutor myThreadPoolTaskExecutor;
063        private final MemoryCacheService myMemoryCacheService;
064        private final UniqueTaskExecutor myUniqueTaskExecutor;
065
066        public SearchParamIdentityCacheSvcImpl(
067                        JpaStorageSettings theStorageSettings,
068                        IResourceIndexedSearchParamIdentityDao theResourceIndexedSearchParamIdentityDao,
069                        PlatformTransactionManager theTxManager,
070                        MemoryCacheService theMemoryCacheService) {
071                myStorageSettings = theStorageSettings;
072                mySearchParamIdentityDao = theResourceIndexedSearchParamIdentityDao;
073                myTxTemplate = new TransactionTemplate(theTxManager);
074                myTxTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
075                myThreadPoolTaskExecutor = createExecutor();
076                myMemoryCacheService = theMemoryCacheService;
077                myUniqueTaskExecutor = new UniqueTaskExecutor(myThreadPoolTaskExecutor);
078        }
079
080        /**
081         * Creates a thread pool executor for asynchronously executing
082         * {@link PersistSearchParameterIdentityTask} instances.
083         * <p>
084         * Uses a fixed pool size of 1 and a bounded queue with a capacity of 5000.
085         * <p>
086         * If the queue is full and all threads are busy, new tasks are silently
087         * discarded using {@link ThreadPoolExecutor.DiscardPolicy}.
088         */
089        private ThreadPoolTaskExecutor createExecutor() {
090                return ThreadPoolUtil.newThreadPool(
091                                1, 1, CACHE_THREAD_PREFIX, THREAD_POOL_QUEUE_SIZE, new ThreadPoolExecutor.DiscardPolicy());
092        }
093
094        @PostConstruct
095        public void start() {
096                initCache();
097        }
098
099        @PreDestroy
100        public void preDestroy() {
101                myThreadPoolTaskExecutor.shutdown();
102        }
103
104        /**
105         * Initializes the cache by preloading search parameter identities {@link IndexedSearchParamIdentity}.
106         */
107        protected void initCache() {
108                // populate cache with IndexedSearchParamIdentities from database
109                Collection<Object[]> spIdentities =
110                                Objects.requireNonNull(myTxTemplate.execute(t -> mySearchParamIdentityDao.getAllHashIdentities()));
111                spIdentities.forEach(
112                                i -> CacheUtils.putSearchParamIdentityToCache(myMemoryCacheService, (Long) i[0], (Integer) i[1]));
113        }
114
115        private void submitPersistSearchParameterIdentityTask(
116                        Long hashIdentity, String theResourceType, String theSearchParamName) {
117                PersistSearchParameterIdentityTask persistSpIdentityTask = new PersistSearchParameterIdentityTask.Builder()
118                                .hashIdentity(hashIdentity)
119                                .resourceType(theResourceType)
120                                .paramName(theSearchParamName)
121                                .memoryCacheService(myMemoryCacheService)
122                                .txTemplate(myTxTemplate)
123                                .searchParamIdentityDao(mySearchParamIdentityDao)
124                                .build();
125
126                myUniqueTaskExecutor.submitIfAbsent(persistSpIdentityTask);
127        }
128
129        /**
130         * Asynchronously ensures that a {@link IndexedSearchParamIdentity} exists for the given
131         * hash identity, parameter name, and resource type. If the identity is already present
132         * in the in-memory cache, no action is taken.
133         *
134         * <p>If the identity is missing, a {@link PersistSearchParameterIdentityTask} is created
135         * and submitted for asynchronous execution. To avoid modifying the cache during an
136         * active transaction, task submission is deferred until after the transaction is committed.
137         *
138         * @param theHashIdentity The hash identity representing the search parameter.
139         * @param theResourceType The resource type (e.g., "Patient", "Observation").
140         * @param theParamName    The search parameter name.
141         *
142         * @see PersistSearchParameterIdentityTask
143         */
144        public void findOrCreateSearchParamIdentity(Long theHashIdentity, String theResourceType, String theParamName) {
145                if (!myStorageSettings.isWriteToSearchParamIdentityTable()) {
146                        return;
147                }
148
149                // check if SearchParamIdentity is already in cache
150                Integer spIdentityId = CacheUtils.getSearchParamIdentityFromCache(myMemoryCacheService, theHashIdentity);
151
152                if (spIdentityId != null) {
153                        return;
154                }
155
156                // cache miss, submit PersistSearchParameterIdentityTask to execute it in separate thread
157                if (TransactionSynchronizationManager.isSynchronizationActive()) {
158                        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
159                                @Override
160                                public void afterCommit() {
161                                        submitPersistSearchParameterIdentityTask(theHashIdentity, theResourceType, theParamName);
162                                }
163                        });
164                } else {
165                        submitPersistSearchParameterIdentityTask(theHashIdentity, theResourceType, theParamName);
166                }
167        }
168
169        /**
170         * This class is responsible for ensuring that a unique {@link IndexedSearchParamIdentity}
171         * exists for a given hash identity (parameter name and resource type). This task is
172         * executed asynchronously to avoid blocking the main thread during persistence.
173         *
174         * <p>
175         * This task checks the in-memory cache for the given hash identity and, if missing,
176         * attempts to create or retrieve the corresponding {@link IndexedSearchParamIdentity}
177         * from the database. The result is then added to the cache.
178         *
179         * <p>
180         * Up to 20 retries are permitted in case of a {@link DataIntegrityViolationException},
181         * which can occur due to concurrent insert attempts for the same identity. If all retries
182         * fail, the {@link IndexedSearchParamIdentity} will not be saved during this execution,
183         * but the task may be retried later when submitted again.
184         *
185         * @see IndexedSearchParamIdentity
186         * @see MemoryCacheService
187         */
188        public static class PersistSearchParameterIdentityTask implements Callable<Void> {
189
190                private final Long myHashIdentity;
191                private final String myResourceType;
192                private final String myParamName;
193                private final TransactionTemplate myTxTemplate;
194                private final MemoryCacheService myMemoryCacheService;
195                private final IResourceIndexedSearchParamIdentityDao myResourceIndexedSearchParamIdentityDao;
196
197                private PersistSearchParameterIdentityTask(Builder theBuilder) {
198                        this.myHashIdentity = theBuilder.myHashIdentity;
199                        this.myResourceType = theBuilder.myResourceType;
200                        this.myParamName = theBuilder.myParamName;
201                        this.myTxTemplate = theBuilder.myTxTemplate;
202                        this.myMemoryCacheService = theBuilder.myMemoryCacheService;
203                        this.myResourceIndexedSearchParamIdentityDao = theBuilder.mySearchParamIdentityDao;
204                }
205
206                public Long getHashIdentity() {
207                        return myHashIdentity;
208                }
209
210                public String getMyResourceType() {
211                        return myResourceType;
212                }
213
214                public String getMyParamName() {
215                        return myParamName;
216                }
217
218                public static class Builder {
219                        private Long myHashIdentity;
220                        private String myResourceType;
221                        private String myParamName;
222                        private TransactionTemplate myTxTemplate;
223                        private MemoryCacheService myMemoryCacheService;
224                        private IResourceIndexedSearchParamIdentityDao mySearchParamIdentityDao;
225
226                        public Builder hashIdentity(Long theHashIdentity) {
227                                this.myHashIdentity = theHashIdentity;
228                                return this;
229                        }
230
231                        public Builder resourceType(String theResourceType) {
232                                this.myResourceType = theResourceType;
233                                return this;
234                        }
235
236                        public Builder paramName(String theParamName) {
237                                this.myParamName = theParamName;
238                                return this;
239                        }
240
241                        public Builder txTemplate(TransactionTemplate theTxTemplate) {
242                                this.myTxTemplate = theTxTemplate;
243                                return this;
244                        }
245
246                        public Builder memoryCacheService(MemoryCacheService theMemoryCacheService) {
247                                this.myMemoryCacheService = theMemoryCacheService;
248                                return this;
249                        }
250
251                        public Builder searchParamIdentityDao(IResourceIndexedSearchParamIdentityDao theSearchParamIdentityDao) {
252                                this.mySearchParamIdentityDao = theSearchParamIdentityDao;
253                                return this;
254                        }
255
256                        public PersistSearchParameterIdentityTask build() {
257                                return new PersistSearchParameterIdentityTask(this);
258                        }
259                }
260
261                @Override
262                public Void call() throws Exception {
263                        Integer spIdentityId;
264                        int retry = 0;
265                        while (retry++ < MAX_RETRY_COUNT) {
266                                spIdentityId = CacheUtils.getSearchParamIdentityFromCache(myMemoryCacheService, myHashIdentity);
267
268                                if (spIdentityId != null) {
269                                        return null;
270                                }
271
272                                try {
273                                        // try to retrieve search parameter identity from db, create if missing, update cache
274                                        spIdentityId = findOrCreateIndexedSearchParamIdentity(myHashIdentity, myParamName, myResourceType);
275                                        CacheUtils.putSearchParamIdentityToCache(myMemoryCacheService, myHashIdentity, spIdentityId);
276                                        return null;
277                                } catch (DataIntegrityViolationException theDataIntegrityViolationException) {
278                                        // retryable exception - unique search parameter identity or hash identity constraint violation
279                                        ourLog.trace(
280                                                        "Failed to save SearchParamIndexIdentity for search parameter with hash identity: {}, "
281                                                                        + "resourceType: {}, paramName: {}, retry attempt: {}",
282                                                        myHashIdentity,
283                                                        myResourceType,
284                                                        myParamName,
285                                                        retry,
286                                                        theDataIntegrityViolationException);
287                                }
288                        }
289                        ourLog.warn(
290                                        "Failed saving IndexedSearchParamIdentity with hash identity: {}, resourceType: {}, paramName: {}",
291                                        myHashIdentity,
292                                        myResourceType,
293                                        myParamName);
294                        return null;
295                }
296
297                private Integer findOrCreateIndexedSearchParamIdentity(
298                                Long theHashIdentity, String theParamName, String theResourceType) {
299
300                        return myTxTemplate.execute(tx -> {
301                                IndexedSearchParamIdentity identity =
302                                                myResourceIndexedSearchParamIdentityDao.getSearchParameterIdByHashIdentity(theHashIdentity);
303                                if (identity != null) {
304                                        ourLog.trace(
305                                                        "DB read success IndexedSearchParamIdentity with hash identity: {}, resourceType: {}, paramName: {}",
306                                                        theHashIdentity,
307                                                        theResourceType,
308                                                        theParamName);
309                                        return identity.getSpIdentityId();
310                                }
311
312                                IndexedSearchParamIdentity indexedSearchParamIdentity = new IndexedSearchParamIdentity();
313                                indexedSearchParamIdentity.setHashIdentity(theHashIdentity);
314                                indexedSearchParamIdentity.setParamName(theParamName);
315                                indexedSearchParamIdentity.setResourceType(theResourceType);
316
317                                myResourceIndexedSearchParamIdentityDao.save(indexedSearchParamIdentity);
318                                ourLog.trace(
319                                                "Write success IndexedSearchParamIdentity with hash identity: {}, resourceType: {}, paramName: {},",
320                                                theHashIdentity,
321                                                theResourceType,
322                                                theParamName);
323
324                                return indexedSearchParamIdentity.getSpIdentityId();
325                        });
326                }
327        }
328
329        public static class CacheUtils {
330
331                private CacheUtils() {}
332
333                public static Integer getSearchParamIdentityFromCache(
334                                MemoryCacheService memoryCacheService, Long hashIdentity) {
335                        return memoryCacheService.getIfPresent(
336                                        MemoryCacheService.CacheEnum.HASH_IDENTITY_TO_SEARCH_PARAM_IDENTITY, hashIdentity);
337                }
338
339                public static void putSearchParamIdentityToCache(
340                                MemoryCacheService memoryCacheService, Long theHashIdentity, Integer theSpIdentityId) {
341                        memoryCacheService.put(
342                                        MemoryCacheService.CacheEnum.HASH_IDENTITY_TO_SEARCH_PARAM_IDENTITY,
343                                        theHashIdentity,
344                                        theSpIdentityId);
345                }
346        }
347
348        /**
349         * Ensures only one instance of the PersistSearchParameterIdentityTask is running per hash identity.
350         * If a task is already in progress, it will not be scheduled again.
351         */
352        private static class UniqueTaskExecutor {
353                private final ThreadPoolTaskExecutor myTaskExecutor;
354                private final Map<Long, Future<Void>> myInFlightTasks = new ConcurrentHashMap<>();
355
356                public UniqueTaskExecutor(ThreadPoolTaskExecutor theTaskExecutor) {
357                        myTaskExecutor = theTaskExecutor;
358                }
359
360                public void submitIfAbsent(PersistSearchParameterIdentityTask theTask) {
361                        Long hashIdentity = theTask.getHashIdentity();
362
363                        // already have a task with same hashIdentity - skip scheduling
364                        Future<Void> existing = myInFlightTasks.get(hashIdentity);
365                        if (existing != null) {
366                                return;
367                        }
368
369                        // put FutureTask in the map. If another thread already put it - skip scheduling.
370                        FutureTask<Void> futureTask = new LoggingFutureTask(theTask);
371                        existing = myInFlightTasks.putIfAbsent(hashIdentity, futureTask);
372                        if (existing != null) {
373                                return;
374                        }
375
376                        myTaskExecutor.execute(() -> {
377                                try {
378                                        futureTask.run();
379                                } finally {
380                                        // remove from the cache once done or failed
381                                        myInFlightTasks.remove(hashIdentity, futureTask);
382                                }
383                        });
384                }
385        }
386
387        /**
388         * A {@link FutureTask} implementation that logs any exception thrown
389         * during execution of a {@link PersistSearchParameterIdentityTask}.
390         * <p>
391         * Since {@link PersistSearchParameterIdentityTask} runs asynchronously in a
392         * separate thread, any exception it throws can only be observed by calling {@link #get()}.
393         * <p>
394         * This class overrides {@link #done()} to call {@code get()},
395         * and log {@link ExecutionException} or {@link InterruptedException}.
396         */
397        private static class LoggingFutureTask extends FutureTask<Void> {
398                private final Long myHashIdentity;
399                private final String myResourceType;
400                private final String myParamName;
401
402                public LoggingFutureTask(PersistSearchParameterIdentityTask theTask) {
403                        super(theTask);
404                        this.myHashIdentity = theTask.getHashIdentity();
405                        this.myResourceType = theTask.getMyResourceType();
406                        this.myParamName = theTask.getMyParamName();
407                }
408
409                @Override
410                protected void done() {
411                        try {
412                                get();
413                        } catch (ExecutionException theException) {
414                                ourLog.error(
415                                                "PersistSearchParameterIdentityTask failed. Hash identity: {}, resourceType: {}, paramName: {}, ",
416                                                myHashIdentity,
417                                                myResourceType,
418                                                myParamName,
419                                                theException.getCause());
420                        } catch (InterruptedException theInterruptedException) {
421                                Thread.currentThread().interrupt();
422                                ourLog.warn(
423                                                "PersistSearchParameterIdentityTask was interrupted. Hash identity: {}, resourceType: {}, paramName: {}, ",
424                                                myHashIdentity,
425                                                myResourceType,
426                                                myParamName,
427                                                theInterruptedException);
428                        }
429                }
430        }
431}