
001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.term; 021 022import ca.uhn.fhir.jpa.dao.data.ITermConceptDao; 023import ca.uhn.fhir.jpa.dao.data.ITermConceptParentChildLinkDao; 024import ca.uhn.fhir.jpa.entity.TermConcept; 025import ca.uhn.fhir.jpa.model.sched.HapiJob; 026import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs; 027import ca.uhn.fhir.jpa.model.sched.ISchedulerService; 028import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; 029import ca.uhn.fhir.jpa.term.api.ITermDeferredStorageSvc; 030import ca.uhn.fhir.jpa.term.api.ITermReindexingSvc; 031import ca.uhn.fhir.util.StopWatch; 032import com.google.common.annotations.VisibleForTesting; 033import com.google.common.collect.ArrayListMultimap; 034import jakarta.annotation.Nonnull; 035import org.apache.commons.lang3.Validate; 036import org.apache.commons.lang3.time.DateUtils; 037import org.quartz.JobExecutionContext; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040import org.springframework.beans.factory.annotation.Autowired; 041import org.springframework.data.domain.Page; 042import org.springframework.data.domain.PageRequest; 043import org.springframework.transaction.PlatformTransactionManager; 044import org.springframework.transaction.TransactionDefinition; 045import org.springframework.transaction.TransactionStatus; 046import org.springframework.transaction.support.TransactionCallbackWithoutResult; 047import org.springframework.transaction.support.TransactionTemplate; 048 049import java.util.Collection; 050import java.util.List; 051 052import static org.apache.commons.lang3.StringUtils.isBlank; 053 054public class TermReindexingSvcImpl implements ITermReindexingSvc, IHasScheduledJobs { 055 private static final Logger ourLog = LoggerFactory.getLogger(TermReindexingSvcImpl.class); 056 private static boolean ourForceSaveDeferredAlwaysForUnitTest; 057 058 @Autowired 059 protected ITermConceptDao myConceptDao; 060 061 private ArrayListMultimap<Long, Long> myChildToParentPidCache; 062 063 @Autowired 064 private PlatformTransactionManager myTransactionMgr; 065 066 @Autowired 067 private ITermConceptParentChildLinkDao myConceptParentChildLinkDao; 068 069 @Autowired 070 private ITermDeferredStorageSvc myDeferredStorageSvc; 071 072 @Autowired 073 private TermConceptDaoSvc myTermConceptDaoSvc; 074 075 @Override 076 public void processReindexing() { 077 if (myDeferredStorageSvc.isStorageQueueEmpty(true) == false && !ourForceSaveDeferredAlwaysForUnitTest) { 078 return; 079 } 080 081 TransactionTemplate tt = new TransactionTemplate(myTransactionMgr); 082 tt.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); 083 tt.execute(new TransactionCallbackWithoutResult() { 084 private void createParentsString(StringBuilder theParentsBuilder, Long theConceptPid) { 085 Validate.notNull(theConceptPid, "theConceptPid must not be null"); 086 List<Long> parents = myChildToParentPidCache.get(theConceptPid); 087 if (parents.contains(-1L)) { 088 return; 089 } else if (parents.isEmpty()) { 090 Collection<Long> parentLinks = myConceptParentChildLinkDao.findAllWithChild(theConceptPid); 091 if (parentLinks.isEmpty()) { 092 myChildToParentPidCache.put(theConceptPid, -1L); 093 ourLog.info( 094 "Found {} parent concepts of concept {} (cache has {})", 095 0, 096 theConceptPid, 097 myChildToParentPidCache.size()); 098 return; 099 } else { 100 for (Long next : parentLinks) { 101 myChildToParentPidCache.put(theConceptPid, next); 102 } 103 int parentCount = 104 myChildToParentPidCache.get(theConceptPid).size(); 105 ourLog.info( 106 "Found {} parent concepts of concept {} (cache has {})", 107 parentCount, 108 theConceptPid, 109 myChildToParentPidCache.size()); 110 } 111 } 112 113 for (Long nextParent : parents) { 114 if (theParentsBuilder.length() > 0) { 115 theParentsBuilder.append(' '); 116 } 117 theParentsBuilder.append(nextParent); 118 createParentsString(theParentsBuilder, nextParent); 119 } 120 } 121 122 @Override 123 protected void doInTransactionWithoutResult(@Nonnull TransactionStatus theArg0) { 124 int maxResult = 1000; 125 Page<TermConcept> concepts = 126 myConceptDao.findResourcesRequiringReindexing(PageRequest.of(0, maxResult)); 127 if (!concepts.hasContent()) { 128 if (myChildToParentPidCache != null) { 129 ourLog.info("Clearing parent concept cache"); 130 myChildToParentPidCache = null; 131 } 132 return; 133 } 134 135 if (myChildToParentPidCache == null) { 136 myChildToParentPidCache = ArrayListMultimap.create(); 137 } 138 139 ourLog.info("Indexing {} / {} concepts", concepts.getContent().size(), concepts.getTotalElements()); 140 141 int count = 0; 142 StopWatch stopwatch = new StopWatch(); 143 144 for (TermConcept nextConcept : concepts) { 145 146 if (isBlank(nextConcept.getParentPidsAsString())) { 147 StringBuilder parentsBuilder = new StringBuilder(); 148 createParentsString(parentsBuilder, nextConcept.getId()); 149 nextConcept.setParentPids(parentsBuilder.toString()); 150 } 151 152 myTermConceptDaoSvc.saveConcept(nextConcept); 153 count++; 154 } 155 156 ourLog.info( 157 "Indexed {} / {} concepts in {}ms - Avg {}ms / resource", 158 count, 159 concepts.getContent().size(), 160 stopwatch.getMillis(), 161 stopwatch.getMillisPerOperation(count)); 162 } 163 }); 164 } 165 166 @Override 167 public void scheduleJobs(ISchedulerService theSchedulerService) { 168 // TODO KHS what does this mean? 169 // Register scheduled job to save deferred concepts 170 // In the future it would be great to make this a cluster-aware task somehow 171 ScheduledJobDefinition jobDefinition = new ScheduledJobDefinition(); 172 jobDefinition.setId(this.getClass().getName()); 173 jobDefinition.setJobClass(Job.class); 174 theSchedulerService.scheduleLocalJob(DateUtils.MILLIS_PER_MINUTE, jobDefinition); 175 } 176 177 public static class Job implements HapiJob { 178 @Autowired 179 private ITermReindexingSvc myTermReindexingSvc; 180 181 @Override 182 public void execute(JobExecutionContext theContext) { 183 myTermReindexingSvc.processReindexing(); 184 } 185 } 186 187 /** 188 * This method is present only for unit tests, do not call from client code 189 */ 190 @VisibleForTesting 191 public static void setForceSaveDeferredAlwaysForUnitTest(boolean theForceSaveDeferredAlwaysForUnitTest) { 192 ourForceSaveDeferredAlwaysForUnitTest = theForceSaveDeferredAlwaysForUnitTest; 193 } 194}