001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.search.cache; 021 022import ca.uhn.fhir.i18n.Msg; 023import ca.uhn.fhir.interceptor.model.RequestPartitionId; 024import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; 025import ca.uhn.fhir.jpa.dao.data.ISearchDao; 026import ca.uhn.fhir.jpa.dao.data.ISearchIncludeDao; 027import ca.uhn.fhir.jpa.dao.data.ISearchResultDao; 028import ca.uhn.fhir.jpa.dao.data.SearchIdAndResultSize; 029import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; 030import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; 031import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService.IExecutionBuilder; 032import ca.uhn.fhir.jpa.entity.Search; 033import ca.uhn.fhir.jpa.model.search.SearchStatusEnum; 034import ca.uhn.fhir.system.HapiSystemProperties; 035import com.google.common.annotations.VisibleForTesting; 036import jakarta.annotation.Nonnull; 037import jakarta.persistence.EntityManager; 038import org.apache.commons.lang3.Validate; 039import org.apache.commons.lang3.time.DateUtils; 040import org.hibernate.Session; 041import org.hl7.fhir.dstu3.model.InstantType; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044import org.springframework.beans.factory.annotation.Autowired; 045import org.springframework.transaction.annotation.Propagation; 046import org.springframework.transaction.annotation.Transactional; 047 048import java.sql.Connection; 049import java.time.Instant; 050import java.util.Collection; 051import java.util.Date; 052import java.util.HashSet; 053import java.util.Optional; 054import java.util.Set; 055import java.util.concurrent.atomic.AtomicInteger; 056import java.util.stream.Stream; 057 058public class DatabaseSearchCacheSvcImpl implements ISearchCacheSvc { 059 /* 060 * Be careful increasing this number! We use the number of params here in a 061 * DELETE FROM foo WHERE params IN (term,term,term...) 062 * type query and this can fail if we have 1000s of params 063 */ 064 public static final int DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_STMT = 500; 065 public static final int DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_PAS = 50000; 066 public static final long SEARCH_CLEANUP_JOB_INTERVAL_MILLIS = DateUtils.MILLIS_PER_MINUTE; 067 public static final int DEFAULT_MAX_DELETE_CANDIDATES_TO_FIND = 2000; 068 private static final Logger ourLog = LoggerFactory.getLogger(DatabaseSearchCacheSvcImpl.class); 069 private static int ourMaximumResultsToDeleteInOneStatement = DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_STMT; 070 private static int ourMaximumResultsToDeleteInOneCommit = DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_PAS; 071 private static Long ourNowForUnitTests; 072 /* 073 * We give a bit of extra leeway just to avoid race conditions where a query result 074 * is being reused (because a new client request came in with the same params) right before 075 * the result is to be deleted 076 */ 077 private long myCutoffSlack = SEARCH_CLEANUP_JOB_INTERVAL_MILLIS; 078 079 @Autowired 080 private ISearchDao mySearchDao; 081 082 @Autowired 083 private EntityManager myEntityManager; 084 085 @Autowired 086 private ISearchResultDao mySearchResultDao; 087 088 @Autowired 089 private ISearchIncludeDao mySearchIncludeDao; 090 091 @Autowired 092 private IHapiTransactionService myTransactionService; 093 094 @Autowired 095 private JpaStorageSettings myStorageSettings; 096 097 @VisibleForTesting 098 public void setCutoffSlackForUnitTest(long theCutoffSlack) { 099 myCutoffSlack = theCutoffSlack; 100 } 101 102 @Override 103 public Search save(Search theSearch, RequestPartitionId theRequestPartitionId) { 104 return myTransactionService 105 .withSystemRequestOnPartition(theRequestPartitionId) 106 .execute(() -> mySearchDao.save(theSearch)); 107 } 108 109 @Override 110 @Transactional(propagation = Propagation.REQUIRED) 111 public Optional<Search> fetchByUuid(String theUuid, RequestPartitionId theRequestPartitionId) { 112 Validate.notBlank(theUuid); 113 return myTransactionService 114 .withSystemRequestOnPartition(theRequestPartitionId) 115 .execute(() -> mySearchDao.findByUuidAndFetchIncludes(theUuid)); 116 } 117 118 void setSearchDaoForUnitTest(ISearchDao theSearchDao) { 119 mySearchDao = theSearchDao; 120 } 121 122 void setTransactionServiceForUnitTest(IHapiTransactionService theTransactionService) { 123 myTransactionService = theTransactionService; 124 } 125 126 @Override 127 public Optional<Search> tryToMarkSearchAsInProgress(Search theSearch, RequestPartitionId theRequestPartitionId) { 128 ourLog.trace( 129 "Going to try to change search status from {} to {}", theSearch.getStatus(), SearchStatusEnum.LOADING); 130 try { 131 132 return myTransactionService 133 .withSystemRequest() 134 .withRequestPartitionId(theRequestPartitionId) 135 .withPropagation(Propagation.REQUIRES_NEW) 136 .execute(t -> { 137 Search search = mySearchDao.findById(theSearch.getId()).orElse(theSearch); 138 139 if (search.getStatus() != SearchStatusEnum.PASSCMPLET) { 140 throw new IllegalStateException( 141 Msg.code(1167) + "Can't change to LOADING because state is " + search.getStatus()); 142 } 143 search.setStatus(SearchStatusEnum.LOADING); 144 Search newSearch = mySearchDao.save(search); 145 return Optional.of(newSearch); 146 }); 147 } catch (Exception e) { 148 ourLog.warn("Failed to activate search: {}", e.toString()); 149 ourLog.trace("Failed to activate search", e); 150 return Optional.empty(); 151 } 152 } 153 154 @Override 155 public Optional<Search> findCandidatesForReuse( 156 String theResourceType, 157 String theQueryString, 158 Instant theCreatedAfter, 159 RequestPartitionId theRequestPartitionId) { 160 HapiTransactionService.requireTransaction(); 161 162 String queryString = Search.createSearchQueryStringForStorage(theQueryString, theRequestPartitionId); 163 164 int hashCode = queryString.hashCode(); 165 Collection<Search> candidates = 166 mySearchDao.findWithCutoffOrExpiry(theResourceType, hashCode, Date.from(theCreatedAfter)); 167 168 for (Search nextCandidateSearch : candidates) { 169 // We should only reuse our search if it was created within the permitted window 170 // Date.after() is unreliable. Instant.isAfter() always works. 171 if (queryString.equals(nextCandidateSearch.getSearchQueryString()) 172 && nextCandidateSearch.getCreated().toInstant().isAfter(theCreatedAfter)) { 173 return Optional.of(nextCandidateSearch); 174 } 175 } 176 177 return Optional.empty(); 178 } 179 180 /** 181 * A transient worker for a single pass through stale-search deletion. 182 */ 183 class DeleteRun { 184 final RequestPartitionId myRequestPartitionId; 185 final Instant myDeadline; 186 final Date myCutoffForDeletion; 187 final Set<Long> myUpdateDeletedFlagBatch = new HashSet<>(); 188 final Set<Long> myDeleteSearchBatch = new HashSet<>(); 189 /** the Search pids of the SearchResults we plan to delete in a chunk */ 190 final Set<Long> myDeleteSearchResultsBatch = new HashSet<>(); 191 /** 192 * Number of results we have queued up in mySearchPidsToDeleteResults to delete. 193 * We try to keep this to a reasonable size to avoid long transactions that may escalate to a table lock. 194 */ 195 private int myDeleteSearchResultsBatchCount = 0; 196 197 DeleteRun(Instant theDeadline, Date theCutoffForDeletion, RequestPartitionId theRequestPartitionId) { 198 myDeadline = theDeadline; 199 myCutoffForDeletion = theCutoffForDeletion; 200 myRequestPartitionId = theRequestPartitionId; 201 } 202 203 /** 204 * Mark all ids in the mySearchesToMarkForDeletion buffer as deleted, and clear the buffer. 205 */ 206 public void flushDeleteMarks() { 207 if (myUpdateDeletedFlagBatch.isEmpty()) { 208 return; 209 } 210 ourLog.debug("Marking {} searches as deleted", myUpdateDeletedFlagBatch.size()); 211 mySearchDao.updateDeleted(myUpdateDeletedFlagBatch, true); 212 myUpdateDeletedFlagBatch.clear(); 213 commitOpenChanges(); 214 } 215 216 /** 217 * Dig into the guts of our Hibernate session, flush any changes in the session, and commit the underlying connection. 218 */ 219 private void commitOpenChanges() { 220 // flush to force Hibernate to actually get a connection from the pool 221 myEntityManager.flush(); 222 // get our connection from the underlying Hibernate session, and commit 223 //noinspection resource 224 myEntityManager.unwrap(Session.class).doWork(Connection::commit); 225 } 226 227 void throwIfDeadlineExpired() { 228 boolean result = Instant.ofEpochMilli(now()).isAfter(myDeadline); 229 if (result) { 230 throw new DeadlineException( 231 Msg.code(2443) + "Deadline expired while cleaning Search cache - " + myDeadline); 232 } 233 } 234 235 private int deleteMarkedSearchesInBatches() { 236 AtomicInteger deletedCounter = new AtomicInteger(0); 237 238 try (final Stream<SearchIdAndResultSize> toDelete = mySearchDao.findDeleted()) { 239 assert toDelete != null; 240 241 toDelete.forEach(nextSearchToDelete -> { 242 throwIfDeadlineExpired(); 243 244 deleteSearchAndResults(nextSearchToDelete.searchId, nextSearchToDelete.size); 245 246 deletedCounter.incrementAndGet(); 247 }); 248 } 249 250 // flush anything left in the buffers 251 flushSearchResultDeletes(); 252 flushSearchAndIncludeDeletes(); 253 254 int deletedCount = deletedCounter.get(); 255 256 ourLog.info("Deleted {} expired searches", deletedCount); 257 258 return deletedCount; 259 } 260 261 /** 262 * Schedule theSearchPid for deletion assuming it has theNumberOfResults SearchResults attached. 263 * 264 * We accumulate a batch of search pids for deletion, and then do a bulk DML as we reach a threshold number 265 * of SearchResults. 266 * 267 * @param theSearchPid pk of the Search 268 * @param theNumberOfResults the number of SearchResults attached 269 */ 270 private void deleteSearchAndResults(long theSearchPid, int theNumberOfResults) { 271 ourLog.trace("Buffering deletion of search pid {} and {} results", theSearchPid, theNumberOfResults); 272 273 myDeleteSearchBatch.add(theSearchPid); 274 275 if (theNumberOfResults > ourMaximumResultsToDeleteInOneCommit) { 276 // don't buffer this one - do it inline 277 deleteSearchResultsByChunk(theSearchPid, theNumberOfResults); 278 return; 279 } 280 myDeleteSearchResultsBatch.add(theSearchPid); 281 myDeleteSearchResultsBatchCount += theNumberOfResults; 282 283 if (myDeleteSearchResultsBatchCount > ourMaximumResultsToDeleteInOneCommit) { 284 flushSearchResultDeletes(); 285 } 286 287 if (myDeleteSearchBatch.size() > ourMaximumResultsToDeleteInOneStatement) { 288 // flush the results to make sure we don't have any references. 289 flushSearchResultDeletes(); 290 291 flushSearchAndIncludeDeletes(); 292 } 293 } 294 295 /** 296 * If this Search has more results than our max delete size, 297 * delete in by itself in range chunks. 298 * @param theSearchPid the target Search pid 299 * @param theNumberOfResults the number of search results present 300 */ 301 private void deleteSearchResultsByChunk(long theSearchPid, int theNumberOfResults) { 302 ourLog.debug( 303 "Search {} is large: has {} results. Deleting results in chunks.", 304 theSearchPid, 305 theNumberOfResults); 306 for (int rangeEnd = theNumberOfResults; rangeEnd >= 0; rangeEnd -= ourMaximumResultsToDeleteInOneCommit) { 307 int rangeStart = rangeEnd - ourMaximumResultsToDeleteInOneCommit; 308 ourLog.trace("Deleting results for search {}: {} - {}", theSearchPid, rangeStart, rangeEnd); 309 mySearchResultDao.deleteBySearchIdInRange(theSearchPid, rangeStart, rangeEnd); 310 commitOpenChanges(); 311 } 312 } 313 314 private void flushSearchAndIncludeDeletes() { 315 if (myDeleteSearchBatch.isEmpty()) { 316 return; 317 } 318 ourLog.debug("Deleting {} Search records", myDeleteSearchBatch.size()); 319 // referential integrity requires we delete includes before the search 320 mySearchIncludeDao.deleteForSearch(myDeleteSearchBatch); 321 mySearchDao.deleteByPids(myDeleteSearchBatch); 322 myDeleteSearchBatch.clear(); 323 commitOpenChanges(); 324 } 325 326 private void flushSearchResultDeletes() { 327 if (myDeleteSearchResultsBatch.isEmpty()) { 328 return; 329 } 330 ourLog.debug( 331 "Deleting {} Search Results from {} searches", 332 myDeleteSearchResultsBatchCount, 333 myDeleteSearchResultsBatch.size()); 334 mySearchResultDao.deleteBySearchIds(myDeleteSearchResultsBatch); 335 myDeleteSearchResultsBatch.clear(); 336 myDeleteSearchResultsBatchCount = 0; 337 commitOpenChanges(); 338 } 339 340 IExecutionBuilder getTxBuilder() { 341 return myTransactionService.withSystemRequest().withRequestPartitionId(myRequestPartitionId); 342 } 343 344 private void run() { 345 ourLog.debug("Searching for searches which are before {}", myCutoffForDeletion); 346 347 // this tx builder is not really for tx management. 348 // Instead, it is used bind a Hibernate session + connection to this thread. 349 // We will run a streaming query to look for work, and then commit changes in batches during the loops. 350 getTxBuilder().execute(theStatus -> { 351 try { 352 markDeletedInBatches(); 353 354 throwIfDeadlineExpired(); 355 356 // Delete searches that are marked as deleted 357 int deletedCount = deleteMarkedSearchesInBatches(); 358 359 throwIfDeadlineExpired(); 360 361 if ((ourLog.isDebugEnabled() || HapiSystemProperties.isTestModeEnabled()) && (deletedCount > 0)) { 362 Long total = mySearchDao.count(); 363 ourLog.debug("Deleted {} searches, {} remaining", deletedCount, total); 364 } 365 } catch (DeadlineException theTimeoutException) { 366 ourLog.warn(theTimeoutException.getMessage()); 367 } 368 369 return null; 370 }); 371 } 372 373 /** 374 * Stream through a list of pids before our cutoff, and set myDeleted=true in batches in a DML statement. 375 */ 376 private void markDeletedInBatches() { 377 378 try (Stream<Long> toMarkDeleted = 379 mySearchDao.findWhereCreatedBefore(myCutoffForDeletion, new Date(now()))) { 380 assert toMarkDeleted != null; 381 382 toMarkDeleted.forEach(nextSearchToDelete -> { 383 throwIfDeadlineExpired(); 384 385 if (myUpdateDeletedFlagBatch.size() >= ourMaximumResultsToDeleteInOneStatement) { 386 flushDeleteMarks(); 387 } 388 ourLog.trace("Marking search with PID {} as ready for deletion", nextSearchToDelete); 389 myUpdateDeletedFlagBatch.add(nextSearchToDelete); 390 }); 391 392 flushDeleteMarks(); 393 } 394 } 395 } 396 397 /** 398 * Marker to abandon our delete run when we are over time. 399 */ 400 private static class DeadlineException extends RuntimeException { 401 public DeadlineException(String message) { 402 super(message); 403 } 404 } 405 406 @Override 407 public void pollForStaleSearchesAndDeleteThem(RequestPartitionId theRequestPartitionId, Instant theDeadline) { 408 HapiTransactionService.noTransactionAllowed(); 409 410 if (!myStorageSettings.isExpireSearchResults()) { 411 return; 412 } 413 414 final Date cutoff = getCutoff(); 415 416 final DeleteRun run = new DeleteRun(theDeadline, cutoff, theRequestPartitionId); 417 418 run.run(); 419 } 420 421 @Nonnull 422 private Date getCutoff() { 423 long cutoffMillis = myStorageSettings.getExpireSearchResultsAfterMillis(); 424 if (myStorageSettings.getReuseCachedSearchResultsForMillis() != null) { 425 cutoffMillis = cutoffMillis + myStorageSettings.getReuseCachedSearchResultsForMillis(); 426 } 427 final Date cutoff = new Date((now() - cutoffMillis) - myCutoffSlack); 428 429 if (ourNowForUnitTests != null) { 430 ourLog.info( 431 "Searching for searches which are before {} - now is {}", 432 new InstantType(cutoff), 433 new InstantType(new Date(now()))); 434 } 435 return cutoff; 436 } 437 438 @VisibleForTesting 439 public static void setMaximumResultsToDeleteInOnePassForUnitTest(int theMaximumResultsToDeleteInOnePass) { 440 ourMaximumResultsToDeleteInOneCommit = theMaximumResultsToDeleteInOnePass; 441 } 442 443 @VisibleForTesting 444 public static void setMaximumResultsToDeleteInOneStatement(int theMaximumResultsToDelete) { 445 ourMaximumResultsToDeleteInOneStatement = theMaximumResultsToDelete; 446 } 447 448 /** 449 * This is for unit tests only, do not call otherwise 450 */ 451 @VisibleForTesting 452 public static void setNowForUnitTests(Long theNowForUnitTests) { 453 ourNowForUnitTests = theNowForUnitTests; 454 } 455 456 public static long now() { 457 if (ourNowForUnitTests != null) { 458 return ourNowForUnitTests; 459 } 460 return System.currentTimeMillis(); 461 } 462}