Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1644,7 +1644,8 @@ The Druid SQL server is configured through the following properties on the Broke
|`druid.sql.enable`|Whether to enable SQL at all, including background metadata fetching. If false, this overrides all other SQL-related properties and disables SQL metadata, serving, and planning completely.|true|
|`druid.sql.avatica.enable`|Whether to enable JDBC querying at `/druid/v2/sql/avatica/`.|true|
|`druid.sql.avatica.maxConnections`|Maximum number of open connections for the Avatica server. These are not HTTP connections, but are logical client connections that may span multiple HTTP connections.|25|
|`druid.sql.avatica.maxRowsPerFrame`|Maximum number of rows to return in a single JDBC frame. Setting this property to -1 indicates that no row limit should be applied. Clients can optionally specify a row limit in their requests; if a client specifies a row limit, the lesser value of the client-provided limit and `maxRowsPerFrame` will be used.|5,000|
|`druid.sql.avatica.maxRowsPerFrame`|Maximum acceptable value for the JDBC client `Statement.setFetchSize` method. This setting determines the maximum number of rows that Druid will populate in a single 'fetch' for a JDBC `ResultSet`. Set this property to -1 to enforce no row limit on the server-side and potentially return the entire set of rows on the initial statement execution. If the JDBC client calls `Statement.setFetchSize` with a value other than -1, Druid uses the lesser value of the client-provided limit and `maxRowsPerFrame`. If `maxRowsPerFrame` is smaller than `minRowsPerFrame`, then the `ResultSet` size will be fixed. To handle queries that produce results with a large number of rows, you can increase value of `druid.sql.avatica.maxRowsPerFrame` to reduce the number of fetches required to completely transfer the result set.|5,000|
|`druid.sql.avatica.minRowsPerFrame`|Minimum acceptable value for the JDBC client `Statement.setFetchSize` method. The value for this property must greater than 0. If the JDBC client calls `Statement.setFetchSize` with a lesser value, Druid uses `minRowsPerFrame` instead. If `maxRowsPerFrame` is less than `minRowsPerFrame`, Druid uses the minimum value of the two. For handling queries which produce results with a large number of rows, you can increase this value to reduce the number of fetches required to completely transfer the result set.|100|
|`druid.sql.avatica.maxStatementsPerConnection`|Maximum number of simultaneous open statements per Avatica client connection.|4|
|`druid.sql.avatica.connectionIdleTimeout`|Avatica client connection idle timeout.|PT5M|
|`druid.sql.http.enable`|Whether to enable JSON over HTTP querying at `/druid/v2/sql/`.|true|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,31 @@
package org.apache.druid.sql.avatica;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.joda.time.Period;

class AvaticaServerConfig
{
public static int DEFAULT_MAX_CONNECTIONS = 25;
public static int DEFAULT_MAX_STATEMENTS_PER_CONNECTION = 4;
public static Period DEFAULT_CONNECTION_IDLE_TIMEOUT = new Period("PT5M");
public static int DEFAULT_MIN_ROWS_PER_FRAME = 100;
public static int DEFAULT_MAX_ROWS_PER_FRAME = 5000;

@JsonProperty
public int maxConnections = DEFAULT_MAX_CONNECTIONS;

@JsonProperty
public int maxConnections = 25;
public int maxStatementsPerConnection = DEFAULT_MAX_STATEMENTS_PER_CONNECTION;

@JsonProperty
public int maxStatementsPerConnection = 4;
public Period connectionIdleTimeout = DEFAULT_CONNECTION_IDLE_TIMEOUT;

@JsonProperty
public Period connectionIdleTimeout = new Period("PT5M");
public int minRowsPerFrame = DEFAULT_MIN_ROWS_PER_FRAME;

@JsonProperty
public int maxRowsPerFrame = 5000;
public int maxRowsPerFrame = DEFAULT_MAX_ROWS_PER_FRAME;

public int getMaxConnections()
{
Expand All @@ -55,4 +65,16 @@ public int getMaxRowsPerFrame()
{
return maxRowsPerFrame;
}

public int getMinRowsPerFrame()
{
Preconditions.checkArgument(
minRowsPerFrame > 0,
"'druid.sql.avatica.minRowsPerFrame' must be set to a value greater than 0"
);
if (maxRowsPerFrame > 0) {
return Math.min(getMaxRowsPerFrame(), minRowsPerFrame);
}
return minRowsPerFrame;
}
}
29 changes: 26 additions & 3 deletions sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java
Original file line number Diff line number Diff line change
Expand Up @@ -627,17 +627,40 @@ private MetaResultSet sqlResultSet(final ConnectionHandle ch, final String sql)
}
}

/**
* Determine JDBC 'frame' size, that is the number of results which will be returned to a single
* {@link java.sql.ResultSet}. This value corresponds to {@link java.sql.Statement#setFetchSize(int)} (which is a user
* hint, we don't have to honor it), and this method modifies it, ensuring the actual chosen value falls within
* {@link AvaticaServerConfig#minRowsPerFrame} and {@link AvaticaServerConfig#maxRowsPerFrame}.
*
* A value of -1 supplied as input indicates that the client has no preference for fetch size, and can handle
* unlimited results (at our discretion). Similarly, a value of -1 for {@link AvaticaServerConfig#maxRowsPerFrame}
* also indicates that there is no upper limit on fetch size on the server side.
*
* {@link AvaticaServerConfig#minRowsPerFrame} must be configured to a value greater than 0, because it will be
* checked against if any additional frames are required (which means one of the input or maximum was set to a value
* other than -1).
*/
private int getEffectiveMaxRowsPerFrame(int clientMaxRowsPerFrame)
{
// no configured row limit, use the client provided limit
if (config.getMaxRowsPerFrame() < 0) {
return clientMaxRowsPerFrame;
return adjustForMinumumRowsPerFrame(clientMaxRowsPerFrame);
}
// client provided no row limit, use the configured row limit
if (clientMaxRowsPerFrame < 0) {
return config.getMaxRowsPerFrame();
return adjustForMinumumRowsPerFrame(config.getMaxRowsPerFrame());
}
return Math.min(clientMaxRowsPerFrame, config.getMaxRowsPerFrame());
return adjustForMinumumRowsPerFrame(Math.min(clientMaxRowsPerFrame, config.getMaxRowsPerFrame()));
}

/**
* coerce fetch size to be, at minimum, {@link AvaticaServerConfig#minRowsPerFrame}
*/
private int adjustForMinumumRowsPerFrame(int rowsPerFrame)
{
final int adjustedRowsPerFrame = Math.max(config.getMinRowsPerFrame(), rowsPerFrame);
return adjustedRowsPerFrame;
}

private static String withEscapeClause(String toEscape)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.google.inject.Key;
import com.google.inject.Scopes;
import com.google.inject.TypeLiteral;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.JsonConfigurator;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.jackson.JacksonModule;
Expand All @@ -35,11 +37,14 @@
import org.eclipse.jetty.server.Handler;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;

import javax.validation.Validation;
import javax.validation.Validator;
import java.util.Properties;
import java.util.Set;

@RunWith(EasyMockRunner.class)
Expand All @@ -52,6 +57,9 @@ public class AvaticaModuleTest
@Mock
private DruidMeta druidMeta;

@Rule
public ExpectedException expectedException = ExpectedException.none();

private AvaticaModule target;
private Injector injector;

Expand Down Expand Up @@ -95,6 +103,76 @@ public void testAvaticaServerConfigIsInjectable()
{
AvaticaServerConfig config = injector.getInstance(AvaticaServerConfig.class);
Assert.assertNotNull(config);
Assert.assertEquals(AvaticaServerConfig.DEFAULT_MAX_CONNECTIONS, config.getMaxConnections());
Assert.assertEquals(
AvaticaServerConfig.DEFAULT_MAX_STATEMENTS_PER_CONNECTION,
config.getMaxStatementsPerConnection()
);
Assert.assertEquals(AvaticaServerConfig.DEFAULT_CONNECTION_IDLE_TIMEOUT, config.getConnectionIdleTimeout());
Assert.assertEquals(AvaticaServerConfig.DEFAULT_MIN_ROWS_PER_FRAME, config.getMinRowsPerFrame());
Assert.assertEquals(AvaticaServerConfig.DEFAULT_MAX_ROWS_PER_FRAME, config.getMaxRowsPerFrame());
}

@Test
public void testAvaticaServerConfigProperties()
{
Properties properties = new Properties();
final JsonConfigProvider<AvaticaServerConfig> provider = JsonConfigProvider.of(
"druid.sql.avatica",
AvaticaServerConfig.class
);
properties.setProperty("druid.sql.avatica.maxRowsPerFrame", "50000");
properties.setProperty("druid.sql.avatica.minRowsPerFrame", "10000");
provider.inject(properties, injector.getInstance(JsonConfigurator.class));
final AvaticaServerConfig config = provider.get().get();
Assert.assertNotNull(config);
Assert.assertEquals(AvaticaServerConfig.DEFAULT_MAX_CONNECTIONS, config.getMaxConnections());
Assert.assertEquals(
AvaticaServerConfig.DEFAULT_MAX_STATEMENTS_PER_CONNECTION,
config.getMaxStatementsPerConnection()
);
Assert.assertEquals(AvaticaServerConfig.DEFAULT_CONNECTION_IDLE_TIMEOUT, config.getConnectionIdleTimeout());
Assert.assertEquals(10_000, config.getMinRowsPerFrame());
Assert.assertEquals(50_000, config.getMaxRowsPerFrame());
}

@Test
public void testAvaticaServerConfigPropertiesSmallerMaxIsAlsoMin()
{
Properties properties = new Properties();
final JsonConfigProvider<AvaticaServerConfig> provider = JsonConfigProvider.of(
"druid.sql.avatica",
AvaticaServerConfig.class
);
properties.setProperty("druid.sql.avatica.maxRowsPerFrame", "50");
provider.inject(properties, injector.getInstance(JsonConfigurator.class));
final AvaticaServerConfig config = provider.get().get();
Assert.assertNotNull(config);
Assert.assertEquals(AvaticaServerConfig.DEFAULT_MAX_CONNECTIONS, config.getMaxConnections());
Assert.assertEquals(
AvaticaServerConfig.DEFAULT_MAX_STATEMENTS_PER_CONNECTION,
config.getMaxStatementsPerConnection()
);
Assert.assertEquals(AvaticaServerConfig.DEFAULT_CONNECTION_IDLE_TIMEOUT, config.getConnectionIdleTimeout());
Assert.assertEquals(50, config.getMinRowsPerFrame());
Assert.assertEquals(50, config.getMaxRowsPerFrame());
}

@Test
public void testAvaticaServerConfigPropertiesBadMinRowsPerFrame()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("'druid.sql.avatica.minRowsPerFrame' must be set to a value greater than 0");
Properties properties = new Properties();
final JsonConfigProvider<AvaticaServerConfig> provider = JsonConfigProvider.of(
"druid.sql.avatica",
AvaticaServerConfig.class
);
properties.setProperty("druid.sql.avatica.minRowsPerFrame", "-1");
provider.inject(properties, injector.getInstance(JsonConfigurator.class));
final AvaticaServerConfig config = provider.get().get();
Assert.assertNotNull(config);
config.getMinRowsPerFrame();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,107 @@ public Frame fetch(
);
}


@Test
public void testMinRowsPerFrame() throws Exception
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to fully understand this test. Is this testing that if a client supplied rows per frame is less than the configured minimum rows per frame, we stick with the minimum in the config?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, the statement sets a fetch size of 2, but the min is 1000, so prior to this patch this test would fail because it would take multiple frames (and it also explicitly checks that fetch is called with the adjusted minimum value here https://github.com/apache/druid/pull/10880/files#diff-5430fa325aa463d4b6c34cad0fa3816e30e189ad2e3d7dbf56567c60e9c6c785R978)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the clarification

{
final int minFetchSize = 1000;
final AvaticaServerConfig smallFrameConfig = new AvaticaServerConfig()
{
@Override
public int getMaxConnections()
{
return 2;
}

@Override
public int getMaxStatementsPerConnection()
{
return 4;
}

@Override
public int getMinRowsPerFrame()
{
return minFetchSize;
}
};

final PlannerConfig plannerConfig = new PlannerConfig();
final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable();
final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
final List<Meta.Frame> frames = new ArrayList<>();
SchemaPlus rootSchema =
CalciteTests.createMockRootSchema(conglomerate, walker, plannerConfig, AuthTestUtils.TEST_AUTHORIZER_MAPPER);
DruidMeta smallFrameDruidMeta = new DruidMeta(
CalciteTests.createSqlLifecycleFactory(
new PlannerFactory(
rootSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
operatorTable,
macroTable,
plannerConfig,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
CalciteTests.getJsonMapper(),
CalciteTests.DRUID_SCHEMA_NAME
)
),
smallFrameConfig,
injector
)
{
@Override
public Frame fetch(
final StatementHandle statement,
final long offset,
final int fetchMaxRowCount
) throws NoSuchStatementException, MissingResultsException
{
// overriding fetch allows us to track how many frames are processed after the first frame, and also fetch size
Assert.assertEquals(minFetchSize, fetchMaxRowCount);
Frame frame = super.fetch(statement, offset, fetchMaxRowCount);
frames.add(frame);
return frame;
}
};

final DruidAvaticaHandler handler = new DruidAvaticaHandler(
smallFrameDruidMeta,
new DruidNode("dummy", "dummy", false, 1, null, true, false),
new AvaticaMonitor()
);
final int port = ThreadLocalRandom.current().nextInt(9999) + 20000;
Server smallFrameServer = new Server(new InetSocketAddress("127.0.0.1", port));
smallFrameServer.setHandler(handler);
smallFrameServer.start();
String smallFrameUrl = StringUtils.format(
"jdbc:avatica:remote:url=http://127.0.0.1:%d%s",
port,
DruidAvaticaHandler.AVATICA_PATH
);
Connection smallFrameClient = DriverManager.getConnection(smallFrameUrl, "regularUser", "druid");

// use a prepared statement because avatica currently ignores fetchSize on the initial fetch of a Statement
PreparedStatement statement = smallFrameClient.prepareStatement("SELECT dim1 FROM druid.foo");
// set a fetch size below the minimum configured threshold
statement.setFetchSize(2);
final ResultSet resultSet = statement.executeQuery();
List<Map<String, Object>> rows = getRows(resultSet);
// expect minimum threshold to be used, which should be enough to do this all in first fetch
Assert.assertEquals(0, frames.size());
Assert.assertEquals(
ImmutableList.of(
ImmutableMap.of("dim1", ""),
ImmutableMap.of("dim1", "10.1"),
ImmutableMap.of("dim1", "2"),
ImmutableMap.of("dim1", "1"),
ImmutableMap.of("dim1", "def"),
ImmutableMap.of("dim1", "abc")
),
rows
);
}

@Test
@SuppressWarnings("unchecked")
public void testSqlRequestLog() throws Exception
Expand Down