diff --git a/sql/src/main/java/io/druid/sql/avatica/AvaticaServerConfig.java b/sql/src/main/java/io/druid/sql/avatica/AvaticaServerConfig.java index 40f990bafa76..fa22b0feff5c 100644 --- a/sql/src/main/java/io/druid/sql/avatica/AvaticaServerConfig.java +++ b/sql/src/main/java/io/druid/sql/avatica/AvaticaServerConfig.java @@ -31,7 +31,7 @@ public class AvaticaServerConfig public int maxStatementsPerConnection = 4; @JsonProperty - public Period connectionIdleTimeout = new Period("PT30M"); + public Period connectionIdleTimeout = new Period("PT5M"); public int getMaxConnections() { diff --git a/sql/src/main/java/io/druid/sql/avatica/DruidConnection.java b/sql/src/main/java/io/druid/sql/avatica/DruidConnection.java index 1b334d1d453e..8e78800f22fa 100644 --- a/sql/src/main/java/io/druid/sql/avatica/DruidConnection.java +++ b/sql/src/main/java/io/druid/sql/avatica/DruidConnection.java @@ -19,43 +19,123 @@ package io.druid.sql.avatica; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.logger.Logger; +import javax.annotation.concurrent.GuardedBy; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; /** - * Connection tracking for {@link DruidMeta}. Not thread-safe. + * Connection tracking for {@link DruidMeta}. Thread-safe. */ public class DruidConnection { - private final Map context; + private static final Logger log = new Logger(DruidConnection.class); + + private final String connectionId; + private final int maxStatements; + private final ImmutableMap context; + private final AtomicInteger statementCounter = new AtomicInteger(); + private final AtomicReference> timeoutFuture = new AtomicReference<>(); + + @GuardedBy("statements") private final Map statements; - private Future timeoutFuture; - public DruidConnection(final Map context) + @GuardedBy("statements") + private boolean open = true; + + public DruidConnection(final String connectionId, final int maxStatements, final Map context) { + this.connectionId = Preconditions.checkNotNull(connectionId); + this.maxStatements = maxStatements; this.context = ImmutableMap.copyOf(context); this.statements = new HashMap<>(); } - public Map context() + public DruidStatement createStatement() + { + final int statementId = statementCounter.incrementAndGet(); + + synchronized (statements) { + if (statements.containsKey(statementId)) { + // Will only happen if statementCounter rolls over before old statements are cleaned up. If this + // ever happens then something fishy is going on, because we shouldn't have billions of statements. + throw new ISE("Uh oh, too many statements"); + } + + if (statements.size() >= maxStatements) { + throw new ISE("Too many open statements, limit is[%,d]", maxStatements); + } + + final DruidStatement statement = new DruidStatement(connectionId, statementId, context, () -> { + // onClose function for the statement + synchronized (statements) { + log.debug("Connection[%s] closed statement[%s].", connectionId, statementId); + statements.remove(statementId); + } + }); + + statements.put(statementId, statement); + log.debug("Connection[%s] opened statement[%s].", connectionId, statementId); + return statement; + } + } + + public DruidStatement getStatement(final int statementId) { - return context; + synchronized (statements) { + return statements.get(statementId); + } } - public Map statements() + /** + * Closes this connection if it has no statements. + * + * @return true if closed + */ + public boolean closeIfEmpty() { - return statements; + synchronized (statements) { + if (statements.isEmpty()) { + close(); + return true; + } else { + return false; + } + } + } + + public void close() + { + synchronized (statements) { + // Copy statements before iterating because statement.close() modifies it. + for (DruidStatement statement : ImmutableList.copyOf(statements.values())) { + try { + statement.close(); + } + catch (Exception e) { + log.warn("Connection[%s] failed to close statement[%s]!", connectionId, statement.getStatementId()); + } + } + + log.debug("Connection[%s] closed.", connectionId); + open = false; + } } public DruidConnection sync(final Future newTimeoutFuture) { - if (timeoutFuture != null) { - timeoutFuture.cancel(false); + final Future oldFuture = timeoutFuture.getAndSet(newTimeoutFuture); + if (oldFuture != null) { + oldFuture.cancel(false); } - timeoutFuture = newTimeoutFuture; return this; } } diff --git a/sql/src/main/java/io/druid/sql/avatica/DruidMeta.java b/sql/src/main/java/io/druid/sql/avatica/DruidMeta.java index 1392bca5df58..79a168832a98 100644 --- a/sql/src/main/java/io/druid/sql/avatica/DruidMeta.java +++ b/sql/src/main/java/io/druid/sql/avatica/DruidMeta.java @@ -19,6 +19,7 @@ package io.druid.sql.avatica; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; @@ -28,25 +29,26 @@ import com.google.common.collect.Iterables; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Inject; -import io.druid.java.util.common.io.Closer; import io.druid.java.util.common.ISE; import io.druid.java.util.common.logger.Logger; import io.druid.sql.calcite.planner.Calcites; import io.druid.sql.calcite.planner.PlannerFactory; import org.apache.calcite.avatica.MetaImpl; import org.apache.calcite.avatica.MissingResultsException; +import org.apache.calcite.avatica.NoSuchConnectionException; import org.apache.calcite.avatica.NoSuchStatementException; import org.apache.calcite.avatica.QueryState; import org.apache.calcite.avatica.remote.TypedValue; import org.joda.time.DateTime; import org.joda.time.Interval; -import java.io.IOException; +import javax.annotation.Nonnull; import java.util.ArrayList; -import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -61,12 +63,12 @@ public class DruidMeta extends MetaImpl private final ScheduledExecutorService exec; private final AvaticaServerConfig config; - // Used to track statements for a connection. Connection id -> statement id -> statement. - // Not concurrent; synchronize on it when reading or writing. - private final Map connections = new HashMap<>(); + // Used to track logical connections. + private final Map connections = new ConcurrentHashMap<>(); - // Used to generate statement ids. - private final AtomicInteger statementCounter = new AtomicInteger(); + // Number of connections reserved in "connections". May be higher than the actual number of connections at times, + // such as when we're reserving space to open a new one. + private final AtomicInteger connectionCount = new AtomicInteger(); @Inject public DruidMeta(final PlannerFactory plannerFactory, final AvaticaServerConfig config) @@ -98,26 +100,10 @@ public void openConnection(final ConnectionHandle ch, final Map @Override public void closeConnection(final ConnectionHandle ch) { - final List statements = new ArrayList<>(); - - synchronized (connections) { - final DruidConnection connection = connections.remove(ch.id); - if (connection != null) { - connection.sync(null); - statements.addAll(connection.statements().values()); - log.debug("Connection[%s] closed, closing %,d statements.", ch.id, statements.size()); - } - } - - final Closer closer = Closer.create(); - for (final DruidStatement statement : statements) { - closer.register(statement); - } - try { - closer.close(); - } - catch (IOException e) { - throw Throwables.propagate(e); + final DruidConnection druidConnection = connections.remove(ch.id); + if (druidConnection != null) { + connectionCount.decrementAndGet(); + druidConnection.close(); } } @@ -132,24 +118,8 @@ public ConnectionProperties connectionSync(final ConnectionHandle ch, final Conn @Override public StatementHandle createStatement(final ConnectionHandle ch) { - synchronized (connections) { - final DruidConnection connection = getDruidConnection(ch.id); - final StatementHandle statement = new StatementHandle(ch.id, statementCounter.incrementAndGet(), null); - - if (connection.statements().containsKey(statement.id)) { - // Will only happen if statementCounter rolls over before old statements are cleaned up. If this - // ever happens then something fishy is going on, because we shouldn't have billions of statements. - throw new ISE("Uh oh, too many statements"); - } - - if (connection.statements().size() >= config.getMaxStatementsPerConnection()) { - throw new ISE("Too many open statements, limit is[%,d]", config.getMaxStatementsPerConnection()); - } - - connection.statements().put(statement.id, new DruidStatement(ch.id, statement.id, connection.context())); - log.debug("Connection[%s] opened statement[%s].", ch.id, statement.id); - return statement; - } + final DruidStatement druidStatement = getDruidConnection(ch.id).createStatement(); + return new StatementHandle(ch.id, druidStatement.getStatementId(), null); } @Override @@ -289,7 +259,14 @@ public Iterable createIterable( @Override public void closeStatement(final StatementHandle h) { - closeDruidStatement(getDruidStatement(h)); + // connections.get, not getDruidConnection, since we want to silently ignore nonexistent statements + final DruidConnection druidConnection = connections.get(h.connectionId); + if (druidConnection != null) { + final DruidStatement druidStatement = druidConnection.getStatement(h.id); + if (druidStatement != null) { + druidStatement.close(); + } + } } @Override @@ -493,64 +470,78 @@ public MetaResultSet getTableTypes(final ConnectionHandle ch) return sqlResultSet(ch, sql); } + @VisibleForTesting + void closeAllConnections() + { + for (String connectionId : ImmutableSet.copyOf(connections.keySet())) { + closeConnection(new ConnectionHandle(connectionId)); + } + } + private DruidConnection openDruidConnection(final String connectionId, final Map context) { - synchronized (connections) { - if (connections.containsKey(connectionId)) { - throw new ISE("Connection[%s] already open.", connectionId); + if (connectionCount.incrementAndGet() > config.getMaxConnections()) { + // O(connections) but we don't expect this to happen often (it's a last-ditch effort to clear out + // abandoned connections) or to have too many connections. + final Iterator> entryIterator = connections.entrySet().iterator(); + while (entryIterator.hasNext()) { + final Map.Entry entry = entryIterator.next(); + if (entry.getValue().closeIfEmpty()) { + entryIterator.remove(); + + // Removed a connection, decrement the counter. + connectionCount.decrementAndGet(); + break; + } } - if (connections.size() >= config.getMaxConnections()) { + if (connectionCount.get() > config.getMaxConnections()) { + // We aren't going to make a connection after all. + connectionCount.decrementAndGet(); throw new ISE("Too many connections, limit is[%,d]", config.getMaxConnections()); } + } - connections.put(connectionId, new DruidConnection(context)); - log.debug("Connection[%s] opened.", connectionId); + final DruidConnection putResult = connections.putIfAbsent( + connectionId, + new DruidConnection(connectionId, config.getMaxStatementsPerConnection(), context) + ); - // Call getDruidConnection to start the timeout timer. - return getDruidConnection(connectionId); + if (putResult != null) { + // Didn't actually insert the connection. + connectionCount.decrementAndGet(); + throw new ISE("Connection[%s] already open.", connectionId); } + + log.debug("Connection[%s] opened.", connectionId); + + // Call getDruidConnection to start the timeout timer. + return getDruidConnection(connectionId); } + /** + * Get a connection, or throw an exception if it doesn't exist. Also refreshes the timeout timer. + * + * @param connectionId connection id + * + * @return the connection + * + * @throws NoSuchConnectionException if the connection id doesn't exist + */ + @Nonnull private DruidConnection getDruidConnection(final String connectionId) { - final DruidConnection connection; + final DruidConnection connection = connections.get(connectionId); - synchronized (connections) { - connection = connections.get(connectionId); - - if (connection == null) { - throw new ISE("Connection[%s] not open", connectionId); - } + if (connection == null) { + throw new NoSuchConnectionException(connectionId); } return connection.sync( exec.schedule( - new Runnable() - { - @Override - public void run() - { - final List statements = new ArrayList<>(); - - synchronized (connections) { - if (connections.remove(connectionId) == connection) { - statements.addAll(connection.statements().values()); - log.debug("Connection[%s] timed out, closing %,d statements.", connectionId, statements.size()); - } - } - - final Closer closer = Closer.create(); - for (final DruidStatement statement : statements) { - closer.register(statement); - } - try { - closer.close(); - } - catch (IOException e) { - throw Throwables.propagate(e); - } - } + () -> { + log.debug("Connection[%s] timed out.", connectionId); + closeConnection(new ConnectionHandle(connectionId)); }, new Interval(new DateTime(), config.getConnectionIdleTimeout()).toDurationMillis(), TimeUnit.MILLISECONDS @@ -558,30 +549,13 @@ public void run() ); } + @Nonnull private DruidStatement getDruidStatement(final StatementHandle statement) { - synchronized (connections) { - final DruidConnection connection = getDruidConnection(statement.connectionId); - final DruidStatement druidStatement = connection.statements().get(statement.id); - Preconditions.checkState(druidStatement != null, "Statement[%s] does not exist", statement.id); - return druidStatement; - } - } - - private void closeDruidStatement(final DruidStatement statement) - { - synchronized (connections) { - final DruidConnection connection = getDruidConnection(statement.getConnectionId()); - if (connection.statements().get(statement.getStatementId()) == statement) { - connection.statements().remove(statement.getStatementId()); - } else { - // "statement" is not actually in the set of open statements for this connection - throw new ISE("Statement[%s] not open", statement.getStatementId()); - } - } - - log.debug("Connection[%s] closed statement[%s].", statement.getConnectionId(), statement.getStatementId()); - statement.close(); + final DruidConnection connection = getDruidConnection(statement.connectionId); + final DruidStatement druidStatement = connection.getStatement(statement.id); + Preconditions.checkState(druidStatement != null, "Statement[%s] does not exist", statement.id); + return druidStatement; } private MetaResultSet sqlResultSet(final ConnectionHandle ch, final String sql) diff --git a/sql/src/main/java/io/druid/sql/avatica/DruidStatement.java b/sql/src/main/java/io/druid/sql/avatica/DruidStatement.java index ed6b6b0e5f87..c8ae0db62feb 100644 --- a/sql/src/main/java/io/druid/sql/avatica/DruidStatement.java +++ b/sql/src/main/java/io/druid/sql/avatica/DruidStatement.java @@ -21,6 +21,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; @@ -30,7 +31,6 @@ import io.druid.sql.calcite.planner.PlannerFactory; import io.druid.sql.calcite.planner.PlannerResult; import io.druid.sql.calcite.rel.QueryMaker; -import org.apache.calcite.avatica.AvaticaParameter; import org.apache.calcite.avatica.ColumnMetaData; import org.apache.calcite.avatica.Meta; import org.apache.calcite.rel.type.RelDataType; @@ -38,7 +38,6 @@ import javax.annotation.concurrent.GuardedBy; import java.io.Closeable; -import java.io.IOException; import java.sql.DatabaseMetaData; import java.util.ArrayList; import java.util.List; @@ -62,6 +61,7 @@ enum State private final String connectionId; private final int statementId; private final Map queryContext; + private final Runnable onClose; private final Object lock = new Object(); private State state = State.NEW; @@ -72,11 +72,17 @@ enum State private Yielder yielder; private int offset = 0; - public DruidStatement(final String connectionId, final int statementId, final Map queryContext) + public DruidStatement( + final String connectionId, + final int statementId, + final Map queryContext, + final Runnable onClose + ) { - this.connectionId = connectionId; + this.connectionId = Preconditions.checkNotNull(connectionId, "connectionId"); this.statementId = statementId; - this.queryContext = queryContext; + this.queryContext = queryContext == null ? ImmutableMap.of() : queryContext; + this.onClose = Preconditions.checkNotNull(onClose, "onClose"); } public static List createColumnMetaData(final RelDataType rowType) @@ -134,15 +140,21 @@ public DruidStatement prepare(final PlannerFactory plannerFactory, final String this.signature = Meta.Signature.create( createColumnMetaData(plannerResult.rowType()), query, - new ArrayList(), + new ArrayList<>(), Meta.CursorFactory.ARRAY, Meta.StatementType.SELECT // We only support SELECT ); this.state = State.PREPARED; } } - catch (Exception e) { - throw Throwables.propagate(e); + catch (Throwable t) { + try { + close(); + } + catch (Throwable t1) { + t.addSuppressed(t1); + } + throw Throwables.propagate(t); } return this; @@ -153,16 +165,27 @@ public DruidStatement execute() synchronized (lock) { ensure(State.PREPARED); - final Sequence baseSequence = plannerResult.run(); + try { + final Sequence baseSequence = plannerResult.run(); - // We can't apply limits greater than Integer.MAX_VALUE, ignore them. - final Sequence retSequence = - maxRowCount >= 0 && maxRowCount <= Integer.MAX_VALUE - ? Sequences.limit(baseSequence, (int) maxRowCount) - : baseSequence; + // We can't apply limits greater than Integer.MAX_VALUE, ignore them. + final Sequence retSequence = + maxRowCount >= 0 && maxRowCount <= Integer.MAX_VALUE + ? Sequences.limit(baseSequence, (int) maxRowCount) + : baseSequence; - yielder = Yielders.each(retSequence); - state = State.RUNNING; + yielder = Yielders.each(retSequence); + state = State.RUNNING; + } + catch (Throwable t) { + try { + close(); + } + catch (Throwable t1) { + t.addSuppressed(t1); + } + throw t; + } return this; } @@ -254,18 +277,39 @@ public Meta.Frame nextFrame(final long fetchOffset, final int fetchMaxRowCount) public void close() { synchronized (lock) { + final State oldState = state; state = State.DONE; - if (yielder != null) { - Yielder theYielder = this.yielder; - this.yielder = null; + try { + if (yielder != null) { + Yielder theYielder = this.yielder; + this.yielder = null; - // Put the close last, so any exceptions it throws are after we did the other cleanup above. - try { + // Put the close last, so any exceptions it throws are after we did the other cleanup above. theYielder.close(); } - catch (IOException e) { - throw Throwables.propagate(e); + } + catch (Throwable t) { + if (oldState != State.DONE) { + // First close. Run the onClose function. + try { + onClose.run(); + } + catch (Throwable t1) { + t.addSuppressed(t1); + } + } + + throw Throwables.propagate(t); + } + + if (oldState != State.DONE) { + // First close. Run the onClose function. + try { + onClose.run(); + } + catch (Throwable t) { + throw Throwables.propagate(t); } } } diff --git a/sql/src/test/java/io/druid/sql/avatica/DruidAvaticaHandlerTest.java b/sql/src/test/java/io/druid/sql/avatica/DruidAvaticaHandlerTest.java index f2e7432e4629..102ed1284466 100644 --- a/sql/src/test/java/io/druid/sql/avatica/DruidAvaticaHandlerTest.java +++ b/sql/src/test/java/io/druid/sql/avatica/DruidAvaticaHandlerTest.java @@ -102,6 +102,8 @@ public int getMaxStatementsPerConnection() private Server server; private Connection client; private Connection clientLosAngeles; + private DruidMeta druidMeta; + private String url; @Before public void setUp() throws Exception @@ -116,11 +118,12 @@ public void setUp() throws Exception ) ); final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable(); + druidMeta = new DruidMeta( + new PlannerFactory(rootSchema, walker, operatorTable, plannerConfig, new ServerConfig()), + AVATICA_CONFIG + ); final DruidAvaticaHandler handler = new DruidAvaticaHandler( - new DruidMeta( - new PlannerFactory(rootSchema, walker, operatorTable, plannerConfig, new ServerConfig()), - AVATICA_CONFIG - ), + druidMeta, new DruidNode("dummy", "dummy", 1), new AvaticaMonitor() ); @@ -128,7 +131,7 @@ public void setUp() throws Exception server = new Server(new InetSocketAddress("127.0.0.1", port)); server.setHandler(handler); server.start(); - final String url = String.format( + url = String.format( "jdbc:avatica:remote:url=http://127.0.0.1:%d%s", port, DruidAvaticaHandler.AVATICA_PATH @@ -426,6 +429,90 @@ public void testNotTooManyStatementsWhenYouCloseThem() throws Exception Assert.assertTrue(true); } + @Test + public void testNotTooManyStatementsWhenYouFullyIterateThem() throws Exception + { + for (int i = 0; i < 50; i++) { + final ResultSet resultSet = client.createStatement().executeQuery( + "SELECT COUNT(*) AS cnt FROM druid.foo" + ); + Assert.assertEquals( + ImmutableList.of( + ImmutableMap.of("cnt", 6L) + ), + getRows(resultSet) + ); + } + + Assert.assertTrue(true); + } + + @Test + public void testNotTooManyStatementsWhenTheyThrowErrors() throws Exception + { + for (int i = 0; i < 50; i++) { + Exception thrown = null; + try { + client.createStatement().executeQuery("SELECT SUM(nonexistent) FROM druid.foo"); + } + catch (Exception e) { + thrown = e; + } + + Assert.assertNotNull(thrown); + + final ResultSet resultSet = client.createStatement().executeQuery("SELECT COUNT(*) AS cnt FROM druid.foo"); + Assert.assertEquals( + ImmutableList.of(ImmutableMap.of("cnt", 6L)), + getRows(resultSet) + ); + } + + Assert.assertTrue(true); + } + + @Test + public void testAutoReconnectOnNoSuchConnection() throws Exception + { + for (int i = 0; i < 50; i++) { + final ResultSet resultSet = client.createStatement().executeQuery("SELECT COUNT(*) AS cnt FROM druid.foo"); + Assert.assertEquals( + ImmutableList.of(ImmutableMap.of("cnt", 6L)), + getRows(resultSet) + ); + druidMeta.closeAllConnections(); + } + + Assert.assertTrue(true); + } + + @Test + public void testTooManyConnections() throws Exception + { + final Connection connection1 = DriverManager.getConnection(url); + final Statement statement1 = connection1.createStatement(); + + final Connection connection2 = DriverManager.getConnection(url); + final Statement statement2 = connection2.createStatement(); + + expectedException.expect(AvaticaClientRuntimeException.class); + expectedException.expectMessage("Too many connections, limit is[2]"); + final Connection connection3 = DriverManager.getConnection(url); + } + + @Test + public void testNotTooManyConnectionsWhenTheyAreEmpty() throws Exception + { + final Connection connection1 = DriverManager.getConnection(url); + connection1.createStatement().close(); + + final Connection connection2 = DriverManager.getConnection(url); + connection2.createStatement().close(); + + final Connection connection3 = DriverManager.getConnection(url); + Assert.assertTrue(true); + } + private static List> getRows(final ResultSet resultSet) throws SQLException { return getRows(resultSet, null); diff --git a/sql/src/test/java/io/druid/sql/avatica/DruidStatementTest.java b/sql/src/test/java/io/druid/sql/avatica/DruidStatementTest.java index 6cea2ff2e5fe..790751e613d4 100644 --- a/sql/src/test/java/io/druid/sql/avatica/DruidStatementTest.java +++ b/sql/src/test/java/io/druid/sql/avatica/DruidStatementTest.java @@ -80,7 +80,7 @@ public void tearDown() throws Exception public void testSignature() throws Exception { final String sql = "SELECT * FROM druid.foo"; - final DruidStatement statement = new DruidStatement("", 0, null).prepare(plannerFactory, sql, -1); + final DruidStatement statement = new DruidStatement("", 0, null, () -> {}).prepare(plannerFactory, sql, -1); // Check signature. final Meta.Signature signature = statement.getSignature(); @@ -118,7 +118,7 @@ public List apply(final ColumnMetaData columnMetaData) public void testSelectAllInFirstFrame() throws Exception { final String sql = "SELECT __time, cnt, dim1, dim2, m1 FROM druid.foo"; - final DruidStatement statement = new DruidStatement("", 0, null).prepare(plannerFactory, sql, -1); + final DruidStatement statement = new DruidStatement("", 0, null, () -> {}).prepare(plannerFactory, sql, -1); // First frame, ask for all rows. Meta.Frame frame = statement.execute().nextFrame(DruidStatement.START_OFFSET, 6); @@ -144,7 +144,7 @@ public void testSelectAllInFirstFrame() throws Exception public void testSelectSplitOverTwoFrames() throws Exception { final String sql = "SELECT __time, cnt, dim1, dim2, m1 FROM druid.foo"; - final DruidStatement statement = new DruidStatement("", 0, null).prepare(plannerFactory, sql, -1); + final DruidStatement statement = new DruidStatement("", 0, null, () -> {}).prepare(plannerFactory, sql, -1); // First frame, ask for 2 rows. Meta.Frame frame = statement.execute().nextFrame(DruidStatement.START_OFFSET, 2);