001/*-
002 * #%L
003 * HAPI FHIR Storage api
004 * %%
005 * Copyright (C) 2014 - 2025 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.dao;
021
022import ca.uhn.fhir.context.BaseRuntimeChildDefinition;
023import ca.uhn.fhir.context.FhirContext;
024import ca.uhn.fhir.context.RuntimeResourceDefinition;
025import ca.uhn.fhir.i18n.Msg;
026import ca.uhn.fhir.interceptor.api.HookParams;
027import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
028import ca.uhn.fhir.interceptor.api.Pointcut;
029import ca.uhn.fhir.model.valueset.BundleTypeEnum;
030import ca.uhn.fhir.rest.api.server.RequestDetails;
031import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
032import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
033import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
034import ca.uhn.fhir.util.BundleUtil;
035import ca.uhn.fhir.util.FhirTerser;
036import ca.uhn.fhir.util.ResourceReferenceInfo;
037import jakarta.annotation.Nonnull;
038import org.apache.commons.lang3.Validate;
039import org.hl7.fhir.instance.model.api.IBase;
040import org.hl7.fhir.instance.model.api.IBaseBundle;
041import org.hl7.fhir.instance.model.api.IBaseResource;
042import org.hl7.fhir.instance.model.api.IIdType;
043
044import java.util.ArrayList;
045import java.util.HashMap;
046import java.util.IdentityHashMap;
047import java.util.List;
048import java.util.Map;
049
050/**
051 * This class invokes the {@link Pointcut#STORAGE_TRANSACTION_PRE_PARTITION} hook to slice a transaction request
052 * bundle into partitions. It then executes each partition as a separate transaction and aggregates the results
053 * into a single cohesive response bundle. The response bundle will have entries which correspond to the original
054 * request bundle entries.
055 * <p>
056 * Note that this does break the FHIR transaction semantics, since it is possible for one partition to succeed
057 * and then a later partition to fail. This should therefore only be used in cases where this is understood
058 * and desired.
059 */
060@SuppressWarnings("ClassCanBeRecord")
061public class TransactionPartitionProcessor<BUNDLE extends IBaseBundle> {
062
063        public static final Object EMPTY_OBJECT = new Object();
064        private final RequestDetails myRequestDetails;
065        private final boolean myNestedMode;
066        private final IInterceptorBroadcaster myInterceptorBroadcaster;
067        private final String myActionName;
068        private final FhirContext myFhirContext;
069        private final BaseTransactionProcessor myTransactionProcessor;
070
071        /**
072         * Constructor
073         */
074        public TransactionPartitionProcessor(
075                        BaseTransactionProcessor theTransactionProcessor,
076                        FhirContext theFhirContext,
077                        RequestDetails theRequestDetails,
078                        boolean theNestedMode,
079                        IInterceptorBroadcaster theCompositeBroadcaster,
080                        String theActionName) {
081                myTransactionProcessor = theTransactionProcessor;
082                myFhirContext = theFhirContext;
083                myRequestDetails = theRequestDetails;
084                myNestedMode = theNestedMode;
085                myInterceptorBroadcaster = theCompositeBroadcaster;
086                myActionName = theActionName;
087        }
088
089        /**
090         * Invoke the {@link Pointcut#STORAGE_TRANSACTION_PRE_PARTITION} hook to slice the transaction request
091         * bundle into partitions, execute each slice, and aggregate the results.
092         */
093        public BUNDLE execute(BUNDLE theRequest) {
094
095                // Stash a copy of the entries in the bundle before we call the hook, so we
096                // can check later that the hook didn't make any illegal changes
097                IdentityHashMap<IBase, Object> originalEntrySet = new IdentityHashMap<>();
098                FhirTerser terser = myFhirContext.newTerser();
099                List<IBase> originalEntries = terser.getValues(theRequest, "entry");
100                for (IBase theOriginalEntry : originalEntries) {
101                        originalEntrySet.put(theOriginalEntry, EMPTY_OBJECT);
102                }
103
104                // Invoke the hook method
105                HookParams hookParams = new HookParams()
106                                .add(RequestDetails.class, myRequestDetails)
107                                .addIfMatchesType(ServletRequestDetails.class, myRequestDetails)
108                                .add(IBaseBundle.class, theRequest);
109                TransactionPrePartitionResponse partitionResponse =
110                                (TransactionPrePartitionResponse) myInterceptorBroadcaster.callHooksAndReturnObject(
111                                                Pointcut.STORAGE_TRANSACTION_PRE_PARTITION, hookParams);
112                Validate.isTrue(
113                                partitionResponse != null
114                                                && partitionResponse.getSplitBundles() != null
115                                                && !partitionResponse.getSplitBundles().isEmpty(),
116                                "Hook for pointcut STORAGE_TRANSACTION_PRE_PARTITION did not return a value");
117                List<IBaseBundle> partitionRequestBundles = partitionResponse.getSplitBundles();
118
119                // Verify that the hook didn't modify the original bundle
120                List<IBase> originalEntriesAfter = terser.getValues(theRequest, "entry");
121                Validate.isTrue(
122                                originalEntries.equals(originalEntriesAfter),
123                                "Interceptor for Pointcut %s must not make changes to the original bundle",
124                                Pointcut.STORAGE_TRANSACTION_PRE_PARTITION);
125
126                // Verify that the output has only entries appropriate to the input
127                for (IBaseBundle outputBundle : partitionRequestBundles) {
128                        for (IBase outputBundleEntry : terser.getValues(outputBundle, "entry")) {
129                                Validate.isTrue(
130                                                originalEntrySet.remove(outputBundleEntry) != null,
131                                                "Interceptor for Pointcut %s must not return Bundles containing duplicates or entries which were not present in the original Bundle",
132                                                Pointcut.STORAGE_TRANSACTION_PRE_PARTITION);
133                        }
134                }
135                Validate.isTrue(
136                                originalEntrySet.isEmpty(),
137                                "Interceptor for Pointcut %s must include all entries from the original Bundle in the partitioned Bundles",
138                                Pointcut.STORAGE_TRANSACTION_PRE_PARTITION);
139
140                return processPartitionedBundles(theRequest, partitionRequestBundles);
141        }
142
143        @Nonnull
144        @SuppressWarnings("unchecked")
145        private BUNDLE processPartitionedBundles(BUNDLE theOriginalBundle, List<IBaseBundle> thePartitionedRequests) {
146
147                RuntimeResourceDefinition bundleDefinition = myFhirContext.getResourceDefinition("Bundle");
148                BaseRuntimeChildDefinition bundleEntryChild = bundleDefinition.getChildByName("entry");
149                FhirTerser terser = myFhirContext.newTerser();
150
151                IdentityHashMap<IBase, Integer> originalEntryToIndex = new IdentityHashMap<>();
152                List<IBase> originalEntries = bundleEntryChild.getAccessor().getValues(theOriginalBundle);
153                for (int i = 0; i < originalEntries.size(); i++) {
154                        originalEntryToIndex.put(originalEntries.get(i), i);
155                }
156
157                List<Boolean> entryFoundInPartitions = createListOfNulls(originalEntries.size());
158
159                List<IBase> responseEntries = createListOfNulls(originalEntries.size());
160
161                List<IBaseBundle> partitionedRequests = thePartitionedRequests.stream()
162                                .filter(t -> !bundleEntryChild.getAccessor().getValues(t).isEmpty())
163                                .toList();
164                for (IBaseBundle singlePartitionRequest : partitionedRequests) {
165                        for (IBase requestEntry : bundleEntryChild.getAccessor().getValues(singlePartitionRequest)) {
166                                Integer originalEntryIndex = originalEntryToIndex.get(requestEntry);
167                                Boolean previousValue = entryFoundInPartitions.set(originalEntryIndex, true);
168                                if (previousValue != null) {
169                                        throw new InternalErrorException(
170                                                        Msg.code(2816) + "Interceptor for pointcut " + Pointcut.STORAGE_TRANSACTION_PRE_PARTITION
171                                                                        + " produced multiple partitions for Bundle.entry[" + originalEntryIndex + "]");
172                                }
173                        }
174                }
175
176                Map<String, IIdType> idSubstitutions = new HashMap<>();
177                for (IBaseBundle singlePartitionRequest : partitionedRequests) {
178
179                        /*
180                         * We create a new TransactionDetails for each sub-transaction. It's tempting to
181                         * reuse this across sub-transactions because we can leverage pre-resolved IDs
182                         * and that kind of thing, but there is other state in there that shouldn't be
183                         * preserved, such as tag definitions and rollback items
184                         */
185                        TransactionDetails transactionDetails = new TransactionDetails();
186
187                        // Apply any placeholder ID substitutions from previous partition executions
188                        for (IBaseResource resource : terser.getAllEmbeddedResources(singlePartitionRequest, true)) {
189                                List<ResourceReferenceInfo> allRefs = terser.getAllResourceReferences(resource);
190                                for (ResourceReferenceInfo reference : allRefs) {
191                                        IIdType refElement = reference.getResourceReference().getReferenceElement();
192                                        IIdType substitution = idSubstitutions.get(refElement.getValue());
193                                        if (substitution != null) {
194                                                reference.getResourceReference().setReference(substitution.getValue());
195                                        }
196                                }
197                        }
198
199                        IBaseBundle singlePartitionResponse = myTransactionProcessor.processTransactionAsSubRequest(
200                                        myRequestDetails, transactionDetails, singlePartitionRequest, myActionName, myNestedMode);
201
202                        // Capture any placeholder ID substitutions from this partition
203                        TransactionUtil.TransactionResponse singlePartitionResponseParsed =
204                                        TransactionUtil.parseTransactionResponse(
205                                                        myFhirContext, singlePartitionRequest, singlePartitionResponse);
206                        for (TransactionUtil.StorageOutcome outcome : singlePartitionResponseParsed.getStorageOutcomes()) {
207                                if (outcome.getSourceId() != null && outcome.getSourceId().isUuid()) {
208                                        idSubstitutions.put(outcome.getSourceId().getValue(), outcome.getTargetId());
209                                }
210                        }
211
212                        List<IBase> partitionRequestEntries = bundleEntryChild.getAccessor().getValues(singlePartitionRequest);
213                        List<IBase> partitionResponseEntries =
214                                        bundleEntryChild.getAccessor().getValues(singlePartitionResponse);
215                        Validate.isTrue(
216                                        partitionRequestEntries.size() == partitionResponseEntries.size(),
217                                        "Partitioned request and response bundles have different number of entries");
218
219                        for (int i = 0; i < partitionRequestEntries.size(); i++) {
220                                IBase partitionRequestEntry = partitionRequestEntries.get(i);
221                                IBase partitionResponseEntry = partitionResponseEntries.get(i);
222                                Integer originalIndex = originalEntryToIndex.get(partitionRequestEntry);
223                                if (originalIndex != null) {
224                                        responseEntries.set(originalIndex, partitionResponseEntry);
225                                }
226                        }
227                }
228
229                BUNDLE response = (BUNDLE) bundleDefinition.newInstance();
230                BundleUtil.setBundleType(myFhirContext, response, BundleTypeEnum.TRANSACTION_RESPONSE.getCode());
231                for (IBase responseEntry : responseEntries) {
232                        bundleEntryChild.getMutator().addValue(response, responseEntry);
233                }
234                return response;
235        }
236
237        @Nonnull
238        private static <T> List<T> createListOfNulls(int theSize) {
239                List<T> responseEntries = new ArrayList<>(theSize);
240                while (responseEntries.size() < theSize) {
241                        responseEntries.add(null);
242                }
243                return responseEntries;
244        }
245}