001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.search.builder.sql; 021 022import ca.uhn.fhir.i18n.Msg; 023import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; 024import ca.uhn.fhir.jpa.search.builder.ISearchQueryExecutor; 025import ca.uhn.fhir.jpa.util.ScrollableResultsIterator; 026import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 027import ca.uhn.fhir.util.IoUtil; 028import jakarta.persistence.EntityManager; 029import jakarta.persistence.FlushModeType; 030import jakarta.persistence.PersistenceContext; 031import jakarta.persistence.PersistenceContextType; 032import jakarta.persistence.Query; 033import org.apache.commons.lang3.Validate; 034import org.hibernate.CacheMode; 035import org.hibernate.ScrollMode; 036import org.hibernate.ScrollableResults; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040import java.util.Arrays; 041import java.util.Objects; 042 043public class SearchQueryExecutor implements ISearchQueryExecutor { 044 045 private static final Long NO_MORE = -1L; 046 private static final SearchQueryExecutor NO_VALUE_EXECUTOR = new SearchQueryExecutor(); 047 private static final Object[] EMPTY_OBJECT_ARRAY = new Object[0]; 048 private static final Logger ourLog = LoggerFactory.getLogger(SearchQueryExecutor.class); 049 private final GeneratedSql myGeneratedSql; 050 051 @PersistenceContext(type = PersistenceContextType.TRANSACTION) 052 private EntityManager myEntityManager; 053 054 private boolean myQueryInitialized; 055 private ScrollableResultsIterator<Object> myResultSet; 056 private Long myNext; 057 058 /** 059 * Constructor 060 */ 061 public SearchQueryExecutor(GeneratedSql theGeneratedSql, Integer theMaxResultsToFetch) { 062 Validate.notNull(theGeneratedSql, "theGeneratedSql must not be null"); 063 myGeneratedSql = theGeneratedSql; 064 myQueryInitialized = false; 065 } 066 067 /** 068 * Internal constructor for empty executor 069 */ 070 private SearchQueryExecutor() { 071 assert NO_MORE != null; 072 073 myGeneratedSql = null; 074 myNext = NO_MORE; 075 } 076 077 @Override 078 public void close() { 079 IoUtil.closeQuietly(myResultSet); 080 } 081 082 @Override 083 public boolean hasNext() { 084 fetchNext(); 085 return !NO_MORE.equals(myNext); 086 } 087 088 @Override 089 public Long next() { 090 fetchNext(); 091 Validate.isTrue(hasNext(), "Can not call next() right now, no data remains"); 092 Long next = myNext; 093 myNext = null; 094 return next; 095 } 096 097 private void fetchNext() { 098 if (myNext == null) { 099 String sql = myGeneratedSql.getSql(); 100 Object[] args = myGeneratedSql.getBindVariables().toArray(EMPTY_OBJECT_ARRAY); 101 102 try { 103 if (!myQueryInitialized) { 104 105 /* 106 * Note that we use the spring managed connection, and the expectation is that a transaction that 107 * is managed by Spring has been started before this method is called. 108 */ 109 HapiTransactionService.requireTransaction(); 110 ourLog.trace("About to execute SQL: {}. Parameters: {}", sql, Arrays.toString(args)); 111 112 Query nativeQuery = myEntityManager.createNativeQuery(sql); 113 org.hibernate.query.Query<?> hibernateQuery = (org.hibernate.query.Query<?>) nativeQuery; 114 for (int i = 1; i <= args.length; i++) { 115 hibernateQuery.setParameter(i, args[i - 1]); 116 } 117 118 /* 119 * These settings help to ensure that we use a search cursor 120 * as opposed to loading all search results into memory 121 */ 122 hibernateQuery.setFetchSize(500000); 123 hibernateQuery.setCacheable(false); 124 hibernateQuery.setCacheMode(CacheMode.IGNORE); 125 hibernateQuery.setReadOnly(true); 126 127 // This tells hibernate not to flush when we call scroll(), but rather to wait until the transaction 128 // commits and 129 // only flush then. We need to do this so that any exceptions that happen in the transaction happen 130 // when 131 // we try to commit the transaction, and not here. 132 // See the test called testTransaction_multiThreaded (in FhirResourceDaoR4ConcurrentWriteTest) which 133 // triggers 134 // the following exception if we don't set this flush mode: 135 // java.util.concurrent.ExecutionException: 136 // org.springframework.transaction.UnexpectedRollbackException: Transaction silently rolled back 137 // because it has been marked as rollback-only 138 hibernateQuery.setFlushMode(FlushModeType.COMMIT); 139 ScrollableResults scrollableResults = hibernateQuery.scroll(ScrollMode.FORWARD_ONLY); 140 myResultSet = new ScrollableResultsIterator<>(scrollableResults); 141 myQueryInitialized = true; 142 } 143 144 if (myResultSet == null || !myResultSet.hasNext()) { 145 myNext = NO_MORE; 146 } else { 147 myNext = getNextPid(myResultSet); 148 } 149 150 } catch (Exception e) { 151 ourLog.error("Failed to create or execute SQL query", e); 152 close(); 153 throw new InternalErrorException(Msg.code(1262) + e, e); 154 } 155 } 156 } 157 158 private long getNextPid(ScrollableResultsIterator<Object> theResultSet) { 159 Object nextRow = Objects.requireNonNull(theResultSet.next()); 160 // We should typically get two columns back, the first is the partition ID and the second 161 // is the resource ID. But if we're doing a count query, we'll get a single column in an array 162 // or maybe even just a single non array value depending on how the platform handles it. 163 if (nextRow instanceof Number) { 164 return ((Number) nextRow).longValue(); 165 } else { 166 Object[] nextRowAsArray = (Object[]) nextRow; 167 if (nextRowAsArray.length == 1) { 168 return (Long) nextRowAsArray[0]; 169 } else { 170 int i; 171 // TODO MB add a strategy object to GeneratedSql to describe the result set. 172 // or make SQE generic 173 // Comment to reviewer: this will be cleaner with the next 174 // merge from ja_20240718_pk_schema_selector 175 176 // We have some cases to distinguish: 177 // - res_id 178 // - count 179 // - partition_id, res_id 180 // - res_id, coord-dist 181 // - partition_id, res_id, coord-dist 182 // Assume res_id is first Long in row, and is in first two columns 183 if (nextRowAsArray[0] instanceof Long) { 184 return (long) nextRowAsArray[0]; 185 } else { 186 return (long) nextRowAsArray[1]; 187 } 188 } 189 } 190 } 191 192 public static SearchQueryExecutor emptyExecutor() { 193 return NO_VALUE_EXECUTOR; 194 } 195}