
001/*- 002 * #%L 003 * HAPI FHIR Storage api 004 * %% 005 * Copyright (C) 2014 - 2025 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.dao; 021 022import ca.uhn.fhir.context.BaseRuntimeChildDefinition; 023import ca.uhn.fhir.context.FhirContext; 024import ca.uhn.fhir.context.RuntimeResourceDefinition; 025import ca.uhn.fhir.i18n.Msg; 026import ca.uhn.fhir.interceptor.api.HookParams; 027import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; 028import ca.uhn.fhir.interceptor.api.Pointcut; 029import ca.uhn.fhir.model.valueset.BundleTypeEnum; 030import ca.uhn.fhir.rest.api.server.RequestDetails; 031import ca.uhn.fhir.rest.api.server.storage.TransactionDetails; 032import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 033import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails; 034import ca.uhn.fhir.util.BundleUtil; 035import ca.uhn.fhir.util.FhirTerser; 036import ca.uhn.fhir.util.ResourceReferenceInfo; 037import jakarta.annotation.Nonnull; 038import org.apache.commons.lang3.Validate; 039import org.hl7.fhir.instance.model.api.IBase; 040import org.hl7.fhir.instance.model.api.IBaseBundle; 041import org.hl7.fhir.instance.model.api.IBaseResource; 042import org.hl7.fhir.instance.model.api.IIdType; 043 044import java.util.ArrayList; 045import java.util.HashMap; 046import java.util.IdentityHashMap; 047import java.util.List; 048import java.util.Map; 049 050/** 051 * This class invokes the {@link Pointcut#STORAGE_TRANSACTION_PRE_PARTITION} hook to slice a transaction request 052 * bundle into partitions. It then executes each partition as a separate transaction and aggregates the results 053 * into a single cohesive response bundle. The response bundle will have entries which correspond to the original 054 * request bundle entries. 055 * <p> 056 * Note that this does break the FHIR transaction semantics, since it is possible for one partition to succeed 057 * and then a later partition to fail. This should therefore only be used in cases where this is understood 058 * and desired. 059 */ 060@SuppressWarnings("ClassCanBeRecord") 061public class TransactionPartitionProcessor<BUNDLE extends IBaseBundle> { 062 063 public static final Object EMPTY_OBJECT = new Object(); 064 private final RequestDetails myRequestDetails; 065 private final boolean myNestedMode; 066 private final IInterceptorBroadcaster myInterceptorBroadcaster; 067 private final String myActionName; 068 private final FhirContext myFhirContext; 069 private final BaseTransactionProcessor myTransactionProcessor; 070 071 /** 072 * Constructor 073 */ 074 public TransactionPartitionProcessor( 075 BaseTransactionProcessor theTransactionProcessor, 076 FhirContext theFhirContext, 077 RequestDetails theRequestDetails, 078 boolean theNestedMode, 079 IInterceptorBroadcaster theCompositeBroadcaster, 080 String theActionName) { 081 myTransactionProcessor = theTransactionProcessor; 082 myFhirContext = theFhirContext; 083 myRequestDetails = theRequestDetails; 084 myNestedMode = theNestedMode; 085 myInterceptorBroadcaster = theCompositeBroadcaster; 086 myActionName = theActionName; 087 } 088 089 /** 090 * Invoke the {@link Pointcut#STORAGE_TRANSACTION_PRE_PARTITION} hook to slice the transaction request 091 * bundle into partitions, execute each slice, and aggregate the results. 092 */ 093 public BUNDLE execute(BUNDLE theRequest) { 094 095 // Stash a copy of the entries in the bundle before we call the hook, so we 096 // can check later that the hook didn't make any illegal changes 097 IdentityHashMap<IBase, Object> originalEntrySet = new IdentityHashMap<>(); 098 FhirTerser terser = myFhirContext.newTerser(); 099 List<IBase> originalEntries = terser.getValues(theRequest, "entry"); 100 for (IBase theOriginalEntry : originalEntries) { 101 originalEntrySet.put(theOriginalEntry, EMPTY_OBJECT); 102 } 103 104 // Invoke the hook method 105 HookParams hookParams = new HookParams() 106 .add(RequestDetails.class, myRequestDetails) 107 .addIfMatchesType(ServletRequestDetails.class, myRequestDetails) 108 .add(IBaseBundle.class, theRequest); 109 TransactionPrePartitionResponse partitionResponse = 110 (TransactionPrePartitionResponse) myInterceptorBroadcaster.callHooksAndReturnObject( 111 Pointcut.STORAGE_TRANSACTION_PRE_PARTITION, hookParams); 112 Validate.isTrue( 113 partitionResponse != null 114 && partitionResponse.getSplitBundles() != null 115 && !partitionResponse.getSplitBundles().isEmpty(), 116 "Hook for pointcut STORAGE_TRANSACTION_PRE_PARTITION did not return a value"); 117 List<IBaseBundle> partitionRequestBundles = partitionResponse.getSplitBundles(); 118 119 // Verify that the hook didn't modify the original bundle 120 List<IBase> originalEntriesAfter = terser.getValues(theRequest, "entry"); 121 Validate.isTrue( 122 originalEntries.equals(originalEntriesAfter), 123 "Interceptor for Pointcut %s must not make changes to the original bundle", 124 Pointcut.STORAGE_TRANSACTION_PRE_PARTITION); 125 126 // Verify that the output has only entries appropriate to the input 127 for (IBaseBundle outputBundle : partitionRequestBundles) { 128 for (IBase outputBundleEntry : terser.getValues(outputBundle, "entry")) { 129 Validate.isTrue( 130 originalEntrySet.remove(outputBundleEntry) != null, 131 "Interceptor for Pointcut %s must not return Bundles containing duplicates or entries which were not present in the original Bundle", 132 Pointcut.STORAGE_TRANSACTION_PRE_PARTITION); 133 } 134 } 135 Validate.isTrue( 136 originalEntrySet.isEmpty(), 137 "Interceptor for Pointcut %s must include all entries from the original Bundle in the partitioned Bundles", 138 Pointcut.STORAGE_TRANSACTION_PRE_PARTITION); 139 140 return processPartitionedBundles(theRequest, partitionRequestBundles); 141 } 142 143 @Nonnull 144 @SuppressWarnings("unchecked") 145 private BUNDLE processPartitionedBundles(BUNDLE theOriginalBundle, List<IBaseBundle> thePartitionedRequests) { 146 147 RuntimeResourceDefinition bundleDefinition = myFhirContext.getResourceDefinition("Bundle"); 148 BaseRuntimeChildDefinition bundleEntryChild = bundleDefinition.getChildByName("entry"); 149 FhirTerser terser = myFhirContext.newTerser(); 150 151 IdentityHashMap<IBase, Integer> originalEntryToIndex = new IdentityHashMap<>(); 152 List<IBase> originalEntries = bundleEntryChild.getAccessor().getValues(theOriginalBundle); 153 for (int i = 0; i < originalEntries.size(); i++) { 154 originalEntryToIndex.put(originalEntries.get(i), i); 155 } 156 157 List<Boolean> entryFoundInPartitions = createListOfNulls(originalEntries.size()); 158 159 List<IBase> responseEntries = createListOfNulls(originalEntries.size()); 160 161 List<IBaseBundle> partitionedRequests = thePartitionedRequests.stream() 162 .filter(t -> !bundleEntryChild.getAccessor().getValues(t).isEmpty()) 163 .toList(); 164 for (IBaseBundle singlePartitionRequest : partitionedRequests) { 165 for (IBase requestEntry : bundleEntryChild.getAccessor().getValues(singlePartitionRequest)) { 166 Integer originalEntryIndex = originalEntryToIndex.get(requestEntry); 167 Boolean previousValue = entryFoundInPartitions.set(originalEntryIndex, true); 168 if (previousValue != null) { 169 throw new InternalErrorException( 170 Msg.code(2816) + "Interceptor for pointcut " + Pointcut.STORAGE_TRANSACTION_PRE_PARTITION 171 + " produced multiple partitions for Bundle.entry[" + originalEntryIndex + "]"); 172 } 173 } 174 } 175 176 Map<String, IIdType> idSubstitutions = new HashMap<>(); 177 for (IBaseBundle singlePartitionRequest : partitionedRequests) { 178 179 /* 180 * We create a new TransactionDetails for each sub-transaction. It's tempting to 181 * reuse this across sub-transactions because we can leverage pre-resolved IDs 182 * and that kind of thing, but there is other state in there that shouldn't be 183 * preserved, such as tag definitions and rollback items 184 */ 185 TransactionDetails transactionDetails = new TransactionDetails(); 186 187 // Apply any placeholder ID substitutions from previous partition executions 188 for (IBaseResource resource : terser.getAllEmbeddedResources(singlePartitionRequest, true)) { 189 List<ResourceReferenceInfo> allRefs = terser.getAllResourceReferences(resource); 190 for (ResourceReferenceInfo reference : allRefs) { 191 IIdType refElement = reference.getResourceReference().getReferenceElement(); 192 IIdType substitution = idSubstitutions.get(refElement.getValue()); 193 if (substitution != null) { 194 reference.getResourceReference().setReference(substitution.getValue()); 195 } 196 } 197 } 198 199 IBaseBundle singlePartitionResponse = myTransactionProcessor.processTransactionAsSubRequest( 200 myRequestDetails, transactionDetails, singlePartitionRequest, myActionName, myNestedMode); 201 202 // Capture any placeholder ID substitutions from this partition 203 TransactionUtil.TransactionResponse singlePartitionResponseParsed = 204 TransactionUtil.parseTransactionResponse( 205 myFhirContext, singlePartitionRequest, singlePartitionResponse); 206 for (TransactionUtil.StorageOutcome outcome : singlePartitionResponseParsed.getStorageOutcomes()) { 207 if (outcome.getSourceId() != null && outcome.getSourceId().isUuid()) { 208 idSubstitutions.put(outcome.getSourceId().getValue(), outcome.getTargetId()); 209 } 210 } 211 212 List<IBase> partitionRequestEntries = bundleEntryChild.getAccessor().getValues(singlePartitionRequest); 213 List<IBase> partitionResponseEntries = 214 bundleEntryChild.getAccessor().getValues(singlePartitionResponse); 215 Validate.isTrue( 216 partitionRequestEntries.size() == partitionResponseEntries.size(), 217 "Partitioned request and response bundles have different number of entries"); 218 219 for (int i = 0; i < partitionRequestEntries.size(); i++) { 220 IBase partitionRequestEntry = partitionRequestEntries.get(i); 221 IBase partitionResponseEntry = partitionResponseEntries.get(i); 222 Integer originalIndex = originalEntryToIndex.get(partitionRequestEntry); 223 if (originalIndex != null) { 224 responseEntries.set(originalIndex, partitionResponseEntry); 225 } 226 } 227 } 228 229 BUNDLE response = (BUNDLE) bundleDefinition.newInstance(); 230 BundleUtil.setBundleType(myFhirContext, response, BundleTypeEnum.TRANSACTION_RESPONSE.getCode()); 231 for (IBase responseEntry : responseEntries) { 232 bundleEntryChild.getMutator().addValue(response, responseEntry); 233 } 234 return response; 235 } 236 237 @Nonnull 238 private static <T> List<T> createListOfNulls(int theSize) { 239 List<T> responseEntries = new ArrayList<>(theSize); 240 while (responseEntries.size() < theSize) { 241 responseEntries.add(null); 242 } 243 return responseEntries; 244 } 245}