diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 8e924f4139b1..b23432c1cbd0 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1644,7 +1644,8 @@ The Druid SQL server is configured through the following properties on the Broke |`druid.sql.enable`|Whether to enable SQL at all, including background metadata fetching. If false, this overrides all other SQL-related properties and disables SQL metadata, serving, and planning completely.|true| |`druid.sql.avatica.enable`|Whether to enable JDBC querying at `/druid/v2/sql/avatica/`.|true| |`druid.sql.avatica.maxConnections`|Maximum number of open connections for the Avatica server. These are not HTTP connections, but are logical client connections that may span multiple HTTP connections.|25| -|`druid.sql.avatica.maxRowsPerFrame`|Maximum number of rows to return in a single JDBC frame. Setting this property to -1 indicates that no row limit should be applied. Clients can optionally specify a row limit in their requests; if a client specifies a row limit, the lesser value of the client-provided limit and `maxRowsPerFrame` will be used.|5,000| +|`druid.sql.avatica.maxRowsPerFrame`|Maximum acceptable value for the JDBC client `Statement.setFetchSize` method. This setting determines the maximum number of rows that Druid will populate in a single 'fetch' for a JDBC `ResultSet`. Set this property to -1 to enforce no row limit on the server-side and potentially return the entire set of rows on the initial statement execution. If the JDBC client calls `Statement.setFetchSize` with a value other than -1, Druid uses the lesser value of the client-provided limit and `maxRowsPerFrame`. If `maxRowsPerFrame` is smaller than `minRowsPerFrame`, then the `ResultSet` size will be fixed. To handle queries that produce results with a large number of rows, you can increase value of `druid.sql.avatica.maxRowsPerFrame` to reduce the number of fetches required to completely transfer the result set.|5,000| +|`druid.sql.avatica.minRowsPerFrame`|Minimum acceptable value for the JDBC client `Statement.setFetchSize` method. The value for this property must greater than 0. If the JDBC client calls `Statement.setFetchSize` with a lesser value, Druid uses `minRowsPerFrame` instead. If `maxRowsPerFrame` is less than `minRowsPerFrame`, Druid uses the minimum value of the two. For handling queries which produce results with a large number of rows, you can increase this value to reduce the number of fetches required to completely transfer the result set.|100| |`druid.sql.avatica.maxStatementsPerConnection`|Maximum number of simultaneous open statements per Avatica client connection.|4| |`druid.sql.avatica.connectionIdleTimeout`|Avatica client connection idle timeout.|PT5M| |`druid.sql.http.enable`|Whether to enable JSON over HTTP querying at `/druid/v2/sql/`.|true| diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/AvaticaServerConfig.java b/sql/src/main/java/org/apache/druid/sql/avatica/AvaticaServerConfig.java index 5d921d0f57cc..e931c3a289ec 100644 --- a/sql/src/main/java/org/apache/druid/sql/avatica/AvaticaServerConfig.java +++ b/sql/src/main/java/org/apache/druid/sql/avatica/AvaticaServerConfig.java @@ -20,21 +20,31 @@ package org.apache.druid.sql.avatica; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; import org.joda.time.Period; class AvaticaServerConfig { + public static int DEFAULT_MAX_CONNECTIONS = 25; + public static int DEFAULT_MAX_STATEMENTS_PER_CONNECTION = 4; + public static Period DEFAULT_CONNECTION_IDLE_TIMEOUT = new Period("PT5M"); + public static int DEFAULT_MIN_ROWS_PER_FRAME = 100; + public static int DEFAULT_MAX_ROWS_PER_FRAME = 5000; + + @JsonProperty + public int maxConnections = DEFAULT_MAX_CONNECTIONS; + @JsonProperty - public int maxConnections = 25; + public int maxStatementsPerConnection = DEFAULT_MAX_STATEMENTS_PER_CONNECTION; @JsonProperty - public int maxStatementsPerConnection = 4; + public Period connectionIdleTimeout = DEFAULT_CONNECTION_IDLE_TIMEOUT; @JsonProperty - public Period connectionIdleTimeout = new Period("PT5M"); + public int minRowsPerFrame = DEFAULT_MIN_ROWS_PER_FRAME; @JsonProperty - public int maxRowsPerFrame = 5000; + public int maxRowsPerFrame = DEFAULT_MAX_ROWS_PER_FRAME; public int getMaxConnections() { @@ -55,4 +65,16 @@ public int getMaxRowsPerFrame() { return maxRowsPerFrame; } + + public int getMinRowsPerFrame() + { + Preconditions.checkArgument( + minRowsPerFrame > 0, + "'druid.sql.avatica.minRowsPerFrame' must be set to a value greater than 0" + ); + if (maxRowsPerFrame > 0) { + return Math.min(getMaxRowsPerFrame(), minRowsPerFrame); + } + return minRowsPerFrame; + } } diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java b/sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java index 738bed4dba7d..5cfea96cfbd7 100644 --- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java +++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java @@ -627,17 +627,40 @@ private MetaResultSet sqlResultSet(final ConnectionHandle ch, final String sql) } } + /** + * Determine JDBC 'frame' size, that is the number of results which will be returned to a single + * {@link java.sql.ResultSet}. This value corresponds to {@link java.sql.Statement#setFetchSize(int)} (which is a user + * hint, we don't have to honor it), and this method modifies it, ensuring the actual chosen value falls within + * {@link AvaticaServerConfig#minRowsPerFrame} and {@link AvaticaServerConfig#maxRowsPerFrame}. + * + * A value of -1 supplied as input indicates that the client has no preference for fetch size, and can handle + * unlimited results (at our discretion). Similarly, a value of -1 for {@link AvaticaServerConfig#maxRowsPerFrame} + * also indicates that there is no upper limit on fetch size on the server side. + * + * {@link AvaticaServerConfig#minRowsPerFrame} must be configured to a value greater than 0, because it will be + * checked against if any additional frames are required (which means one of the input or maximum was set to a value + * other than -1). + */ private int getEffectiveMaxRowsPerFrame(int clientMaxRowsPerFrame) { // no configured row limit, use the client provided limit if (config.getMaxRowsPerFrame() < 0) { - return clientMaxRowsPerFrame; + return adjustForMinumumRowsPerFrame(clientMaxRowsPerFrame); } // client provided no row limit, use the configured row limit if (clientMaxRowsPerFrame < 0) { - return config.getMaxRowsPerFrame(); + return adjustForMinumumRowsPerFrame(config.getMaxRowsPerFrame()); } - return Math.min(clientMaxRowsPerFrame, config.getMaxRowsPerFrame()); + return adjustForMinumumRowsPerFrame(Math.min(clientMaxRowsPerFrame, config.getMaxRowsPerFrame())); + } + + /** + * coerce fetch size to be, at minimum, {@link AvaticaServerConfig#minRowsPerFrame} + */ + private int adjustForMinumumRowsPerFrame(int rowsPerFrame) + { + final int adjustedRowsPerFrame = Math.max(config.getMinRowsPerFrame(), rowsPerFrame); + return adjustedRowsPerFrame; } private static String withEscapeClause(String toEscape) diff --git a/sql/src/test/java/org/apache/druid/sql/avatica/AvaticaModuleTest.java b/sql/src/test/java/org/apache/druid/sql/avatica/AvaticaModuleTest.java index 937db099f380..adf084d7c503 100644 --- a/sql/src/test/java/org/apache/druid/sql/avatica/AvaticaModuleTest.java +++ b/sql/src/test/java/org/apache/druid/sql/avatica/AvaticaModuleTest.java @@ -24,6 +24,8 @@ import com.google.inject.Key; import com.google.inject.Scopes; import com.google.inject.TypeLiteral; +import org.apache.druid.guice.JsonConfigProvider; +import org.apache.druid.guice.JsonConfigurator; import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.annotations.Self; import org.apache.druid.jackson.JacksonModule; @@ -35,11 +37,14 @@ import org.eclipse.jetty.server.Handler; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import javax.validation.Validation; import javax.validation.Validator; +import java.util.Properties; import java.util.Set; @RunWith(EasyMockRunner.class) @@ -52,6 +57,9 @@ public class AvaticaModuleTest @Mock private DruidMeta druidMeta; + @Rule + public ExpectedException expectedException = ExpectedException.none(); + private AvaticaModule target; private Injector injector; @@ -95,6 +103,76 @@ public void testAvaticaServerConfigIsInjectable() { AvaticaServerConfig config = injector.getInstance(AvaticaServerConfig.class); Assert.assertNotNull(config); + Assert.assertEquals(AvaticaServerConfig.DEFAULT_MAX_CONNECTIONS, config.getMaxConnections()); + Assert.assertEquals( + AvaticaServerConfig.DEFAULT_MAX_STATEMENTS_PER_CONNECTION, + config.getMaxStatementsPerConnection() + ); + Assert.assertEquals(AvaticaServerConfig.DEFAULT_CONNECTION_IDLE_TIMEOUT, config.getConnectionIdleTimeout()); + Assert.assertEquals(AvaticaServerConfig.DEFAULT_MIN_ROWS_PER_FRAME, config.getMinRowsPerFrame()); + Assert.assertEquals(AvaticaServerConfig.DEFAULT_MAX_ROWS_PER_FRAME, config.getMaxRowsPerFrame()); + } + + @Test + public void testAvaticaServerConfigProperties() + { + Properties properties = new Properties(); + final JsonConfigProvider provider = JsonConfigProvider.of( + "druid.sql.avatica", + AvaticaServerConfig.class + ); + properties.setProperty("druid.sql.avatica.maxRowsPerFrame", "50000"); + properties.setProperty("druid.sql.avatica.minRowsPerFrame", "10000"); + provider.inject(properties, injector.getInstance(JsonConfigurator.class)); + final AvaticaServerConfig config = provider.get().get(); + Assert.assertNotNull(config); + Assert.assertEquals(AvaticaServerConfig.DEFAULT_MAX_CONNECTIONS, config.getMaxConnections()); + Assert.assertEquals( + AvaticaServerConfig.DEFAULT_MAX_STATEMENTS_PER_CONNECTION, + config.getMaxStatementsPerConnection() + ); + Assert.assertEquals(AvaticaServerConfig.DEFAULT_CONNECTION_IDLE_TIMEOUT, config.getConnectionIdleTimeout()); + Assert.assertEquals(10_000, config.getMinRowsPerFrame()); + Assert.assertEquals(50_000, config.getMaxRowsPerFrame()); + } + + @Test + public void testAvaticaServerConfigPropertiesSmallerMaxIsAlsoMin() + { + Properties properties = new Properties(); + final JsonConfigProvider provider = JsonConfigProvider.of( + "druid.sql.avatica", + AvaticaServerConfig.class + ); + properties.setProperty("druid.sql.avatica.maxRowsPerFrame", "50"); + provider.inject(properties, injector.getInstance(JsonConfigurator.class)); + final AvaticaServerConfig config = provider.get().get(); + Assert.assertNotNull(config); + Assert.assertEquals(AvaticaServerConfig.DEFAULT_MAX_CONNECTIONS, config.getMaxConnections()); + Assert.assertEquals( + AvaticaServerConfig.DEFAULT_MAX_STATEMENTS_PER_CONNECTION, + config.getMaxStatementsPerConnection() + ); + Assert.assertEquals(AvaticaServerConfig.DEFAULT_CONNECTION_IDLE_TIMEOUT, config.getConnectionIdleTimeout()); + Assert.assertEquals(50, config.getMinRowsPerFrame()); + Assert.assertEquals(50, config.getMaxRowsPerFrame()); + } + + @Test + public void testAvaticaServerConfigPropertiesBadMinRowsPerFrame() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("'druid.sql.avatica.minRowsPerFrame' must be set to a value greater than 0"); + Properties properties = new Properties(); + final JsonConfigProvider provider = JsonConfigProvider.of( + "druid.sql.avatica", + AvaticaServerConfig.class + ); + properties.setProperty("druid.sql.avatica.minRowsPerFrame", "-1"); + provider.inject(properties, injector.getInstance(JsonConfigurator.class)); + final AvaticaServerConfig config = provider.get().get(); + Assert.assertNotNull(config); + config.getMinRowsPerFrame(); } @Test diff --git a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java index 99b8cd51fd36..909287e1d8e5 100644 --- a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java +++ b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java @@ -918,6 +918,107 @@ public Frame fetch( ); } + + @Test + public void testMinRowsPerFrame() throws Exception + { + final int minFetchSize = 1000; + final AvaticaServerConfig smallFrameConfig = new AvaticaServerConfig() + { + @Override + public int getMaxConnections() + { + return 2; + } + + @Override + public int getMaxStatementsPerConnection() + { + return 4; + } + + @Override + public int getMinRowsPerFrame() + { + return minFetchSize; + } + }; + + final PlannerConfig plannerConfig = new PlannerConfig(); + final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable(); + final ExprMacroTable macroTable = CalciteTests.createExprMacroTable(); + final List frames = new ArrayList<>(); + SchemaPlus rootSchema = + CalciteTests.createMockRootSchema(conglomerate, walker, plannerConfig, AuthTestUtils.TEST_AUTHORIZER_MAPPER); + DruidMeta smallFrameDruidMeta = new DruidMeta( + CalciteTests.createSqlLifecycleFactory( + new PlannerFactory( + rootSchema, + CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), + operatorTable, + macroTable, + plannerConfig, + AuthTestUtils.TEST_AUTHORIZER_MAPPER, + CalciteTests.getJsonMapper(), + CalciteTests.DRUID_SCHEMA_NAME + ) + ), + smallFrameConfig, + injector + ) + { + @Override + public Frame fetch( + final StatementHandle statement, + final long offset, + final int fetchMaxRowCount + ) throws NoSuchStatementException, MissingResultsException + { + // overriding fetch allows us to track how many frames are processed after the first frame, and also fetch size + Assert.assertEquals(minFetchSize, fetchMaxRowCount); + Frame frame = super.fetch(statement, offset, fetchMaxRowCount); + frames.add(frame); + return frame; + } + }; + + final DruidAvaticaHandler handler = new DruidAvaticaHandler( + smallFrameDruidMeta, + new DruidNode("dummy", "dummy", false, 1, null, true, false), + new AvaticaMonitor() + ); + final int port = ThreadLocalRandom.current().nextInt(9999) + 20000; + Server smallFrameServer = new Server(new InetSocketAddress("127.0.0.1", port)); + smallFrameServer.setHandler(handler); + smallFrameServer.start(); + String smallFrameUrl = StringUtils.format( + "jdbc:avatica:remote:url=http://127.0.0.1:%d%s", + port, + DruidAvaticaHandler.AVATICA_PATH + ); + Connection smallFrameClient = DriverManager.getConnection(smallFrameUrl, "regularUser", "druid"); + + // use a prepared statement because avatica currently ignores fetchSize on the initial fetch of a Statement + PreparedStatement statement = smallFrameClient.prepareStatement("SELECT dim1 FROM druid.foo"); + // set a fetch size below the minimum configured threshold + statement.setFetchSize(2); + final ResultSet resultSet = statement.executeQuery(); + List> rows = getRows(resultSet); + // expect minimum threshold to be used, which should be enough to do this all in first fetch + Assert.assertEquals(0, frames.size()); + Assert.assertEquals( + ImmutableList.of( + ImmutableMap.of("dim1", ""), + ImmutableMap.of("dim1", "10.1"), + ImmutableMap.of("dim1", "2"), + ImmutableMap.of("dim1", "1"), + ImmutableMap.of("dim1", "def"), + ImmutableMap.of("dim1", "abc") + ), + rows + ); + } + @Test @SuppressWarnings("unchecked") public void testSqlRequestLog() throws Exception