001/*-
002 * #%L
003 * HAPI FHIR JPA Server
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.search.builder.sql;
021
022import ca.uhn.fhir.i18n.Msg;
023import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
024import ca.uhn.fhir.jpa.model.dao.JpaPid;
025import ca.uhn.fhir.jpa.search.builder.ISearchQueryExecutor;
026import ca.uhn.fhir.jpa.util.ScrollableResultsIterator;
027import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
028import ca.uhn.fhir.util.IoUtil;
029import jakarta.persistence.EntityManager;
030import jakarta.persistence.FlushModeType;
031import jakarta.persistence.PersistenceContext;
032import jakarta.persistence.PersistenceContextType;
033import jakarta.persistence.Query;
034import org.apache.commons.lang3.Validate;
035import org.hibernate.CacheMode;
036import org.hibernate.ScrollMode;
037import org.hibernate.ScrollableResults;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041import java.util.Arrays;
042import java.util.Objects;
043
044public class SearchQueryExecutor implements ISearchQueryExecutor {
045
046        private static final JpaPid NO_MORE = JpaPid.fromId(-1L);
047        private static final SearchQueryExecutor NO_VALUE_EXECUTOR = new SearchQueryExecutor();
048        private static final Object[] EMPTY_OBJECT_ARRAY = new Object[0];
049        private static final Logger ourLog = LoggerFactory.getLogger(SearchQueryExecutor.class);
050        private final GeneratedSql myGeneratedSql;
051
052        @PersistenceContext(type = PersistenceContextType.TRANSACTION)
053        private EntityManager myEntityManager;
054
055        private boolean myQueryInitialized;
056        private ScrollableResultsIterator<Object> myResultSet;
057        private JpaPid myNext;
058
059        /**
060         * Constructor
061         */
062        public SearchQueryExecutor(GeneratedSql theGeneratedSql, Integer theMaxResultsToFetch) {
063                Validate.notNull(theGeneratedSql, "theGeneratedSql must not be null");
064                myGeneratedSql = theGeneratedSql;
065                myQueryInitialized = false;
066        }
067
068        /**
069         * Internal constructor for empty executor
070         */
071        private SearchQueryExecutor() {
072                assert NO_MORE != null;
073
074                myGeneratedSql = null;
075                myNext = NO_MORE;
076        }
077
078        @Override
079        public void close() {
080                IoUtil.closeQuietly(myResultSet);
081        }
082
083        @Override
084        public boolean hasNext() {
085                fetchNext();
086                return !NO_MORE.equals(myNext);
087        }
088
089        @Override
090        public JpaPid next() {
091                fetchNext();
092                Validate.isTrue(hasNext(), "Can not call next() right now, no data remains");
093                JpaPid next = myNext;
094                myNext = null;
095                return next;
096        }
097
098        private void fetchNext() {
099                if (myNext == null) {
100                        String sql = myGeneratedSql.getSql();
101                        Object[] args = myGeneratedSql.getBindVariables().toArray(EMPTY_OBJECT_ARRAY);
102
103                        try {
104                                if (!myQueryInitialized) {
105
106                                        /*
107                                         * Note that we use the spring managed connection, and the expectation is that a transaction that
108                                         * is managed by Spring has been started before this method is called.
109                                         */
110                                        HapiTransactionService.requireTransaction();
111                                        ourLog.trace("About to execute SQL: {}. Parameters: {}", sql, Arrays.toString(args));
112
113                                        Query nativeQuery = myEntityManager.createNativeQuery(sql);
114                                        org.hibernate.query.Query<?> hibernateQuery = (org.hibernate.query.Query<?>) nativeQuery;
115                                        for (int i = 1; i <= args.length; i++) {
116                                                hibernateQuery.setParameter(i, args[i - 1]);
117                                        }
118
119                                        /*
120                                         * These settings help to ensure that we use a search cursor
121                                         * as opposed to loading all search results into memory
122                                         */
123                                        hibernateQuery.setFetchSize(500000);
124                                        hibernateQuery.setCacheable(false);
125                                        hibernateQuery.setCacheMode(CacheMode.IGNORE);
126                                        hibernateQuery.setReadOnly(true);
127
128                                        // This tells hibernate not to flush when we call scroll(), but rather to wait until the transaction
129                                        // commits and
130                                        // only flush then.  We need to do this so that any exceptions that happen in the transaction happen
131                                        // when
132                                        // we try to commit the transaction, and not here.
133                                        // See the test called testTransaction_multiThreaded (in FhirResourceDaoR4ConcurrentWriteTest) which
134                                        // triggers
135                                        // the following exception if we don't set this flush mode:
136                                        // java.util.concurrent.ExecutionException:
137                                        // org.springframework.transaction.UnexpectedRollbackException: Transaction silently rolled back
138                                        // because it has been marked as rollback-only
139                                        hibernateQuery.setFlushMode(FlushModeType.COMMIT);
140                                        ScrollableResults scrollableResults = hibernateQuery.scroll(ScrollMode.FORWARD_ONLY);
141                                        myResultSet = new ScrollableResultsIterator<>(scrollableResults);
142                                        myQueryInitialized = true;
143                                }
144
145                                if (myResultSet == null || !myResultSet.hasNext()) {
146                                        myNext = NO_MORE;
147                                } else {
148                                        myNext = getNextPid(myResultSet);
149                                }
150
151                        } catch (Exception e) {
152                                ourLog.error("Failed to create or execute SQL query", e);
153                                close();
154                                throw new InternalErrorException(Msg.code(1262) + e, e);
155                        }
156                }
157        }
158
159        private JpaPid getNextPid(ScrollableResultsIterator<Object> theResultSet) {
160                Object nextRow = Objects.requireNonNull(theResultSet.next());
161                // We should typically get two columns back, the first is the partition ID and the second
162                // is the resource ID. But if we're doing a count query, we'll get a single column in an array
163                // or maybe even just a single non array value depending on how the platform handles it.
164                if (nextRow instanceof Number) {
165                        return JpaPid.fromId(((Number) nextRow).longValue());
166                } else {
167                        Object[] nextRowAsArray = (Object[]) nextRow;
168                        if (nextRowAsArray.length == 1) {
169                                return JpaPid.fromId((Long) nextRowAsArray[0]);
170                        } else {
171                                int i;
172                                // TODO MB add a strategy object to GeneratedSql to describe the result set.
173                                // or make SQE generic
174                                // Comment to reviewer: this will be cleaner with the next
175                                // merge from ja_20240718_pk_schema_selector
176
177                                // We have some cases to distinguish:
178                                // - res_id
179                                // - count
180                                // - partition_id, res_id
181                                // - res_id, coord-dist
182                                // - partition_id, res_id, coord-dist
183                                // Assume res_id is first Long in row, and is in first two columns
184                                if (nextRowAsArray[0] instanceof Long) {
185                                        return JpaPid.fromId((Long) nextRowAsArray[0]);
186                                } else {
187                                        Integer partitionId = (Integer) nextRowAsArray[0];
188                                        Long pid = (Long) nextRowAsArray[1];
189                                        return JpaPid.fromId(pid, partitionId);
190                                }
191                        }
192                }
193        }
194
195        public static SearchQueryExecutor emptyExecutor() {
196                return NO_VALUE_EXECUTOR;
197        }
198}