001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.search.reindex; 021 022import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; 023import ca.uhn.fhir.context.FhirContext; 024import ca.uhn.fhir.i18n.Msg; 025import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; 026import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao; 027import ca.uhn.fhir.jpa.dao.data.IResourceReindexJobDao; 028import ca.uhn.fhir.jpa.dao.data.IResourceTableDao; 029import ca.uhn.fhir.jpa.entity.ResourceReindexJobEntity; 030import ca.uhn.fhir.jpa.model.dao.JpaPid; 031import ca.uhn.fhir.jpa.model.entity.ResourceTable; 032import ca.uhn.fhir.jpa.model.sched.HapiJob; 033import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs; 034import ca.uhn.fhir.jpa.model.sched.ISchedulerService; 035import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; 036import ca.uhn.fhir.parser.DataFormatException; 037import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; 038import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException; 039import ca.uhn.fhir.rest.server.util.ISearchParamRegistry; 040import ca.uhn.fhir.util.StopWatch; 041import com.google.common.annotations.VisibleForTesting; 042import jakarta.annotation.Nullable; 043import jakarta.annotation.PostConstruct; 044import jakarta.persistence.EntityManager; 045import jakarta.persistence.PersistenceContext; 046import jakarta.persistence.PersistenceContextType; 047import jakarta.persistence.Query; 048import org.apache.commons.lang3.Validate; 049import org.apache.commons.lang3.concurrent.BasicThreadFactory; 050import org.apache.commons.lang3.time.DateUtils; 051import org.hl7.fhir.r4.model.InstantType; 052import org.quartz.JobExecutionContext; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055import org.springframework.beans.factory.annotation.Autowired; 056import org.springframework.data.domain.PageRequest; 057import org.springframework.data.domain.Slice; 058import org.springframework.transaction.PlatformTransactionManager; 059import org.springframework.transaction.TransactionDefinition; 060import org.springframework.transaction.annotation.Propagation; 061import org.springframework.transaction.annotation.Transactional; 062import org.springframework.transaction.support.TransactionCallback; 063import org.springframework.transaction.support.TransactionTemplate; 064 065import java.util.Collection; 066import java.util.Date; 067import java.util.List; 068import java.util.concurrent.Callable; 069import java.util.concurrent.Future; 070import java.util.concurrent.LinkedBlockingQueue; 071import java.util.concurrent.RejectedExecutionHandler; 072import java.util.concurrent.ThreadFactory; 073import java.util.concurrent.ThreadPoolExecutor; 074import java.util.concurrent.TimeUnit; 075import java.util.concurrent.atomic.AtomicInteger; 076import java.util.concurrent.locks.ReentrantLock; 077import java.util.stream.Collectors; 078 079import static org.apache.commons.lang3.StringUtils.isNotBlank; 080 081/** 082 * @deprecated Use the Batch2 {@link ca.uhn.fhir.batch2.api.IJobCoordinator#startInstance(JobInstanceStartRequest)} instead. 083 */ 084@SuppressWarnings({"removal", "DeprecatedIsStillUsed"}) 085@Deprecated 086public class ResourceReindexingSvcImpl implements IResourceReindexingSvc, IHasScheduledJobs { 087 088 private static final Date BEGINNING_OF_TIME = new Date(0); 089 private static final Logger ourLog = LoggerFactory.getLogger(ResourceReindexingSvcImpl.class); 090 private static final int PASS_SIZE = 25000; 091 private final ReentrantLock myIndexingLock = new ReentrantLock(); 092 093 @Autowired 094 private IResourceReindexJobDao myReindexJobDao; 095 096 @Autowired 097 private JpaStorageSettings myStorageSettings; 098 099 @Autowired 100 private PlatformTransactionManager myTxManager; 101 102 private TransactionTemplate myTxTemplate; 103 private final ThreadFactory myReindexingThreadFactory = 104 new BasicThreadFactory.Builder().namingPattern("ResourceReindex-%d").build(); 105 private ThreadPoolExecutor myTaskExecutor; 106 107 @Autowired 108 private IResourceTableDao myResourceTableDao; 109 110 @Autowired 111 private FhirContext myContext; 112 113 @PersistenceContext(type = PersistenceContextType.TRANSACTION) 114 private EntityManager myEntityManager; 115 116 @Autowired 117 private ISearchParamRegistry mySearchParamRegistry; 118 119 @Autowired 120 private ResourceReindexer myResourceReindexer; 121 122 @VisibleForTesting 123 void setStorageSettingsForUnitTest(JpaStorageSettings theStorageSettings) { 124 myStorageSettings = theStorageSettings; 125 } 126 127 @VisibleForTesting 128 void setContextForUnitTest(FhirContext theContext) { 129 myContext = theContext; 130 } 131 132 @PostConstruct 133 public void start() { 134 myTxTemplate = new TransactionTemplate(myTxManager); 135 initExecutor(); 136 } 137 138 public void initExecutor() { 139 // Create the threadpool executor used for reindex jobs 140 int reindexThreadCount = myStorageSettings.getReindexThreadCount(); 141 RejectedExecutionHandler rejectHandler = new BlockPolicy(); 142 myTaskExecutor = new ThreadPoolExecutor( 143 0, 144 reindexThreadCount, 145 0L, 146 TimeUnit.MILLISECONDS, 147 new LinkedBlockingQueue<>(100), 148 myReindexingThreadFactory, 149 rejectHandler); 150 } 151 152 @Override 153 public void scheduleJobs(ISchedulerService theSchedulerService) { 154 ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); 155 jobDetail.setId(getClass().getName()); 156 jobDetail.setJobClass(Job.class); 157 theSchedulerService.scheduleClusteredJob(10 * DateUtils.MILLIS_PER_SECOND, jobDetail); 158 } 159 160 @Override 161 @Transactional(propagation = Propagation.REQUIRED) 162 public Long markAllResourcesForReindexing() { 163 return markAllResourcesForReindexing(null); 164 } 165 166 @Override 167 @Transactional(propagation = Propagation.REQUIRED) 168 public Long markAllResourcesForReindexing(String theType) { 169 170 String typeDesc; 171 if (isNotBlank(theType)) { 172 try { 173 myContext.getResourceType(theType); 174 } catch (DataFormatException e) { 175 throw new InvalidRequestException(Msg.code(1170) + "Unknown resource type: " + theType); 176 } 177 myReindexJobDao.markAllOfTypeAsDeleted(theType); 178 typeDesc = theType; 179 } else { 180 myReindexJobDao.markAllOfTypeAsDeleted(); 181 typeDesc = "(any)"; 182 } 183 184 ResourceReindexJobEntity job = new ResourceReindexJobEntity(); 185 job.setResourceType(theType); 186 job.setThresholdHigh(DateUtils.addMinutes(new Date(), 5)); 187 job = myReindexJobDao.saveAndFlush(job); 188 189 ourLog.info("Marking all resources of type {} for reindexing - Got job ID[{}]", typeDesc, job.getId()); 190 return job.getId(); 191 } 192 193 public static class Job implements HapiJob { 194 @Autowired 195 private IResourceReindexingSvc myTarget; 196 197 @Override 198 public void execute(JobExecutionContext theContext) { 199 myTarget.runReindexingPass(); 200 } 201 } 202 203 @VisibleForTesting 204 ReentrantLock getIndexingLockForUnitTest() { 205 return myIndexingLock; 206 } 207 208 @Override 209 @Transactional(propagation = Propagation.NEVER) 210 public Integer runReindexingPass() { 211 if (myStorageSettings.isSchedulingDisabled() || !myStorageSettings.isEnableTaskPreExpandValueSets()) { 212 return null; 213 } 214 if (myIndexingLock.tryLock()) { 215 try { 216 return doReindexingPassInsideLock(); 217 } finally { 218 myIndexingLock.unlock(); 219 } 220 } 221 return null; 222 } 223 224 private int doReindexingPassInsideLock() { 225 expungeJobsMarkedAsDeleted(); 226 return runReindexJobs(); 227 } 228 229 @Override 230 public int forceReindexingPass() { 231 myIndexingLock.lock(); 232 try { 233 return doReindexingPassInsideLock(); 234 } finally { 235 myIndexingLock.unlock(); 236 } 237 } 238 239 @Override 240 public void cancelAndPurgeAllJobs() { 241 ourLog.info("Cancelling and purging all resource reindexing jobs"); 242 myIndexingLock.lock(); 243 try { 244 myTxTemplate.execute(t -> { 245 myReindexJobDao.markAllOfTypeAsDeleted(); 246 return null; 247 }); 248 249 myTaskExecutor.shutdown(); 250 initExecutor(); 251 252 expungeJobsMarkedAsDeleted(); 253 } finally { 254 myIndexingLock.unlock(); 255 } 256 } 257 258 private int runReindexJobs() { 259 Collection<ResourceReindexJobEntity> jobs = getResourceReindexJobEntities(); 260 261 if (!jobs.isEmpty()) { 262 ourLog.info("Running {} reindex jobs: {}", jobs.size(), jobs); 263 } else { 264 ourLog.debug("Running 0 reindex jobs"); 265 return 0; 266 } 267 268 int count = 0; 269 for (ResourceReindexJobEntity next : jobs) { 270 271 if (next.getThresholdLow() != null 272 && next.getThresholdLow().getTime() 273 >= next.getThresholdHigh().getTime()) { 274 markJobAsDeleted(next); 275 continue; 276 } 277 278 count += runReindexJob(next); 279 } 280 return count; 281 } 282 283 @Override 284 public int countReindexJobs() { 285 return getResourceReindexJobEntities().size(); 286 } 287 288 private Collection<ResourceReindexJobEntity> getResourceReindexJobEntities() { 289 Collection<ResourceReindexJobEntity> jobs = 290 myTxTemplate.execute(t -> myReindexJobDao.findAll(PageRequest.of(0, 10), false)); 291 assert jobs != null; 292 return jobs; 293 } 294 295 private void markJobAsDeleted(ResourceReindexJobEntity theJob) { 296 ourLog.info("Marking reindexing job ID[{}] as deleted", theJob.getId()); 297 myTxTemplate.execute(t -> { 298 myReindexJobDao.markAsDeletedById(theJob.getId()); 299 return null; 300 }); 301 } 302 303 @VisibleForTesting 304 public void setResourceReindexerForUnitTest(ResourceReindexer theResourceReindexer) { 305 myResourceReindexer = theResourceReindexer; 306 } 307 308 private int runReindexJob(ResourceReindexJobEntity theJob) { 309 if (theJob.getSuspendedUntil() != null) { 310 if (theJob.getSuspendedUntil().getTime() > System.currentTimeMillis()) { 311 return 0; 312 } 313 } 314 315 ourLog.info("Performing reindex pass for JOB[{}]", theJob.getId()); 316 StopWatch sw = new StopWatch(); 317 AtomicInteger counter = new AtomicInteger(); 318 319 /* 320 * On the first time we run a particular reindex job, let's make sure we 321 * have the latest search parameters loaded. A common reason to 322 * be reindexing is that the search parameters have changed in some way, so 323 * this makes sure we're on the latest versions 324 */ 325 if (theJob.getThresholdLow() == null) { 326 mySearchParamRegistry.forceRefresh(); 327 } 328 329 // Calculate range 330 Date low = theJob.getThresholdLow() != null ? theJob.getThresholdLow() : BEGINNING_OF_TIME; 331 Date high = theJob.getThresholdHigh(); 332 333 // Query for resources within threshold 334 StopWatch pageSw = new StopWatch(); 335 Slice<Long> range = myTxTemplate.execute(t -> { 336 PageRequest page = PageRequest.of(0, PASS_SIZE); 337 if (isNotBlank(theJob.getResourceType())) { 338 return myResourceTableDao.findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest( 339 page, theJob.getResourceType(), low, high); 340 } else { 341 return myResourceTableDao.findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest(page, low, high); 342 } 343 }); 344 Validate.notNull(range); 345 int count = range.getNumberOfElements(); 346 ourLog.info("Loaded {} resources for reindexing in {}", count, pageSw); 347 348 // If we didn't find any results at all, mark as deleted 349 if (count == 0) { 350 markJobAsDeleted(theJob); 351 return 0; 352 } 353 354 // Submit each resource requiring reindexing 355 List<Future<Date>> futures = range.stream() 356 .map(t -> myTaskExecutor.submit(new ResourceReindexingTask(JpaPid.fromId(t), counter))) 357 .collect(Collectors.toList()); 358 359 Date latestDate = null; 360 for (Future<Date> next : futures) { 361 Date nextDate; 362 try { 363 nextDate = next.get(); 364 } catch (Exception e) { 365 ourLog.error("Failure reindexing", e); 366 Date suspendedUntil = DateUtils.addMinutes(new Date(), 1); 367 myTxTemplate.execute(t -> { 368 myReindexJobDao.setSuspendedUntil(suspendedUntil); 369 return null; 370 }); 371 return counter.get(); 372 } 373 374 if (nextDate != null) { 375 if (latestDate == null || latestDate.getTime() < nextDate.getTime()) { 376 latestDate = new Date(nextDate.getTime()); 377 } 378 } 379 } 380 381 Validate.notNull(latestDate); 382 Date newLow; 383 if (latestDate.getTime() == low.getTime()) { 384 if (count == PASS_SIZE) { 385 // Just in case we end up in some sort of infinite loop. This shouldn't happen, and couldn't really 386 // happen unless there were 10000 resources with the exact same update time down to the 387 // millisecond. 388 ourLog.error( 389 "Final pass time for reindex JOB[{}] has same ending low value: {}", 390 theJob.getId(), 391 latestDate); 392 } 393 394 newLow = new Date(latestDate.getTime() + 1); 395 } else { 396 newLow = latestDate; 397 } 398 399 myTxTemplate.execute(t -> { 400 myReindexJobDao.setThresholdLow(theJob.getId(), newLow); 401 Integer existingCount = 402 myReindexJobDao.getReindexCount(theJob.getId()).orElse(0); 403 int newCount = existingCount + counter.get(); 404 myReindexJobDao.setReindexCount(theJob.getId(), newCount); 405 return null; 406 }); 407 408 ourLog.info( 409 "Completed pass of reindex JOB[{}] - Indexed {} resources in {} ({} / sec) - Have indexed until: {}", 410 theJob.getId(), 411 count, 412 sw, 413 sw.formatThroughput(count, TimeUnit.SECONDS), 414 new InstantType(newLow)); 415 return counter.get(); 416 } 417 418 private void expungeJobsMarkedAsDeleted() { 419 myTxTemplate.execute(t -> { 420 Collection<ResourceReindexJobEntity> toDelete = myReindexJobDao.findAll(PageRequest.of(0, 10), true); 421 toDelete.forEach(job -> { 422 ourLog.info("Purging deleted job[{}]", job.getId()); 423 myReindexJobDao.deleteById(job.getId()); 424 }); 425 return null; 426 }); 427 } 428 429 private void markResourceAsIndexingFailed(final JpaPid theId) { 430 TransactionTemplate txTemplate = new TransactionTemplate(myTxManager); 431 txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); 432 txTemplate.execute((TransactionCallback<Void>) theStatus -> { 433 ourLog.info("Marking resource with PID {} as indexing_failed", theId); 434 435 myResourceTableDao.updateIndexStatus(theId.getId(), BaseHapiFhirDao.INDEX_STATUS_INDEXING_FAILED); 436 437 Query q = myEntityManager.createQuery("DELETE FROM ResourceTag t WHERE t.myResource.myId = :id"); 438 q.setParameter("id", theId.getId()); 439 q.executeUpdate(); 440 441 q = myEntityManager.createQuery( 442 "DELETE FROM ResourceIndexedSearchParamCoords t WHERE t.myResource.myId = :id"); 443 q.setParameter("id", theId.getId()); 444 q.executeUpdate(); 445 446 q = myEntityManager.createQuery( 447 "DELETE FROM ResourceIndexedSearchParamDate t WHERE t.myResource.myId = :id"); 448 q.setParameter("id", theId.getId()); 449 q.executeUpdate(); 450 451 q = myEntityManager.createQuery( 452 "DELETE FROM ResourceIndexedSearchParamNumber t WHERE t.myResource.myId = :id"); 453 q.setParameter("id", theId.getId()); 454 q.executeUpdate(); 455 456 q = myEntityManager.createQuery( 457 "DELETE FROM ResourceIndexedSearchParamQuantity t WHERE t.myResource.myId = :id"); 458 q.setParameter("id", theId.getId()); 459 q.executeUpdate(); 460 461 q = myEntityManager.createQuery( 462 "DELETE FROM ResourceIndexedSearchParamQuantityNormalized t WHERE t.myResource.myId = :id"); 463 q.setParameter("id", theId.getId()); 464 q.executeUpdate(); 465 466 q = myEntityManager.createQuery( 467 "DELETE FROM ResourceIndexedSearchParamString t WHERE t.myResource.myId = :id"); 468 q.setParameter("id", theId.getId()); 469 q.executeUpdate(); 470 471 q = myEntityManager.createQuery( 472 "DELETE FROM ResourceIndexedSearchParamToken t WHERE t.myResource.myId = :id"); 473 q.setParameter("id", theId.getId()); 474 q.executeUpdate(); 475 476 q = myEntityManager.createQuery( 477 "DELETE FROM ResourceIndexedSearchParamUri t WHERE t.myResource.myId = :id"); 478 q.setParameter("id", theId.getId()); 479 q.executeUpdate(); 480 481 q = myEntityManager.createQuery("DELETE FROM ResourceLink t WHERE t.mySourceResource.myId = :id"); 482 q.setParameter("id", theId.getId()); 483 q.executeUpdate(); 484 485 q = myEntityManager.createQuery("DELETE FROM ResourceLink t WHERE t.myTargetResource.myId = :id"); 486 q.setParameter("id", theId.getId()); 487 q.executeUpdate(); 488 489 return null; 490 }); 491 } 492 493 private class ResourceReindexingTask implements Callable<Date> { 494 private final JpaPid myNextId; 495 private final AtomicInteger myCounter; 496 private Date myUpdated; 497 498 ResourceReindexingTask(JpaPid theNextId, AtomicInteger theCounter) { 499 myNextId = theNextId; 500 myCounter = theCounter; 501 } 502 503 @Override 504 public Date call() { 505 Throwable reindexFailure; 506 507 try { 508 reindexFailure = readResourceAndReindex(); 509 } catch (ResourceVersionConflictException e) { 510 /* 511 * We reindex in multiple threads, so it's technically possible that two threads try 512 * to index resources that cause a constraint error now (i.e. because a unique index has been 513 * added that didn't previously exist). In this case, one of the threads would succeed and 514 * not get this error, so we'll let the other one fail and try 515 * again later. 516 */ 517 ourLog.info( 518 "Failed to reindex because of a version conflict. Leaving in unindexed state: {}", 519 e.getMessage()); 520 reindexFailure = null; 521 } 522 523 if (reindexFailure != null) { 524 ourLog.info("Setting resource PID[{}] status to ERRORED", myNextId); 525 markResourceAsIndexingFailed(myNextId); 526 } 527 528 return myUpdated; 529 } 530 531 @Nullable 532 private Throwable readResourceAndReindex() { 533 Throwable reindexFailure; 534 reindexFailure = myTxTemplate.execute(t -> { 535 ResourceTable resourceTable = 536 myResourceTableDao.findById(myNextId.getId()).orElseThrow(IllegalStateException::new); 537 myUpdated = resourceTable.getUpdatedDate(); 538 539 try { 540 myResourceReindexer.reindexResourceEntity(resourceTable); 541 myCounter.incrementAndGet(); 542 return null; 543 544 } catch (Exception e) { 545 ourLog.error("Failed to index resource {}: {}", resourceTable.getIdDt(), e, e); 546 t.setRollbackOnly(); 547 return e; 548 } 549 }); 550 return reindexFailure; 551 } 552 } 553}