001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.term; 021 022import ca.uhn.fhir.jpa.dao.data.ITermConceptDao; 023import ca.uhn.fhir.jpa.dao.data.ITermConceptParentChildLinkDao; 024import ca.uhn.fhir.jpa.entity.TermConcept; 025import ca.uhn.fhir.jpa.model.sched.HapiJob; 026import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs; 027import ca.uhn.fhir.jpa.model.sched.ISchedulerService; 028import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; 029import ca.uhn.fhir.jpa.term.api.ITermDeferredStorageSvc; 030import ca.uhn.fhir.jpa.term.api.ITermReindexingSvc; 031import ca.uhn.fhir.util.StopWatch; 032import com.google.common.annotations.VisibleForTesting; 033import com.google.common.collect.ArrayListMultimap; 034import org.apache.commons.lang3.Validate; 035import org.apache.commons.lang3.time.DateUtils; 036import org.quartz.JobExecutionContext; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039import org.springframework.beans.factory.annotation.Autowired; 040import org.springframework.data.domain.Page; 041import org.springframework.data.domain.PageRequest; 042import org.springframework.transaction.PlatformTransactionManager; 043import org.springframework.transaction.TransactionDefinition; 044import org.springframework.transaction.TransactionStatus; 045import org.springframework.transaction.support.TransactionCallbackWithoutResult; 046import org.springframework.transaction.support.TransactionTemplate; 047 048import java.util.Collection; 049import java.util.List; 050 051import static org.apache.commons.lang3.StringUtils.isBlank; 052 053public class TermReindexingSvcImpl implements ITermReindexingSvc, IHasScheduledJobs { 054 private static final Logger ourLog = LoggerFactory.getLogger(TermReindexingSvcImpl.class); 055 private static boolean ourForceSaveDeferredAlwaysForUnitTest; 056 057 @Autowired 058 protected ITermConceptDao myConceptDao; 059 060 private ArrayListMultimap<Long, Long> myChildToParentPidCache; 061 062 @Autowired 063 private PlatformTransactionManager myTransactionMgr; 064 065 @Autowired 066 private ITermConceptParentChildLinkDao myConceptParentChildLinkDao; 067 068 @Autowired 069 private ITermDeferredStorageSvc myDeferredStorageSvc; 070 071 @Autowired 072 private TermConceptDaoSvc myTermConceptDaoSvc; 073 074 @Override 075 public void processReindexing() { 076 if (myDeferredStorageSvc.isStorageQueueEmpty(true) == false && !ourForceSaveDeferredAlwaysForUnitTest) { 077 return; 078 } 079 080 TransactionTemplate tt = new TransactionTemplate(myTransactionMgr); 081 tt.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); 082 tt.execute(new TransactionCallbackWithoutResult() { 083 private void createParentsString(StringBuilder theParentsBuilder, Long theConceptPid) { 084 Validate.notNull(theConceptPid, "theConceptPid must not be null"); 085 List<Long> parents = myChildToParentPidCache.get(theConceptPid); 086 if (parents.contains(-1L)) { 087 return; 088 } else if (parents.isEmpty()) { 089 Collection<Long> parentLinks = myConceptParentChildLinkDao.findAllWithChild(theConceptPid); 090 if (parentLinks.isEmpty()) { 091 myChildToParentPidCache.put(theConceptPid, -1L); 092 ourLog.info( 093 "Found {} parent concepts of concept {} (cache has {})", 094 0, 095 theConceptPid, 096 myChildToParentPidCache.size()); 097 return; 098 } else { 099 for (Long next : parentLinks) { 100 myChildToParentPidCache.put(theConceptPid, next); 101 } 102 int parentCount = 103 myChildToParentPidCache.get(theConceptPid).size(); 104 ourLog.info( 105 "Found {} parent concepts of concept {} (cache has {})", 106 parentCount, 107 theConceptPid, 108 myChildToParentPidCache.size()); 109 } 110 } 111 112 for (Long nextParent : parents) { 113 if (theParentsBuilder.length() > 0) { 114 theParentsBuilder.append(' '); 115 } 116 theParentsBuilder.append(nextParent); 117 createParentsString(theParentsBuilder, nextParent); 118 } 119 } 120 121 @Override 122 protected void doInTransactionWithoutResult(TransactionStatus theArg0) { 123 int maxResult = 1000; 124 Page<TermConcept> concepts = 125 myConceptDao.findResourcesRequiringReindexing(PageRequest.of(0, maxResult)); 126 if (!concepts.hasContent()) { 127 if (myChildToParentPidCache != null) { 128 ourLog.info("Clearing parent concept cache"); 129 myChildToParentPidCache = null; 130 } 131 return; 132 } 133 134 if (myChildToParentPidCache == null) { 135 myChildToParentPidCache = ArrayListMultimap.create(); 136 } 137 138 ourLog.info("Indexing {} / {} concepts", concepts.getContent().size(), concepts.getTotalElements()); 139 140 int count = 0; 141 StopWatch stopwatch = new StopWatch(); 142 143 for (TermConcept nextConcept : concepts) { 144 145 if (isBlank(nextConcept.getParentPidsAsString())) { 146 StringBuilder parentsBuilder = new StringBuilder(); 147 createParentsString(parentsBuilder, nextConcept.getId()); 148 nextConcept.setParentPids(parentsBuilder.toString()); 149 } 150 151 myTermConceptDaoSvc.saveConcept(nextConcept); 152 count++; 153 } 154 155 ourLog.info( 156 "Indexed {} / {} concepts in {}ms - Avg {}ms / resource", 157 count, 158 concepts.getContent().size(), 159 stopwatch.getMillis(), 160 stopwatch.getMillisPerOperation(count)); 161 } 162 }); 163 } 164 165 @Override 166 public void scheduleJobs(ISchedulerService theSchedulerService) { 167 // TODO KHS what does this mean? 168 // Register scheduled job to save deferred concepts 169 // In the future it would be great to make this a cluster-aware task somehow 170 ScheduledJobDefinition jobDefinition = new ScheduledJobDefinition(); 171 jobDefinition.setId(this.getClass().getName()); 172 jobDefinition.setJobClass(Job.class); 173 theSchedulerService.scheduleLocalJob(DateUtils.MILLIS_PER_MINUTE, jobDefinition); 174 } 175 176 public static class Job implements HapiJob { 177 @Autowired 178 private ITermReindexingSvc myTermReindexingSvc; 179 180 @Override 181 public void execute(JobExecutionContext theContext) { 182 myTermReindexingSvc.processReindexing(); 183 } 184 } 185 186 /** 187 * This method is present only for unit tests, do not call from client code 188 */ 189 @VisibleForTesting 190 public static void setForceSaveDeferredAlwaysForUnitTest(boolean theForceSaveDeferredAlwaysForUnitTest) { 191 ourForceSaveDeferredAlwaysForUnitTest = theForceSaveDeferredAlwaysForUnitTest; 192 } 193}