001/*-
002 * #%L
003 * HAPI FHIR JPA Server
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.search.cache;
021
022import ca.uhn.fhir.i18n.Msg;
023import ca.uhn.fhir.interceptor.model.RequestPartitionId;
024import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
025import ca.uhn.fhir.jpa.dao.data.ISearchDao;
026import ca.uhn.fhir.jpa.dao.data.ISearchIncludeDao;
027import ca.uhn.fhir.jpa.dao.data.ISearchResultDao;
028import ca.uhn.fhir.jpa.dao.data.SearchIdAndResultSize;
029import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
030import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
031import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService.IExecutionBuilder;
032import ca.uhn.fhir.jpa.entity.Search;
033import ca.uhn.fhir.jpa.model.search.SearchStatusEnum;
034import ca.uhn.fhir.system.HapiSystemProperties;
035import com.google.common.annotations.VisibleForTesting;
036import jakarta.annotation.Nonnull;
037import jakarta.persistence.EntityManager;
038import org.apache.commons.lang3.Validate;
039import org.apache.commons.lang3.time.DateUtils;
040import org.hibernate.Session;
041import org.hl7.fhir.dstu3.model.InstantType;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044import org.springframework.beans.factory.annotation.Autowired;
045import org.springframework.transaction.annotation.Propagation;
046import org.springframework.transaction.annotation.Transactional;
047
048import java.sql.Connection;
049import java.time.Instant;
050import java.util.Collection;
051import java.util.Date;
052import java.util.HashSet;
053import java.util.Optional;
054import java.util.Set;
055import java.util.concurrent.atomic.AtomicInteger;
056import java.util.stream.Stream;
057
058public class DatabaseSearchCacheSvcImpl implements ISearchCacheSvc {
059        /*
060         * Be careful increasing this number! We use the number of params here in a
061         * DELETE FROM foo WHERE params IN (term,term,term...)
062         * type query and this can fail if we have 1000s of params
063         */
064        public static final int DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_STMT = 500;
065        public static final int DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_PAS = 50000;
066        public static final long SEARCH_CLEANUP_JOB_INTERVAL_MILLIS = DateUtils.MILLIS_PER_MINUTE;
067        public static final int DEFAULT_MAX_DELETE_CANDIDATES_TO_FIND = 2000;
068        private static final Logger ourLog = LoggerFactory.getLogger(DatabaseSearchCacheSvcImpl.class);
069        private static int ourMaximumResultsToDeleteInOneStatement = DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_STMT;
070        private static int ourMaximumResultsToDeleteInOneCommit = DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_PAS;
071        private static Long ourNowForUnitTests;
072        /*
073         * We give a bit of extra leeway just to avoid race conditions where a query result
074         * is being reused (because a new client request came in with the same params) right before
075         * the result is to be deleted
076         */
077        private long myCutoffSlack = SEARCH_CLEANUP_JOB_INTERVAL_MILLIS;
078
079        @Autowired
080        private ISearchDao mySearchDao;
081
082        @Autowired
083        private EntityManager myEntityManager;
084
085        @Autowired
086        private ISearchResultDao mySearchResultDao;
087
088        @Autowired
089        private ISearchIncludeDao mySearchIncludeDao;
090
091        @Autowired
092        private IHapiTransactionService myTransactionService;
093
094        @Autowired
095        private JpaStorageSettings myStorageSettings;
096
097        @VisibleForTesting
098        public void setCutoffSlackForUnitTest(long theCutoffSlack) {
099                myCutoffSlack = theCutoffSlack;
100        }
101
102        @Override
103        public Search save(Search theSearch, RequestPartitionId theRequestPartitionId) {
104                return myTransactionService
105                                .withSystemRequestOnPartition(theRequestPartitionId)
106                                .execute(() -> mySearchDao.save(theSearch));
107        }
108
109        @Override
110        @Transactional(propagation = Propagation.REQUIRED)
111        public Optional<Search> fetchByUuid(String theUuid, RequestPartitionId theRequestPartitionId) {
112                Validate.notBlank(theUuid);
113                return myTransactionService
114                                .withSystemRequestOnPartition(theRequestPartitionId)
115                                .execute(() -> mySearchDao.findByUuidAndFetchIncludes(theUuid));
116        }
117
118        void setSearchDaoForUnitTest(ISearchDao theSearchDao) {
119                mySearchDao = theSearchDao;
120        }
121
122        void setTransactionServiceForUnitTest(IHapiTransactionService theTransactionService) {
123                myTransactionService = theTransactionService;
124        }
125
126        @Override
127        public Optional<Search> tryToMarkSearchAsInProgress(Search theSearch, RequestPartitionId theRequestPartitionId) {
128                ourLog.trace(
129                                "Going to try to change search status from {} to {}", theSearch.getStatus(), SearchStatusEnum.LOADING);
130                try {
131
132                        return myTransactionService
133                                        .withSystemRequest()
134                                        .withRequestPartitionId(theRequestPartitionId)
135                                        .withPropagation(Propagation.REQUIRES_NEW)
136                                        .execute(t -> {
137                                                Search search = mySearchDao.findById(theSearch.getId()).orElse(theSearch);
138
139                                                if (search.getStatus() != SearchStatusEnum.PASSCMPLET) {
140                                                        throw new IllegalStateException(
141                                                                        Msg.code(1167) + "Can't change to LOADING because state is " + search.getStatus());
142                                                }
143                                                search.setStatus(SearchStatusEnum.LOADING);
144                                                Search newSearch = mySearchDao.save(search);
145                                                return Optional.of(newSearch);
146                                        });
147                } catch (Exception e) {
148                        ourLog.warn("Failed to activate search: {}", e.toString());
149                        ourLog.trace("Failed to activate search", e);
150                        return Optional.empty();
151                }
152        }
153
154        @Override
155        public Optional<Search> findCandidatesForReuse(
156                        String theResourceType,
157                        String theQueryString,
158                        Instant theCreatedAfter,
159                        RequestPartitionId theRequestPartitionId) {
160                HapiTransactionService.requireTransaction();
161
162                String queryString = Search.createSearchQueryStringForStorage(theQueryString, theRequestPartitionId);
163
164                int hashCode = queryString.hashCode();
165                Collection<Search> candidates =
166                                mySearchDao.findWithCutoffOrExpiry(theResourceType, hashCode, Date.from(theCreatedAfter));
167
168                for (Search nextCandidateSearch : candidates) {
169                        // We should only reuse our search if it was created within the permitted window
170                        // Date.after() is unreliable.  Instant.isAfter() always works.
171                        if (queryString.equals(nextCandidateSearch.getSearchQueryString())
172                                        && nextCandidateSearch.getCreated().toInstant().isAfter(theCreatedAfter)) {
173                                return Optional.of(nextCandidateSearch);
174                        }
175                }
176
177                return Optional.empty();
178        }
179
180        /**
181         * A transient worker for a single pass through stale-search deletion.
182         */
183        class DeleteRun {
184                final RequestPartitionId myRequestPartitionId;
185                final Instant myDeadline;
186                final Date myCutoffForDeletion;
187                final Set<Long> myUpdateDeletedFlagBatch = new HashSet<>();
188                final Set<Long> myDeleteSearchBatch = new HashSet<>();
189                /** the Search pids of the SearchResults we plan to delete in a chunk */
190                final Set<Long> myDeleteSearchResultsBatch = new HashSet<>();
191                /**
192                 * Number of results we have queued up in mySearchPidsToDeleteResults to delete.
193                 * We try to keep this to a reasonable size to avoid long transactions that may escalate to a table lock.
194                 */
195                private int myDeleteSearchResultsBatchCount = 0;
196
197                DeleteRun(Instant theDeadline, Date theCutoffForDeletion, RequestPartitionId theRequestPartitionId) {
198                        myDeadline = theDeadline;
199                        myCutoffForDeletion = theCutoffForDeletion;
200                        myRequestPartitionId = theRequestPartitionId;
201                }
202
203                /**
204                 * Mark all ids in the mySearchesToMarkForDeletion buffer as deleted, and clear the buffer.
205                 */
206                public void flushDeleteMarks() {
207                        if (myUpdateDeletedFlagBatch.isEmpty()) {
208                                return;
209                        }
210                        ourLog.debug("Marking {} searches as deleted", myUpdateDeletedFlagBatch.size());
211                        mySearchDao.updateDeleted(myUpdateDeletedFlagBatch, true);
212                        myUpdateDeletedFlagBatch.clear();
213                        commitOpenChanges();
214                }
215
216                /**
217                 * Dig into the guts of our Hibernate session, flush any changes in the session, and commit the underlying connection.
218                 */
219                private void commitOpenChanges() {
220                        // flush to force Hibernate to actually get a connection from the pool
221                        myEntityManager.flush();
222                        // get our connection from the underlying Hibernate session, and commit
223                        myEntityManager.unwrap(Session.class).doWork(Connection::commit);
224                }
225
226                void throwIfDeadlineExpired() {
227                        boolean result = Instant.ofEpochMilli(now()).isAfter(myDeadline);
228                        if (result) {
229                                throw new DeadlineException(
230                                                Msg.code(2443) + "Deadline expired while cleaning Search cache - " + myDeadline);
231                        }
232                }
233
234                private int deleteMarkedSearchesInBatches() {
235                        AtomicInteger deletedCounter = new AtomicInteger(0);
236
237                        try (final Stream<SearchIdAndResultSize> toDelete = mySearchDao.findDeleted()) {
238                                assert toDelete != null;
239
240                                toDelete.forEach(nextSearchToDelete -> {
241                                        throwIfDeadlineExpired();
242
243                                        deleteSearchAndResults(nextSearchToDelete.searchId, nextSearchToDelete.size);
244
245                                        deletedCounter.incrementAndGet();
246                                });
247                        }
248
249                        // flush anything left in the buffers
250                        flushSearchResultDeletes();
251                        flushSearchAndIncludeDeletes();
252
253                        int deletedCount = deletedCounter.get();
254                        if (deletedCount > 0) {
255                                ourLog.debug("Deleted {} expired searches", deletedCount);
256                        }
257
258                        return deletedCount;
259                }
260
261                /**
262                 * Schedule theSearchPid for deletion assuming it has theNumberOfResults SearchResults attached.
263                 * <p>
264                 * We accumulate a batch of search pids for deletion, and then do a bulk DML as we reach a threshold number
265                 * of SearchResults.
266                 * </p>
267                 *
268                 * @param theSearchPid pk of the Search
269                 * @param theNumberOfResults the number of SearchResults attached
270                 */
271                private void deleteSearchAndResults(long theSearchPid, int theNumberOfResults) {
272                        ourLog.trace("Buffering deletion of search pid {} and {} results", theSearchPid, theNumberOfResults);
273
274                        myDeleteSearchBatch.add(theSearchPid);
275
276                        if (theNumberOfResults > ourMaximumResultsToDeleteInOneCommit) {
277                                // don't buffer this one - do it inline
278                                deleteSearchResultsByChunk(theSearchPid, theNumberOfResults);
279                                return;
280                        }
281                        myDeleteSearchResultsBatch.add(theSearchPid);
282                        myDeleteSearchResultsBatchCount += theNumberOfResults;
283
284                        if (myDeleteSearchResultsBatchCount > ourMaximumResultsToDeleteInOneCommit) {
285                                flushSearchResultDeletes();
286                        }
287
288                        if (myDeleteSearchBatch.size() > ourMaximumResultsToDeleteInOneStatement) {
289                                // flush the results to make sure we don't have any references.
290                                flushSearchResultDeletes();
291
292                                flushSearchAndIncludeDeletes();
293                        }
294                }
295
296                /**
297                 * If this Search has more results than our max delete size,
298                 * delete in by itself in range chunks.
299                 * @param theSearchPid the target Search pid
300                 * @param theNumberOfResults the number of search results present
301                 */
302                private void deleteSearchResultsByChunk(long theSearchPid, int theNumberOfResults) {
303                        ourLog.debug(
304                                        "Search {} is large: has {} results.  Deleting results in chunks.",
305                                        theSearchPid,
306                                        theNumberOfResults);
307                        for (int rangeEnd = theNumberOfResults; rangeEnd >= 0; rangeEnd -= ourMaximumResultsToDeleteInOneCommit) {
308                                int rangeStart = rangeEnd - ourMaximumResultsToDeleteInOneCommit;
309                                ourLog.trace("Deleting results for search {}: {} - {}", theSearchPid, rangeStart, rangeEnd);
310                                mySearchResultDao.deleteBySearchIdInRange(theSearchPid, rangeStart, rangeEnd);
311                                commitOpenChanges();
312                        }
313                }
314
315                private void flushSearchAndIncludeDeletes() {
316                        if (myDeleteSearchBatch.isEmpty()) {
317                                return;
318                        }
319                        ourLog.debug("Deleting {} Search records", myDeleteSearchBatch.size());
320                        // referential integrity requires we delete includes before the search
321                        mySearchIncludeDao.deleteForSearch(myDeleteSearchBatch);
322                        mySearchDao.deleteByPids(myDeleteSearchBatch);
323                        myDeleteSearchBatch.clear();
324                        commitOpenChanges();
325                }
326
327                private void flushSearchResultDeletes() {
328                        if (myDeleteSearchResultsBatch.isEmpty()) {
329                                return;
330                        }
331                        ourLog.debug(
332                                        "Deleting {} Search Results from {} searches",
333                                        myDeleteSearchResultsBatchCount,
334                                        myDeleteSearchResultsBatch.size());
335                        mySearchResultDao.deleteBySearchIds(myDeleteSearchResultsBatch);
336                        myDeleteSearchResultsBatch.clear();
337                        myDeleteSearchResultsBatchCount = 0;
338                        commitOpenChanges();
339                }
340
341                IExecutionBuilder getTxBuilder() {
342                        return myTransactionService.withSystemRequest().withRequestPartitionId(myRequestPartitionId);
343                }
344
345                private void run() {
346                        ourLog.debug("Searching for searches which are before {}", myCutoffForDeletion);
347
348                        // this tx builder is not really for tx management.
349                        // Instead, it is used bind a Hibernate session + connection to this thread.
350                        // We will run a streaming query to look for work, and then commit changes in batches during the loops.
351                        getTxBuilder().execute(theStatus -> {
352                                try {
353                                        markDeletedInBatches();
354
355                                        throwIfDeadlineExpired();
356
357                                        // Delete searches that are marked as deleted
358                                        int deletedCount = deleteMarkedSearchesInBatches();
359
360                                        throwIfDeadlineExpired();
361
362                                        if ((ourLog.isDebugEnabled() || HapiSystemProperties.isTestModeEnabled()) && (deletedCount > 0)) {
363                                                Long total = mySearchDao.count();
364                                                ourLog.debug("Deleted {} searches, {} remaining", deletedCount, total);
365                                        }
366                                } catch (DeadlineException theTimeoutException) {
367                                        ourLog.warn(theTimeoutException.getMessage());
368                                }
369
370                                return null;
371                        });
372                }
373
374                /**
375                 * Stream through a list of pids before our cutoff, and set myDeleted=true in batches in a DML statement.
376                 */
377                private void markDeletedInBatches() {
378
379                        try (Stream<Long> toMarkDeleted =
380                                        mySearchDao.findWhereCreatedBefore(myCutoffForDeletion, new Date(now()))) {
381                                assert toMarkDeleted != null;
382
383                                toMarkDeleted.forEach(nextSearchToDelete -> {
384                                        throwIfDeadlineExpired();
385
386                                        if (myUpdateDeletedFlagBatch.size() >= ourMaximumResultsToDeleteInOneStatement) {
387                                                flushDeleteMarks();
388                                        }
389                                        ourLog.trace("Marking search with PID {} as ready for deletion", nextSearchToDelete);
390                                        myUpdateDeletedFlagBatch.add(nextSearchToDelete);
391                                });
392
393                                flushDeleteMarks();
394                        }
395                }
396        }
397
398        /**
399         * Marker to abandon our delete run when we are over time.
400         */
401        private static class DeadlineException extends RuntimeException {
402                public DeadlineException(String message) {
403                        super(message);
404                }
405        }
406
407        @Override
408        public void pollForStaleSearchesAndDeleteThem(RequestPartitionId theRequestPartitionId, Instant theDeadline) {
409                HapiTransactionService.noTransactionAllowed();
410
411                if (!myStorageSettings.isExpireSearchResults()) {
412                        return;
413                }
414
415                final Date cutoff = getCutoff();
416
417                final DeleteRun run = new DeleteRun(theDeadline, cutoff, theRequestPartitionId);
418
419                run.run();
420        }
421
422        @Nonnull
423        private Date getCutoff() {
424                long cutoffMillis = myStorageSettings.getExpireSearchResultsAfterMillis();
425                if (myStorageSettings.getReuseCachedSearchResultsForMillis() != null) {
426                        cutoffMillis = cutoffMillis + myStorageSettings.getReuseCachedSearchResultsForMillis();
427                }
428                final Date cutoff = new Date((now() - cutoffMillis) - myCutoffSlack);
429
430                if (ourNowForUnitTests != null) {
431                        ourLog.info(
432                                        "Searching for searches which are before {} - now is {}",
433                                        new InstantType(cutoff),
434                                        new InstantType(new Date(now())));
435                }
436                return cutoff;
437        }
438
439        @VisibleForTesting
440        public static void setMaximumResultsToDeleteInOnePassForUnitTest(int theMaximumResultsToDeleteInOnePass) {
441                ourMaximumResultsToDeleteInOneCommit = theMaximumResultsToDeleteInOnePass;
442        }
443
444        @VisibleForTesting
445        public static void setMaximumResultsToDeleteInOneStatement(int theMaximumResultsToDelete) {
446                ourMaximumResultsToDeleteInOneStatement = theMaximumResultsToDelete;
447        }
448
449        /**
450         * This is for unit tests only, do not call otherwise
451         */
452        @VisibleForTesting
453        public static void setNowForUnitTests(Long theNowForUnitTests) {
454                ourNowForUnitTests = theNowForUnitTests;
455        }
456
457        public static long now() {
458                if (ourNowForUnitTests != null) {
459                        return ourNowForUnitTests;
460                }
461                return System.currentTimeMillis();
462        }
463}