001/*-
002 * #%L
003 * HAPI FHIR JPA Server
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.search.reindex;
021
022import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
023import ca.uhn.fhir.context.FhirContext;
024import ca.uhn.fhir.i18n.Msg;
025import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
026import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
027import ca.uhn.fhir.jpa.dao.data.IResourceReindexJobDao;
028import ca.uhn.fhir.jpa.dao.data.IResourceTableDao;
029import ca.uhn.fhir.jpa.entity.ResourceReindexJobEntity;
030import ca.uhn.fhir.jpa.model.dao.JpaPid;
031import ca.uhn.fhir.jpa.model.entity.ResourceTable;
032import ca.uhn.fhir.jpa.model.sched.HapiJob;
033import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs;
034import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
035import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
036import ca.uhn.fhir.parser.DataFormatException;
037import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
038import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
039import ca.uhn.fhir.rest.server.util.ISearchParamRegistry;
040import ca.uhn.fhir.util.StopWatch;
041import com.google.common.annotations.VisibleForTesting;
042import jakarta.annotation.Nullable;
043import jakarta.annotation.PostConstruct;
044import jakarta.persistence.EntityManager;
045import jakarta.persistence.PersistenceContext;
046import jakarta.persistence.PersistenceContextType;
047import jakarta.persistence.Query;
048import org.apache.commons.lang3.Validate;
049import org.apache.commons.lang3.concurrent.BasicThreadFactory;
050import org.apache.commons.lang3.time.DateUtils;
051import org.hl7.fhir.r4.model.InstantType;
052import org.quartz.JobExecutionContext;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055import org.springframework.beans.factory.annotation.Autowired;
056import org.springframework.data.domain.PageRequest;
057import org.springframework.data.domain.Slice;
058import org.springframework.transaction.PlatformTransactionManager;
059import org.springframework.transaction.TransactionDefinition;
060import org.springframework.transaction.annotation.Propagation;
061import org.springframework.transaction.annotation.Transactional;
062import org.springframework.transaction.support.TransactionCallback;
063import org.springframework.transaction.support.TransactionTemplate;
064
065import java.util.Collection;
066import java.util.Date;
067import java.util.List;
068import java.util.concurrent.Callable;
069import java.util.concurrent.Future;
070import java.util.concurrent.LinkedBlockingQueue;
071import java.util.concurrent.RejectedExecutionHandler;
072import java.util.concurrent.ThreadFactory;
073import java.util.concurrent.ThreadPoolExecutor;
074import java.util.concurrent.TimeUnit;
075import java.util.concurrent.atomic.AtomicInteger;
076import java.util.concurrent.locks.ReentrantLock;
077import java.util.stream.Collectors;
078
079import static org.apache.commons.lang3.StringUtils.isNotBlank;
080
081/**
082 * @deprecated Use the Batch2 {@link ca.uhn.fhir.batch2.api.IJobCoordinator#startInstance(JobInstanceStartRequest)} instead.
083 */
084@SuppressWarnings({"removal", "DeprecatedIsStillUsed"})
085@Deprecated
086public class ResourceReindexingSvcImpl implements IResourceReindexingSvc, IHasScheduledJobs {
087
088        private static final Date BEGINNING_OF_TIME = new Date(0);
089        private static final Logger ourLog = LoggerFactory.getLogger(ResourceReindexingSvcImpl.class);
090        private static final int PASS_SIZE = 25000;
091        private final ReentrantLock myIndexingLock = new ReentrantLock();
092
093        @Autowired
094        private IResourceReindexJobDao myReindexJobDao;
095
096        @Autowired
097        private JpaStorageSettings myStorageSettings;
098
099        @Autowired
100        private PlatformTransactionManager myTxManager;
101
102        private TransactionTemplate myTxTemplate;
103        private final ThreadFactory myReindexingThreadFactory =
104                        new BasicThreadFactory.Builder().namingPattern("ResourceReindex-%d").build();
105        private ThreadPoolExecutor myTaskExecutor;
106
107        @Autowired
108        private IResourceTableDao myResourceTableDao;
109
110        @Autowired
111        private FhirContext myContext;
112
113        @PersistenceContext(type = PersistenceContextType.TRANSACTION)
114        private EntityManager myEntityManager;
115
116        @Autowired
117        private ISearchParamRegistry mySearchParamRegistry;
118
119        @Autowired
120        private ResourceReindexer myResourceReindexer;
121
122        @VisibleForTesting
123        void setStorageSettingsForUnitTest(JpaStorageSettings theStorageSettings) {
124                myStorageSettings = theStorageSettings;
125        }
126
127        @VisibleForTesting
128        void setContextForUnitTest(FhirContext theContext) {
129                myContext = theContext;
130        }
131
132        @PostConstruct
133        public void start() {
134                myTxTemplate = new TransactionTemplate(myTxManager);
135                initExecutor();
136        }
137
138        public void initExecutor() {
139                // Create the threadpool executor used for reindex jobs
140                int reindexThreadCount = myStorageSettings.getReindexThreadCount();
141                RejectedExecutionHandler rejectHandler = new BlockPolicy();
142                myTaskExecutor = new ThreadPoolExecutor(
143                                0,
144                                reindexThreadCount,
145                                0L,
146                                TimeUnit.MILLISECONDS,
147                                new LinkedBlockingQueue<>(100),
148                                myReindexingThreadFactory,
149                                rejectHandler);
150        }
151
152        @Override
153        public void scheduleJobs(ISchedulerService theSchedulerService) {
154                ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
155                jobDetail.setId(getClass().getName());
156                jobDetail.setJobClass(Job.class);
157                theSchedulerService.scheduleClusteredJob(10 * DateUtils.MILLIS_PER_SECOND, jobDetail);
158        }
159
160        @Override
161        @Transactional(propagation = Propagation.REQUIRED)
162        public Long markAllResourcesForReindexing() {
163                return markAllResourcesForReindexing(null);
164        }
165
166        @Override
167        @Transactional(propagation = Propagation.REQUIRED)
168        public Long markAllResourcesForReindexing(String theType) {
169
170                String typeDesc;
171                if (isNotBlank(theType)) {
172                        try {
173                                myContext.getResourceType(theType);
174                        } catch (DataFormatException e) {
175                                throw new InvalidRequestException(Msg.code(1170) + "Unknown resource type: " + theType);
176                        }
177                        myReindexJobDao.markAllOfTypeAsDeleted(theType);
178                        typeDesc = theType;
179                } else {
180                        myReindexJobDao.markAllOfTypeAsDeleted();
181                        typeDesc = "(any)";
182                }
183
184                ResourceReindexJobEntity job = new ResourceReindexJobEntity();
185                job.setResourceType(theType);
186                job.setThresholdHigh(DateUtils.addMinutes(new Date(), 5));
187                job = myReindexJobDao.saveAndFlush(job);
188
189                ourLog.info("Marking all resources of type {} for reindexing - Got job ID[{}]", typeDesc, job.getId());
190                return job.getId();
191        }
192
193        public static class Job implements HapiJob {
194                @Autowired
195                private IResourceReindexingSvc myTarget;
196
197                @Override
198                public void execute(JobExecutionContext theContext) {
199                        myTarget.runReindexingPass();
200                }
201        }
202
203        @VisibleForTesting
204        ReentrantLock getIndexingLockForUnitTest() {
205                return myIndexingLock;
206        }
207
208        @Override
209        @Transactional(propagation = Propagation.NEVER)
210        public Integer runReindexingPass() {
211                if (myStorageSettings.isSchedulingDisabled() || !myStorageSettings.isEnableTaskPreExpandValueSets()) {
212                        return null;
213                }
214                if (myIndexingLock.tryLock()) {
215                        try {
216                                return doReindexingPassInsideLock();
217                        } finally {
218                                myIndexingLock.unlock();
219                        }
220                }
221                return null;
222        }
223
224        private int doReindexingPassInsideLock() {
225                expungeJobsMarkedAsDeleted();
226                return runReindexJobs();
227        }
228
229        @Override
230        public int forceReindexingPass() {
231                myIndexingLock.lock();
232                try {
233                        return doReindexingPassInsideLock();
234                } finally {
235                        myIndexingLock.unlock();
236                }
237        }
238
239        @Override
240        public void cancelAndPurgeAllJobs() {
241                ourLog.info("Cancelling and purging all resource reindexing jobs");
242                myIndexingLock.lock();
243                try {
244                        myTxTemplate.execute(t -> {
245                                myReindexJobDao.markAllOfTypeAsDeleted();
246                                return null;
247                        });
248
249                        myTaskExecutor.shutdown();
250                        initExecutor();
251
252                        expungeJobsMarkedAsDeleted();
253                } finally {
254                        myIndexingLock.unlock();
255                }
256        }
257
258        private int runReindexJobs() {
259                Collection<ResourceReindexJobEntity> jobs = getResourceReindexJobEntities();
260
261                if (!jobs.isEmpty()) {
262                        ourLog.info("Running {} reindex jobs: {}", jobs.size(), jobs);
263                } else {
264                        ourLog.debug("Running 0 reindex jobs");
265                        return 0;
266                }
267
268                int count = 0;
269                for (ResourceReindexJobEntity next : jobs) {
270
271                        if (next.getThresholdLow() != null
272                                        && next.getThresholdLow().getTime()
273                                                        >= next.getThresholdHigh().getTime()) {
274                                markJobAsDeleted(next);
275                                continue;
276                        }
277
278                        count += runReindexJob(next);
279                }
280                return count;
281        }
282
283        @Override
284        public int countReindexJobs() {
285                return getResourceReindexJobEntities().size();
286        }
287
288        private Collection<ResourceReindexJobEntity> getResourceReindexJobEntities() {
289                Collection<ResourceReindexJobEntity> jobs =
290                                myTxTemplate.execute(t -> myReindexJobDao.findAll(PageRequest.of(0, 10), false));
291                assert jobs != null;
292                return jobs;
293        }
294
295        private void markJobAsDeleted(ResourceReindexJobEntity theJob) {
296                ourLog.info("Marking reindexing job ID[{}] as deleted", theJob.getId());
297                myTxTemplate.execute(t -> {
298                        myReindexJobDao.markAsDeletedById(theJob.getId());
299                        return null;
300                });
301        }
302
303        @VisibleForTesting
304        public void setResourceReindexerForUnitTest(ResourceReindexer theResourceReindexer) {
305                myResourceReindexer = theResourceReindexer;
306        }
307
308        private int runReindexJob(ResourceReindexJobEntity theJob) {
309                if (theJob.getSuspendedUntil() != null) {
310                        if (theJob.getSuspendedUntil().getTime() > System.currentTimeMillis()) {
311                                return 0;
312                        }
313                }
314
315                ourLog.info("Performing reindex pass for JOB[{}]", theJob.getId());
316                StopWatch sw = new StopWatch();
317                AtomicInteger counter = new AtomicInteger();
318
319                /*
320                 * On the first time we run a particular reindex job, let's make sure we
321                 * have the latest search parameters loaded. A common reason to
322                 * be reindexing is that the search parameters have changed in some way, so
323                 * this makes sure we're on the latest versions
324                 */
325                if (theJob.getThresholdLow() == null) {
326                        mySearchParamRegistry.forceRefresh();
327                }
328
329                // Calculate range
330                Date low = theJob.getThresholdLow() != null ? theJob.getThresholdLow() : BEGINNING_OF_TIME;
331                Date high = theJob.getThresholdHigh();
332
333                // Query for resources within threshold
334                StopWatch pageSw = new StopWatch();
335                Slice<Long> range = myTxTemplate.execute(t -> {
336                        PageRequest page = PageRequest.of(0, PASS_SIZE);
337                        if (isNotBlank(theJob.getResourceType())) {
338                                return myResourceTableDao.findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest(
339                                                page, theJob.getResourceType(), low, high);
340                        } else {
341                                return myResourceTableDao.findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest(page, low, high);
342                        }
343                });
344                Validate.notNull(range);
345                int count = range.getNumberOfElements();
346                ourLog.info("Loaded {} resources for reindexing in {}", count, pageSw);
347
348                // If we didn't find any results at all, mark as deleted
349                if (count == 0) {
350                        markJobAsDeleted(theJob);
351                        return 0;
352                }
353
354                // Submit each resource requiring reindexing
355                List<Future<Date>> futures = range.stream()
356                                .map(t -> myTaskExecutor.submit(new ResourceReindexingTask(JpaPid.fromId(t), counter)))
357                                .collect(Collectors.toList());
358
359                Date latestDate = null;
360                for (Future<Date> next : futures) {
361                        Date nextDate;
362                        try {
363                                nextDate = next.get();
364                        } catch (Exception e) {
365                                ourLog.error("Failure reindexing", e);
366                                Date suspendedUntil = DateUtils.addMinutes(new Date(), 1);
367                                myTxTemplate.execute(t -> {
368                                        myReindexJobDao.setSuspendedUntil(suspendedUntil);
369                                        return null;
370                                });
371                                return counter.get();
372                        }
373
374                        if (nextDate != null) {
375                                if (latestDate == null || latestDate.getTime() < nextDate.getTime()) {
376                                        latestDate = new Date(nextDate.getTime());
377                                }
378                        }
379                }
380
381                Validate.notNull(latestDate);
382                Date newLow;
383                if (latestDate.getTime() == low.getTime()) {
384                        if (count == PASS_SIZE) {
385                                // Just in case we end up in some sort of infinite loop. This shouldn't happen, and couldn't really
386                                // happen unless there were 10000 resources with the exact same update time down to the
387                                // millisecond.
388                                ourLog.error(
389                                                "Final pass time for reindex JOB[{}] has same ending low value: {}",
390                                                theJob.getId(),
391                                                latestDate);
392                        }
393
394                        newLow = new Date(latestDate.getTime() + 1);
395                } else {
396                        newLow = latestDate;
397                }
398
399                myTxTemplate.execute(t -> {
400                        myReindexJobDao.setThresholdLow(theJob.getId(), newLow);
401                        Integer existingCount =
402                                        myReindexJobDao.getReindexCount(theJob.getId()).orElse(0);
403                        int newCount = existingCount + counter.get();
404                        myReindexJobDao.setReindexCount(theJob.getId(), newCount);
405                        return null;
406                });
407
408                ourLog.info(
409                                "Completed pass of reindex JOB[{}] - Indexed {} resources in {} ({} / sec) - Have indexed until: {}",
410                                theJob.getId(),
411                                count,
412                                sw,
413                                sw.formatThroughput(count, TimeUnit.SECONDS),
414                                new InstantType(newLow));
415                return counter.get();
416        }
417
418        private void expungeJobsMarkedAsDeleted() {
419                myTxTemplate.execute(t -> {
420                        Collection<ResourceReindexJobEntity> toDelete = myReindexJobDao.findAll(PageRequest.of(0, 10), true);
421                        toDelete.forEach(job -> {
422                                ourLog.info("Purging deleted job[{}]", job.getId());
423                                myReindexJobDao.deleteById(job.getId());
424                        });
425                        return null;
426                });
427        }
428
429        private void markResourceAsIndexingFailed(final JpaPid theId) {
430                TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
431                txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
432                txTemplate.execute((TransactionCallback<Void>) theStatus -> {
433                        ourLog.info("Marking resource with PID {} as indexing_failed", theId);
434
435                        myResourceTableDao.updateIndexStatus(theId.getId(), BaseHapiFhirDao.INDEX_STATUS_INDEXING_FAILED);
436
437                        Query q = myEntityManager.createQuery("DELETE FROM ResourceTag t WHERE t.myResource.myId = :id");
438                        q.setParameter("id", theId.getId());
439                        q.executeUpdate();
440
441                        q = myEntityManager.createQuery(
442                                        "DELETE FROM ResourceIndexedSearchParamCoords t WHERE t.myResource.myId = :id");
443                        q.setParameter("id", theId.getId());
444                        q.executeUpdate();
445
446                        q = myEntityManager.createQuery(
447                                        "DELETE FROM ResourceIndexedSearchParamDate t WHERE t.myResource.myId = :id");
448                        q.setParameter("id", theId.getId());
449                        q.executeUpdate();
450
451                        q = myEntityManager.createQuery(
452                                        "DELETE FROM ResourceIndexedSearchParamNumber t WHERE t.myResource.myId = :id");
453                        q.setParameter("id", theId.getId());
454                        q.executeUpdate();
455
456                        q = myEntityManager.createQuery(
457                                        "DELETE FROM ResourceIndexedSearchParamQuantity t WHERE t.myResource.myId = :id");
458                        q.setParameter("id", theId.getId());
459                        q.executeUpdate();
460
461                        q = myEntityManager.createQuery(
462                                        "DELETE FROM ResourceIndexedSearchParamQuantityNormalized t WHERE t.myResource.myId = :id");
463                        q.setParameter("id", theId.getId());
464                        q.executeUpdate();
465
466                        q = myEntityManager.createQuery(
467                                        "DELETE FROM ResourceIndexedSearchParamString t WHERE t.myResource.myId = :id");
468                        q.setParameter("id", theId.getId());
469                        q.executeUpdate();
470
471                        q = myEntityManager.createQuery(
472                                        "DELETE FROM ResourceIndexedSearchParamToken t WHERE t.myResource.myId = :id");
473                        q.setParameter("id", theId.getId());
474                        q.executeUpdate();
475
476                        q = myEntityManager.createQuery(
477                                        "DELETE FROM ResourceIndexedSearchParamUri t WHERE t.myResource.myId = :id");
478                        q.setParameter("id", theId.getId());
479                        q.executeUpdate();
480
481                        q = myEntityManager.createQuery("DELETE FROM ResourceLink t WHERE t.mySourceResource.myId = :id");
482                        q.setParameter("id", theId.getId());
483                        q.executeUpdate();
484
485                        q = myEntityManager.createQuery("DELETE FROM ResourceLink t WHERE t.myTargetResource.myId = :id");
486                        q.setParameter("id", theId.getId());
487                        q.executeUpdate();
488
489                        return null;
490                });
491        }
492
493        private class ResourceReindexingTask implements Callable<Date> {
494                private final JpaPid myNextId;
495                private final AtomicInteger myCounter;
496                private Date myUpdated;
497
498                ResourceReindexingTask(JpaPid theNextId, AtomicInteger theCounter) {
499                        myNextId = theNextId;
500                        myCounter = theCounter;
501                }
502
503                @Override
504                public Date call() {
505                        Throwable reindexFailure;
506
507                        try {
508                                reindexFailure = readResourceAndReindex();
509                        } catch (ResourceVersionConflictException e) {
510                                /*
511                                 * We reindex in multiple threads, so it's technically possible that two threads try
512                                 * to index resources that cause a constraint error now (i.e. because a unique index has been
513                                 * added that didn't previously exist). In this case, one of the threads would succeed and
514                                 * not get this error, so we'll let the other one fail and try
515                                 * again later.
516                                 */
517                                ourLog.info(
518                                                "Failed to reindex because of a version conflict. Leaving in unindexed state: {}",
519                                                e.getMessage());
520                                reindexFailure = null;
521                        }
522
523                        if (reindexFailure != null) {
524                                ourLog.info("Setting resource PID[{}] status to ERRORED", myNextId);
525                                markResourceAsIndexingFailed(myNextId);
526                        }
527
528                        return myUpdated;
529                }
530
531                @Nullable
532                private Throwable readResourceAndReindex() {
533                        Throwable reindexFailure;
534                        reindexFailure = myTxTemplate.execute(t -> {
535                                ResourceTable resourceTable =
536                                                myResourceTableDao.findById(myNextId.getId()).orElseThrow(IllegalStateException::new);
537                                myUpdated = resourceTable.getUpdatedDate();
538
539                                try {
540                                        myResourceReindexer.reindexResourceEntity(resourceTable);
541                                        myCounter.incrementAndGet();
542                                        return null;
543
544                                } catch (Exception e) {
545                                        ourLog.error("Failed to index resource {}: {}", resourceTable.getIdDt(), e, e);
546                                        t.setRollbackOnly();
547                                        return e;
548                                }
549                        });
550                        return reindexFailure;
551                }
552        }
553}