001/*-
002 * #%L
003 * HAPI FHIR JPA Server
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.search.reindex;
021
022import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
023import ca.uhn.fhir.context.FhirContext;
024import ca.uhn.fhir.i18n.Msg;
025import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
026import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
027import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
028import ca.uhn.fhir.jpa.dao.data.IResourceReindexJobDao;
029import ca.uhn.fhir.jpa.dao.data.IResourceTableDao;
030import ca.uhn.fhir.jpa.entity.ResourceReindexJobEntity;
031import ca.uhn.fhir.jpa.model.entity.ResourceTable;
032import ca.uhn.fhir.jpa.model.sched.HapiJob;
033import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs;
034import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
035import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
036import ca.uhn.fhir.parser.DataFormatException;
037import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
038import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
039import ca.uhn.fhir.rest.server.util.ISearchParamRegistry;
040import ca.uhn.fhir.util.StopWatch;
041import com.google.common.annotations.VisibleForTesting;
042import jakarta.annotation.Nullable;
043import jakarta.annotation.PostConstruct;
044import jakarta.persistence.EntityManager;
045import jakarta.persistence.PersistenceContext;
046import jakarta.persistence.PersistenceContextType;
047import jakarta.persistence.Query;
048import org.apache.commons.lang3.Validate;
049import org.apache.commons.lang3.concurrent.BasicThreadFactory;
050import org.apache.commons.lang3.time.DateUtils;
051import org.hl7.fhir.r4.model.InstantType;
052import org.quartz.JobExecutionContext;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055import org.springframework.beans.factory.annotation.Autowired;
056import org.springframework.data.domain.PageRequest;
057import org.springframework.data.domain.Slice;
058import org.springframework.transaction.PlatformTransactionManager;
059import org.springframework.transaction.TransactionDefinition;
060import org.springframework.transaction.annotation.Propagation;
061import org.springframework.transaction.annotation.Transactional;
062import org.springframework.transaction.support.TransactionCallback;
063import org.springframework.transaction.support.TransactionTemplate;
064
065import java.util.Collection;
066import java.util.Date;
067import java.util.List;
068import java.util.concurrent.Callable;
069import java.util.concurrent.Future;
070import java.util.concurrent.LinkedBlockingQueue;
071import java.util.concurrent.RejectedExecutionHandler;
072import java.util.concurrent.ThreadFactory;
073import java.util.concurrent.ThreadPoolExecutor;
074import java.util.concurrent.TimeUnit;
075import java.util.concurrent.atomic.AtomicInteger;
076import java.util.concurrent.locks.ReentrantLock;
077import java.util.stream.Collectors;
078
079import static org.apache.commons.lang3.StringUtils.isNotBlank;
080
081/**
082 * @see ca.uhn.fhir.jpa.reindex.job.ReindexJobConfig
083 * @deprecated Use the Batch2 {@link ca.uhn.fhir.batch2.api.IJobCoordinator#startInstance(JobInstanceStartRequest)} instead.
084 */
085@Deprecated
086public class ResourceReindexingSvcImpl implements IResourceReindexingSvc, IHasScheduledJobs {
087
088        private static final Date BEGINNING_OF_TIME = new Date(0);
089        private static final Logger ourLog = LoggerFactory.getLogger(ResourceReindexingSvcImpl.class);
090        private static final int PASS_SIZE = 25000;
091        private final ReentrantLock myIndexingLock = new ReentrantLock();
092
093        @Autowired
094        private IResourceReindexJobDao myReindexJobDao;
095
096        @Autowired
097        private JpaStorageSettings myStorageSettings;
098
099        @Autowired
100        private PlatformTransactionManager myTxManager;
101
102        private TransactionTemplate myTxTemplate;
103        private final ThreadFactory myReindexingThreadFactory =
104                        new BasicThreadFactory.Builder().namingPattern("ResourceReindex-%d").build();
105        private ThreadPoolExecutor myTaskExecutor;
106
107        @Autowired
108        private IResourceTableDao myResourceTableDao;
109
110        @Autowired
111        private DaoRegistry myDaoRegistry;
112
113        @Autowired
114        private FhirContext myContext;
115
116        @PersistenceContext(type = PersistenceContextType.TRANSACTION)
117        private EntityManager myEntityManager;
118
119        @Autowired
120        private ISearchParamRegistry mySearchParamRegistry;
121
122        @Autowired
123        private ResourceReindexer myResourceReindexer;
124
125        @VisibleForTesting
126        void setStorageSettingsForUnitTest(JpaStorageSettings theStorageSettings) {
127                myStorageSettings = theStorageSettings;
128        }
129
130        @VisibleForTesting
131        void setContextForUnitTest(FhirContext theContext) {
132                myContext = theContext;
133        }
134
135        @PostConstruct
136        public void start() {
137                myTxTemplate = new TransactionTemplate(myTxManager);
138                initExecutor();
139        }
140
141        public void initExecutor() {
142                // Create the threadpool executor used for reindex jobs
143                int reindexThreadCount = myStorageSettings.getReindexThreadCount();
144                RejectedExecutionHandler rejectHandler = new BlockPolicy();
145                myTaskExecutor = new ThreadPoolExecutor(
146                                0,
147                                reindexThreadCount,
148                                0L,
149                                TimeUnit.MILLISECONDS,
150                                new LinkedBlockingQueue<>(100),
151                                myReindexingThreadFactory,
152                                rejectHandler);
153        }
154
155        @Override
156        public void scheduleJobs(ISchedulerService theSchedulerService) {
157                ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
158                jobDetail.setId(getClass().getName());
159                jobDetail.setJobClass(Job.class);
160                theSchedulerService.scheduleClusteredJob(10 * DateUtils.MILLIS_PER_SECOND, jobDetail);
161        }
162
163        @Override
164        @Transactional(propagation = Propagation.REQUIRED)
165        public Long markAllResourcesForReindexing() {
166                return markAllResourcesForReindexing(null);
167        }
168
169        @Override
170        @Transactional(propagation = Propagation.REQUIRED)
171        public Long markAllResourcesForReindexing(String theType) {
172
173                String typeDesc;
174                if (isNotBlank(theType)) {
175                        try {
176                                myContext.getResourceType(theType);
177                        } catch (DataFormatException e) {
178                                throw new InvalidRequestException(Msg.code(1170) + "Unknown resource type: " + theType);
179                        }
180                        myReindexJobDao.markAllOfTypeAsDeleted(theType);
181                        typeDesc = theType;
182                } else {
183                        myReindexJobDao.markAllOfTypeAsDeleted();
184                        typeDesc = "(any)";
185                }
186
187                ResourceReindexJobEntity job = new ResourceReindexJobEntity();
188                job.setResourceType(theType);
189                job.setThresholdHigh(DateUtils.addMinutes(new Date(), 5));
190                job = myReindexJobDao.saveAndFlush(job);
191
192                ourLog.info("Marking all resources of type {} for reindexing - Got job ID[{}]", typeDesc, job.getId());
193                return job.getId();
194        }
195
196        public static class Job implements HapiJob {
197                @Autowired
198                private IResourceReindexingSvc myTarget;
199
200                @Override
201                public void execute(JobExecutionContext theContext) {
202                        myTarget.runReindexingPass();
203                }
204        }
205
206        @VisibleForTesting
207        ReentrantLock getIndexingLockForUnitTest() {
208                return myIndexingLock;
209        }
210
211        @Override
212        @Transactional(propagation = Propagation.NEVER)
213        public Integer runReindexingPass() {
214                if (myStorageSettings.isSchedulingDisabled() || !myStorageSettings.isEnableTaskPreExpandValueSets()) {
215                        return null;
216                }
217                if (myIndexingLock.tryLock()) {
218                        try {
219                                return doReindexingPassInsideLock();
220                        } finally {
221                                myIndexingLock.unlock();
222                        }
223                }
224                return null;
225        }
226
227        private int doReindexingPassInsideLock() {
228                expungeJobsMarkedAsDeleted();
229                return runReindexJobs();
230        }
231
232        @Override
233        public int forceReindexingPass() {
234                myIndexingLock.lock();
235                try {
236                        return doReindexingPassInsideLock();
237                } finally {
238                        myIndexingLock.unlock();
239                }
240        }
241
242        @Override
243        public void cancelAndPurgeAllJobs() {
244                ourLog.info("Cancelling and purging all resource reindexing jobs");
245                myIndexingLock.lock();
246                try {
247                        myTxTemplate.execute(t -> {
248                                myReindexJobDao.markAllOfTypeAsDeleted();
249                                return null;
250                        });
251
252                        myTaskExecutor.shutdown();
253                        initExecutor();
254
255                        expungeJobsMarkedAsDeleted();
256                } finally {
257                        myIndexingLock.unlock();
258                }
259        }
260
261        private int runReindexJobs() {
262                Collection<ResourceReindexJobEntity> jobs = getResourceReindexJobEntities();
263
264                if (jobs.size() > 0) {
265                        ourLog.info("Running {} reindex jobs: {}", jobs.size(), jobs);
266                } else {
267                        ourLog.debug("Running {} reindex jobs: {}", jobs.size(), jobs);
268                        return 0;
269                }
270
271                int count = 0;
272                for (ResourceReindexJobEntity next : jobs) {
273
274                        if (next.getThresholdLow() != null
275                                        && next.getThresholdLow().getTime()
276                                                        >= next.getThresholdHigh().getTime()) {
277                                markJobAsDeleted(next);
278                                continue;
279                        }
280
281                        count += runReindexJob(next);
282                }
283                return count;
284        }
285
286        @Override
287        public int countReindexJobs() {
288                return getResourceReindexJobEntities().size();
289        }
290
291        private Collection<ResourceReindexJobEntity> getResourceReindexJobEntities() {
292                Collection<ResourceReindexJobEntity> jobs =
293                                myTxTemplate.execute(t -> myReindexJobDao.findAll(PageRequest.of(0, 10), false));
294                assert jobs != null;
295                return jobs;
296        }
297
298        private void markJobAsDeleted(ResourceReindexJobEntity theJob) {
299                ourLog.info("Marking reindexing job ID[{}] as deleted", theJob.getId());
300                myTxTemplate.execute(t -> {
301                        myReindexJobDao.markAsDeletedById(theJob.getId());
302                        return null;
303                });
304        }
305
306        @VisibleForTesting
307        public void setResourceReindexerForUnitTest(ResourceReindexer theResourceReindexer) {
308                myResourceReindexer = theResourceReindexer;
309        }
310
311        private int runReindexJob(ResourceReindexJobEntity theJob) {
312                if (theJob.getSuspendedUntil() != null) {
313                        if (theJob.getSuspendedUntil().getTime() > System.currentTimeMillis()) {
314                                return 0;
315                        }
316                }
317
318                ourLog.info("Performing reindex pass for JOB[{}]", theJob.getId());
319                StopWatch sw = new StopWatch();
320                AtomicInteger counter = new AtomicInteger();
321
322                /*
323                 * On the first time we run a particular reindex job, let's make sure we
324                 * have the latest search parameters loaded. A common reason to
325                 * be reindexing is that the search parameters have changed in some way, so
326                 * this makes sure we're on the latest versions
327                 */
328                if (theJob.getThresholdLow() == null) {
329                        mySearchParamRegistry.forceRefresh();
330                }
331
332                // Calculate range
333                Date low = theJob.getThresholdLow() != null ? theJob.getThresholdLow() : BEGINNING_OF_TIME;
334                Date high = theJob.getThresholdHigh();
335
336                // Query for resources within threshold
337                StopWatch pageSw = new StopWatch();
338                Slice<Long> range = myTxTemplate.execute(t -> {
339                        PageRequest page = PageRequest.of(0, PASS_SIZE);
340                        if (isNotBlank(theJob.getResourceType())) {
341                                return myResourceTableDao.findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest(
342                                                page, theJob.getResourceType(), low, high);
343                        } else {
344                                return myResourceTableDao.findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest(page, low, high);
345                        }
346                });
347                Validate.notNull(range);
348                int count = range.getNumberOfElements();
349                ourLog.info("Loaded {} resources for reindexing in {}", count, pageSw);
350
351                // If we didn't find any results at all, mark as deleted
352                if (count == 0) {
353                        markJobAsDeleted(theJob);
354                        return 0;
355                }
356
357                // Submit each resource requiring reindexing
358                List<Future<Date>> futures = range.stream()
359                                .map(t -> myTaskExecutor.submit(new ResourceReindexingTask(t, counter)))
360                                .collect(Collectors.toList());
361
362                Date latestDate = null;
363                for (Future<Date> next : futures) {
364                        Date nextDate;
365                        try {
366                                nextDate = next.get();
367                        } catch (Exception e) {
368                                ourLog.error("Failure reindexing", e);
369                                Date suspendedUntil = DateUtils.addMinutes(new Date(), 1);
370                                myTxTemplate.execute(t -> {
371                                        myReindexJobDao.setSuspendedUntil(suspendedUntil);
372                                        return null;
373                                });
374                                return counter.get();
375                        }
376
377                        if (nextDate != null) {
378                                if (latestDate == null || latestDate.getTime() < nextDate.getTime()) {
379                                        latestDate = new Date(nextDate.getTime());
380                                }
381                        }
382                }
383
384                Validate.notNull(latestDate);
385                Date newLow;
386                if (latestDate.getTime() == low.getTime()) {
387                        if (count == PASS_SIZE) {
388                                // Just in case we end up in some sort of infinite loop. This shouldn't happen, and couldn't really
389                                // happen unless there were 10000 resources with the exact same update time down to the
390                                // millisecond.
391                                ourLog.error(
392                                                "Final pass time for reindex JOB[{}] has same ending low value: {}",
393                                                theJob.getId(),
394                                                latestDate);
395                        }
396
397                        newLow = new Date(latestDate.getTime() + 1);
398                } else {
399                        newLow = latestDate;
400                }
401
402                myTxTemplate.execute(t -> {
403                        myReindexJobDao.setThresholdLow(theJob.getId(), newLow);
404                        Integer existingCount =
405                                        myReindexJobDao.getReindexCount(theJob.getId()).orElse(0);
406                        int newCount = existingCount + counter.get();
407                        myReindexJobDao.setReindexCount(theJob.getId(), newCount);
408                        return null;
409                });
410
411                ourLog.info(
412                                "Completed pass of reindex JOB[{}] - Indexed {} resources in {} ({} / sec) - Have indexed until: {}",
413                                theJob.getId(),
414                                count,
415                                sw,
416                                sw.formatThroughput(count, TimeUnit.SECONDS),
417                                new InstantType(newLow));
418                return counter.get();
419        }
420
421        private void expungeJobsMarkedAsDeleted() {
422                myTxTemplate.execute(t -> {
423                        Collection<ResourceReindexJobEntity> toDelete = myReindexJobDao.findAll(PageRequest.of(0, 10), true);
424                        toDelete.forEach(job -> {
425                                ourLog.info("Purging deleted job[{}]", job.getId());
426                                myReindexJobDao.deleteById(job.getId());
427                        });
428                        return null;
429                });
430        }
431
432        private void markResourceAsIndexingFailed(final long theId) {
433                TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
434                txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
435                txTemplate.execute((TransactionCallback<Void>) theStatus -> {
436                        ourLog.info("Marking resource with PID {} as indexing_failed", theId);
437
438                        myResourceTableDao.updateIndexStatus(theId, BaseHapiFhirDao.INDEX_STATUS_INDEXING_FAILED);
439
440                        Query q = myEntityManager.createQuery("DELETE FROM ResourceTag t WHERE t.myResourceId = :id");
441                        q.setParameter("id", theId);
442                        q.executeUpdate();
443
444                        q = myEntityManager.createQuery(
445                                        "DELETE FROM ResourceIndexedSearchParamCoords t WHERE t.myResourcePid = :id");
446                        q.setParameter("id", theId);
447                        q.executeUpdate();
448
449                        q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamDate t WHERE t.myResourcePid = :id");
450                        q.setParameter("id", theId);
451                        q.executeUpdate();
452
453                        q = myEntityManager.createQuery(
454                                        "DELETE FROM ResourceIndexedSearchParamNumber t WHERE t.myResourcePid = :id");
455                        q.setParameter("id", theId);
456                        q.executeUpdate();
457
458                        q = myEntityManager.createQuery(
459                                        "DELETE FROM ResourceIndexedSearchParamQuantity t WHERE t.myResourcePid = :id");
460                        q.setParameter("id", theId);
461                        q.executeUpdate();
462
463                        q = myEntityManager.createQuery(
464                                        "DELETE FROM ResourceIndexedSearchParamQuantityNormalized t WHERE t.myResourcePid = :id");
465                        q.setParameter("id", theId);
466                        q.executeUpdate();
467
468                        q = myEntityManager.createQuery(
469                                        "DELETE FROM ResourceIndexedSearchParamString t WHERE t.myResourcePid = :id");
470                        q.setParameter("id", theId);
471                        q.executeUpdate();
472
473                        q = myEntityManager.createQuery(
474                                        "DELETE FROM ResourceIndexedSearchParamToken t WHERE t.myResourcePid = :id");
475                        q.setParameter("id", theId);
476                        q.executeUpdate();
477
478                        q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamUri t WHERE t.myResourcePid = :id");
479                        q.setParameter("id", theId);
480                        q.executeUpdate();
481
482                        q = myEntityManager.createQuery("DELETE FROM ResourceLink t WHERE t.mySourceResourcePid = :id");
483                        q.setParameter("id", theId);
484                        q.executeUpdate();
485
486                        q = myEntityManager.createQuery("DELETE FROM ResourceLink t WHERE t.myTargetResourcePid = :id");
487                        q.setParameter("id", theId);
488                        q.executeUpdate();
489
490                        return null;
491                });
492        }
493
494        private class ResourceReindexingTask implements Callable<Date> {
495                private final Long myNextId;
496                private final AtomicInteger myCounter;
497                private Date myUpdated;
498
499                ResourceReindexingTask(Long theNextId, AtomicInteger theCounter) {
500                        myNextId = theNextId;
501                        myCounter = theCounter;
502                }
503
504                @Override
505                public Date call() {
506                        Throwable reindexFailure;
507
508                        try {
509                                reindexFailure = readResourceAndReindex();
510                        } catch (ResourceVersionConflictException e) {
511                                /*
512                                 * We reindex in multiple threads, so it's technically possible that two threads try
513                                 * to index resources that cause a constraint error now (i.e. because a unique index has been
514                                 * added that didn't previously exist). In this case, one of the threads would succeed and
515                                 * not get this error, so we'll let the other one fail and try
516                                 * again later.
517                                 */
518                                ourLog.info(
519                                                "Failed to reindex because of a version conflict. Leaving in unindexed state: {}",
520                                                e.getMessage());
521                                reindexFailure = null;
522                        }
523
524                        if (reindexFailure != null) {
525                                ourLog.info("Setting resource PID[{}] status to ERRORED", myNextId);
526                                markResourceAsIndexingFailed(myNextId);
527                        }
528
529                        return myUpdated;
530                }
531
532                @Nullable
533                private Throwable readResourceAndReindex() {
534                        Throwable reindexFailure;
535                        reindexFailure = myTxTemplate.execute(t -> {
536                                ResourceTable resourceTable =
537                                                myResourceTableDao.findById(myNextId).orElseThrow(IllegalStateException::new);
538                                myUpdated = resourceTable.getUpdatedDate();
539
540                                try {
541                                        myResourceReindexer.reindexResourceEntity(resourceTable);
542                                        myCounter.incrementAndGet();
543                                        return null;
544
545                                } catch (Exception e) {
546                                        ourLog.error("Failed to index resource {}: {}", resourceTable.getIdDt(), e, e);
547                                        t.setRollbackOnly();
548                                        return e;
549                                }
550                        });
551                        return reindexFailure;
552                }
553        }
554}