001/*-
002 * #%L
003 * HAPI FHIR Storage api
004 * %%
005 * Copyright (C) 2014 - 2025 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.dao;
021
022import ca.uhn.fhir.context.BaseRuntimeChildDefinition;
023import ca.uhn.fhir.context.FhirContext;
024import ca.uhn.fhir.context.RuntimeResourceDefinition;
025import ca.uhn.fhir.i18n.Msg;
026import ca.uhn.fhir.interceptor.api.HookParams;
027import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
028import ca.uhn.fhir.interceptor.api.Pointcut;
029import ca.uhn.fhir.model.valueset.BundleTypeEnum;
030import ca.uhn.fhir.rest.api.server.RequestDetails;
031import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
032import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
033import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
034import ca.uhn.fhir.util.BundleUtil;
035import ca.uhn.fhir.util.FhirTerser;
036import ca.uhn.fhir.util.ResourceReferenceInfo;
037import jakarta.annotation.Nonnull;
038import org.apache.commons.lang3.Validate;
039import org.hl7.fhir.instance.model.api.IBase;
040import org.hl7.fhir.instance.model.api.IBaseBundle;
041import org.hl7.fhir.instance.model.api.IBaseResource;
042import org.hl7.fhir.instance.model.api.IIdType;
043
044import java.util.ArrayList;
045import java.util.HashMap;
046import java.util.IdentityHashMap;
047import java.util.List;
048import java.util.Map;
049
050/**
051 * This class invokes the {@link Pointcut#STORAGE_TRANSACTION_PRE_PARTITION} hook to slice a transaction request
052 * bundle into partitions. It then executes each partition as a separate transaction and aggregates the results
053 * into a single cohesive response bundle. The response bundle will have entries which correspond to the original
054 * request bundle entries.
055 * <p>
056 * Note that this does break the FHIR transaction semantics, since it is possible for one partition to succeed
057 * and then a later partition to fail. This should therefore only be used in cases where this is understood
058 * and desired.
059 */
060@SuppressWarnings("ClassCanBeRecord")
061public class TransactionPartitionProcessor<BUNDLE extends IBaseBundle> {
062
063        public static final Object EMPTY_OBJECT = new Object();
064        private final RequestDetails myRequestDetails;
065        private final boolean myNestedMode;
066        private final IInterceptorBroadcaster myInterceptorBroadcaster;
067        private final String myActionName;
068        private final FhirContext myFhirContext;
069        private final BaseTransactionProcessor myTransactionProcessor;
070
071        /**
072         * Constructor
073         */
074        public TransactionPartitionProcessor(
075                        BaseTransactionProcessor theTransactionProcessor,
076                        FhirContext theFhirContext,
077                        RequestDetails theRequestDetails,
078                        boolean theNestedMode,
079                        IInterceptorBroadcaster theCompositeBroadcaster,
080                        String theActionName) {
081                myTransactionProcessor = theTransactionProcessor;
082                myFhirContext = theFhirContext;
083                myRequestDetails = theRequestDetails;
084                myNestedMode = theNestedMode;
085                myInterceptorBroadcaster = theCompositeBroadcaster;
086                myActionName = theActionName;
087        }
088
089        /**
090         * Invoke the {@link Pointcut#STORAGE_TRANSACTION_PRE_PARTITION} hook to slice the transaction request
091         * bundle into partitions, execute each slice, and aggregate the results.
092         */
093        public BUNDLE execute(BUNDLE theRequest) {
094
095                // Stash a copy of the entries in the bundle before we call the hook, so we
096                // can check later that the hook didn't make any illegal changes
097                IdentityHashMap<IBase, Object> originalEntrySet = new IdentityHashMap<>();
098                FhirTerser terser = myFhirContext.newTerser();
099                List<IBase> originalEntries = terser.getValues(theRequest, "entry");
100                for (IBase theOriginalEntry : originalEntries) {
101                        originalEntrySet.put(theOriginalEntry, EMPTY_OBJECT);
102                }
103
104                // Invoke the hook method
105                HookParams hookParams = new HookParams()
106                                .add(RequestDetails.class, myRequestDetails)
107                                .addIfMatchesType(ServletRequestDetails.class, myRequestDetails)
108                                .add(IBaseBundle.class, theRequest);
109                TransactionPrePartitionResponse partitionResponse =
110                                (TransactionPrePartitionResponse) myInterceptorBroadcaster.callHooksAndReturnObject(
111                                                Pointcut.STORAGE_TRANSACTION_PRE_PARTITION, hookParams);
112                Validate.isTrue(
113                                partitionResponse != null
114                                                && partitionResponse.getSplitBundles() != null
115                                                && !partitionResponse.getSplitBundles().isEmpty(),
116                                "Hook for pointcut STORAGE_TRANSACTION_PRE_PARTITION did not return a value");
117                List<IBaseBundle> partitionRequestBundles = partitionResponse.getSplitBundles();
118
119                // Verify that the hook didn't modify the original bundle
120                List<IBase> originalEntriesAfter = terser.getValues(theRequest, "entry");
121                Validate.isTrue(
122                                originalEntries.equals(originalEntriesAfter),
123                                "Interceptor for Pointcut %s must not make changes to the original bundle",
124                                Pointcut.STORAGE_TRANSACTION_PRE_PARTITION);
125
126                // Verify that the output has only entries appropriate to the input
127                for (IBaseBundle outputBundle : partitionRequestBundles) {
128                        for (IBase outputBundleEntry : terser.getValues(outputBundle, "entry")) {
129                                Validate.isTrue(
130                                                originalEntrySet.remove(outputBundleEntry) != null,
131                                                "Interceptor for Pointcut %s must not return Bundles containing duplicates or entries which were not present in the original Bundle",
132                                                Pointcut.STORAGE_TRANSACTION_PRE_PARTITION);
133                        }
134                }
135                Validate.isTrue(
136                                originalEntrySet.isEmpty(),
137                                "Interceptor for Pointcut %s must include all entries from the original Bundle in the partitioned Bundles",
138                                Pointcut.STORAGE_TRANSACTION_PRE_PARTITION);
139
140                return processPartitionedBundles(theRequest, partitionRequestBundles);
141        }
142
143        @Nonnull
144        @SuppressWarnings("unchecked")
145        private BUNDLE processPartitionedBundles(BUNDLE theOriginalBundle, List<IBaseBundle> thePartitionedRequests) {
146
147                RuntimeResourceDefinition bundleDefinition = myFhirContext.getResourceDefinition("Bundle");
148                BaseRuntimeChildDefinition bundleEntryChild = bundleDefinition.getChildByName("entry");
149                FhirTerser terser = myFhirContext.newTerser();
150
151                IdentityHashMap<IBase, Integer> originalEntryToIndex = new IdentityHashMap<>();
152                List<IBase> originalEntries = bundleEntryChild.getAccessor().getValues(theOriginalBundle);
153                for (int i = 0; i < originalEntries.size(); i++) {
154                        originalEntryToIndex.put(originalEntries.get(i), i);
155                }
156
157                List<Boolean> entryFoundInPartitions = createListOfNulls(originalEntries.size());
158
159                List<IBase> responseEntries = createListOfNulls(originalEntries.size());
160
161                List<IBaseBundle> partitionedRequests = thePartitionedRequests.stream()
162                                .filter(t -> !bundleEntryChild.getAccessor().getValues(t).isEmpty())
163                                .toList();
164                for (IBaseBundle singlePartitionRequest : partitionedRequests) {
165                        for (IBase requestEntry : bundleEntryChild.getAccessor().getValues(singlePartitionRequest)) {
166                                Integer originalEntryIndex = originalEntryToIndex.get(requestEntry);
167                                Boolean previousValue = entryFoundInPartitions.set(originalEntryIndex, true);
168                                if (previousValue != null) {
169                                        throw new InternalErrorException(
170                                                        Msg.code(2816) + "Interceptor for pointcut " + Pointcut.STORAGE_TRANSACTION_PRE_PARTITION
171                                                                        + " produced multiple partitions for Bundle.entry[" + originalEntryIndex + "]");
172                                }
173                        }
174                }
175
176                TransactionDetails transactionDetails = new TransactionDetails();
177                Map<String, IIdType> idSubstitutions = new HashMap<>();
178                for (IBaseBundle singlePartitionRequest : partitionedRequests) {
179
180                        // Apply any placeholder ID substitutions from previous partition executions
181                        for (IBaseResource resource : terser.getAllEmbeddedResources(singlePartitionRequest, true)) {
182                                List<ResourceReferenceInfo> allRefs = terser.getAllResourceReferences(resource);
183                                for (ResourceReferenceInfo reference : allRefs) {
184                                        IIdType refElement = reference.getResourceReference().getReferenceElement();
185                                        IIdType substitution = idSubstitutions.get(refElement.getValue());
186                                        if (substitution != null) {
187                                                reference.getResourceReference().setReference(substitution.getValue());
188                                        }
189                                }
190                        }
191
192                        IBaseBundle singlePartitionResponse = myTransactionProcessor.processTransactionAsSubRequest(
193                                        myRequestDetails, transactionDetails, singlePartitionRequest, myActionName, myNestedMode);
194
195                        // Capture any placeholder ID substitutions from this partition
196                        TransactionUtil.TransactionResponse singlePartitionResponseParsed =
197                                        TransactionUtil.parseTransactionResponse(
198                                                        myFhirContext, singlePartitionRequest, singlePartitionResponse);
199                        for (TransactionUtil.StorageOutcome outcome : singlePartitionResponseParsed.getStorageOutcomes()) {
200                                if (outcome.getSourceId() != null && outcome.getSourceId().isUuid()) {
201                                        idSubstitutions.put(outcome.getSourceId().getValue(), outcome.getTargetId());
202                                }
203                        }
204
205                        List<IBase> partitionRequestEntries = bundleEntryChild.getAccessor().getValues(singlePartitionRequest);
206                        List<IBase> partitionResponseEntries =
207                                        bundleEntryChild.getAccessor().getValues(singlePartitionResponse);
208                        Validate.isTrue(
209                                        partitionRequestEntries.size() == partitionResponseEntries.size(),
210                                        "Partitioned request and response bundles have different number of entries");
211
212                        for (int i = 0; i < partitionRequestEntries.size(); i++) {
213                                IBase partitionRequestEntry = partitionRequestEntries.get(i);
214                                IBase partitionResponseEntry = partitionResponseEntries.get(i);
215                                Integer originalIndex = originalEntryToIndex.get(partitionRequestEntry);
216                                if (originalIndex != null) {
217                                        responseEntries.set(originalIndex, partitionResponseEntry);
218                                }
219                        }
220                }
221
222                BUNDLE response = (BUNDLE) bundleDefinition.newInstance();
223                BundleUtil.setBundleType(myFhirContext, response, BundleTypeEnum.TRANSACTION_RESPONSE.getCode());
224                for (IBase responseEntry : responseEntries) {
225                        bundleEntryChild.getMutator().addValue(response, responseEntry);
226                }
227                return response;
228        }
229
230        @Nonnull
231        private static <T> List<T> createListOfNulls(int theSize) {
232                List<T> responseEntries = new ArrayList<>(theSize);
233                while (responseEntries.size() < theSize) {
234                        responseEntries.add(null);
235                }
236                return responseEntries;
237        }
238}