001/*-
002 * #%L
003 * HAPI FHIR JPA Server
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.search.cache;
021
022import ca.uhn.fhir.i18n.Msg;
023import ca.uhn.fhir.interceptor.model.RequestPartitionId;
024import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
025import ca.uhn.fhir.jpa.dao.data.ISearchDao;
026import ca.uhn.fhir.jpa.dao.data.ISearchIncludeDao;
027import ca.uhn.fhir.jpa.dao.data.ISearchResultDao;
028import ca.uhn.fhir.jpa.dao.data.SearchIdAndResultSize;
029import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
030import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
031import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService.IExecutionBuilder;
032import ca.uhn.fhir.jpa.entity.Search;
033import ca.uhn.fhir.jpa.model.search.SearchStatusEnum;
034import ca.uhn.fhir.system.HapiSystemProperties;
035import com.google.common.annotations.VisibleForTesting;
036import jakarta.annotation.Nonnull;
037import jakarta.persistence.EntityManager;
038import org.apache.commons.lang3.Validate;
039import org.apache.commons.lang3.time.DateUtils;
040import org.hibernate.Session;
041import org.hl7.fhir.dstu3.model.InstantType;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044import org.springframework.beans.factory.annotation.Autowired;
045import org.springframework.transaction.annotation.Propagation;
046import org.springframework.transaction.annotation.Transactional;
047
048import java.sql.Connection;
049import java.time.Instant;
050import java.util.Collection;
051import java.util.Date;
052import java.util.HashSet;
053import java.util.Optional;
054import java.util.Set;
055import java.util.concurrent.atomic.AtomicInteger;
056import java.util.stream.Stream;
057
058public class DatabaseSearchCacheSvcImpl implements ISearchCacheSvc {
059        /*
060         * Be careful increasing this number! We use the number of params here in a
061         * DELETE FROM foo WHERE params IN (term,term,term...)
062         * type query and this can fail if we have 1000s of params
063         */
064        public static final int DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_STMT = 500;
065        public static final int DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_PAS = 50000;
066        public static final long SEARCH_CLEANUP_JOB_INTERVAL_MILLIS = DateUtils.MILLIS_PER_MINUTE;
067        public static final int DEFAULT_MAX_DELETE_CANDIDATES_TO_FIND = 2000;
068        private static final Logger ourLog = LoggerFactory.getLogger(DatabaseSearchCacheSvcImpl.class);
069        private static int ourMaximumResultsToDeleteInOneStatement = DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_STMT;
070        private static int ourMaximumResultsToDeleteInOneCommit = DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_PAS;
071        private static Long ourNowForUnitTests;
072        /*
073         * We give a bit of extra leeway just to avoid race conditions where a query result
074         * is being reused (because a new client request came in with the same params) right before
075         * the result is to be deleted
076         */
077        private long myCutoffSlack = SEARCH_CLEANUP_JOB_INTERVAL_MILLIS;
078
079        @Autowired
080        private ISearchDao mySearchDao;
081
082        @Autowired
083        private EntityManager myEntityManager;
084
085        @Autowired
086        private ISearchResultDao mySearchResultDao;
087
088        @Autowired
089        private ISearchIncludeDao mySearchIncludeDao;
090
091        @Autowired
092        private IHapiTransactionService myTransactionService;
093
094        @Autowired
095        private JpaStorageSettings myStorageSettings;
096
097        @VisibleForTesting
098        public void setCutoffSlackForUnitTest(long theCutoffSlack) {
099                myCutoffSlack = theCutoffSlack;
100        }
101
102        @Override
103        public Search save(Search theSearch, RequestPartitionId theRequestPartitionId) {
104                return myTransactionService
105                                .withSystemRequestOnPartition(theRequestPartitionId)
106                                .execute(() -> mySearchDao.save(theSearch));
107        }
108
109        @Override
110        @Transactional(propagation = Propagation.REQUIRED)
111        public Optional<Search> fetchByUuid(String theUuid, RequestPartitionId theRequestPartitionId) {
112                Validate.notBlank(theUuid);
113                return myTransactionService
114                                .withSystemRequestOnPartition(theRequestPartitionId)
115                                .execute(() -> mySearchDao.findByUuidAndFetchIncludes(theUuid));
116        }
117
118        void setSearchDaoForUnitTest(ISearchDao theSearchDao) {
119                mySearchDao = theSearchDao;
120        }
121
122        void setTransactionServiceForUnitTest(IHapiTransactionService theTransactionService) {
123                myTransactionService = theTransactionService;
124        }
125
126        @Override
127        public Optional<Search> tryToMarkSearchAsInProgress(Search theSearch, RequestPartitionId theRequestPartitionId) {
128                ourLog.trace(
129                                "Going to try to change search status from {} to {}", theSearch.getStatus(), SearchStatusEnum.LOADING);
130                try {
131
132                        return myTransactionService
133                                        .withSystemRequest()
134                                        .withRequestPartitionId(theRequestPartitionId)
135                                        .withPropagation(Propagation.REQUIRES_NEW)
136                                        .execute(t -> {
137                                                Search search = mySearchDao.findById(theSearch.getId()).orElse(theSearch);
138
139                                                if (search.getStatus() != SearchStatusEnum.PASSCMPLET) {
140                                                        throw new IllegalStateException(
141                                                                        Msg.code(1167) + "Can't change to LOADING because state is " + search.getStatus());
142                                                }
143                                                search.setStatus(SearchStatusEnum.LOADING);
144                                                Search newSearch = mySearchDao.save(search);
145                                                return Optional.of(newSearch);
146                                        });
147                } catch (Exception e) {
148                        ourLog.warn("Failed to activate search: {}", e.toString());
149                        ourLog.trace("Failed to activate search", e);
150                        return Optional.empty();
151                }
152        }
153
154        @Override
155        public Optional<Search> findCandidatesForReuse(
156                        String theResourceType,
157                        String theQueryString,
158                        Instant theCreatedAfter,
159                        RequestPartitionId theRequestPartitionId) {
160                HapiTransactionService.requireTransaction();
161
162                String queryString = Search.createSearchQueryStringForStorage(theQueryString, theRequestPartitionId);
163
164                int hashCode = queryString.hashCode();
165                Collection<Search> candidates =
166                                mySearchDao.findWithCutoffOrExpiry(theResourceType, hashCode, Date.from(theCreatedAfter));
167
168                for (Search nextCandidateSearch : candidates) {
169                        // We should only reuse our search if it was created within the permitted window
170                        // Date.after() is unreliable.  Instant.isAfter() always works.
171                        if (queryString.equals(nextCandidateSearch.getSearchQueryString())
172                                        && nextCandidateSearch.getCreated().toInstant().isAfter(theCreatedAfter)) {
173                                return Optional.of(nextCandidateSearch);
174                        }
175                }
176
177                return Optional.empty();
178        }
179
180        /**
181         * A transient worker for a single pass through stale-search deletion.
182         */
183        class DeleteRun {
184                final RequestPartitionId myRequestPartitionId;
185                final Instant myDeadline;
186                final Date myCutoffForDeletion;
187                final Set<Long> myUpdateDeletedFlagBatch = new HashSet<>();
188                final Set<Long> myDeleteSearchBatch = new HashSet<>();
189                /** the Search pids of the SearchResults we plan to delete in a chunk */
190                final Set<Long> myDeleteSearchResultsBatch = new HashSet<>();
191                /**
192                 * Number of results we have queued up in mySearchPidsToDeleteResults to delete.
193                 * We try to keep this to a reasonable size to avoid long transactions that may escalate to a table lock.
194                 */
195                private int myDeleteSearchResultsBatchCount = 0;
196
197                DeleteRun(Instant theDeadline, Date theCutoffForDeletion, RequestPartitionId theRequestPartitionId) {
198                        myDeadline = theDeadline;
199                        myCutoffForDeletion = theCutoffForDeletion;
200                        myRequestPartitionId = theRequestPartitionId;
201                }
202
203                /**
204                 * Mark all ids in the mySearchesToMarkForDeletion buffer as deleted, and clear the buffer.
205                 */
206                public void flushDeleteMarks() {
207                        if (myUpdateDeletedFlagBatch.isEmpty()) {
208                                return;
209                        }
210                        ourLog.debug("Marking {} searches as deleted", myUpdateDeletedFlagBatch.size());
211                        mySearchDao.updateDeleted(myUpdateDeletedFlagBatch, true);
212                        myUpdateDeletedFlagBatch.clear();
213                        commitOpenChanges();
214                }
215
216                /**
217                 * Dig into the guts of our Hibernate session, flush any changes in the session, and commit the underlying connection.
218                 */
219                private void commitOpenChanges() {
220                        // flush to force Hibernate to actually get a connection from the pool
221                        myEntityManager.flush();
222                        // get our connection from the underlying Hibernate session, and commit
223                        //noinspection resource
224                        myEntityManager.unwrap(Session.class).doWork(Connection::commit);
225                }
226
227                void throwIfDeadlineExpired() {
228                        boolean result = Instant.ofEpochMilli(now()).isAfter(myDeadline);
229                        if (result) {
230                                throw new DeadlineException(
231                                                Msg.code(2443) + "Deadline expired while cleaning Search cache - " + myDeadline);
232                        }
233                }
234
235                private int deleteMarkedSearchesInBatches() {
236                        AtomicInteger deletedCounter = new AtomicInteger(0);
237
238                        try (final Stream<SearchIdAndResultSize> toDelete = mySearchDao.findDeleted()) {
239                                assert toDelete != null;
240
241                                toDelete.forEach(nextSearchToDelete -> {
242                                        throwIfDeadlineExpired();
243
244                                        deleteSearchAndResults(nextSearchToDelete.searchId, nextSearchToDelete.size);
245
246                                        deletedCounter.incrementAndGet();
247                                });
248                        }
249
250                        // flush anything left in the buffers
251                        flushSearchResultDeletes();
252                        flushSearchAndIncludeDeletes();
253
254                        int deletedCount = deletedCounter.get();
255
256                        ourLog.info("Deleted {} expired searches", deletedCount);
257
258                        return deletedCount;
259                }
260
261                /**
262                 * Schedule theSearchPid for deletion assuming it has theNumberOfResults SearchResults attached.
263                 *
264                 * We accumulate a batch of search pids for deletion, and then do a bulk DML as we reach a threshold number
265                 * of SearchResults.
266                 *
267                 * @param theSearchPid pk of the Search
268                 * @param theNumberOfResults the number of SearchResults attached
269                 */
270                private void deleteSearchAndResults(long theSearchPid, int theNumberOfResults) {
271                        ourLog.trace("Buffering deletion of search pid {} and {} results", theSearchPid, theNumberOfResults);
272
273                        myDeleteSearchBatch.add(theSearchPid);
274
275                        if (theNumberOfResults > ourMaximumResultsToDeleteInOneCommit) {
276                                // don't buffer this one - do it inline
277                                deleteSearchResultsByChunk(theSearchPid, theNumberOfResults);
278                                return;
279                        }
280                        myDeleteSearchResultsBatch.add(theSearchPid);
281                        myDeleteSearchResultsBatchCount += theNumberOfResults;
282
283                        if (myDeleteSearchResultsBatchCount > ourMaximumResultsToDeleteInOneCommit) {
284                                flushSearchResultDeletes();
285                        }
286
287                        if (myDeleteSearchBatch.size() > ourMaximumResultsToDeleteInOneStatement) {
288                                // flush the results to make sure we don't have any references.
289                                flushSearchResultDeletes();
290
291                                flushSearchAndIncludeDeletes();
292                        }
293                }
294
295                /**
296                 * If this Search has more results than our max delete size,
297                 * delete in by itself in range chunks.
298                 * @param theSearchPid the target Search pid
299                 * @param theNumberOfResults the number of search results present
300                 */
301                private void deleteSearchResultsByChunk(long theSearchPid, int theNumberOfResults) {
302                        ourLog.debug(
303                                        "Search {} is large: has {} results.  Deleting results in chunks.",
304                                        theSearchPid,
305                                        theNumberOfResults);
306                        for (int rangeEnd = theNumberOfResults; rangeEnd >= 0; rangeEnd -= ourMaximumResultsToDeleteInOneCommit) {
307                                int rangeStart = rangeEnd - ourMaximumResultsToDeleteInOneCommit;
308                                ourLog.trace("Deleting results for search {}: {} - {}", theSearchPid, rangeStart, rangeEnd);
309                                mySearchResultDao.deleteBySearchIdInRange(theSearchPid, rangeStart, rangeEnd);
310                                commitOpenChanges();
311                        }
312                }
313
314                private void flushSearchAndIncludeDeletes() {
315                        if (myDeleteSearchBatch.isEmpty()) {
316                                return;
317                        }
318                        ourLog.debug("Deleting {} Search records", myDeleteSearchBatch.size());
319                        // referential integrity requires we delete includes before the search
320                        mySearchIncludeDao.deleteForSearch(myDeleteSearchBatch);
321                        mySearchDao.deleteByPids(myDeleteSearchBatch);
322                        myDeleteSearchBatch.clear();
323                        commitOpenChanges();
324                }
325
326                private void flushSearchResultDeletes() {
327                        if (myDeleteSearchResultsBatch.isEmpty()) {
328                                return;
329                        }
330                        ourLog.debug(
331                                        "Deleting {} Search Results from {} searches",
332                                        myDeleteSearchResultsBatchCount,
333                                        myDeleteSearchResultsBatch.size());
334                        mySearchResultDao.deleteBySearchIds(myDeleteSearchResultsBatch);
335                        myDeleteSearchResultsBatch.clear();
336                        myDeleteSearchResultsBatchCount = 0;
337                        commitOpenChanges();
338                }
339
340                IExecutionBuilder getTxBuilder() {
341                        return myTransactionService.withSystemRequest().withRequestPartitionId(myRequestPartitionId);
342                }
343
344                private void run() {
345                        ourLog.debug("Searching for searches which are before {}", myCutoffForDeletion);
346
347                        // this tx builder is not really for tx management.
348                        // Instead, it is used bind a Hibernate session + connection to this thread.
349                        // We will run a streaming query to look for work, and then commit changes in batches during the loops.
350                        getTxBuilder().execute(theStatus -> {
351                                try {
352                                        markDeletedInBatches();
353
354                                        throwIfDeadlineExpired();
355
356                                        // Delete searches that are marked as deleted
357                                        int deletedCount = deleteMarkedSearchesInBatches();
358
359                                        throwIfDeadlineExpired();
360
361                                        if ((ourLog.isDebugEnabled() || HapiSystemProperties.isTestModeEnabled()) && (deletedCount > 0)) {
362                                                Long total = mySearchDao.count();
363                                                ourLog.debug("Deleted {} searches, {} remaining", deletedCount, total);
364                                        }
365                                } catch (DeadlineException theTimeoutException) {
366                                        ourLog.warn(theTimeoutException.getMessage());
367                                }
368
369                                return null;
370                        });
371                }
372
373                /**
374                 * Stream through a list of pids before our cutoff, and set myDeleted=true in batches in a DML statement.
375                 */
376                private void markDeletedInBatches() {
377
378                        try (Stream<Long> toMarkDeleted =
379                                        mySearchDao.findWhereCreatedBefore(myCutoffForDeletion, new Date(now()))) {
380                                assert toMarkDeleted != null;
381
382                                toMarkDeleted.forEach(nextSearchToDelete -> {
383                                        throwIfDeadlineExpired();
384
385                                        if (myUpdateDeletedFlagBatch.size() >= ourMaximumResultsToDeleteInOneStatement) {
386                                                flushDeleteMarks();
387                                        }
388                                        ourLog.trace("Marking search with PID {} as ready for deletion", nextSearchToDelete);
389                                        myUpdateDeletedFlagBatch.add(nextSearchToDelete);
390                                });
391
392                                flushDeleteMarks();
393                        }
394                }
395        }
396
397        /**
398         * Marker to abandon our delete run when we are over time.
399         */
400        private static class DeadlineException extends RuntimeException {
401                public DeadlineException(String message) {
402                        super(message);
403                }
404        }
405
406        @Override
407        public void pollForStaleSearchesAndDeleteThem(RequestPartitionId theRequestPartitionId, Instant theDeadline) {
408                HapiTransactionService.noTransactionAllowed();
409
410                if (!myStorageSettings.isExpireSearchResults()) {
411                        return;
412                }
413
414                final Date cutoff = getCutoff();
415
416                final DeleteRun run = new DeleteRun(theDeadline, cutoff, theRequestPartitionId);
417
418                run.run();
419        }
420
421        @Nonnull
422        private Date getCutoff() {
423                long cutoffMillis = myStorageSettings.getExpireSearchResultsAfterMillis();
424                if (myStorageSettings.getReuseCachedSearchResultsForMillis() != null) {
425                        cutoffMillis = cutoffMillis + myStorageSettings.getReuseCachedSearchResultsForMillis();
426                }
427                final Date cutoff = new Date((now() - cutoffMillis) - myCutoffSlack);
428
429                if (ourNowForUnitTests != null) {
430                        ourLog.info(
431                                        "Searching for searches which are before {} - now is {}",
432                                        new InstantType(cutoff),
433                                        new InstantType(new Date(now())));
434                }
435                return cutoff;
436        }
437
438        @VisibleForTesting
439        public static void setMaximumResultsToDeleteInOnePassForUnitTest(int theMaximumResultsToDeleteInOnePass) {
440                ourMaximumResultsToDeleteInOneCommit = theMaximumResultsToDeleteInOnePass;
441        }
442
443        @VisibleForTesting
444        public static void setMaximumResultsToDeleteInOneStatement(int theMaximumResultsToDelete) {
445                ourMaximumResultsToDeleteInOneStatement = theMaximumResultsToDelete;
446        }
447
448        /**
449         * This is for unit tests only, do not call otherwise
450         */
451        @VisibleForTesting
452        public static void setNowForUnitTests(Long theNowForUnitTests) {
453                ourNowForUnitTests = theNowForUnitTests;
454        }
455
456        public static long now() {
457                if (ourNowForUnitTests != null) {
458                        return ourNowForUnitTests;
459                }
460                return System.currentTimeMillis();
461        }
462}