001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.search.reindex; 021 022import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; 023import ca.uhn.fhir.context.FhirContext; 024import ca.uhn.fhir.i18n.Msg; 025import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; 026import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 027import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao; 028import ca.uhn.fhir.jpa.dao.data.IResourceReindexJobDao; 029import ca.uhn.fhir.jpa.dao.data.IResourceTableDao; 030import ca.uhn.fhir.jpa.entity.ResourceReindexJobEntity; 031import ca.uhn.fhir.jpa.model.entity.ResourceTable; 032import ca.uhn.fhir.jpa.model.sched.HapiJob; 033import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs; 034import ca.uhn.fhir.jpa.model.sched.ISchedulerService; 035import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; 036import ca.uhn.fhir.parser.DataFormatException; 037import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; 038import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException; 039import ca.uhn.fhir.rest.server.util.ISearchParamRegistry; 040import ca.uhn.fhir.util.StopWatch; 041import com.google.common.annotations.VisibleForTesting; 042import jakarta.annotation.Nullable; 043import jakarta.annotation.PostConstruct; 044import jakarta.persistence.EntityManager; 045import jakarta.persistence.PersistenceContext; 046import jakarta.persistence.PersistenceContextType; 047import jakarta.persistence.Query; 048import org.apache.commons.lang3.Validate; 049import org.apache.commons.lang3.concurrent.BasicThreadFactory; 050import org.apache.commons.lang3.time.DateUtils; 051import org.hl7.fhir.r4.model.InstantType; 052import org.quartz.JobExecutionContext; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055import org.springframework.beans.factory.annotation.Autowired; 056import org.springframework.data.domain.PageRequest; 057import org.springframework.data.domain.Slice; 058import org.springframework.transaction.PlatformTransactionManager; 059import org.springframework.transaction.TransactionDefinition; 060import org.springframework.transaction.annotation.Propagation; 061import org.springframework.transaction.annotation.Transactional; 062import org.springframework.transaction.support.TransactionCallback; 063import org.springframework.transaction.support.TransactionTemplate; 064 065import java.util.Collection; 066import java.util.Date; 067import java.util.List; 068import java.util.concurrent.Callable; 069import java.util.concurrent.Future; 070import java.util.concurrent.LinkedBlockingQueue; 071import java.util.concurrent.RejectedExecutionHandler; 072import java.util.concurrent.ThreadFactory; 073import java.util.concurrent.ThreadPoolExecutor; 074import java.util.concurrent.TimeUnit; 075import java.util.concurrent.atomic.AtomicInteger; 076import java.util.concurrent.locks.ReentrantLock; 077import java.util.stream.Collectors; 078 079import static org.apache.commons.lang3.StringUtils.isNotBlank; 080 081/** 082 * @see ca.uhn.fhir.jpa.reindex.job.ReindexJobConfig 083 * @deprecated Use the Batch2 {@link ca.uhn.fhir.batch2.api.IJobCoordinator#startInstance(JobInstanceStartRequest)} instead. 084 */ 085@Deprecated 086public class ResourceReindexingSvcImpl implements IResourceReindexingSvc, IHasScheduledJobs { 087 088 private static final Date BEGINNING_OF_TIME = new Date(0); 089 private static final Logger ourLog = LoggerFactory.getLogger(ResourceReindexingSvcImpl.class); 090 private static final int PASS_SIZE = 25000; 091 private final ReentrantLock myIndexingLock = new ReentrantLock(); 092 093 @Autowired 094 private IResourceReindexJobDao myReindexJobDao; 095 096 @Autowired 097 private JpaStorageSettings myStorageSettings; 098 099 @Autowired 100 private PlatformTransactionManager myTxManager; 101 102 private TransactionTemplate myTxTemplate; 103 private final ThreadFactory myReindexingThreadFactory = 104 new BasicThreadFactory.Builder().namingPattern("ResourceReindex-%d").build(); 105 private ThreadPoolExecutor myTaskExecutor; 106 107 @Autowired 108 private IResourceTableDao myResourceTableDao; 109 110 @Autowired 111 private DaoRegistry myDaoRegistry; 112 113 @Autowired 114 private FhirContext myContext; 115 116 @PersistenceContext(type = PersistenceContextType.TRANSACTION) 117 private EntityManager myEntityManager; 118 119 @Autowired 120 private ISearchParamRegistry mySearchParamRegistry; 121 122 @Autowired 123 private ResourceReindexer myResourceReindexer; 124 125 @VisibleForTesting 126 void setStorageSettingsForUnitTest(JpaStorageSettings theStorageSettings) { 127 myStorageSettings = theStorageSettings; 128 } 129 130 @VisibleForTesting 131 void setContextForUnitTest(FhirContext theContext) { 132 myContext = theContext; 133 } 134 135 @PostConstruct 136 public void start() { 137 myTxTemplate = new TransactionTemplate(myTxManager); 138 initExecutor(); 139 } 140 141 public void initExecutor() { 142 // Create the threadpool executor used for reindex jobs 143 int reindexThreadCount = myStorageSettings.getReindexThreadCount(); 144 RejectedExecutionHandler rejectHandler = new BlockPolicy(); 145 myTaskExecutor = new ThreadPoolExecutor( 146 0, 147 reindexThreadCount, 148 0L, 149 TimeUnit.MILLISECONDS, 150 new LinkedBlockingQueue<>(100), 151 myReindexingThreadFactory, 152 rejectHandler); 153 } 154 155 @Override 156 public void scheduleJobs(ISchedulerService theSchedulerService) { 157 ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); 158 jobDetail.setId(getClass().getName()); 159 jobDetail.setJobClass(Job.class); 160 theSchedulerService.scheduleClusteredJob(10 * DateUtils.MILLIS_PER_SECOND, jobDetail); 161 } 162 163 @Override 164 @Transactional(propagation = Propagation.REQUIRED) 165 public Long markAllResourcesForReindexing() { 166 return markAllResourcesForReindexing(null); 167 } 168 169 @Override 170 @Transactional(propagation = Propagation.REQUIRED) 171 public Long markAllResourcesForReindexing(String theType) { 172 173 String typeDesc; 174 if (isNotBlank(theType)) { 175 try { 176 myContext.getResourceType(theType); 177 } catch (DataFormatException e) { 178 throw new InvalidRequestException(Msg.code(1170) + "Unknown resource type: " + theType); 179 } 180 myReindexJobDao.markAllOfTypeAsDeleted(theType); 181 typeDesc = theType; 182 } else { 183 myReindexJobDao.markAllOfTypeAsDeleted(); 184 typeDesc = "(any)"; 185 } 186 187 ResourceReindexJobEntity job = new ResourceReindexJobEntity(); 188 job.setResourceType(theType); 189 job.setThresholdHigh(DateUtils.addMinutes(new Date(), 5)); 190 job = myReindexJobDao.saveAndFlush(job); 191 192 ourLog.info("Marking all resources of type {} for reindexing - Got job ID[{}]", typeDesc, job.getId()); 193 return job.getId(); 194 } 195 196 public static class Job implements HapiJob { 197 @Autowired 198 private IResourceReindexingSvc myTarget; 199 200 @Override 201 public void execute(JobExecutionContext theContext) { 202 myTarget.runReindexingPass(); 203 } 204 } 205 206 @VisibleForTesting 207 ReentrantLock getIndexingLockForUnitTest() { 208 return myIndexingLock; 209 } 210 211 @Override 212 @Transactional(propagation = Propagation.NEVER) 213 public Integer runReindexingPass() { 214 if (myStorageSettings.isSchedulingDisabled() || !myStorageSettings.isEnableTaskPreExpandValueSets()) { 215 return null; 216 } 217 if (myIndexingLock.tryLock()) { 218 try { 219 return doReindexingPassInsideLock(); 220 } finally { 221 myIndexingLock.unlock(); 222 } 223 } 224 return null; 225 } 226 227 private int doReindexingPassInsideLock() { 228 expungeJobsMarkedAsDeleted(); 229 return runReindexJobs(); 230 } 231 232 @Override 233 public int forceReindexingPass() { 234 myIndexingLock.lock(); 235 try { 236 return doReindexingPassInsideLock(); 237 } finally { 238 myIndexingLock.unlock(); 239 } 240 } 241 242 @Override 243 public void cancelAndPurgeAllJobs() { 244 ourLog.info("Cancelling and purging all resource reindexing jobs"); 245 myIndexingLock.lock(); 246 try { 247 myTxTemplate.execute(t -> { 248 myReindexJobDao.markAllOfTypeAsDeleted(); 249 return null; 250 }); 251 252 myTaskExecutor.shutdown(); 253 initExecutor(); 254 255 expungeJobsMarkedAsDeleted(); 256 } finally { 257 myIndexingLock.unlock(); 258 } 259 } 260 261 private int runReindexJobs() { 262 Collection<ResourceReindexJobEntity> jobs = getResourceReindexJobEntities(); 263 264 if (jobs.size() > 0) { 265 ourLog.info("Running {} reindex jobs: {}", jobs.size(), jobs); 266 } else { 267 ourLog.debug("Running {} reindex jobs: {}", jobs.size(), jobs); 268 return 0; 269 } 270 271 int count = 0; 272 for (ResourceReindexJobEntity next : jobs) { 273 274 if (next.getThresholdLow() != null 275 && next.getThresholdLow().getTime() 276 >= next.getThresholdHigh().getTime()) { 277 markJobAsDeleted(next); 278 continue; 279 } 280 281 count += runReindexJob(next); 282 } 283 return count; 284 } 285 286 @Override 287 public int countReindexJobs() { 288 return getResourceReindexJobEntities().size(); 289 } 290 291 private Collection<ResourceReindexJobEntity> getResourceReindexJobEntities() { 292 Collection<ResourceReindexJobEntity> jobs = 293 myTxTemplate.execute(t -> myReindexJobDao.findAll(PageRequest.of(0, 10), false)); 294 assert jobs != null; 295 return jobs; 296 } 297 298 private void markJobAsDeleted(ResourceReindexJobEntity theJob) { 299 ourLog.info("Marking reindexing job ID[{}] as deleted", theJob.getId()); 300 myTxTemplate.execute(t -> { 301 myReindexJobDao.markAsDeletedById(theJob.getId()); 302 return null; 303 }); 304 } 305 306 @VisibleForTesting 307 public void setResourceReindexerForUnitTest(ResourceReindexer theResourceReindexer) { 308 myResourceReindexer = theResourceReindexer; 309 } 310 311 private int runReindexJob(ResourceReindexJobEntity theJob) { 312 if (theJob.getSuspendedUntil() != null) { 313 if (theJob.getSuspendedUntil().getTime() > System.currentTimeMillis()) { 314 return 0; 315 } 316 } 317 318 ourLog.info("Performing reindex pass for JOB[{}]", theJob.getId()); 319 StopWatch sw = new StopWatch(); 320 AtomicInteger counter = new AtomicInteger(); 321 322 /* 323 * On the first time we run a particular reindex job, let's make sure we 324 * have the latest search parameters loaded. A common reason to 325 * be reindexing is that the search parameters have changed in some way, so 326 * this makes sure we're on the latest versions 327 */ 328 if (theJob.getThresholdLow() == null) { 329 mySearchParamRegistry.forceRefresh(); 330 } 331 332 // Calculate range 333 Date low = theJob.getThresholdLow() != null ? theJob.getThresholdLow() : BEGINNING_OF_TIME; 334 Date high = theJob.getThresholdHigh(); 335 336 // Query for resources within threshold 337 StopWatch pageSw = new StopWatch(); 338 Slice<Long> range = myTxTemplate.execute(t -> { 339 PageRequest page = PageRequest.of(0, PASS_SIZE); 340 if (isNotBlank(theJob.getResourceType())) { 341 return myResourceTableDao.findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest( 342 page, theJob.getResourceType(), low, high); 343 } else { 344 return myResourceTableDao.findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest(page, low, high); 345 } 346 }); 347 Validate.notNull(range); 348 int count = range.getNumberOfElements(); 349 ourLog.info("Loaded {} resources for reindexing in {}", count, pageSw); 350 351 // If we didn't find any results at all, mark as deleted 352 if (count == 0) { 353 markJobAsDeleted(theJob); 354 return 0; 355 } 356 357 // Submit each resource requiring reindexing 358 List<Future<Date>> futures = range.stream() 359 .map(t -> myTaskExecutor.submit(new ResourceReindexingTask(t, counter))) 360 .collect(Collectors.toList()); 361 362 Date latestDate = null; 363 for (Future<Date> next : futures) { 364 Date nextDate; 365 try { 366 nextDate = next.get(); 367 } catch (Exception e) { 368 ourLog.error("Failure reindexing", e); 369 Date suspendedUntil = DateUtils.addMinutes(new Date(), 1); 370 myTxTemplate.execute(t -> { 371 myReindexJobDao.setSuspendedUntil(suspendedUntil); 372 return null; 373 }); 374 return counter.get(); 375 } 376 377 if (nextDate != null) { 378 if (latestDate == null || latestDate.getTime() < nextDate.getTime()) { 379 latestDate = new Date(nextDate.getTime()); 380 } 381 } 382 } 383 384 Validate.notNull(latestDate); 385 Date newLow; 386 if (latestDate.getTime() == low.getTime()) { 387 if (count == PASS_SIZE) { 388 // Just in case we end up in some sort of infinite loop. This shouldn't happen, and couldn't really 389 // happen unless there were 10000 resources with the exact same update time down to the 390 // millisecond. 391 ourLog.error( 392 "Final pass time for reindex JOB[{}] has same ending low value: {}", 393 theJob.getId(), 394 latestDate); 395 } 396 397 newLow = new Date(latestDate.getTime() + 1); 398 } else { 399 newLow = latestDate; 400 } 401 402 myTxTemplate.execute(t -> { 403 myReindexJobDao.setThresholdLow(theJob.getId(), newLow); 404 Integer existingCount = 405 myReindexJobDao.getReindexCount(theJob.getId()).orElse(0); 406 int newCount = existingCount + counter.get(); 407 myReindexJobDao.setReindexCount(theJob.getId(), newCount); 408 return null; 409 }); 410 411 ourLog.info( 412 "Completed pass of reindex JOB[{}] - Indexed {} resources in {} ({} / sec) - Have indexed until: {}", 413 theJob.getId(), 414 count, 415 sw, 416 sw.formatThroughput(count, TimeUnit.SECONDS), 417 new InstantType(newLow)); 418 return counter.get(); 419 } 420 421 private void expungeJobsMarkedAsDeleted() { 422 myTxTemplate.execute(t -> { 423 Collection<ResourceReindexJobEntity> toDelete = myReindexJobDao.findAll(PageRequest.of(0, 10), true); 424 toDelete.forEach(job -> { 425 ourLog.info("Purging deleted job[{}]", job.getId()); 426 myReindexJobDao.deleteById(job.getId()); 427 }); 428 return null; 429 }); 430 } 431 432 private void markResourceAsIndexingFailed(final long theId) { 433 TransactionTemplate txTemplate = new TransactionTemplate(myTxManager); 434 txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); 435 txTemplate.execute((TransactionCallback<Void>) theStatus -> { 436 ourLog.info("Marking resource with PID {} as indexing_failed", theId); 437 438 myResourceTableDao.updateIndexStatus(theId, BaseHapiFhirDao.INDEX_STATUS_INDEXING_FAILED); 439 440 Query q = myEntityManager.createQuery("DELETE FROM ResourceTag t WHERE t.myResourceId = :id"); 441 q.setParameter("id", theId); 442 q.executeUpdate(); 443 444 q = myEntityManager.createQuery( 445 "DELETE FROM ResourceIndexedSearchParamCoords t WHERE t.myResourcePid = :id"); 446 q.setParameter("id", theId); 447 q.executeUpdate(); 448 449 q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamDate t WHERE t.myResourcePid = :id"); 450 q.setParameter("id", theId); 451 q.executeUpdate(); 452 453 q = myEntityManager.createQuery( 454 "DELETE FROM ResourceIndexedSearchParamNumber t WHERE t.myResourcePid = :id"); 455 q.setParameter("id", theId); 456 q.executeUpdate(); 457 458 q = myEntityManager.createQuery( 459 "DELETE FROM ResourceIndexedSearchParamQuantity t WHERE t.myResourcePid = :id"); 460 q.setParameter("id", theId); 461 q.executeUpdate(); 462 463 q = myEntityManager.createQuery( 464 "DELETE FROM ResourceIndexedSearchParamQuantityNormalized t WHERE t.myResourcePid = :id"); 465 q.setParameter("id", theId); 466 q.executeUpdate(); 467 468 q = myEntityManager.createQuery( 469 "DELETE FROM ResourceIndexedSearchParamString t WHERE t.myResourcePid = :id"); 470 q.setParameter("id", theId); 471 q.executeUpdate(); 472 473 q = myEntityManager.createQuery( 474 "DELETE FROM ResourceIndexedSearchParamToken t WHERE t.myResourcePid = :id"); 475 q.setParameter("id", theId); 476 q.executeUpdate(); 477 478 q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamUri t WHERE t.myResourcePid = :id"); 479 q.setParameter("id", theId); 480 q.executeUpdate(); 481 482 q = myEntityManager.createQuery("DELETE FROM ResourceLink t WHERE t.mySourceResourcePid = :id"); 483 q.setParameter("id", theId); 484 q.executeUpdate(); 485 486 q = myEntityManager.createQuery("DELETE FROM ResourceLink t WHERE t.myTargetResourcePid = :id"); 487 q.setParameter("id", theId); 488 q.executeUpdate(); 489 490 return null; 491 }); 492 } 493 494 private class ResourceReindexingTask implements Callable<Date> { 495 private final Long myNextId; 496 private final AtomicInteger myCounter; 497 private Date myUpdated; 498 499 ResourceReindexingTask(Long theNextId, AtomicInteger theCounter) { 500 myNextId = theNextId; 501 myCounter = theCounter; 502 } 503 504 @Override 505 public Date call() { 506 Throwable reindexFailure; 507 508 try { 509 reindexFailure = readResourceAndReindex(); 510 } catch (ResourceVersionConflictException e) { 511 /* 512 * We reindex in multiple threads, so it's technically possible that two threads try 513 * to index resources that cause a constraint error now (i.e. because a unique index has been 514 * added that didn't previously exist). In this case, one of the threads would succeed and 515 * not get this error, so we'll let the other one fail and try 516 * again later. 517 */ 518 ourLog.info( 519 "Failed to reindex because of a version conflict. Leaving in unindexed state: {}", 520 e.getMessage()); 521 reindexFailure = null; 522 } 523 524 if (reindexFailure != null) { 525 ourLog.info("Setting resource PID[{}] status to ERRORED", myNextId); 526 markResourceAsIndexingFailed(myNextId); 527 } 528 529 return myUpdated; 530 } 531 532 @Nullable 533 private Throwable readResourceAndReindex() { 534 Throwable reindexFailure; 535 reindexFailure = myTxTemplate.execute(t -> { 536 ResourceTable resourceTable = 537 myResourceTableDao.findById(myNextId).orElseThrow(IllegalStateException::new); 538 myUpdated = resourceTable.getUpdatedDate(); 539 540 try { 541 myResourceReindexer.reindexResourceEntity(resourceTable); 542 myCounter.incrementAndGet(); 543 return null; 544 545 } catch (Exception e) { 546 ourLog.error("Failed to index resource {}: {}", resourceTable.getIdDt(), e, e); 547 t.setRollbackOnly(); 548 return e; 549 } 550 }); 551 return reindexFailure; 552 } 553 } 554}