
001/*- 002 * #%L 003 * HAPI FHIR Storage api 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.dao; 021 022import ca.uhn.fhir.context.BaseRuntimeChildDefinition; 023import ca.uhn.fhir.context.FhirContext; 024import ca.uhn.fhir.context.RuntimeResourceDefinition; 025import ca.uhn.fhir.i18n.Msg; 026import ca.uhn.fhir.interceptor.api.HookParams; 027import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; 028import ca.uhn.fhir.interceptor.api.Pointcut; 029import ca.uhn.fhir.model.valueset.BundleTypeEnum; 030import ca.uhn.fhir.rest.api.server.RequestDetails; 031import ca.uhn.fhir.rest.api.server.storage.TransactionDetails; 032import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 033import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails; 034import ca.uhn.fhir.util.BundleUtil; 035import ca.uhn.fhir.util.FhirTerser; 036import ca.uhn.fhir.util.ResourceReferenceInfo; 037import jakarta.annotation.Nonnull; 038import org.apache.commons.lang3.Validate; 039import org.hl7.fhir.instance.model.api.IBase; 040import org.hl7.fhir.instance.model.api.IBaseBundle; 041import org.hl7.fhir.instance.model.api.IBaseResource; 042import org.hl7.fhir.instance.model.api.IIdType; 043 044import java.util.ArrayList; 045import java.util.HashMap; 046import java.util.IdentityHashMap; 047import java.util.List; 048import java.util.Map; 049 050/** 051 * This class invokes the {@link Pointcut#STORAGE_TRANSACTION_PRE_PARTITION} hook to slice a transaction request 052 * bundle into partitions. It then executes each partition as a separate transaction and aggregates the results 053 * into a single cohesive response bundle. The response bundle will have entries which correspond to the original 054 * request bundle entries. 055 * <p> 056 * Note that this does break the FHIR transaction semantics, since it is possible for one partition to succeed 057 * and then a later partition to fail. This should therefore only be used in cases where this is understood 058 * and desired. 059 */ 060@SuppressWarnings("ClassCanBeRecord") 061public class TransactionPartitionProcessor<BUNDLE extends IBaseBundle> { 062 063 public static final Object EMPTY_OBJECT = new Object(); 064 private final RequestDetails myRequestDetails; 065 private final boolean myNestedMode; 066 private final IInterceptorBroadcaster myInterceptorBroadcaster; 067 private final String myActionName; 068 private final FhirContext myFhirContext; 069 private final BaseTransactionProcessor myTransactionProcessor; 070 071 /** 072 * Constructor 073 */ 074 public TransactionPartitionProcessor( 075 BaseTransactionProcessor theTransactionProcessor, 076 FhirContext theFhirContext, 077 RequestDetails theRequestDetails, 078 boolean theNestedMode, 079 IInterceptorBroadcaster theCompositeBroadcaster, 080 String theActionName) { 081 myTransactionProcessor = theTransactionProcessor; 082 myFhirContext = theFhirContext; 083 myRequestDetails = theRequestDetails; 084 myNestedMode = theNestedMode; 085 myInterceptorBroadcaster = theCompositeBroadcaster; 086 myActionName = theActionName; 087 } 088 089 /** 090 * Invoke the {@link Pointcut#STORAGE_TRANSACTION_PRE_PARTITION} hook to slice the transaction request 091 * bundle into partitions, execute each slice, and aggregate the results. 092 */ 093 public BUNDLE execute(BUNDLE theRequest) { 094 095 // Stash a copy of the entries in the bundle before we call the hook, so we 096 // can check later that the hook didn't make any illegal changes 097 IdentityHashMap<IBase, Object> originalEntrySet = new IdentityHashMap<>(); 098 FhirTerser terser = myFhirContext.newTerser(); 099 List<IBase> originalEntries = terser.getValues(theRequest, "entry"); 100 for (IBase theOriginalEntry : originalEntries) { 101 originalEntrySet.put(theOriginalEntry, EMPTY_OBJECT); 102 } 103 104 // Invoke the hook method 105 HookParams hookParams = new HookParams() 106 .add(RequestDetails.class, myRequestDetails) 107 .addIfMatchesType(ServletRequestDetails.class, myRequestDetails) 108 .add(IBaseBundle.class, theRequest); 109 TransactionPrePartitionResponse partitionResponse = 110 (TransactionPrePartitionResponse) myInterceptorBroadcaster.callHooksAndReturnObject( 111 Pointcut.STORAGE_TRANSACTION_PRE_PARTITION, hookParams); 112 Validate.isTrue( 113 partitionResponse != null 114 && partitionResponse.getSplitBundles() != null 115 && !partitionResponse.getSplitBundles().isEmpty(), 116 "Hook for pointcut STORAGE_TRANSACTION_PRE_PARTITION did not return a value"); 117 List<IBaseBundle> partitionRequestBundles = partitionResponse.getSplitBundles(); 118 119 // Verify that the hook didn't modify the original bundle 120 List<IBase> originalEntriesAfter = terser.getValues(theRequest, "entry"); 121 Validate.isTrue( 122 originalEntries.equals(originalEntriesAfter), 123 "Interceptor for Pointcut %s must not make changes to the original bundle", 124 Pointcut.STORAGE_TRANSACTION_PRE_PARTITION); 125 126 // Verify that the output has only entries appropriate to the input 127 for (IBaseBundle outputBundle : partitionRequestBundles) { 128 for (IBase outputBundleEntry : terser.getValues(outputBundle, "entry")) { 129 Validate.isTrue( 130 originalEntrySet.remove(outputBundleEntry) != null, 131 "Interceptor for Pointcut %s must not return Bundles containing duplicates or entries which were not present in the original Bundle", 132 Pointcut.STORAGE_TRANSACTION_PRE_PARTITION); 133 } 134 } 135 Validate.isTrue( 136 originalEntrySet.isEmpty(), 137 "Interceptor for Pointcut %s must include all entries from the original Bundle in the partitioned Bundles", 138 Pointcut.STORAGE_TRANSACTION_PRE_PARTITION); 139 140 return processPartitionedBundles(theRequest, partitionRequestBundles); 141 } 142 143 @Nonnull 144 @SuppressWarnings("unchecked") 145 private BUNDLE processPartitionedBundles(BUNDLE theOriginalBundle, List<IBaseBundle> thePartitionedRequests) { 146 147 RuntimeResourceDefinition bundleDefinition = myFhirContext.getResourceDefinition("Bundle"); 148 BaseRuntimeChildDefinition bundleEntryChild = bundleDefinition.getChildByName("entry"); 149 FhirTerser terser = myFhirContext.newTerser(); 150 151 IdentityHashMap<IBase, Integer> originalEntryToIndex = new IdentityHashMap<>(); 152 List<IBase> originalEntries = bundleEntryChild.getAccessor().getValues(theOriginalBundle); 153 for (int i = 0; i < originalEntries.size(); i++) { 154 originalEntryToIndex.put(originalEntries.get(i), i); 155 } 156 157 List<Boolean> entryFoundInPartitions = createListOfNulls(originalEntries.size()); 158 159 List<IBase> responseEntries = createListOfNulls(originalEntries.size()); 160 161 List<IBaseBundle> partitionedRequests = thePartitionedRequests.stream() 162 .filter(t -> !bundleEntryChild.getAccessor().getValues(t).isEmpty()) 163 .toList(); 164 for (IBaseBundle singlePartitionRequest : partitionedRequests) { 165 for (IBase requestEntry : bundleEntryChild.getAccessor().getValues(singlePartitionRequest)) { 166 Integer originalEntryIndex = originalEntryToIndex.get(requestEntry); 167 Boolean previousValue = entryFoundInPartitions.set(originalEntryIndex, true); 168 if (previousValue != null) { 169 throw new InternalErrorException( 170 Msg.code(2816) + "Interceptor for pointcut " + Pointcut.STORAGE_TRANSACTION_PRE_PARTITION 171 + " produced multiple partitions for Bundle.entry[" + originalEntryIndex + "]"); 172 } 173 } 174 } 175 176 TransactionDetails transactionDetails = new TransactionDetails(); 177 Map<String, IIdType> idSubstitutions = new HashMap<>(); 178 for (IBaseBundle singlePartitionRequest : partitionedRequests) { 179 180 // Apply any placeholder ID substitutions from previous partition executions 181 for (IBaseResource resource : terser.getAllEmbeddedResources(singlePartitionRequest, true)) { 182 List<ResourceReferenceInfo> allRefs = terser.getAllResourceReferences(resource); 183 for (ResourceReferenceInfo reference : allRefs) { 184 IIdType refElement = reference.getResourceReference().getReferenceElement(); 185 IIdType substitution = idSubstitutions.get(refElement.getValue()); 186 if (substitution != null) { 187 reference.getResourceReference().setReference(substitution.getValue()); 188 } 189 } 190 } 191 192 IBaseBundle singlePartitionResponse = myTransactionProcessor.processTransactionAsSubRequest( 193 myRequestDetails, transactionDetails, singlePartitionRequest, myActionName, myNestedMode); 194 195 // Capture any placeholder ID substitutions from this partition 196 TransactionUtil.TransactionResponse singlePartitionResponseParsed = 197 TransactionUtil.parseTransactionResponse( 198 myFhirContext, singlePartitionRequest, singlePartitionResponse); 199 for (TransactionUtil.StorageOutcome outcome : singlePartitionResponseParsed.getStorageOutcomes()) { 200 if (outcome.getSourceId() != null && outcome.getSourceId().isUuid()) { 201 idSubstitutions.put(outcome.getSourceId().getValue(), outcome.getTargetId()); 202 } 203 } 204 205 List<IBase> partitionRequestEntries = bundleEntryChild.getAccessor().getValues(singlePartitionRequest); 206 List<IBase> partitionResponseEntries = 207 bundleEntryChild.getAccessor().getValues(singlePartitionResponse); 208 Validate.isTrue( 209 partitionRequestEntries.size() == partitionResponseEntries.size(), 210 "Partitioned request and response bundles have different number of entries"); 211 212 for (int i = 0; i < partitionRequestEntries.size(); i++) { 213 IBase partitionRequestEntry = partitionRequestEntries.get(i); 214 IBase partitionResponseEntry = partitionResponseEntries.get(i); 215 Integer originalIndex = originalEntryToIndex.get(partitionRequestEntry); 216 if (originalIndex != null) { 217 responseEntries.set(originalIndex, partitionResponseEntry); 218 } 219 } 220 } 221 222 BUNDLE response = (BUNDLE) bundleDefinition.newInstance(); 223 BundleUtil.setBundleType(myFhirContext, response, BundleTypeEnum.TRANSACTION_RESPONSE.getCode()); 224 for (IBase responseEntry : responseEntries) { 225 bundleEntryChild.getMutator().addValue(response, responseEntry); 226 } 227 return response; 228 } 229 230 @Nonnull 231 private static <T> List<T> createListOfNulls(int theSize) { 232 List<T> responseEntries = new ArrayList<>(theSize); 233 while (responseEntries.size() < theSize) { 234 responseEntries.add(null); 235 } 236 return responseEntries; 237 } 238}