001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.search.cache; 021 022import ca.uhn.fhir.i18n.Msg; 023import ca.uhn.fhir.interceptor.model.RequestPartitionId; 024import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; 025import ca.uhn.fhir.jpa.dao.data.ISearchDao; 026import ca.uhn.fhir.jpa.dao.data.ISearchIncludeDao; 027import ca.uhn.fhir.jpa.dao.data.ISearchResultDao; 028import ca.uhn.fhir.jpa.dao.data.SearchIdAndResultSize; 029import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; 030import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; 031import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService.IExecutionBuilder; 032import ca.uhn.fhir.jpa.entity.Search; 033import ca.uhn.fhir.jpa.model.search.SearchStatusEnum; 034import ca.uhn.fhir.system.HapiSystemProperties; 035import com.google.common.annotations.VisibleForTesting; 036import jakarta.annotation.Nonnull; 037import jakarta.persistence.EntityManager; 038import org.apache.commons.lang3.Validate; 039import org.apache.commons.lang3.time.DateUtils; 040import org.hibernate.Session; 041import org.hl7.fhir.dstu3.model.InstantType; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044import org.springframework.beans.factory.annotation.Autowired; 045import org.springframework.transaction.annotation.Propagation; 046import org.springframework.transaction.annotation.Transactional; 047 048import java.sql.Connection; 049import java.time.Instant; 050import java.util.Collection; 051import java.util.Date; 052import java.util.HashSet; 053import java.util.Optional; 054import java.util.Set; 055import java.util.concurrent.atomic.AtomicInteger; 056import java.util.stream.Stream; 057 058public class DatabaseSearchCacheSvcImpl implements ISearchCacheSvc { 059 /* 060 * Be careful increasing this number! We use the number of params here in a 061 * DELETE FROM foo WHERE params IN (term,term,term...) 062 * type query and this can fail if we have 1000s of params 063 */ 064 public static final int DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_STMT = 500; 065 public static final int DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_PAS = 50000; 066 public static final long SEARCH_CLEANUP_JOB_INTERVAL_MILLIS = DateUtils.MILLIS_PER_MINUTE; 067 public static final int DEFAULT_MAX_DELETE_CANDIDATES_TO_FIND = 2000; 068 private static final Logger ourLog = LoggerFactory.getLogger(DatabaseSearchCacheSvcImpl.class); 069 private static int ourMaximumResultsToDeleteInOneStatement = DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_STMT; 070 private static int ourMaximumResultsToDeleteInOneCommit = DEFAULT_MAX_RESULTS_TO_DELETE_IN_ONE_PAS; 071 private static Long ourNowForUnitTests; 072 /* 073 * We give a bit of extra leeway just to avoid race conditions where a query result 074 * is being reused (because a new client request came in with the same params) right before 075 * the result is to be deleted 076 */ 077 private long myCutoffSlack = SEARCH_CLEANUP_JOB_INTERVAL_MILLIS; 078 079 @Autowired 080 private ISearchDao mySearchDao; 081 082 @Autowired 083 private EntityManager myEntityManager; 084 085 @Autowired 086 private ISearchResultDao mySearchResultDao; 087 088 @Autowired 089 private ISearchIncludeDao mySearchIncludeDao; 090 091 @Autowired 092 private IHapiTransactionService myTransactionService; 093 094 @Autowired 095 private JpaStorageSettings myStorageSettings; 096 097 @VisibleForTesting 098 public void setCutoffSlackForUnitTest(long theCutoffSlack) { 099 myCutoffSlack = theCutoffSlack; 100 } 101 102 @Override 103 public Search save(Search theSearch, RequestPartitionId theRequestPartitionId) { 104 return myTransactionService 105 .withSystemRequestOnPartition(theRequestPartitionId) 106 .execute(() -> mySearchDao.save(theSearch)); 107 } 108 109 @Override 110 @Transactional(propagation = Propagation.REQUIRED) 111 public Optional<Search> fetchByUuid(String theUuid, RequestPartitionId theRequestPartitionId) { 112 Validate.notBlank(theUuid); 113 return myTransactionService 114 .withSystemRequestOnPartition(theRequestPartitionId) 115 .execute(() -> mySearchDao.findByUuidAndFetchIncludes(theUuid)); 116 } 117 118 void setSearchDaoForUnitTest(ISearchDao theSearchDao) { 119 mySearchDao = theSearchDao; 120 } 121 122 void setTransactionServiceForUnitTest(IHapiTransactionService theTransactionService) { 123 myTransactionService = theTransactionService; 124 } 125 126 @Override 127 public Optional<Search> tryToMarkSearchAsInProgress(Search theSearch, RequestPartitionId theRequestPartitionId) { 128 ourLog.trace( 129 "Going to try to change search status from {} to {}", theSearch.getStatus(), SearchStatusEnum.LOADING); 130 try { 131 132 return myTransactionService 133 .withSystemRequest() 134 .withRequestPartitionId(theRequestPartitionId) 135 .withPropagation(Propagation.REQUIRES_NEW) 136 .execute(t -> { 137 Search search = mySearchDao.findById(theSearch.getId()).orElse(theSearch); 138 139 if (search.getStatus() != SearchStatusEnum.PASSCMPLET) { 140 throw new IllegalStateException( 141 Msg.code(1167) + "Can't change to LOADING because state is " + search.getStatus()); 142 } 143 search.setStatus(SearchStatusEnum.LOADING); 144 Search newSearch = mySearchDao.save(search); 145 return Optional.of(newSearch); 146 }); 147 } catch (Exception e) { 148 ourLog.warn("Failed to activate search: {}", e.toString()); 149 ourLog.trace("Failed to activate search", e); 150 return Optional.empty(); 151 } 152 } 153 154 @Override 155 public Optional<Search> findCandidatesForReuse( 156 String theResourceType, 157 String theQueryString, 158 Instant theCreatedAfter, 159 RequestPartitionId theRequestPartitionId) { 160 HapiTransactionService.requireTransaction(); 161 162 String queryString = Search.createSearchQueryStringForStorage(theQueryString, theRequestPartitionId); 163 164 int hashCode = queryString.hashCode(); 165 Collection<Search> candidates = 166 mySearchDao.findWithCutoffOrExpiry(theResourceType, hashCode, Date.from(theCreatedAfter)); 167 168 for (Search nextCandidateSearch : candidates) { 169 // We should only reuse our search if it was created within the permitted window 170 // Date.after() is unreliable. Instant.isAfter() always works. 171 if (queryString.equals(nextCandidateSearch.getSearchQueryString()) 172 && nextCandidateSearch.getCreated().toInstant().isAfter(theCreatedAfter)) { 173 return Optional.of(nextCandidateSearch); 174 } 175 } 176 177 return Optional.empty(); 178 } 179 180 /** 181 * A transient worker for a single pass through stale-search deletion. 182 */ 183 class DeleteRun { 184 final RequestPartitionId myRequestPartitionId; 185 final Instant myDeadline; 186 final Date myCutoffForDeletion; 187 final Set<Long> myUpdateDeletedFlagBatch = new HashSet<>(); 188 final Set<Long> myDeleteSearchBatch = new HashSet<>(); 189 /** the Search pids of the SearchResults we plan to delete in a chunk */ 190 final Set<Long> myDeleteSearchResultsBatch = new HashSet<>(); 191 /** 192 * Number of results we have queued up in mySearchPidsToDeleteResults to delete. 193 * We try to keep this to a reasonable size to avoid long transactions that may escalate to a table lock. 194 */ 195 private int myDeleteSearchResultsBatchCount = 0; 196 197 DeleteRun(Instant theDeadline, Date theCutoffForDeletion, RequestPartitionId theRequestPartitionId) { 198 myDeadline = theDeadline; 199 myCutoffForDeletion = theCutoffForDeletion; 200 myRequestPartitionId = theRequestPartitionId; 201 } 202 203 /** 204 * Mark all ids in the mySearchesToMarkForDeletion buffer as deleted, and clear the buffer. 205 */ 206 public void flushDeleteMarks() { 207 if (myUpdateDeletedFlagBatch.isEmpty()) { 208 return; 209 } 210 ourLog.debug("Marking {} searches as deleted", myUpdateDeletedFlagBatch.size()); 211 mySearchDao.updateDeleted(myUpdateDeletedFlagBatch, true); 212 myUpdateDeletedFlagBatch.clear(); 213 commitOpenChanges(); 214 } 215 216 /** 217 * Dig into the guts of our Hibernate session, flush any changes in the session, and commit the underlying connection. 218 */ 219 private void commitOpenChanges() { 220 // flush to force Hibernate to actually get a connection from the pool 221 myEntityManager.flush(); 222 // get our connection from the underlying Hibernate session, and commit 223 myEntityManager.unwrap(Session.class).doWork(Connection::commit); 224 } 225 226 void throwIfDeadlineExpired() { 227 boolean result = Instant.ofEpochMilli(now()).isAfter(myDeadline); 228 if (result) { 229 throw new DeadlineException( 230 Msg.code(2443) + "Deadline expired while cleaning Search cache - " + myDeadline); 231 } 232 } 233 234 private int deleteMarkedSearchesInBatches() { 235 AtomicInteger deletedCounter = new AtomicInteger(0); 236 237 try (final Stream<SearchIdAndResultSize> toDelete = mySearchDao.findDeleted()) { 238 assert toDelete != null; 239 240 toDelete.forEach(nextSearchToDelete -> { 241 throwIfDeadlineExpired(); 242 243 deleteSearchAndResults(nextSearchToDelete.searchId, nextSearchToDelete.size); 244 245 deletedCounter.incrementAndGet(); 246 }); 247 } 248 249 // flush anything left in the buffers 250 flushSearchResultDeletes(); 251 flushSearchAndIncludeDeletes(); 252 253 int deletedCount = deletedCounter.get(); 254 if (deletedCount > 0) { 255 ourLog.debug("Deleted {} expired searches", deletedCount); 256 } 257 258 return deletedCount; 259 } 260 261 /** 262 * Schedule theSearchPid for deletion assuming it has theNumberOfResults SearchResults attached. 263 * <p> 264 * We accumulate a batch of search pids for deletion, and then do a bulk DML as we reach a threshold number 265 * of SearchResults. 266 * </p> 267 * 268 * @param theSearchPid pk of the Search 269 * @param theNumberOfResults the number of SearchResults attached 270 */ 271 private void deleteSearchAndResults(long theSearchPid, int theNumberOfResults) { 272 ourLog.trace("Buffering deletion of search pid {} and {} results", theSearchPid, theNumberOfResults); 273 274 myDeleteSearchBatch.add(theSearchPid); 275 276 if (theNumberOfResults > ourMaximumResultsToDeleteInOneCommit) { 277 // don't buffer this one - do it inline 278 deleteSearchResultsByChunk(theSearchPid, theNumberOfResults); 279 return; 280 } 281 myDeleteSearchResultsBatch.add(theSearchPid); 282 myDeleteSearchResultsBatchCount += theNumberOfResults; 283 284 if (myDeleteSearchResultsBatchCount > ourMaximumResultsToDeleteInOneCommit) { 285 flushSearchResultDeletes(); 286 } 287 288 if (myDeleteSearchBatch.size() > ourMaximumResultsToDeleteInOneStatement) { 289 // flush the results to make sure we don't have any references. 290 flushSearchResultDeletes(); 291 292 flushSearchAndIncludeDeletes(); 293 } 294 } 295 296 /** 297 * If this Search has more results than our max delete size, 298 * delete in by itself in range chunks. 299 * @param theSearchPid the target Search pid 300 * @param theNumberOfResults the number of search results present 301 */ 302 private void deleteSearchResultsByChunk(long theSearchPid, int theNumberOfResults) { 303 ourLog.debug( 304 "Search {} is large: has {} results. Deleting results in chunks.", 305 theSearchPid, 306 theNumberOfResults); 307 for (int rangeEnd = theNumberOfResults; rangeEnd >= 0; rangeEnd -= ourMaximumResultsToDeleteInOneCommit) { 308 int rangeStart = rangeEnd - ourMaximumResultsToDeleteInOneCommit; 309 ourLog.trace("Deleting results for search {}: {} - {}", theSearchPid, rangeStart, rangeEnd); 310 mySearchResultDao.deleteBySearchIdInRange(theSearchPid, rangeStart, rangeEnd); 311 commitOpenChanges(); 312 } 313 } 314 315 private void flushSearchAndIncludeDeletes() { 316 if (myDeleteSearchBatch.isEmpty()) { 317 return; 318 } 319 ourLog.debug("Deleting {} Search records", myDeleteSearchBatch.size()); 320 // referential integrity requires we delete includes before the search 321 mySearchIncludeDao.deleteForSearch(myDeleteSearchBatch); 322 mySearchDao.deleteByPids(myDeleteSearchBatch); 323 myDeleteSearchBatch.clear(); 324 commitOpenChanges(); 325 } 326 327 private void flushSearchResultDeletes() { 328 if (myDeleteSearchResultsBatch.isEmpty()) { 329 return; 330 } 331 ourLog.debug( 332 "Deleting {} Search Results from {} searches", 333 myDeleteSearchResultsBatchCount, 334 myDeleteSearchResultsBatch.size()); 335 mySearchResultDao.deleteBySearchIds(myDeleteSearchResultsBatch); 336 myDeleteSearchResultsBatch.clear(); 337 myDeleteSearchResultsBatchCount = 0; 338 commitOpenChanges(); 339 } 340 341 IExecutionBuilder getTxBuilder() { 342 return myTransactionService.withSystemRequest().withRequestPartitionId(myRequestPartitionId); 343 } 344 345 private void run() { 346 ourLog.debug("Searching for searches which are before {}", myCutoffForDeletion); 347 348 // this tx builder is not really for tx management. 349 // Instead, it is used bind a Hibernate session + connection to this thread. 350 // We will run a streaming query to look for work, and then commit changes in batches during the loops. 351 getTxBuilder().execute(theStatus -> { 352 try { 353 markDeletedInBatches(); 354 355 throwIfDeadlineExpired(); 356 357 // Delete searches that are marked as deleted 358 int deletedCount = deleteMarkedSearchesInBatches(); 359 360 throwIfDeadlineExpired(); 361 362 if ((ourLog.isDebugEnabled() || HapiSystemProperties.isTestModeEnabled()) && (deletedCount > 0)) { 363 Long total = mySearchDao.count(); 364 ourLog.debug("Deleted {} searches, {} remaining", deletedCount, total); 365 } 366 } catch (DeadlineException theTimeoutException) { 367 ourLog.warn(theTimeoutException.getMessage()); 368 } 369 370 return null; 371 }); 372 } 373 374 /** 375 * Stream through a list of pids before our cutoff, and set myDeleted=true in batches in a DML statement. 376 */ 377 private void markDeletedInBatches() { 378 379 try (Stream<Long> toMarkDeleted = 380 mySearchDao.findWhereCreatedBefore(myCutoffForDeletion, new Date(now()))) { 381 assert toMarkDeleted != null; 382 383 toMarkDeleted.forEach(nextSearchToDelete -> { 384 throwIfDeadlineExpired(); 385 386 if (myUpdateDeletedFlagBatch.size() >= ourMaximumResultsToDeleteInOneStatement) { 387 flushDeleteMarks(); 388 } 389 ourLog.trace("Marking search with PID {} as ready for deletion", nextSearchToDelete); 390 myUpdateDeletedFlagBatch.add(nextSearchToDelete); 391 }); 392 393 flushDeleteMarks(); 394 } 395 } 396 } 397 398 /** 399 * Marker to abandon our delete run when we are over time. 400 */ 401 private static class DeadlineException extends RuntimeException { 402 public DeadlineException(String message) { 403 super(message); 404 } 405 } 406 407 @Override 408 public void pollForStaleSearchesAndDeleteThem(RequestPartitionId theRequestPartitionId, Instant theDeadline) { 409 HapiTransactionService.noTransactionAllowed(); 410 411 if (!myStorageSettings.isExpireSearchResults()) { 412 return; 413 } 414 415 final Date cutoff = getCutoff(); 416 417 final DeleteRun run = new DeleteRun(theDeadline, cutoff, theRequestPartitionId); 418 419 run.run(); 420 } 421 422 @Nonnull 423 private Date getCutoff() { 424 long cutoffMillis = myStorageSettings.getExpireSearchResultsAfterMillis(); 425 if (myStorageSettings.getReuseCachedSearchResultsForMillis() != null) { 426 cutoffMillis = cutoffMillis + myStorageSettings.getReuseCachedSearchResultsForMillis(); 427 } 428 final Date cutoff = new Date((now() - cutoffMillis) - myCutoffSlack); 429 430 if (ourNowForUnitTests != null) { 431 ourLog.info( 432 "Searching for searches which are before {} - now is {}", 433 new InstantType(cutoff), 434 new InstantType(new Date(now()))); 435 } 436 return cutoff; 437 } 438 439 @VisibleForTesting 440 public static void setMaximumResultsToDeleteInOnePassForUnitTest(int theMaximumResultsToDeleteInOnePass) { 441 ourMaximumResultsToDeleteInOneCommit = theMaximumResultsToDeleteInOnePass; 442 } 443 444 @VisibleForTesting 445 public static void setMaximumResultsToDeleteInOneStatement(int theMaximumResultsToDelete) { 446 ourMaximumResultsToDeleteInOneStatement = theMaximumResultsToDelete; 447 } 448 449 /** 450 * This is for unit tests only, do not call otherwise 451 */ 452 @VisibleForTesting 453 public static void setNowForUnitTests(Long theNowForUnitTests) { 454 ourNowForUnitTests = theNowForUnitTests; 455 } 456 457 public static long now() { 458 if (ourNowForUnitTests != null) { 459 return ourNowForUnitTests; 460 } 461 return System.currentTimeMillis(); 462 } 463}