001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.search.builder.sql; 021 022import ca.uhn.fhir.i18n.Msg; 023import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; 024import ca.uhn.fhir.jpa.model.dao.JpaPid; 025import ca.uhn.fhir.jpa.search.builder.ISearchQueryExecutor; 026import ca.uhn.fhir.jpa.util.ScrollableResultsIterator; 027import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 028import ca.uhn.fhir.util.IoUtil; 029import jakarta.persistence.EntityManager; 030import jakarta.persistence.FlushModeType; 031import jakarta.persistence.PersistenceContext; 032import jakarta.persistence.PersistenceContextType; 033import jakarta.persistence.Query; 034import org.apache.commons.lang3.Validate; 035import org.hibernate.CacheMode; 036import org.hibernate.ScrollMode; 037import org.hibernate.ScrollableResults; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041import java.util.Arrays; 042import java.util.Objects; 043 044public class SearchQueryExecutor implements ISearchQueryExecutor { 045 046 private static final JpaPid NO_MORE = JpaPid.fromId(-1L); 047 private static final SearchQueryExecutor NO_VALUE_EXECUTOR = new SearchQueryExecutor(); 048 private static final Object[] EMPTY_OBJECT_ARRAY = new Object[0]; 049 private static final Logger ourLog = LoggerFactory.getLogger(SearchQueryExecutor.class); 050 private final GeneratedSql myGeneratedSql; 051 052 @PersistenceContext(type = PersistenceContextType.TRANSACTION) 053 private EntityManager myEntityManager; 054 055 private boolean myQueryInitialized; 056 private ScrollableResultsIterator<Object> myResultSet; 057 private JpaPid myNext; 058 059 /** 060 * Constructor 061 */ 062 public SearchQueryExecutor(GeneratedSql theGeneratedSql, Integer theMaxResultsToFetch) { 063 Validate.notNull(theGeneratedSql, "theGeneratedSql must not be null"); 064 myGeneratedSql = theGeneratedSql; 065 myQueryInitialized = false; 066 } 067 068 /** 069 * Internal constructor for empty executor 070 */ 071 private SearchQueryExecutor() { 072 assert NO_MORE != null; 073 074 myGeneratedSql = null; 075 myNext = NO_MORE; 076 } 077 078 @Override 079 public void close() { 080 IoUtil.closeQuietly(myResultSet); 081 } 082 083 @Override 084 public boolean hasNext() { 085 fetchNext(); 086 return !NO_MORE.equals(myNext); 087 } 088 089 @Override 090 public JpaPid next() { 091 fetchNext(); 092 Validate.isTrue(hasNext(), "Can not call next() right now, no data remains"); 093 JpaPid next = myNext; 094 myNext = null; 095 return next; 096 } 097 098 private void fetchNext() { 099 if (myNext == null) { 100 String sql = myGeneratedSql.getSql(); 101 Object[] args = myGeneratedSql.getBindVariables().toArray(EMPTY_OBJECT_ARRAY); 102 103 try { 104 if (!myQueryInitialized) { 105 106 /* 107 * Note that we use the spring managed connection, and the expectation is that a transaction that 108 * is managed by Spring has been started before this method is called. 109 */ 110 HapiTransactionService.requireTransaction(); 111 ourLog.trace("About to execute SQL: {}. Parameters: {}", sql, Arrays.toString(args)); 112 113 Query nativeQuery = myEntityManager.createNativeQuery(sql); 114 org.hibernate.query.Query<?> hibernateQuery = (org.hibernate.query.Query<?>) nativeQuery; 115 for (int i = 1; i <= args.length; i++) { 116 hibernateQuery.setParameter(i, args[i - 1]); 117 } 118 119 /* 120 * These settings help to ensure that we use a search cursor 121 * as opposed to loading all search results into memory 122 */ 123 hibernateQuery.setFetchSize(500000); 124 hibernateQuery.setCacheable(false); 125 hibernateQuery.setCacheMode(CacheMode.IGNORE); 126 hibernateQuery.setReadOnly(true); 127 128 // This tells hibernate not to flush when we call scroll(), but rather to wait until the transaction 129 // commits and 130 // only flush then. We need to do this so that any exceptions that happen in the transaction happen 131 // when 132 // we try to commit the transaction, and not here. 133 // See the test called testTransaction_multiThreaded (in FhirResourceDaoR4ConcurrentWriteTest) which 134 // triggers 135 // the following exception if we don't set this flush mode: 136 // java.util.concurrent.ExecutionException: 137 // org.springframework.transaction.UnexpectedRollbackException: Transaction silently rolled back 138 // because it has been marked as rollback-only 139 hibernateQuery.setFlushMode(FlushModeType.COMMIT); 140 ScrollableResults scrollableResults = hibernateQuery.scroll(ScrollMode.FORWARD_ONLY); 141 myResultSet = new ScrollableResultsIterator<>(scrollableResults); 142 myQueryInitialized = true; 143 } 144 145 if (myResultSet == null || !myResultSet.hasNext()) { 146 myNext = NO_MORE; 147 } else { 148 myNext = getNextPid(myResultSet); 149 } 150 151 } catch (Exception e) { 152 ourLog.error("Failed to create or execute SQL query", e); 153 close(); 154 throw new InternalErrorException(Msg.code(1262) + e, e); 155 } 156 } 157 } 158 159 private JpaPid getNextPid(ScrollableResultsIterator<Object> theResultSet) { 160 Object nextRow = Objects.requireNonNull(theResultSet.next()); 161 // We should typically get two columns back, the first is the partition ID and the second 162 // is the resource ID. But if we're doing a count query, we'll get a single column in an array 163 // or maybe even just a single non array value depending on how the platform handles it. 164 if (nextRow instanceof Number) { 165 return JpaPid.fromId(((Number) nextRow).longValue()); 166 } else { 167 Object[] nextRowAsArray = (Object[]) nextRow; 168 if (nextRowAsArray.length == 1) { 169 return JpaPid.fromId((Long) nextRowAsArray[0]); 170 } else { 171 int i; 172 // TODO MB add a strategy object to GeneratedSql to describe the result set. 173 // or make SQE generic 174 // Comment to reviewer: this will be cleaner with the next 175 // merge from ja_20240718_pk_schema_selector 176 177 // We have some cases to distinguish: 178 // - res_id 179 // - count 180 // - partition_id, res_id 181 // - res_id, coord-dist 182 // - partition_id, res_id, coord-dist 183 // Assume res_id is first Long in row, and is in first two columns 184 if (nextRowAsArray[0] instanceof Long) { 185 return JpaPid.fromId((Long) nextRowAsArray[0]); 186 } else { 187 Integer partitionId = (Integer) nextRowAsArray[0]; 188 Long pid = (Long) nextRowAsArray[1]; 189 return JpaPid.fromId(pid, partitionId); 190 } 191 } 192 } 193 } 194 195 public static SearchQueryExecutor emptyExecutor() { 196 return NO_VALUE_EXECUTOR; 197 } 198}