001/*- 002 * #%L 003 * HAPI FHIR JPA Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.search.builder.sql; 021 022import ca.uhn.fhir.i18n.Msg; 023import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; 024import ca.uhn.fhir.jpa.search.builder.ISearchQueryExecutor; 025import ca.uhn.fhir.jpa.util.ScrollableResultsIterator; 026import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 027import ca.uhn.fhir.util.IoUtil; 028import jakarta.persistence.EntityManager; 029import jakarta.persistence.FlushModeType; 030import jakarta.persistence.PersistenceContext; 031import jakarta.persistence.PersistenceContextType; 032import jakarta.persistence.Query; 033import org.apache.commons.lang3.Validate; 034import org.hibernate.CacheMode; 035import org.hibernate.ScrollMode; 036import org.hibernate.ScrollableResults; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040import java.util.Arrays; 041import java.util.Objects; 042 043public class SearchQueryExecutor implements ISearchQueryExecutor { 044 045 private static final Long NO_MORE = -1L; 046 private static final SearchQueryExecutor NO_VALUE_EXECUTOR = new SearchQueryExecutor(); 047 private static final Object[] EMPTY_OBJECT_ARRAY = new Object[0]; 048 private static final Logger ourLog = LoggerFactory.getLogger(SearchQueryExecutor.class); 049 private final GeneratedSql myGeneratedSql; 050 051 @PersistenceContext(type = PersistenceContextType.TRANSACTION) 052 private EntityManager myEntityManager; 053 054 private boolean myQueryInitialized; 055 private ScrollableResultsIterator<Object> myResultSet; 056 private Long myNext; 057 058 /** 059 * Constructor 060 */ 061 public SearchQueryExecutor(GeneratedSql theGeneratedSql, Integer theMaxResultsToFetch) { 062 Validate.notNull(theGeneratedSql, "theGeneratedSql must not be null"); 063 myGeneratedSql = theGeneratedSql; 064 myQueryInitialized = false; 065 } 066 067 /** 068 * Internal constructor for empty executor 069 */ 070 private SearchQueryExecutor() { 071 assert NO_MORE != null; 072 073 myGeneratedSql = null; 074 myNext = NO_MORE; 075 } 076 077 @Override 078 public void close() { 079 IoUtil.closeQuietly(myResultSet); 080 } 081 082 @Override 083 public boolean hasNext() { 084 fetchNext(); 085 return !NO_MORE.equals(myNext); 086 } 087 088 @Override 089 public Long next() { 090 fetchNext(); 091 Validate.isTrue(hasNext(), "Can not call next() right now, no data remains"); 092 Long next = myNext; 093 myNext = null; 094 return next; 095 } 096 097 private void fetchNext() { 098 if (myNext == null) { 099 String sql = myGeneratedSql.getSql(); 100 Object[] args = myGeneratedSql.getBindVariables().toArray(EMPTY_OBJECT_ARRAY); 101 102 try { 103 if (!myQueryInitialized) { 104 105 /* 106 * Note that we use the spring managed connection, and the expectation is that a transaction that 107 * is managed by Spring has been started before this method is called. 108 */ 109 HapiTransactionService.requireTransaction(); 110 111 Query nativeQuery = myEntityManager.createNativeQuery(sql); 112 org.hibernate.query.Query<?> hibernateQuery = (org.hibernate.query.Query<?>) nativeQuery; 113 for (int i = 1; i <= args.length; i++) { 114 hibernateQuery.setParameter(i, args[i - 1]); 115 } 116 117 ourLog.trace("About to execute SQL: {}. Parameters: {}", sql, Arrays.toString(args)); 118 119 /* 120 * These settings help to ensure that we use a search cursor 121 * as opposed to loading all search results into memory 122 */ 123 hibernateQuery.setFetchSize(500000); 124 hibernateQuery.setCacheable(false); 125 hibernateQuery.setCacheMode(CacheMode.IGNORE); 126 hibernateQuery.setReadOnly(true); 127 128 // This tells hibernate not to flush when we call scroll(), but rather to wait until the transaction 129 // commits and 130 // only flush then. We need to do this so that any exceptions that happen in the transaction happen 131 // when 132 // we try to commit the transaction, and not here. 133 // See the test called testTransaction_multiThreaded (in FhirResourceDaoR4ConcurrentWriteTest) which 134 // triggers 135 // the following exception if we don't set this flush mode: 136 // java.util.concurrent.ExecutionException: 137 // org.springframework.transaction.UnexpectedRollbackException: Transaction silently rolled back 138 // because it has been marked as rollback-only 139 hibernateQuery.setFlushMode(FlushModeType.COMMIT); 140 ScrollableResults scrollableResults = hibernateQuery.scroll(ScrollMode.FORWARD_ONLY); 141 myResultSet = new ScrollableResultsIterator<>(scrollableResults); 142 myQueryInitialized = true; 143 } 144 145 if (myResultSet == null || !myResultSet.hasNext()) { 146 myNext = NO_MORE; 147 } else { 148 Object nextRow = Objects.requireNonNull(myResultSet.next()); 149 Number next; 150 if (nextRow instanceof Number) { 151 next = (Number) nextRow; 152 } else { 153 next = (Number) ((Object[]) nextRow)[0]; 154 } 155 myNext = next.longValue(); 156 } 157 158 } catch (Exception e) { 159 ourLog.error("Failed to create or execute SQL query", e); 160 close(); 161 throw new InternalErrorException(Msg.code(1262) + e, e); 162 } 163 } 164 } 165 166 public static SearchQueryExecutor emptyExecutor() { 167 return NO_VALUE_EXECUTOR; 168 } 169}