
001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.bulk.export.svc; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.context.RuntimeResourceDefinition; 024import ca.uhn.fhir.context.RuntimeSearchParam; 025import ca.uhn.fhir.fhirpath.IFhirPath; 026import ca.uhn.fhir.i18n.Msg; 027import ca.uhn.fhir.interceptor.model.RequestPartitionId; 028import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; 029import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 030import ca.uhn.fhir.jpa.api.model.PersistentIdToForcedIdMap; 031import ca.uhn.fhir.jpa.api.svc.IIdHelperService; 032import ca.uhn.fhir.jpa.bulk.export.api.IBulkExportProcessor; 033import ca.uhn.fhir.jpa.bulk.export.model.ExportPIDIteratorParameters; 034import ca.uhn.fhir.jpa.dao.IResultIterator; 035import ca.uhn.fhir.jpa.dao.ISearchBuilder; 036import ca.uhn.fhir.jpa.dao.SearchBuilderFactory; 037import ca.uhn.fhir.jpa.dao.mdm.MdmExpansionCacheSvc; 038import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; 039import ca.uhn.fhir.jpa.entity.MdmLink; 040import ca.uhn.fhir.jpa.model.dao.JpaPid; 041import ca.uhn.fhir.jpa.model.search.SearchBuilderLoadIncludesParameters; 042import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails; 043import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 044import ca.uhn.fhir.mdm.api.MdmMatchResultEnum; 045import ca.uhn.fhir.mdm.dao.IMdmLinkDao; 046import ca.uhn.fhir.mdm.model.MdmPidTuple; 047import ca.uhn.fhir.model.api.Include; 048import ca.uhn.fhir.model.primitive.IdDt; 049import ca.uhn.fhir.rest.api.server.SystemRequestDetails; 050import ca.uhn.fhir.rest.api.server.bulk.BulkExportJobParameters; 051import ca.uhn.fhir.rest.param.HasOrListParam; 052import ca.uhn.fhir.rest.param.HasParam; 053import ca.uhn.fhir.rest.param.ReferenceOrListParam; 054import ca.uhn.fhir.rest.param.ReferenceParam; 055import ca.uhn.fhir.rest.server.util.ISearchParamRegistry; 056import ca.uhn.fhir.util.ExtensionUtil; 057import ca.uhn.fhir.util.HapiExtensions; 058import ca.uhn.fhir.util.Logs; 059import ca.uhn.fhir.util.SearchParameterUtil; 060import ca.uhn.fhir.util.TaskChunker; 061import jakarta.annotation.Nonnull; 062import jakarta.persistence.EntityManager; 063import org.apache.commons.lang3.StringUtils; 064import org.hl7.fhir.instance.model.api.IBaseExtension; 065import org.hl7.fhir.instance.model.api.IBaseReference; 066import org.hl7.fhir.instance.model.api.IBaseResource; 067import org.hl7.fhir.instance.model.api.IIdType; 068import org.slf4j.Logger; 069import org.slf4j.LoggerFactory; 070import org.springframework.beans.factory.annotation.Autowired; 071 072import java.io.IOException; 073import java.util.ArrayList; 074import java.util.HashMap; 075import java.util.HashSet; 076import java.util.Iterator; 077import java.util.LinkedHashSet; 078import java.util.List; 079import java.util.Map; 080import java.util.Optional; 081import java.util.Set; 082import java.util.stream.Collectors; 083 084import static ca.uhn.fhir.rest.api.Constants.PARAM_HAS; 085import static ca.uhn.fhir.rest.api.Constants.PARAM_ID; 086 087public class JpaBulkExportProcessor implements IBulkExportProcessor<JpaPid> { 088 private static final Logger ourLog = LoggerFactory.getLogger(JpaBulkExportProcessor.class); 089 090 public static final int QUERY_CHUNK_SIZE = 100; 091 public static final List<String> PATIENT_BULK_EXPORT_FORWARD_REFERENCE_RESOURCE_TYPES = 092 List.of("Practitioner", "Organization"); 093 094 @Autowired 095 private FhirContext myContext; 096 097 @Autowired 098 private BulkExportHelperService myBulkExportHelperSvc; 099 100 @Autowired 101 private JpaStorageSettings myStorageSettings; 102 103 @Autowired 104 private DaoRegistry myDaoRegistry; 105 106 @Autowired 107 protected SearchBuilderFactory<JpaPid> mySearchBuilderFactory; 108 109 @Autowired 110 private IIdHelperService<JpaPid> myIdHelperService; 111 112 @Autowired 113 protected IMdmLinkDao<JpaPid, MdmLink> myMdmLinkDao; 114 115 @Autowired 116 private MdmExpansionCacheSvc myMdmExpansionCacheSvc; 117 118 @Autowired 119 private EntityManager myEntityManager; 120 121 @Autowired 122 private IHapiTransactionService myHapiTransactionService; 123 124 @Autowired 125 private ISearchParamRegistry mySearchParamRegistry; 126 127 private IFhirPath myFhirPath; 128 129 @Override 130 public Iterator<JpaPid> getResourcePidIterator(ExportPIDIteratorParameters theParams) { 131 return myHapiTransactionService 132 .withSystemRequest() 133 .withRequestPartitionId(theParams.getPartitionIdOrAllPartitions()) 134 .readOnly() 135 .execute(() -> { 136 String resourceType = theParams.getResourceType(); 137 String jobId = theParams.getInstanceId(); 138 String chunkId = theParams.getChunkId(); 139 RuntimeResourceDefinition def = myContext.getResourceDefinition(resourceType); 140 141 LinkedHashSet<JpaPid> pids; 142 if (theParams.getExportStyle() == BulkExportJobParameters.ExportStyle.PATIENT) { 143 pids = getPidsForPatientStyleExport(theParams, resourceType, jobId, chunkId, def); 144 } else if (theParams.getExportStyle() == BulkExportJobParameters.ExportStyle.GROUP) { 145 pids = getPidsForGroupStyleExport(theParams, resourceType, def); 146 } else { 147 pids = getPidsForSystemStyleExport(theParams, jobId, chunkId, def); 148 } 149 150 ourLog.debug("Finished expanding resource pids to export, size is {}", pids.size()); 151 return pids.iterator(); 152 }); 153 } 154 155 @SuppressWarnings("unchecked") 156 private LinkedHashSet<JpaPid> getPidsForPatientStyleExport( 157 ExportPIDIteratorParameters theParams, 158 String resourceType, 159 String theJobId, 160 String theChunkId, 161 RuntimeResourceDefinition def) 162 throws IOException { 163 LinkedHashSet<JpaPid> pids = new LinkedHashSet<>(); 164 // Patient 165 if (myStorageSettings.getIndexMissingFields() == JpaStorageSettings.IndexEnabledEnum.DISABLED) { 166 String errorMessage = 167 "You attempted to start a Patient Bulk Export, but the system has `Index Missing Fields` disabled. It must be enabled for Patient Bulk Export"; 168 ourLog.error(errorMessage); 169 throw new IllegalStateException(Msg.code(797) + errorMessage); 170 } 171 172 Set<String> patientSearchParams = 173 SearchParameterUtil.getPatientSearchParamsForResourceType(myContext, theParams.getResourceType()); 174 175 for (String patientSearchParam : patientSearchParams) { 176 List<SearchParameterMap> maps = 177 myBulkExportHelperSvc.createSearchParameterMapsForResourceType(def, theParams, false); 178 for (SearchParameterMap map : maps) { 179 // Ensure users did not monkey with the patient compartment search parameter. 180 validateSearchParametersForPatient(map, theParams); 181 182 ISearchBuilder<JpaPid> searchBuilder = getSearchBuilderForResourceType(theParams.getResourceType()); 183 184 filterBySpecificPatient(theParams, resourceType, patientSearchParam, map); 185 186 SearchRuntimeDetails searchRuntime = new SearchRuntimeDetails(null, theJobId); 187 188 Logs.getBatchTroubleshootingLog() 189 .debug( 190 "Executing query for bulk export job[{}] chunk[{}]: {}", 191 theJobId, 192 theChunkId, 193 map.toNormalizedQueryString(myContext)); 194 195 try (IResultIterator<JpaPid> resultIterator = searchBuilder.createQuery( 196 map, searchRuntime, new SystemRequestDetails(), theParams.getPartitionIdOrAllPartitions())) { 197 int pidCount = 0; 198 while (resultIterator.hasNext()) { 199 if (pidCount % 10000 == 0) { 200 Logs.getBatchTroubleshootingLog() 201 .debug( 202 "Bulk export job[{}] chunk[{}] has loaded {} pids", 203 theJobId, 204 theChunkId, 205 pidCount); 206 } 207 pidCount++; 208 pids.add(resultIterator.next()); 209 } 210 } 211 } 212 } 213 return pids; 214 } 215 216 private void filterBySpecificPatient( 217 ExportPIDIteratorParameters theParams, 218 String resourceType, 219 String patientSearchParam, 220 SearchParameterMap map) { 221 if (resourceType.equalsIgnoreCase("Patient")) { 222 if (theParams.getPatientIds() != null) { 223 ReferenceOrListParam referenceOrListParam = makeReferenceOrListParam(theParams.getPatientIds()); 224 map.add(PARAM_ID, referenceOrListParam); 225 } 226 } else { 227 if (theParams.getPatientIds() != null) { 228 ReferenceOrListParam referenceOrListParam = makeReferenceOrListParam(theParams.getPatientIds()); 229 map.add(patientSearchParam, referenceOrListParam); 230 } else { 231 map.add(patientSearchParam, new ReferenceParam().setMissing(false)); 232 } 233 } 234 } 235 236 @Nonnull 237 private ReferenceOrListParam makeReferenceOrListParam(@Nonnull List<String> thePatientIds) { 238 final ReferenceOrListParam referenceOrListParam = new ReferenceOrListParam(); 239 thePatientIds.forEach(patientId -> referenceOrListParam.addOr(new ReferenceParam(patientId))); 240 return referenceOrListParam; 241 } 242 243 @SuppressWarnings("unchecked") 244 private LinkedHashSet<JpaPid> getPidsForSystemStyleExport( 245 ExportPIDIteratorParameters theParams, String theJobId, String theChunkId, RuntimeResourceDefinition theDef) 246 throws IOException { 247 LinkedHashSet<JpaPid> pids = new LinkedHashSet<>(); 248 // System 249 List<SearchParameterMap> maps = 250 myBulkExportHelperSvc.createSearchParameterMapsForResourceType(theDef, theParams, true); 251 ISearchBuilder<JpaPid> searchBuilder = getSearchBuilderForResourceType(theParams.getResourceType()); 252 253 for (SearchParameterMap map : maps) { 254 Logs.getBatchTroubleshootingLog() 255 .debug( 256 "Executing query for bulk export job[{}] chunk[{}]: {}", 257 theJobId, 258 theChunkId, 259 map.toNormalizedQueryString(myContext)); 260 261 // requires a transaction 262 try (IResultIterator<JpaPid> resultIterator = searchBuilder.createQuery( 263 map, new SearchRuntimeDetails(null, theJobId), null, theParams.getPartitionIdOrAllPartitions())) { 264 int pidCount = 0; 265 while (resultIterator.hasNext()) { 266 if (pidCount % 10000 == 0) { 267 Logs.getBatchTroubleshootingLog() 268 .debug( 269 "Bulk export job[{}] chunk[{}] has loaded {} pids", 270 theJobId, 271 theChunkId, 272 pidCount); 273 } 274 pidCount++; 275 pids.add(resultIterator.next()); 276 } 277 } 278 } 279 return pids; 280 } 281 282 private LinkedHashSet<JpaPid> getPidsForGroupStyleExport( 283 ExportPIDIteratorParameters theParams, String theResourceType, RuntimeResourceDefinition theDef) 284 throws IOException { 285 LinkedHashSet<JpaPid> pids; 286 287 if (theResourceType.equalsIgnoreCase("Patient")) { 288 ourLog.info("Expanding Patients of a Group Bulk Export."); 289 pids = getExpandedPatientList(theParams); 290 ourLog.info("Obtained {} PIDs", pids.size()); 291 } else if (theResourceType.equalsIgnoreCase("Group")) { 292 pids = getSingletonGroupList(theParams); 293 } else { 294 pids = getRelatedResourceTypePids(theParams, theDef); 295 } 296 return pids; 297 } 298 299 private LinkedHashSet<JpaPid> getRelatedResourceTypePids( 300 ExportPIDIteratorParameters theParams, RuntimeResourceDefinition theDef) throws IOException { 301 LinkedHashSet<JpaPid> pids = new LinkedHashSet<>(); 302 // Check if the patient compartment search parameter is active to enable export of this resource 303 RuntimeSearchParam activeSearchParam = 304 getActivePatientSearchParamForCurrentResourceType(theParams.getResourceType()); 305 if (activeSearchParam != null) { 306 // expand the group pid -> list of patients in that group (list of patient pids) 307 Set<JpaPid> expandedMemberResourceIds = expandAllPatientPidsFromGroup(theParams); 308 assert !expandedMemberResourceIds.isEmpty(); 309 Logs.getBatchTroubleshootingLog() 310 .debug("{} has been expanded to members:[{}]", theParams.getGroupId(), expandedMemberResourceIds); 311 312 // for each patient pid -> 313 // search for the target resources, with their correct patient references, chunked. 314 // The results will be jammed into myReadPids 315 TaskChunker.chunk(expandedMemberResourceIds, QUERY_CHUNK_SIZE, (idChunk) -> { 316 try { 317 queryResourceTypeWithReferencesToPatients(pids, idChunk, theParams, theDef); 318 } catch (IOException ex) { 319 // we will never see this; 320 // SearchBuilder#QueryIterator does not (nor can ever) throw 321 // an IOException... but Java requires the check, 322 // so we'll put a log here (just in the off chance) 323 ourLog.error("Couldn't close query iterator ", ex); 324 throw new RuntimeException(Msg.code(2346) + "Couldn't close query iterator", ex); 325 } 326 }); 327 } else { 328 ourLog.warn("No active patient compartment search parameter(s) for resource type " 329 + theParams.getResourceType()); 330 } 331 return pids; 332 } 333 334 private LinkedHashSet<JpaPid> getSingletonGroupList(ExportPIDIteratorParameters theParams) { 335 RequestPartitionId partitionId = theParams.getPartitionIdOrAllPartitions(); 336 IBaseResource group = myDaoRegistry 337 .getResourceDao("Group") 338 .read(new IdDt(theParams.getGroupId()), new SystemRequestDetails().setRequestPartitionId(partitionId)); 339 JpaPid pidOrNull = myIdHelperService.getPidOrNull(partitionId, group); 340 LinkedHashSet<JpaPid> pids = new LinkedHashSet<>(); 341 pids.add(pidOrNull); 342 return pids; 343 } 344 345 /** 346 * Get a ISearchBuilder for the given resource type. 347 */ 348 protected ISearchBuilder<JpaPid> getSearchBuilderForResourceType(String theResourceType) { 349 RuntimeResourceDefinition def = myContext.getResourceDefinition(theResourceType); 350 Class<? extends IBaseResource> typeClass = def.getImplementingClass(); 351 return mySearchBuilderFactory.newSearchBuilder(theResourceType, typeClass); 352 } 353 354 protected RuntimeSearchParam getPatientSearchParamForCurrentResourceType(String theResourceType) { 355 RuntimeSearchParam searchParam = null; 356 Optional<RuntimeSearchParam> onlyPatientSearchParamForResourceType = 357 SearchParameterUtil.getOnlyPatientSearchParamForResourceType(myContext, theResourceType); 358 if (onlyPatientSearchParamForResourceType.isPresent()) { 359 searchParam = onlyPatientSearchParamForResourceType.get(); 360 } 361 return searchParam; 362 } 363 364 @Override 365 public void expandMdmResources(List<IBaseResource> theResources) { 366 for (IBaseResource resource : theResources) { 367 if (!PATIENT_BULK_EXPORT_FORWARD_REFERENCE_RESOURCE_TYPES.contains(resource.fhirType())) { 368 annotateBackwardsReferences(resource); 369 } 370 } 371 } 372 373 /** 374 * For Patient 375 **/ 376 private RuntimeSearchParam validateSearchParametersForPatient( 377 SearchParameterMap expandedSpMap, ExportPIDIteratorParameters theParams) { 378 RuntimeSearchParam runtimeSearchParam = 379 getPatientSearchParamForCurrentResourceType(theParams.getResourceType()); 380 if (expandedSpMap.get(runtimeSearchParam.getName()) != null) { 381 throw new IllegalArgumentException(Msg.code(796) 382 + String.format( 383 "Patient Bulk Export manually modifies the Search Parameter called [%s], so you may not include this search parameter in your _typeFilter!", 384 runtimeSearchParam.getName())); 385 } 386 return runtimeSearchParam; 387 } 388 389 /** 390 * for group exports 391 **/ 392 private void validateSearchParametersForGroup(SearchParameterMap expandedSpMap, String theResourceType) { 393 // we only validate for certain types 394 if (!PATIENT_BULK_EXPORT_FORWARD_REFERENCE_RESOURCE_TYPES.contains(theResourceType)) { 395 RuntimeSearchParam runtimeSearchParam = getPatientSearchParamForCurrentResourceType(theResourceType); 396 if (expandedSpMap.get(runtimeSearchParam.getName()) != null) { 397 throw new IllegalArgumentException(Msg.code(792) 398 + String.format( 399 "Group Bulk Export manually modifies the Search Parameter called [%s], so you may not include this search parameter in your _typeFilter!", 400 runtimeSearchParam.getName())); 401 } 402 } 403 } 404 405 /** 406 * In case we are doing a Group Bulk Export and resourceType `Patient` is requested, we can just return the group members, 407 * possibly expanded by MDM, and don't have to go and fetch other resource DAOs. 408 */ 409 @SuppressWarnings("unchecked") 410 private LinkedHashSet<JpaPid> getExpandedPatientList(ExportPIDIteratorParameters theParameters) throws IOException { 411 List<JpaPid> members = getMembersFromGroupWithFilter(theParameters, true); 412 List<IIdType> ids = 413 members.stream().map(member -> new IdDt("Patient/" + member)).collect(Collectors.toList()); 414 ourLog.info("While extracting patients from a group, we found {} patients.", ids.size()); 415 ourLog.info("Found patients: {}", ids.stream().map(id -> id.getValue()).collect(Collectors.joining(", "))); 416 417 List<JpaPid> pidsOrThrowException = members; 418 LinkedHashSet<JpaPid> patientPidsToExport = new LinkedHashSet<>(pidsOrThrowException); 419 420 if (theParameters.isExpandMdm()) { 421 RequestPartitionId partitionId = theParameters.getPartitionIdOrAllPartitions(); 422 SystemRequestDetails srd = new SystemRequestDetails().setRequestPartitionId(partitionId); 423 IBaseResource group = myDaoRegistry.getResourceDao("Group").read(new IdDt(theParameters.getGroupId()), srd); 424 JpaPid pidOrNull = myIdHelperService.getPidOrNull(partitionId, group); 425 List<MdmPidTuple<JpaPid>> goldenPidSourcePidTuple = 426 myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH); 427 goldenPidSourcePidTuple.forEach(tuple -> { 428 patientPidsToExport.add(tuple.getGoldenPid()); 429 patientPidsToExport.add(tuple.getSourcePid()); 430 }); 431 populateMdmResourceCache(goldenPidSourcePidTuple); 432 } 433 return patientPidsToExport; 434 } 435 436 /** 437 * Given the parameters, find all members' patient references in the group with the typeFilter applied. 438 * 439 * @return A list of strings representing the Patient IDs of the members (e.g. ["P1", "P2", "P3"] 440 */ 441 @SuppressWarnings("unchecked") 442 private List<JpaPid> getMembersFromGroupWithFilter( 443 ExportPIDIteratorParameters theParameters, boolean theConsiderDateRange) throws IOException { 444 final List<SearchParameterMap> maps = makeSearchParameterMaps(theParameters, theConsiderDateRange); 445 final List<JpaPid> resPids = new ArrayList<>(); 446 for (SearchParameterMap map : maps) { 447 ISearchBuilder<JpaPid> searchBuilder = getSearchBuilderForResourceType("Patient"); 448 ourLog.debug( 449 "Searching for members of group {} with job instance {} with map {}", 450 theParameters.getGroupId(), 451 theParameters.getInstanceId(), 452 map); 453 try (IResultIterator<JpaPid> resultIterator = searchBuilder.createQuery( 454 map, 455 new SearchRuntimeDetails(null, theParameters.getInstanceId()), 456 null, 457 theParameters.getPartitionIdOrAllPartitions())) { 458 459 while (resultIterator.hasNext()) { 460 resPids.add(resultIterator.next()); 461 } 462 } 463 } 464 return resPids; 465 } 466 467 @Nonnull 468 private List<SearchParameterMap> makeSearchParameterMaps( 469 @Nonnull ExportPIDIteratorParameters theParameters, boolean theConsiderDateRange) { 470 final RuntimeResourceDefinition def = myContext.getResourceDefinition("Patient"); 471 final List<SearchParameterMap> maps = myBulkExportHelperSvc.createSearchParameterMapsForResourceType( 472 def, theParameters, theConsiderDateRange); 473 maps.forEach(map -> { 474 map.add(PARAM_HAS, makeGroupMemberHasOrListParam(theParameters.getGroupId())); 475 final List<String> patientIds = theParameters.getPatientIds(); 476 if (patientIds != null && !patientIds.isEmpty()) { 477 map.add(PARAM_ID, makeReferenceOrListParam(patientIds)); 478 } 479 }); 480 return maps; 481 } 482 483 @Nonnull 484 private HasOrListParam makeGroupMemberHasOrListParam(@Nonnull String theGroupId) { 485 final HasParam hasParam = new HasParam("Group", "member", "_id", theGroupId); 486 return new HasOrListParam().addOr(hasParam); 487 } 488 489 /** 490 * @param thePidTuples 491 */ 492 @SuppressWarnings({"unchecked", "rawtypes"}) 493 private void populateMdmResourceCache(List<MdmPidTuple<JpaPid>> thePidTuples) { 494 if (myMdmExpansionCacheSvc.hasBeenPopulated()) { 495 return; 496 } 497 // First, convert this zipped set of tuples to a map of 498 // { 499 // patient/gold-1 -> [patient/1, patient/2] 500 // patient/gold-2 -> [patient/3, patient/4] 501 // } 502 Map<JpaPid, Set<JpaPid>> goldenResourceToSourcePidMap = new HashMap<>(); 503 extract(thePidTuples, goldenResourceToSourcePidMap); 504 505 // Next, lets convert it to an inverted index for fast lookup 506 // { 507 // patient/1 -> patient/gold-1 508 // patient/2 -> patient/gold-1 509 // patient/3 -> patient/gold-2 510 // patient/4 -> patient/gold-2 511 // } 512 Map<String, String> sourceResourceIdToGoldenResourceIdMap = new HashMap<>(); 513 goldenResourceToSourcePidMap.forEach((key, value) -> { 514 String goldenResourceId = 515 myIdHelperService.translatePidIdToForcedIdWithCache(key).orElse(key.toString()); 516 PersistentIdToForcedIdMap pidsToForcedIds = myIdHelperService.translatePidsToForcedIds(value); 517 518 Set<String> sourceResourceIds = pidsToForcedIds.getResolvedResourceIds(); 519 520 sourceResourceIds.forEach( 521 sourceResourceId -> sourceResourceIdToGoldenResourceIdMap.put(sourceResourceId, goldenResourceId)); 522 }); 523 524 // Now that we have built our cached expansion, store it. 525 myMdmExpansionCacheSvc.setCacheContents(sourceResourceIdToGoldenResourceIdMap); 526 } 527 528 private void extract( 529 List<MdmPidTuple<JpaPid>> theGoldenPidTargetPidTuples, 530 Map<JpaPid, Set<JpaPid>> theGoldenResourceToSourcePidMap) { 531 for (MdmPidTuple<JpaPid> goldenPidTargetPidTuple : theGoldenPidTargetPidTuples) { 532 JpaPid goldenPid = goldenPidTargetPidTuple.getGoldenPid(); 533 JpaPid sourcePid = goldenPidTargetPidTuple.getSourcePid(); 534 theGoldenResourceToSourcePidMap 535 .computeIfAbsent(goldenPid, key -> new HashSet<>()) 536 .add(sourcePid); 537 } 538 } 539 540 // gets all the resources related to each patient provided in the list of thePatientPids 541 @SuppressWarnings("unchecked") 542 private void queryResourceTypeWithReferencesToPatients( 543 Set<JpaPid> theReadPids, 544 List<JpaPid> thePatientPids, 545 ExportPIDIteratorParameters theParams, 546 RuntimeResourceDefinition theDef) 547 throws IOException { 548 549 // Convert Resource Persistent IDs to actual client IDs. 550 Set<JpaPid> pidSet = new HashSet<>(thePatientPids); 551 Set<String> patientIds = myIdHelperService.translatePidsToFhirResourceIds(pidSet); 552 553 // Build SP map 554 // First, inject the _typeFilters and _since from the export job 555 List<SearchParameterMap> expandedSpMaps = 556 myBulkExportHelperSvc.createSearchParameterMapsForResourceType(theDef, theParams, true); 557 for (SearchParameterMap expandedSpMap : expandedSpMaps) { 558 559 // Since we are in a bulk job, we have to ensure the user didn't jam in a patient search param, since we 560 // need to manually set that. 561 validateSearchParametersForGroup(expandedSpMap, theParams.getResourceType()); 562 563 // Fetch and cache a search builder for this resource type 564 // filter by ResourceType 565 ISearchBuilder<JpaPid> searchBuilder = getSearchBuilderForResourceType(theParams.getResourceType()); 566 567 // Now, further filter the query with patient references defined by the chunk of IDs we have. 568 // filter by PatientIds 569 if (PATIENT_BULK_EXPORT_FORWARD_REFERENCE_RESOURCE_TYPES.contains(theParams.getResourceType())) { 570 filterSearchByHasParam(patientIds, expandedSpMap, theParams); 571 } else { 572 filterSearchByResourceIds(patientIds, expandedSpMap, theParams); 573 } 574 575 // Execute query and all found pids to our local iterator. 576 RequestPartitionId partitionId = theParams.getPartitionIdOrAllPartitions(); 577 try (IResultIterator<JpaPid> resultIterator = searchBuilder.createQuery( 578 expandedSpMap, new SearchRuntimeDetails(null, theParams.getInstanceId()), null, partitionId)) { 579 while (resultIterator.hasNext()) { 580 theReadPids.add(resultIterator.next()); 581 } 582 } 583 584 // Construct our Includes filter 585 // We use this to recursively fetch resources of interest 586 // (but should only request those the user has requested/can see) 587 Set<Include> includes = new HashSet<>(); 588 for (String resourceType : theParams.getRequestedResourceTypes()) { 589 includes.add(new Include(resourceType + ":*", true)); 590 } 591 592 SystemRequestDetails requestDetails = new SystemRequestDetails().setRequestPartitionId(partitionId); 593 SearchBuilderLoadIncludesParameters<JpaPid> loadIncludesParameters = 594 new SearchBuilderLoadIncludesParameters<>(); 595 loadIncludesParameters.setFhirContext(myContext); 596 loadIncludesParameters.setMatches(theReadPids); 597 loadIncludesParameters.setEntityManager(myEntityManager); 598 loadIncludesParameters.setRequestDetails(requestDetails); 599 loadIncludesParameters.setIncludeFilters(includes); 600 loadIncludesParameters.setReverseMode(false); 601 loadIncludesParameters.setLastUpdated(expandedSpMap.getLastUpdated()); 602 loadIncludesParameters.setSearchIdOrDescription(theParams.getInstanceId()); 603 loadIncludesParameters.setDesiredResourceTypes(theParams.getRequestedResourceTypes()); 604 Set<JpaPid> includeIds = searchBuilder.loadIncludes(loadIncludesParameters); 605 606 // gets rid of the Patient duplicates 607 theReadPids.addAll(includeIds.stream() 608 .filter((id) -> !id.getResourceType().equals("Patient")) 609 .collect(Collectors.toSet())); 610 } 611 } 612 613 private RuntimeSearchParam getActivePatientSearchParamForCurrentResourceType(String theResourceType) { 614 String activeSearchParamName = ""; 615 String resourceToCheck = theResourceType; 616 if (!PATIENT_BULK_EXPORT_FORWARD_REFERENCE_RESOURCE_TYPES.contains(theResourceType)) { 617 activeSearchParamName = 618 getPatientSearchParamForCurrentResourceType(theResourceType).getName(); 619 } else if ("Practitioner".equalsIgnoreCase(theResourceType)) { 620 resourceToCheck = "Patient"; 621 activeSearchParamName = "general-practitioner"; 622 } else if ("Organization".equalsIgnoreCase(theResourceType)) { 623 resourceToCheck = "Patient"; 624 activeSearchParamName = "organization"; 625 } 626 return mySearchParamRegistry.getActiveSearchParam( 627 resourceToCheck, activeSearchParamName, ISearchParamRegistry.SearchParamLookupContextEnum.SEARCH); 628 } 629 630 /** 631 * Must not be called for resources types listed in PATIENT_BULK_EXPORT_FORWARD_REFERENCE_RESOURCE_TYPES 632 * 633 * @param idChunk 634 * @param expandedSpMap 635 * @param theParams 636 */ 637 private void filterSearchByResourceIds( 638 Set<String> idChunk, SearchParameterMap expandedSpMap, ExportPIDIteratorParameters theParams) { 639 ReferenceOrListParam orList = new ReferenceOrListParam(); 640 idChunk.forEach(id -> orList.add(new ReferenceParam(id))); 641 RuntimeSearchParam patientSearchParamForCurrentResourceType = 642 getPatientSearchParamForCurrentResourceType(theParams.getResourceType()); 643 expandedSpMap.add(patientSearchParamForCurrentResourceType.getName(), orList); 644 } 645 646 /** 647 * @param idChunk 648 * @param expandedSpMap 649 */ 650 private void filterSearchByHasParam( 651 Set<String> idChunk, SearchParameterMap expandedSpMap, ExportPIDIteratorParameters theParams) { 652 HasOrListParam hasOrListParam = new HasOrListParam(); 653 idChunk.stream().forEach(id -> hasOrListParam.addOr(buildHasParam(id, theParams.getResourceType()))); 654 expandedSpMap.add("_has", hasOrListParam); 655 } 656 657 private HasParam buildHasParam(String theResourceId, String theResourceType) { 658 if ("Practitioner".equalsIgnoreCase(theResourceType)) { 659 return new HasParam("Patient", "general-practitioner", "_id", theResourceId); 660 } else if ("Organization".equalsIgnoreCase(theResourceType)) { 661 return new HasParam("Patient", "organization", "_id", theResourceId); 662 } else { 663 throw new IllegalArgumentException( 664 Msg.code(2077) + " We can't handle forward references onto type " + theResourceType); 665 } 666 } 667 668 /** 669 * Given the local myGroupId, perform an expansion to retrieve all resource IDs of member patients. 670 * if myMdmEnabled is set to true, we also reach out to the IMdmLinkDao to attempt to also expand it into matched 671 * patients. 672 * 673 * @return a Set of Strings representing the resource IDs of all members of a group. 674 */ 675 private Set<JpaPid> expandAllPatientPidsFromGroup(ExportPIDIteratorParameters theParams) throws IOException { 676 Set<JpaPid> expandedIds = new HashSet<>(); 677 RequestPartitionId partitionId = theParams.getPartitionIdOrAllPartitions(); 678 SystemRequestDetails requestDetails = new SystemRequestDetails().setRequestPartitionId(partitionId); 679 IBaseResource group = 680 myDaoRegistry.getResourceDao("Group").read(new IdDt(theParams.getGroupId()), requestDetails); 681 JpaPid pidOrNull = myIdHelperService.getPidOrNull(partitionId, group); 682 683 // Attempt to perform MDM Expansion of membership 684 if (theParams.isExpandMdm()) { 685 expandedIds.addAll(performMembershipExpansionViaMdmTable(pidOrNull)); 686 } 687 688 // Now manually add the members of the group (its possible even with mdm expansion that some members dont have 689 // MDM matches, 690 // so would be otherwise skipped 691 List<JpaPid> membersFromGroupWithFilter = getMembersFromGroupWithFilter(theParams, false); 692 ourLog.debug("Group with ID [{}] has been expanded to: {}", theParams.getGroupId(), membersFromGroupWithFilter); 693 expandedIds.addAll(membersFromGroupWithFilter); 694 695 return expandedIds; 696 } 697 698 @SuppressWarnings({"rawtypes", "unchecked"}) 699 private Set<JpaPid> performMembershipExpansionViaMdmTable(JpaPid pidOrNull) { 700 List<MdmPidTuple<JpaPid>> goldenPidTargetPidTuples = 701 myMdmLinkDao.expandPidsFromGroupPidGivenMatchResult(pidOrNull, MdmMatchResultEnum.MATCH); 702 // Now lets translate these pids into resource IDs 703 Set<JpaPid> uniquePids = new HashSet<>(); 704 goldenPidTargetPidTuples.forEach(tuple -> { 705 uniquePids.add(tuple.getGoldenPid()); 706 uniquePids.add(tuple.getSourcePid()); 707 }); 708 PersistentIdToForcedIdMap pidToForcedIdMap = myIdHelperService.translatePidsToForcedIds(uniquePids); 709 710 Map<JpaPid, Set<JpaPid>> goldenResourceToSourcePidMap = new HashMap<>(); 711 extract(goldenPidTargetPidTuples, goldenResourceToSourcePidMap); 712 populateMdmResourceCache(goldenPidTargetPidTuples); 713 714 return uniquePids; 715 } 716 717 /* Mdm Expansion */ 718 719 private RuntimeSearchParam getRuntimeSearchParam(IBaseResource theResource) { 720 Optional<RuntimeSearchParam> oPatientSearchParam = 721 SearchParameterUtil.getOnlyPatientSearchParamForResourceType(myContext, theResource.fhirType()); 722 if (!oPatientSearchParam.isPresent()) { 723 String errorMessage = String.format( 724 "[%s] has no search parameters that are for patients, so it is invalid for Group Bulk Export!", 725 theResource.fhirType()); 726 throw new IllegalArgumentException(Msg.code(2242) + errorMessage); 727 } else { 728 return oPatientSearchParam.get(); 729 } 730 } 731 732 private void annotateBackwardsReferences(IBaseResource iBaseResource) { 733 Optional<String> patientReference = getPatientReference(iBaseResource); 734 if (patientReference.isPresent()) { 735 addGoldenResourceExtension(iBaseResource, patientReference.get()); 736 } else { 737 ourLog.error( 738 "Failed to find the patient reference information for resource {}. This is a bug, " 739 + "as all resources which can be exported via Group Bulk Export must reference a patient.", 740 iBaseResource); 741 } 742 } 743 744 private Optional<String> getPatientReference(IBaseResource iBaseResource) { 745 String fhirPath; 746 747 RuntimeSearchParam runtimeSearchParam = getRuntimeSearchParam(iBaseResource); 748 fhirPath = getPatientFhirPath(runtimeSearchParam); 749 750 if (iBaseResource.fhirType().equalsIgnoreCase("Patient")) { 751 return Optional.of(iBaseResource.getIdElement().getIdPart()); 752 } else { 753 Optional<IBaseReference> optionalReference = 754 getFhirParser().evaluateFirst(iBaseResource, fhirPath, IBaseReference.class); 755 if (optionalReference.isPresent()) { 756 return optionalReference.map(theIBaseReference -> 757 theIBaseReference.getReferenceElement().getIdPart()); 758 } else { 759 return Optional.empty(); 760 } 761 } 762 } 763 764 private void addGoldenResourceExtension(IBaseResource iBaseResource, String sourceResourceId) { 765 String goldenResourceId = myMdmExpansionCacheSvc.getGoldenResourceId(sourceResourceId); 766 IBaseExtension<?, ?> extension = ExtensionUtil.getOrCreateExtension( 767 iBaseResource, HapiExtensions.ASSOCIATED_GOLDEN_RESOURCE_EXTENSION_URL); 768 if (!StringUtils.isBlank(goldenResourceId)) { 769 ExtensionUtil.setExtension(myContext, extension, "reference", prefixPatient(goldenResourceId)); 770 } 771 } 772 773 private String prefixPatient(String theResourceId) { 774 return "Patient/" + theResourceId; 775 } 776 777 private IFhirPath getFhirParser() { 778 if (myFhirPath == null) { 779 myFhirPath = myContext.newFhirPath(); 780 } 781 return myFhirPath; 782 } 783 784 private String getPatientFhirPath(RuntimeSearchParam theRuntimeParam) { 785 String path = theRuntimeParam.getPath(); 786 // GGG: Yes this is a stupid hack, but by default this runtime search param will return stuff like 787 // Observation.subject.where(resolve() is Patient) which unfortunately our FHIRpath evaluator doesn't play 788 // nicely with 789 // our FHIRPath evaluator. 790 if (path.contains(".where")) { 791 path = path.substring(0, path.indexOf(".where")); 792 } 793 return path; 794 } 795}