diff --git a/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java b/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java index 562ca66de089..7acf915954cf 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java @@ -48,7 +48,7 @@ public class ITBasicAuthConfigurationTest extends AbstractAuthConfigurationTest private static final String BASIC_AUTHORIZER = "basic"; private static final String EXPECTED_AVATICA_AUTH_ERROR = "Error while executing SQL \"SELECT * FROM INFORMATION_SCHEMA.COLUMNS\": Remote driver error: QueryInterruptedException: User metadata store authentication failed. -> BasicSecurityAuthenticationException: User metadata store authentication failed."; - private static final String EXPECTED_AVATICA_AUTHZ_ERROR = "Error while executing SQL \"SELECT * FROM INFORMATION_SCHEMA.COLUMNS\": Remote driver error: RuntimeException: org.apache.druid.server.security.ForbiddenException: Allowed:false, Message: -> ForbiddenException: Allowed:false, Message:"; + private static final String EXPECTED_AVATICA_AUTHZ_ERROR = "Error while executing SQL \"SELECT * FROM INFORMATION_SCHEMA.COLUMNS\": Remote driver error: ForbiddenException: Allowed:false, Message:"; private HttpClient druid99; diff --git a/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthLdapConfigurationTest.java b/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthLdapConfigurationTest.java index a3e7291bb247..f174cdf8fa64 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthLdapConfigurationTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthLdapConfigurationTest.java @@ -54,7 +54,7 @@ public class ITBasicAuthLdapConfigurationTest extends AbstractAuthConfigurationT private static final String LDAP_AUTHORIZER = "ldapauth"; private static final String EXPECTED_AVATICA_AUTH_ERROR = "Error while executing SQL \"SELECT * FROM INFORMATION_SCHEMA.COLUMNS\": Remote driver error: QueryInterruptedException: User LDAP authentication failed. -> BasicSecurityAuthenticationException: User LDAP authentication failed."; - private static final String EXPECTED_AVATICA_AUTHZ_ERROR = "Error while executing SQL \"SELECT * FROM INFORMATION_SCHEMA.COLUMNS\": Remote driver error: RuntimeException: org.apache.druid.server.security.ForbiddenException: Allowed:false, Message: -> ForbiddenException: Allowed:false, Message:"; + private static final String EXPECTED_AVATICA_AUTHZ_ERROR = "Error while executing SQL \"SELECT * FROM INFORMATION_SCHEMA.COLUMNS\": Remote driver error: ForbiddenException: Allowed:false, Message:"; @Inject IntegrationTestingConfig config; diff --git a/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java b/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java index 9faca5c600f2..002fd3045ac8 100644 --- a/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java +++ b/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java @@ -170,7 +170,7 @@ private String sqlQueryId() } /** - * Assign dynamic parameters to be used to substitute values during query exection. This can be performed at any + * Assign dynamic parameters to be used to substitute values during query execution. This can be performed at any * part of the lifecycle. */ public void setParameters(List parameters) diff --git a/sql/src/main/java/org/apache/druid/sql/SqlQueryPlus.java b/sql/src/main/java/org/apache/druid/sql/SqlQueryPlus.java new file mode 100644 index 000000000000..bebf74b1a34b --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/SqlQueryPlus.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql; + +import com.google.common.base.Preconditions; +import org.apache.calcite.avatica.remote.TypedValue; +import org.apache.druid.query.QueryContext; +import org.apache.druid.server.security.AuthenticationResult; +import org.apache.druid.sql.http.SqlParameter; +import org.apache.druid.sql.http.SqlQuery; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * Captures the inputs to a SQL execution request: the statement, + * the context, parameters, and the authorization result. Pass this + * around rather than the quad of items. The request can evolve: + * items can be filled in later as needed (except for the SQL + * and auth result, which are required.) + */ +public class SqlQueryPlus +{ + private final String sql; + private final QueryContext queryContext; + private final List parameters; + private final AuthenticationResult authResult; + + public SqlQueryPlus( + String sql, + QueryContext queryContext, + List parameters, + AuthenticationResult authResult + ) + { + this.sql = Preconditions.checkNotNull(sql); + this.queryContext = queryContext == null + ? new QueryContext() + : queryContext; + this.parameters = parameters == null + ? Collections.emptyList() + : parameters; + this.authResult = Preconditions.checkNotNull(authResult); + } + + public SqlQueryPlus(final String sql, final AuthenticationResult authResult) + { + this(sql, (QueryContext) null, null, authResult); + } + + public static SqlQueryPlus fromSqlParameters( + String sql, + Map queryContext, + List parameters, + AuthenticationResult authResult + ) + { + return new SqlQueryPlus( + sql, + queryContext == null ? null : new QueryContext(queryContext), + parameters == null ? null : SqlQuery.getParameterList(parameters), + authResult + ); + } + + public static SqlQueryPlus from( + String sql, + Map queryContext, + List parameters, + AuthenticationResult authResult + ) + { + return new SqlQueryPlus( + sql, + queryContext == null ? null : new QueryContext(queryContext), + parameters, + authResult + ); + } + + public String sql() + { + return sql; + } + + public QueryContext context() + { + return queryContext; + } + + public List parameters() + { + return parameters; + } + + public AuthenticationResult authResult() + { + return authResult; + } + + public SqlQueryPlus withContext(QueryContext context) + { + return new SqlQueryPlus(sql, context, parameters, authResult); + } + + public SqlQueryPlus withParameters(List parameters) + { + return new SqlQueryPlus(sql, queryContext, parameters, authResult); + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/AbstractDruidJdbcStatement.java b/sql/src/main/java/org/apache/druid/sql/avatica/AbstractDruidJdbcStatement.java new file mode 100644 index 000000000000..399ecf673fb1 --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/avatica/AbstractDruidJdbcStatement.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.avatica; + +import com.google.common.base.Preconditions; +import org.apache.calcite.avatica.AvaticaParameter; +import org.apache.calcite.avatica.ColumnMetaData; +import org.apache.calcite.avatica.Meta; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.sql.calcite.planner.Calcites; +import org.apache.druid.sql.calcite.planner.PrepareResult; + +import java.io.Closeable; +import java.sql.Array; +import java.sql.DatabaseMetaData; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; + +/** + * Common implementation for the JDBC {@code Statement} and + * {@code PreparedStatement} implementations in Druid. Statement use + * {@link DruidJdbcResultSet} objects to iterate through rows: zero + * or one may be open at any time, and a single statement supports + * multiple result sets concurrently. Druid closes the result set after + * the last batch in compliance with this note on page 137 of the + * + * JDBC 4.1 specification: + *

+ * Some JDBC driver implementations may also implicitly close the + * ResultSet when the ResultSet type is TYPE_FORWARD_ONLY and the next + * method of ResultSet returns false. + */ +public abstract class AbstractDruidJdbcStatement implements Closeable +{ + public static final long START_OFFSET = 0; + + protected final DruidConnection connection; + protected final int statementId; + protected DruidJdbcResultSet resultSet; + + public AbstractDruidJdbcStatement( + final DruidConnection connection, + final int statementId + ) + { + this.connection = Preconditions.checkNotNull(connection, "connection"); + this.statementId = statementId; + } + + protected static Meta.Signature createSignature(PrepareResult prepareResult, String sql) + { + List params = new ArrayList<>(); + final RelDataType parameterRowType = prepareResult.getParameterRowType(); + for (RelDataTypeField field : parameterRowType.getFieldList()) { + RelDataType type = field.getType(); + params.add(createParameter(field, type)); + } + return Meta.Signature.create( + createColumnMetaData(prepareResult.getRowType()), + sql, + params, + Meta.CursorFactory.ARRAY, + Meta.StatementType.SELECT // We only support SELECT + ); + } + + private static AvaticaParameter createParameter(RelDataTypeField field, RelDataType type) + { + // signed is always false because no way to extract from RelDataType, and the only usage of this AvaticaParameter + // constructor I can find, in CalcitePrepareImpl, does it this way with hard coded false + return new AvaticaParameter( + false, + type.getPrecision(), + type.getScale(), + type.getSqlTypeName().getJdbcOrdinal(), + type.getSqlTypeName().getName(), + Calcites.sqlTypeNameJdbcToJavaClass(type.getSqlTypeName()).getName(), + field.getName() + ); + } + + public static List createColumnMetaData(final RelDataType rowType) + { + final List columns = new ArrayList<>(); + List fieldList = rowType.getFieldList(); + + for (int i = 0; i < fieldList.size(); i++) { + RelDataTypeField field = fieldList.get(i); + + final ColumnMetaData.AvaticaType columnType; + if (field.getType().getSqlTypeName() == SqlTypeName.ARRAY) { + final ColumnMetaData.Rep elementRep = rep(field.getType().getComponentType().getSqlTypeName()); + final ColumnMetaData.ScalarType elementType = ColumnMetaData.scalar( + field.getType().getComponentType().getSqlTypeName().getJdbcOrdinal(), + field.getType().getComponentType().getSqlTypeName().getName(), + elementRep + ); + final ColumnMetaData.Rep arrayRep = rep(field.getType().getSqlTypeName()); + columnType = ColumnMetaData.array( + elementType, + field.getType().getSqlTypeName().getName(), + arrayRep + ); + } else { + final ColumnMetaData.Rep rep = rep(field.getType().getSqlTypeName()); + columnType = ColumnMetaData.scalar( + field.getType().getSqlTypeName().getJdbcOrdinal(), + field.getType().getSqlTypeName().getName(), + rep + ); + } + columns.add( + new ColumnMetaData( + i, // ordinal + false, // auto increment + true, // case sensitive + false, // searchable + false, // currency + field.getType().isNullable() + ? DatabaseMetaData.columnNullable + : DatabaseMetaData.columnNoNulls, // nullable + true, // signed + field.getType().getPrecision(), // display size + field.getName(), // label + null, // column name + null, // schema name + field.getType().getPrecision(), // precision + field.getType().getScale(), // scale + null, // table name + null, // catalog name + columnType, // avatica type + true, // read only + false, // writable + false, // definitely writable + columnType.columnClassName() // column class name + ) + ); + } + + return columns; + } + + private static ColumnMetaData.Rep rep(final SqlTypeName sqlType) + { + if (SqlTypeName.CHAR_TYPES.contains(sqlType)) { + return ColumnMetaData.Rep.of(String.class); + } else if (sqlType == SqlTypeName.TIMESTAMP) { + return ColumnMetaData.Rep.of(Long.class); + } else if (sqlType == SqlTypeName.DATE) { + return ColumnMetaData.Rep.of(Integer.class); + } else if (sqlType == SqlTypeName.INTEGER) { + // use Number.class for exact numeric types since JSON transport might switch longs to integers + return ColumnMetaData.Rep.of(Number.class); + } else if (sqlType == SqlTypeName.BIGINT) { + // use Number.class for exact numeric types since JSON transport might switch longs to integers + return ColumnMetaData.Rep.of(Number.class); + } else if (sqlType == SqlTypeName.FLOAT) { + return ColumnMetaData.Rep.of(Float.class); + } else if (sqlType == SqlTypeName.DOUBLE || sqlType == SqlTypeName.DECIMAL) { + return ColumnMetaData.Rep.of(Double.class); + } else if (sqlType == SqlTypeName.BOOLEAN) { + return ColumnMetaData.Rep.of(Boolean.class); + } else if (sqlType == SqlTypeName.OTHER) { + return ColumnMetaData.Rep.of(Object.class); + } else if (sqlType == SqlTypeName.ARRAY) { + return ColumnMetaData.Rep.of(Array.class); + } else { + throw new ISE("No rep for SQL type [%s]", sqlType); + } + } + + public Meta.Frame nextFrame(final long fetchOffset, final int fetchMaxRowCount) + { + Meta.Frame frame = requireResultSet().nextFrame(fetchOffset, fetchMaxRowCount); + + // Implicitly close after the last result frame. + if (frame.done) { + closeResultSet(); + } + return frame; + } + + public abstract Meta.Signature getSignature(); + + public void closeResultSet() + { + // Lock held only to get the result set, not during cleanup. + DruidJdbcResultSet currentResultSet; + synchronized (this) { + currentResultSet = resultSet; + resultSet = null; + } + if (currentResultSet != null) { + currentResultSet.close(); + } + } + + protected synchronized DruidJdbcResultSet requireResultSet() + { + if (resultSet == null) { + throw new ISE("No result set open for statement [%d]", statementId); + } + return resultSet; + } + + public long getCurrentOffset() + { + return requireResultSet().getCurrentOffset(); + } + + public synchronized boolean isDone() + { + return resultSet == null ? true : resultSet.isDone(); + } + + @Override + public synchronized void close() + { + closeResultSet(); + } + + public String getConnectionId() + { + return connection.getConnectionId(); + } + + public int getStatementId() + { + return statementId; + } + + public ExecutorService executor() + { + return connection.executor(); + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/DruidConnection.java b/sql/src/main/java/org/apache/druid/sql/avatica/DruidConnection.java index 2cd277f1c580..ab6ae65a98ce 100644 --- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidConnection.java +++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidConnection.java @@ -20,17 +20,22 @@ package org.apache.druid.sql.avatica; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.errorprone.annotations.concurrent.GuardedBy; +import org.apache.calcite.tools.RelConversionException; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.QueryContext; import org.apache.druid.sql.SqlLifecycleFactory; +import org.apache.druid.sql.SqlQueryPlus; +import org.apache.druid.sql.calcite.planner.PlannerContext; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -44,15 +49,14 @@ public class DruidConnection private final String connectionId; private final int maxStatements; - private final ImmutableMap userSecret; - private final QueryContext context; + private final Map userSecret; + private final Map context; private final AtomicInteger statementCounter = new AtomicInteger(); private final AtomicReference> timeoutFuture = new AtomicReference<>(); + private final ExecutorService yielderOpenCloseExecutor; - // Typically synchronized by connectionLock, except in one case: the onClose function passed - // into DruidStatements contained by the map. @GuardedBy("connectionLock") - private final ConcurrentMap statements; + private final ConcurrentMap statements = new ConcurrentHashMap<>(); private final Object connectionLock = new Object(); @GuardedBy("connectionLock") @@ -62,14 +66,19 @@ public DruidConnection( final String connectionId, final int maxStatements, final Map userSecret, - final QueryContext context + final Map context ) { this.connectionId = Preconditions.checkNotNull(connectionId); this.maxStatements = maxStatements; this.userSecret = ImmutableMap.copyOf(userSecret); - this.context = context; - this.statements = new ConcurrentHashMap<>(); + this.context = Preconditions.checkNotNull(context); + this.yielderOpenCloseExecutor = Execs.singleThreaded( + StringUtils.format( + "JDBCYielderOpenCloseExecutor-connection-%s", + StringUtils.encodeForFormat(connectionId) + ) + ); } public String getConnectionId() @@ -77,7 +86,16 @@ public String getConnectionId() return connectionId; } - public DruidStatement createStatement(SqlLifecycleFactory sqlLifecycleFactory) + public QueryContext makeContext() + { + // QueryContext constructor copies the context parameters. + // we don't want to stringify arrays for JDBC ever because Avatica needs to handle this + final QueryContext queryContext = new QueryContext(context); + queryContext.addSystemParam(PlannerContext.CTX_SQL_STRINGIFY_ARRAYS, false); + return queryContext; + } + + public DruidJdbcStatement createStatement(SqlLifecycleFactory sqlLifecycleFactory) { final int statementId = statementCounter.incrementAndGet(); @@ -89,36 +107,85 @@ public DruidStatement createStatement(SqlLifecycleFactory sqlLifecycleFactory) } if (statements.size() >= maxStatements) { - throw DruidMeta.logFailure(new ISE("Too many open statements, limit is[%,d]", maxStatements)); + throw DruidMeta.logFailure(new ISE("Too many open statements, limit is [%,d]", maxStatements)); } @SuppressWarnings("GuardedBy") - final DruidStatement statement = new DruidStatement( - connectionId, + final DruidJdbcStatement statement = new DruidJdbcStatement( + this, statementId, - context, - sqlLifecycleFactory.factorize(), - () -> { - // onClose function for the statement - LOG.debug("Connection[%s] closed statement[%s].", connectionId, statementId); - // statements will be accessed unsynchronized to avoid deadlock - statements.remove(statementId); - } + sqlLifecycleFactory ); statements.put(statementId, statement); - LOG.debug("Connection[%s] opened statement[%s].", connectionId, statementId); + LOG.debug("Connection [%s] opened statement [%s].", connectionId, statementId); return statement; } } - public DruidStatement getStatement(final int statementId) + public DruidJdbcPreparedStatement createPreparedStatement( + SqlLifecycleFactory sqlLifecycleFactory, + SqlQueryPlus queryPlus, + final long maxRowCount) + { + final int statementId = statementCounter.incrementAndGet(); + + synchronized (connectionLock) { + if (statements.containsKey(statementId)) { + // Will only happen if statementCounter rolls over before old statements are cleaned up. If this + // ever happens then something fishy is going on, because we shouldn't have billions of statements. + throw DruidMeta.logFailure(new ISE("Uh oh, too many statements")); + } + + if (statements.size() >= maxStatements) { + throw DruidMeta.logFailure(new ISE("Too many open statements, limit is [%,d]", maxStatements)); + } + + @SuppressWarnings("GuardedBy") + final DruidJdbcPreparedStatement jdbcStmt = new DruidJdbcPreparedStatement( + this, + statementId, + queryPlus, + sqlLifecycleFactory, + maxRowCount + ); + jdbcStmt.prepare(); + + statements.put(statementId, jdbcStmt); + LOG.debug("Connection [%s] opened prepared statement [%s].", connectionId, statementId); + return jdbcStmt; + } + } + + public void prepareAndExecute( + final DruidJdbcStatement druidStatement, + final SqlQueryPlus queryPlus, + final long maxRowCount + ) throws RelConversionException + { + Preconditions.checkNotNull(context, "JDBC connection context is null!"); + druidStatement.execute(queryPlus.withContext(makeContext()), maxRowCount); + } + + public AbstractDruidJdbcStatement getStatement(final int statementId) { synchronized (connectionLock) { return statements.get(statementId); } } + public void closeStatement(int statementId) + { + AbstractDruidJdbcStatement stmt; + synchronized (connectionLock) { + stmt = statements.remove(statementId); + } + if (stmt != null) { + stmt.close(); + LOG.debug("Connection [%s] closed statement [%s].", connectionId, statementId); + } + } + /** * Closes this connection if it has no statements. * @@ -139,18 +206,18 @@ public boolean closeIfEmpty() public void close() { synchronized (connectionLock) { - // Copy statements before iterating because statement.close() modifies it. - for (DruidStatement statement : ImmutableList.copyOf(statements.values())) { + open = false; + for (AbstractDruidJdbcStatement statement : statements.values()) { try { statement.close(); } catch (Exception e) { - LOG.warn("Connection[%s] failed to close statement[%s]!", connectionId, statement.getStatementId()); + LOG.warn("Connection [%s] failed to close statement [%s]!", connectionId, statement.getStatementId()); } } - - LOG.debug("Connection[%s] closed.", connectionId); - open = false; + statements.clear(); + yielderOpenCloseExecutor.shutdownNow(); + LOG.debug("Connection [%s] closed.", connectionId); } } @@ -167,4 +234,9 @@ public Map userSecret() { return userSecret; } + + public ExecutorService executor() + { + return yielderOpenCloseExecutor; + } } diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcPreparedStatement.java b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcPreparedStatement.java new file mode 100644 index 000000000000..5f23b8ee4d2b --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcPreparedStatement.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.avatica; + +import com.google.errorprone.annotations.concurrent.GuardedBy; +import org.apache.calcite.avatica.Meta; +import org.apache.calcite.avatica.remote.TypedValue; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.server.security.ForbiddenException; +import org.apache.druid.sql.SqlLifecycle; +import org.apache.druid.sql.SqlLifecycleFactory; +import org.apache.druid.sql.SqlQueryPlus; +import org.apache.druid.sql.calcite.planner.PrepareResult; + +import java.util.List; + +/** + * The Druid implementation of the server-side representation of the + * JDBC {@code PreparedStatement} class. A prepared statement can be prepared + * once with a query, then executed any number of times, typically with + * parameter values. No parameters are provided during prepare, though we do + * learn the parameter definitions passed back to the client and used by + * Avatica for serialization. Each execution produces a + * {@link DruidJdbcResultSet}. Only one execution is active at a time. + */ +public class DruidJdbcPreparedStatement extends AbstractDruidJdbcStatement +{ + private final SqlLifecycle sqlStatement; + private final SqlQueryPlus queryPlus; + private final SqlLifecycleFactory lifecycleFactory; + private final long maxRowCount; + private Meta.Signature signature; + private State state = State.NEW; + + public DruidJdbcPreparedStatement( + final DruidConnection connection, + final int statementId, + final SqlQueryPlus queryPlus, + final SqlLifecycleFactory lifecycleFactory, + final long maxRowCount + ) + { + super(connection, statementId); + this.lifecycleFactory = lifecycleFactory; + this.queryPlus = queryPlus; + this.maxRowCount = maxRowCount; + this.sqlStatement = lifecycleFactory.factorize(); + sqlStatement.initialize(queryPlus.sql(), connection.makeContext()); + } + + public synchronized void prepare() + { + try { + ensure(State.NEW); + sqlStatement.validateAndAuthorize(queryPlus.authResult()); + PrepareResult prepareResult = sqlStatement.prepare(); + signature = createSignature( + prepareResult, + queryPlus.sql() + ); + state = State.PREPARED; + } + catch (ForbiddenException e) { + // Can't finalize statement in in this case. Call will fail with an + // assertion error. + DruidMeta.logFailure(e); + state = State.CLOSED; + throw e; + } + catch (RuntimeException e) { + failed(e); + throw e; + } + catch (Throwable t) { + failed(t); + throw new RuntimeException(t); + } + } + + @Override + public synchronized Meta.Signature getSignature() + { + ensure(State.PREPARED); + return signature; + } + + public synchronized void execute(List parameters) + { + ensure(State.PREPARED); + closeResultSet(); + try { + SqlLifecycle directStmt = lifecycleFactory.factorize(); + directStmt.initialize(queryPlus.sql(), connection.makeContext()); + directStmt.setParameters(parameters); + resultSet = new DruidJdbcResultSet(this, queryPlus, directStmt, maxRowCount); + resultSet.execute(); + } + // Failure to execute does not close the prepared statement. + catch (RuntimeException e) { + failed(e); + throw e; + } + catch (Throwable t) { + failed(t); + throw new RuntimeException(t); + } + } + + @GuardedBy("this") + private void ensure(final State... desiredStates) + { + for (State desiredState : desiredStates) { + if (state == desiredState) { + return; + } + } + throw new ISE("Invalid action for state [%s]", state); + } + + private void failed(Throwable t) + { + super.close(); + sqlStatement.finalizeStateAndEmitLogsAndMetrics(t, null, -1); + state = State.CLOSED; + } + + @Override + public synchronized void close() + { + if (state != State.CLOSED) { + super.close(); + sqlStatement.finalizeStateAndEmitLogsAndMetrics(null, null, -1); + } + state = State.CLOSED; + } + + enum State + { + NEW, + PREPARED, + CLOSED + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java new file mode 100644 index 000000000000..d4c3eba1d018 --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.avatica; + +import com.google.common.base.Preconditions; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import org.apache.calcite.avatica.Meta; +import org.apache.calcite.tools.RelConversionException; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Yielder; +import org.apache.druid.java.util.common.guava.Yielders; +import org.apache.druid.sql.SqlLifecycle; +import org.apache.druid.sql.SqlQueryPlus; +import org.apache.druid.sql.calcite.planner.PrepareResult; + +import java.io.Closeable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Druid's server-side representation of a JDBC result set. At most one + * can be open per statement (standard or prepared). The implementation + * is based on Druid's {@link SqlLifecycle} class. Even if result + * set is for a {@code PreparedStatement}, the result set itself uses + * a Druid {@code SqlLifecycle} which includes the parameter values + * given for the execution. This allows Druid's planner to use the "query + * optimized" form of parameter substitution: we replan the query for + * each execution with the parameter values. + *

+ * Avatica returns results in {@link Meta.Frame} objects as batches of + * rows. The result set uses the {@code TYPE_FORWARD_ONLY} execution model: + * the application can only read results sequentially, the application + * can't jump around or read backwards. As a result, the enclosing + * statement closes the result set at EOF to release resources early. + */ +public class DruidJdbcResultSet implements Closeable +{ + /** + * Query metrics can only be used within a single thread. Because results can + * be paginated into multiple JDBC frames (each frame being processed by a + * potentially different thread), the thread that closes the yielder + * (resulting in a QueryMetrics emit() call) may not be the same thread that + * created the yielder (which initializes DefaultQueryMetrics with the current + * thread as the owner). Create and close the yielder with this single-thread + * executor to prevent this from happening. + *

+ * The thread owner check in DefaultQueryMetrics is more aggressive than + * needed for this specific JDBC case, since the JDBC frames are processed + * sequentially. If the thread owner check is changed/loosened to permit this + * use case, we would not need to use this executor. + *

+ * See discussion at: + * https://github.com/apache/druid/pull/4288 + * https://github.com/apache/druid/pull/4415 + */ + private final AbstractDruidJdbcStatement jdbcStatement; + private final SqlQueryPlus sqlRequest; + private final SqlLifecycle stmt; + private final long maxRowCount; + private State state = State.NEW; + private Meta.Signature signature; + private Yielder yielder; + private int offset; + + public DruidJdbcResultSet( + final AbstractDruidJdbcStatement jdbcStatement, + final SqlQueryPlus sqlRequest, + final SqlLifecycle stmt, + final long maxRowCount + ) + { + this.jdbcStatement = jdbcStatement; + this.stmt = stmt; + this.sqlRequest = sqlRequest; + this.maxRowCount = maxRowCount; + } + + public synchronized void execute() throws RelConversionException + { + ensure(State.NEW); + stmt.validateAndAuthorize(sqlRequest.authResult()); + PrepareResult prepareResult = stmt.prepare(); + stmt.plan(); + signature = AbstractDruidJdbcStatement.createSignature( + prepareResult, + sqlRequest.sql() + ); + try { + state = State.RUNNING; + final Sequence baseSequence = jdbcStatement.executor().submit(stmt::execute).get(); + + // We can't apply limits greater than Integer.MAX_VALUE, ignore them. + final Sequence retSequence = + maxRowCount >= 0 && maxRowCount <= Integer.MAX_VALUE + ? baseSequence.limit((int) maxRowCount) + : baseSequence; + + yielder = Yielders.each(retSequence); + } + catch (Throwable t) { + throw closeAndPropagateThrowable(t); + } + } + + public synchronized boolean isDone() + { + return state == State.DONE; + } + + public synchronized Meta.Signature getSignature() + { + ensure(State.RUNNING, State.DONE); + return signature; + } + + public synchronized Meta.Frame nextFrame(final long fetchOffset, final int fetchMaxRowCount) + { + ensure(State.RUNNING, State.DONE); + Preconditions.checkState(fetchOffset == offset, "fetchOffset [%,d] != offset [%,d]", fetchOffset, offset); + if (state == State.DONE) { + return new Meta.Frame(fetchOffset, true, Collections.emptyList()); + } + + try { + final List rows = new ArrayList<>(); + while (!yielder.isDone() && (fetchMaxRowCount < 0 || offset < fetchOffset + fetchMaxRowCount)) { + rows.add(yielder.get()); + yielder = yielder.next(null); + offset++; + } + + if (yielder.isDone()) { + state = State.DONE; + } + + return new Meta.Frame(fetchOffset, state == State.DONE, rows); + } + catch (Throwable t) { + throw closeAndPropagateThrowable(t); + } + } + + public synchronized long getCurrentOffset() + { + ensure(State.RUNNING, State.DONE); + return offset; + } + + @GuardedBy("this") + private void ensure(final State... desiredStates) + { + for (State desiredState : desiredStates) { + if (state == desiredState) { + return; + } + } + throw new ISE("Invalid action for state [%s]", state); + } + + private RuntimeException closeAndPropagateThrowable(Throwable t) + { + DruidMeta.logFailure(t); + // Report a failure so that the failure is logged. + try { + close(t); + } + catch (Throwable t1) { + t.addSuppressed(t1); + } + finally { + state = State.FAILED; + } + + // Avoid unnecessary wrapping. + if (t instanceof RuntimeException) { + return (RuntimeException) t; + } + return new RuntimeException(t); + } + + @Override + public synchronized void close() + { + close(null); + } + + private void close(Throwable error) + { + if (state == State.NEW) { + state = State.CLOSED; + } + if (state == State.CLOSED || state == State.FAILED) { + return; + } + state = State.CLOSED; + try { + if (yielder != null) { + Yielder theYielder = this.yielder; + this.yielder = null; + + // Put the close last, so any exceptions it throws are after we did the other cleanup above. + jdbcStatement.executor().submit( + () -> { + theYielder.close(); + // makes this a Callable instead of Runnable so we don't need to catch exceptions inside the lambda + return null; + } + ).get(); + + } + } + catch (RuntimeException e) { + throw e; + } + catch (Throwable t) { + throw new RuntimeException(t); + } + finally { + stmt.finalizeStateAndEmitLogsAndMetrics(error, null, -1); + } + } + + private enum State + { + NEW, + RUNNING, + DONE, + FAILED, + CLOSED + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcStatement.java b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcStatement.java new file mode 100644 index 000000000000..ebe64f5bdec8 --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcStatement.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.avatica; + +import com.google.common.base.Preconditions; +import org.apache.calcite.avatica.Meta; +import org.apache.calcite.tools.RelConversionException; +import org.apache.druid.server.security.ForbiddenException; +import org.apache.druid.sql.SqlLifecycle; +import org.apache.druid.sql.SqlLifecycleFactory; +import org.apache.druid.sql.SqlQueryPlus; + +/** + * Represents Druid's version of the JDBC {@code Statement} class: + * can be executed multiple times, one after another, producing a + * {@link DruidJdbcResultSet} for each execution. + */ +public class DruidJdbcStatement extends AbstractDruidJdbcStatement +{ + private final SqlLifecycleFactory lifecycleFactory; + protected boolean closed; + + public DruidJdbcStatement( + final DruidConnection connection, + final int statementId, + final SqlLifecycleFactory lifecycleFactory + ) + { + super(connection, statementId); + this.lifecycleFactory = Preconditions.checkNotNull(lifecycleFactory, "lifecycleFactory"); + } + + public synchronized void execute(SqlQueryPlus sqlRequest, long maxRowCount) throws RelConversionException + { + closeResultSet(); + SqlLifecycle stmt = lifecycleFactory.factorize(); + stmt.initialize(sqlRequest.sql(), connection.makeContext()); + try { + stmt.validateAndAuthorize(sqlRequest.authResult()); + resultSet = new DruidJdbcResultSet(this, sqlRequest, stmt, Long.MAX_VALUE); + resultSet.execute(); + } + catch (ForbiddenException e) { + // Can't finalize statement in in this case. Call will fail with an + // assertion error. + resultSet = null; + DruidMeta.logFailure(e); + throw e; + } + catch (Throwable t) { + stmt.finalizeStateAndEmitLogsAndMetrics(t, null, -1); + resultSet = null; + throw t; + } + } + + @Override + public Meta.Signature getSignature() + { + return requireResultSet().getSignature(); + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java b/sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java index 781cae53da67..d318bd4bf388 100644 --- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java +++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java @@ -40,20 +40,19 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.query.QueryContext; import org.apache.druid.server.security.AuthenticationResult; import org.apache.druid.server.security.Authenticator; import org.apache.druid.server.security.AuthenticatorMapper; import org.apache.druid.server.security.ForbiddenException; import org.apache.druid.sql.SqlLifecycleFactory; +import org.apache.druid.sql.SqlQueryPlus; import org.apache.druid.sql.calcite.planner.Calcites; -import org.apache.druid.sql.calcite.planner.PlannerContext; import org.joda.time.Interval; import javax.annotation.Nonnull; import javax.annotation.Nullable; + import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -108,7 +107,7 @@ public static T logFailure(T error) private final ErrorHandler errorHandler; /** - * Used to track logical connections. + * Tracks logical connections. */ private final ConcurrentMap connections = new ConcurrentHashMap<>(); @@ -157,16 +156,13 @@ public void openConnection(final ConnectionHandle ch, final Map } } } - // we don't want to stringify arrays for JDBC ever because avatica needs to handle this - final QueryContext context = new QueryContext(contextMap); - context.addSystemParam(PlannerContext.CTX_SQL_STRINGIFY_ARRAYS, false); - openDruidConnection(ch.id, secret, context); + openDruidConnection(ch.id, secret, contextMap); } catch (NoSuchConnectionException e) { throw e; } catch (Throwable t) { - // we want to avoid sanitizing avatica specific exceptions as the avatica code can rely on them to handle issues + // we want to avoid sanitizing Avatica specific exceptions as the Avatica code can rely on them to handle issues // differently throw errorHandler.sanitize(t); } @@ -206,11 +202,16 @@ public ConnectionProperties connectionSync(final ConnectionHandle ch, final Conn } } + /** + * Creates a new implementation of the one-pass JDBC {@code Statement} + * class. Corresponds to the JDBC {@code Connection.createStatement()} + * method. + */ @Override public StatementHandle createStatement(final ConnectionHandle ch) { try { - final DruidStatement druidStatement = getDruidConnection(ch.id).createStatement(sqlLifecycleFactory); + final DruidJdbcStatement druidStatement = getDruidConnection(ch.id).createStatement(sqlLifecycleFactory); return new StatementHandle(ch.id, druidStatement.getStatementId(), null); } catch (NoSuchConnectionException e) { @@ -221,6 +222,11 @@ public StatementHandle createStatement(final ConnectionHandle ch) } } + /** + * Creates a new implementation of the JDBC {@code PreparedStatement} + * class which allows preparing once, executing many times. Corresponds to + * the JDBC {@code Connection.prepareStatement()} call. + */ @Override public StatementHandle prepare( final ConnectionHandle ch, @@ -229,26 +235,19 @@ public StatementHandle prepare( ) { try { - final StatementHandle statement = createStatement(ch); - final DruidStatement druidStatement; - try { - druidStatement = getDruidStatement(statement); - } - catch (NoSuchStatementException e) { - throw logFailure(new ISE(e, e.getMessage())); - } - final DruidConnection druidConnection = getDruidConnection(statement.connectionId); - AuthenticationResult authenticationResult = authenticateConnection(druidConnection); - if (authenticationResult == null) { - throw logFailure( - new ForbiddenException("Authentication failed."), - "Authentication failed for statement[%s]", - druidStatement.getStatementId() - ); - } - statement.signature = druidStatement.prepare(sql, maxRowCount, authenticationResult).getSignature(); - LOG.debug("Successfully prepared statement[%s] for execution", druidStatement.getStatementId()); - return statement; + final DruidConnection druidConnection = getDruidConnection(ch.id); + SqlQueryPlus sqlReq = new SqlQueryPlus( + sql, + null, // Context provided by connection + null, // No parameters in this path + doAuthenticate(druidConnection) + ); + DruidJdbcPreparedStatement stmt = druidConnection.createPreparedStatement( + sqlLifecycleFactory, + sqlReq, + maxRowCount); + LOG.debug("Successfully prepared statement [%s] for execution", stmt.getStatementId()); + return new StatementHandle(ch.id, stmt.getStatementId(), stmt.getSignature()); } catch (NoSuchConnectionException e) { throw e; @@ -258,6 +257,18 @@ public StatementHandle prepare( } } + private AuthenticationResult doAuthenticate(final DruidConnection druidConnection) + { + AuthenticationResult authenticationResult = authenticateConnection(druidConnection); + if (authenticationResult == null) { + throw logFailure( + new ForbiddenException("Authentication failed."), + "Authentication failed for prepare" + ); + } + return authenticationResult; + } + @Deprecated @Override public ExecuteResult prepareAndExecute( @@ -271,6 +282,9 @@ public ExecuteResult prepareAndExecute( throw errorHandler.sanitize(new UOE("Deprecated")); } + /** + * Prepares and executes a JDBC {@code Statement} + */ @Override public ExecuteResult prepareAndExecute( final StatementHandle statement, @@ -280,39 +294,19 @@ public ExecuteResult prepareAndExecute( final PrepareCallback callback ) throws NoSuchStatementException { + try { // Ignore "callback", this class is designed for use with LocalService which doesn't use it. - final DruidStatement druidStatement = getDruidStatement(statement); + final DruidJdbcStatement druidStatement = getDruidStatement(statement, DruidJdbcStatement.class); final DruidConnection druidConnection = getDruidConnection(statement.connectionId); - AuthenticationResult authenticationResult = authenticateConnection(druidConnection); - if (authenticationResult == null) { - throw logFailure( - new ForbiddenException("Authentication failed."), - "Authentication failed for statement[%s]", - druidStatement.getStatementId() - ); - } - druidStatement.prepare(sql, maxRowCount, authenticationResult); - final Frame firstFrame = druidStatement.execute(Collections.emptyList()) - .nextFrame( - DruidStatement.START_OFFSET, - getEffectiveMaxRowsPerFrame(maxRowsInFirstFrame) - ); - final Signature signature = druidStatement.getSignature(); - LOG.debug("Successfully prepared statement[%s] and started execution", druidStatement.getStatementId()); - return new ExecuteResult( - ImmutableList.of( - MetaResultSet.create( - statement.connectionId, - statement.id, - false, - signature, - firstFrame - ) - ) - ); - } - // cannot affect these exceptions as avatica handles them + // No parameters for a "regular" JDBC statement. + SqlQueryPlus sqlRequest = new SqlQueryPlus(sql, null, null, doAuthenticate(druidConnection)); + druidConnection.prepareAndExecute(druidStatement, sqlRequest, maxRowCount); + ExecuteResult result = doFetch(druidStatement, maxRowsInFirstFrame); + LOG.debug("Successfully prepared statement [%s] and started execution", druidStatement.getStatementId()); + return result; + } + // Cannot affect these exceptions as Avatica handles them. catch (NoSuchConnectionException | NoSuchStatementException e) { throw e; } @@ -321,6 +315,27 @@ public ExecuteResult prepareAndExecute( } } + private ExecuteResult doFetch(AbstractDruidJdbcStatement druidStatement, int maxRows) + { + final Signature signature = druidStatement.getSignature(); + final Frame firstFrame = druidStatement.nextFrame( + AbstractDruidJdbcStatement.START_OFFSET, + getEffectiveMaxRowsPerFrame(maxRows) + ); + + return new ExecuteResult( + ImmutableList.of( + MetaResultSet.create( + druidStatement.getConnectionId(), + druidStatement.statementId, + false, + signature, + firstFrame + ) + ) + ); + } + @Override public ExecuteBatchResult prepareAndExecuteBatch( final StatementHandle statement, @@ -351,7 +366,7 @@ public Frame fetch( try { final int maxRows = getEffectiveMaxRowsPerFrame(fetchMaxRowCount); LOG.debug("Fetching next frame from offset[%s] with [%s] rows for statement[%s]", offset, maxRows, statement.id); - return getDruidStatement(statement).nextFrame(offset, maxRows); + return getDruidStatement(statement, AbstractDruidJdbcStatement.class).nextFrame(offset, maxRows); } catch (NoSuchConnectionException e) { throw e; @@ -381,26 +396,14 @@ public ExecuteResult execute( ) throws NoSuchStatementException { try { - final DruidStatement druidStatement = getDruidStatement(statement); - final Frame firstFrame = druidStatement.execute(parameterValues) - .nextFrame( - DruidStatement.START_OFFSET, - getEffectiveMaxRowsPerFrame(maxRowsInFirstFrame) - ); - - final Signature signature = druidStatement.getSignature(); - LOG.debug("Successfully started execution of statement[%s]", druidStatement.getStatementId()); - return new ExecuteResult( - ImmutableList.of( - MetaResultSet.create( - statement.connectionId, - statement.id, - false, - signature, - firstFrame - ) - ) - ); + final DruidJdbcPreparedStatement druidStatement = + getDruidStatement(statement, DruidJdbcPreparedStatement.class); + druidStatement.execute(parameterValues); + ExecuteResult result = doFetch(druidStatement, maxRowsInFirstFrame); + LOG.debug( + "Successfully started execution of statement[%s]", + druidStatement.getStatementId()); + return result; } catch (NoSuchStatementException | NoSuchConnectionException e) { throw e; @@ -430,10 +433,7 @@ public void closeStatement(final StatementHandle h) // connections.get, not getDruidConnection, since we want to silently ignore nonexistent statements final DruidConnection druidConnection = connections.get(h.connectionId); if (druidConnection != null) { - final DruidStatement druidStatement = druidConnection.getStatement(h.id); - if (druidStatement != null) { - druidStatement.close(); - } + druidConnection.closeStatement(h.id); } } catch (NoSuchConnectionException e) { @@ -452,7 +452,7 @@ public boolean syncResults( ) throws NoSuchStatementException { try { - final DruidStatement druidStatement = getDruidStatement(sh); + final AbstractDruidJdbcStatement druidStatement = getDruidStatement(sh, AbstractDruidJdbcStatement.class); final boolean isDone = druidStatement.isDone(); final long currentOffset = druidStatement.getCurrentOffset(); if (currentOffset != offset) { @@ -729,7 +729,7 @@ private AuthenticationResult authenticateConnection(final DruidConnection connec private DruidConnection openDruidConnection( final String connectionId, final Map userSecret, - final QueryContext context + final Map context ) { if (connectionCount.incrementAndGet() > config.getMaxConnections()) { @@ -804,14 +804,22 @@ private DruidConnection getDruidConnection(final String connectionId) } @Nonnull - private DruidStatement getDruidStatement(final StatementHandle statement) throws NoSuchStatementException + private T getDruidStatement( + final StatementHandle statement, + final Class stmtClass + ) throws NoSuchStatementException { final DruidConnection connection = getDruidConnection(statement.connectionId); - final DruidStatement druidStatement = connection.getStatement(statement.id); + final AbstractDruidJdbcStatement druidStatement = connection.getStatement(statement.id); if (druidStatement == null) { throw logFailure(new NoSuchStatementException(statement)); } - return druidStatement; + try { + return stmtClass.cast(druidStatement); + } + catch (ClassCastException e) { + throw logFailure(new NoSuchStatementException(statement)); + } } private MetaResultSet sqlResultSet(final ConnectionHandle ch, final String sql) diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/DruidStatement.java b/sql/src/main/java/org/apache/druid/sql/avatica/DruidStatement.java deleted file mode 100644 index b3c7e41284e0..000000000000 --- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidStatement.java +++ /dev/null @@ -1,449 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.sql.avatica; - -import com.google.common.base.Preconditions; -import com.google.errorprone.annotations.concurrent.GuardedBy; -import org.apache.calcite.avatica.AvaticaParameter; -import org.apache.calcite.avatica.ColumnMetaData; -import org.apache.calcite.avatica.Meta; -import org.apache.calcite.avatica.remote.TypedValue; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeField; -import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.concurrent.Execs; -import org.apache.druid.java.util.common.guava.Sequence; -import org.apache.druid.java.util.common.guava.Yielder; -import org.apache.druid.java.util.common.guava.Yielders; -import org.apache.druid.query.QueryContext; -import org.apache.druid.server.security.AuthenticationResult; -import org.apache.druid.server.security.ForbiddenException; -import org.apache.druid.sql.SqlLifecycle; -import org.apache.druid.sql.calcite.planner.Calcites; -import org.apache.druid.sql.calcite.planner.PrepareResult; - -import java.io.Closeable; -import java.sql.Array; -import java.sql.DatabaseMetaData; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutorService; - -/** - * Statement handle for {@link DruidMeta}. Thread-safe. - */ -public class DruidStatement implements Closeable -{ - public static final long START_OFFSET = 0; - private final String connectionId; - private final int statementId; - private final QueryContext queryContext; - @GuardedBy("lock") - private final SqlLifecycle sqlLifecycle; - private final Runnable onClose; - private final Object lock = new Object(); - /** - * Query metrics can only be used within a single thread. Because results can be paginated into multiple - * JDBC frames (each frame being processed by a potentially different thread), the thread that closes the yielder - * (resulting in a QueryMetrics emit() call) may not be the same thread that created the yielder (which initializes - * DefaultQueryMetrics with the current thread as the owner). Create and close the yielder with this - * single-thread executor to prevent this from happening. - *

- * The thread owner check in DefaultQueryMetrics is more aggressive than needed for this specific JDBC case, since - * the JDBC frames are processed sequentially. If the thread owner check is changed/loosened to permit this use case, - * we would not need to use this executor. - *

- * See discussion at: - * https://github.com/apache/druid/pull/4288 - * https://github.com/apache/druid/pull/4415 - */ - private final ExecutorService yielderOpenCloseExecutor; - private State state = State.NEW; - private String query; - private long maxRowCount; - private Meta.Signature signature; - private Yielder yielder; - private int offset = 0; - private Throwable throwable; - private AuthenticationResult authenticationResult; - - public DruidStatement( - final String connectionId, - final int statementId, - final QueryContext queryContext, - final SqlLifecycle sqlLifecycle, - final Runnable onClose - ) - { - this.connectionId = Preconditions.checkNotNull(connectionId, "connectionId"); - this.statementId = statementId; - this.queryContext = queryContext; - this.sqlLifecycle = Preconditions.checkNotNull(sqlLifecycle, "sqlLifecycle"); - this.onClose = Preconditions.checkNotNull(onClose, "onClose"); - this.yielderOpenCloseExecutor = Execs.singleThreaded( - StringUtils.format( - "JDBCYielderOpenCloseExecutor-connection-%s-statement-%d", - StringUtils.encodeForFormat(connectionId), - statementId - ) - ); - } - - public static List createColumnMetaData(final RelDataType rowType) - { - final List columns = new ArrayList<>(); - List fieldList = rowType.getFieldList(); - - for (int i = 0; i < fieldList.size(); i++) { - RelDataTypeField field = fieldList.get(i); - - final ColumnMetaData.AvaticaType columnType; - if (field.getType().getSqlTypeName() == SqlTypeName.ARRAY) { - final ColumnMetaData.Rep elementRep = rep(field.getType().getComponentType().getSqlTypeName()); - final ColumnMetaData.ScalarType elementType = ColumnMetaData.scalar( - field.getType().getComponentType().getSqlTypeName().getJdbcOrdinal(), - field.getType().getComponentType().getSqlTypeName().getName(), - elementRep - ); - final ColumnMetaData.Rep arrayRep = rep(field.getType().getSqlTypeName()); - columnType = ColumnMetaData.array( - elementType, - field.getType().getSqlTypeName().getName(), - arrayRep - ); - } else { - final ColumnMetaData.Rep rep = rep(field.getType().getSqlTypeName()); - columnType = ColumnMetaData.scalar( - field.getType().getSqlTypeName().getJdbcOrdinal(), - field.getType().getSqlTypeName().getName(), - rep - ); - } - columns.add( - new ColumnMetaData( - i, // ordinal - false, // auto increment - true, // case sensitive - false, // searchable - false, // currency - field.getType().isNullable() - ? DatabaseMetaData.columnNullable - : DatabaseMetaData.columnNoNulls, // nullable - true, // signed - field.getType().getPrecision(), // display size - field.getName(), // label - null, // column name - null, // schema name - field.getType().getPrecision(), // precision - field.getType().getScale(), // scale - null, // table name - null, // catalog name - columnType, // avatica type - true, // read only - false, // writable - false, // definitely writable - columnType.columnClassName() // column class name - ) - ); - } - - return columns; - } - - public DruidStatement prepare( - final String query, - final long maxRowCount, - final AuthenticationResult authenticationResult - ) - { - synchronized (lock) { - try { - ensure(State.NEW); - sqlLifecycle.initialize(query, queryContext); - sqlLifecycle.validateAndAuthorize(authenticationResult); - this.authenticationResult = authenticationResult; - PrepareResult prepareResult = sqlLifecycle.prepare(); - this.maxRowCount = maxRowCount; - this.query = query; - List params = new ArrayList<>(); - final RelDataType parameterRowType = prepareResult.getParameterRowType(); - for (RelDataTypeField field : parameterRowType.getFieldList()) { - RelDataType type = field.getType(); - params.add(createParameter(field, type)); - } - this.signature = Meta.Signature.create( - createColumnMetaData(prepareResult.getRowType()), - query, - params, - Meta.CursorFactory.ARRAY, - Meta.StatementType.SELECT // We only support SELECT - ); - this.state = State.PREPARED; - } - catch (Throwable t) { - return closeAndPropagateThrowable(t); - } - - return this; - } - } - - - public DruidStatement execute(List parameters) - { - synchronized (lock) { - ensure(State.PREPARED); - try { - sqlLifecycle.setParameters(parameters); - sqlLifecycle.validateAndAuthorize(authenticationResult); - sqlLifecycle.plan(); - final Sequence baseSequence = yielderOpenCloseExecutor.submit(sqlLifecycle::execute).get(); - - // We can't apply limits greater than Integer.MAX_VALUE, ignore them. - final Sequence retSequence = - maxRowCount >= 0 && maxRowCount <= Integer.MAX_VALUE - ? baseSequence.limit((int) maxRowCount) - : baseSequence; - - yielder = Yielders.each(retSequence); - state = State.RUNNING; - } - catch (Throwable t) { - closeAndPropagateThrowable(t); - } - - return this; - } - } - - public String getConnectionId() - { - return connectionId; - } - - public int getStatementId() - { - return statementId; - } - - public String getQuery() - { - synchronized (lock) { - ensure(State.PREPARED, State.RUNNING, State.DONE); - return query; - } - } - - public Meta.Signature getSignature() - { - synchronized (lock) { - ensure(State.PREPARED, State.RUNNING, State.DONE); - return signature; - } - } - - public long getCurrentOffset() - { - synchronized (lock) { - ensure(State.RUNNING, State.DONE); - return offset; - } - } - - public boolean isDone() - { - synchronized (lock) { - return state == State.DONE; - } - } - - public Meta.Frame nextFrame(final long fetchOffset, final int fetchMaxRowCount) - { - synchronized (lock) { - ensure(State.RUNNING); - Preconditions.checkState(fetchOffset == offset, "fetchOffset[%,d] != offset[%,d]", fetchOffset, offset); - - try { - final List rows = new ArrayList<>(); - while (!yielder.isDone() && (fetchMaxRowCount < 0 || offset < fetchOffset + fetchMaxRowCount)) { - rows.add(yielder.get()); - yielder = yielder.next(null); - offset++; - } - - final boolean done = yielder.isDone(); - if (done) { - close(); - } - - return new Meta.Frame(fetchOffset, done, rows); - } - catch (Throwable t) { - this.throwable = t; - try { - close(); - } - catch (Throwable t1) { - t.addSuppressed(t1); - } - throw t; - } - } - } - - @Override - public void close() - { - State oldState = null; - try { - synchronized (lock) { - oldState = state; - state = State.DONE; - if (yielder != null) { - Yielder theYielder = this.yielder; - this.yielder = null; - - // Put the close last, so any exceptions it throws are after we did the other cleanup above. - yielderOpenCloseExecutor.submit( - () -> { - theYielder.close(); - // makes this a Callable instead of Runnable so we don't need to catch exceptions inside the lambda - return null; - } - ).get(); - - yielderOpenCloseExecutor.shutdownNow(); - } - } - } - catch (Throwable t) { - if (oldState != State.DONE) { - // First close. Run the onClose function. - try { - onClose.run(); - synchronized (lock) { - sqlLifecycle.finalizeStateAndEmitLogsAndMetrics(t, null, -1); - } - } - catch (Throwable t1) { - t.addSuppressed(t1); - } - } - - throw new RuntimeException(t); - } - - if (oldState != State.DONE) { - // First close. Run the onClose function. - try { - if (!(this.throwable instanceof ForbiddenException)) { - synchronized (lock) { - sqlLifecycle.finalizeStateAndEmitLogsAndMetrics(this.throwable, null, -1); - } - } else { - DruidMeta.logFailure(this.throwable); - } - onClose.run(); - } - catch (Throwable t) { - throw new RuntimeException(t); - } - } - } - - private AvaticaParameter createParameter(RelDataTypeField field, RelDataType type) - { - // signed is always false because no way to extract from RelDataType, and the only usage of this AvaticaParameter - // constructor I can find, in CalcitePrepareImpl, does it this way with hard coded false - return new AvaticaParameter( - false, - type.getPrecision(), - type.getScale(), - type.getSqlTypeName().getJdbcOrdinal(), - type.getSqlTypeName().getName(), - Calcites.sqlTypeNameJdbcToJavaClass(type.getSqlTypeName()).getName(), - field.getName() - ); - } - - - private DruidStatement closeAndPropagateThrowable(Throwable t) - { - this.throwable = t; - DruidMeta.logFailure(t); - try { - close(); - } - catch (Throwable t1) { - t.addSuppressed(t1); - } - throw new RuntimeException(t); - } - - @GuardedBy("lock") - private void ensure(final State... desiredStates) - { - for (State desiredState : desiredStates) { - if (state == desiredState) { - return; - } - } - throw new ISE("Invalid action for state[%s]", state); - } - - private static ColumnMetaData.Rep rep(final SqlTypeName sqlType) - { - if (SqlTypeName.CHAR_TYPES.contains(sqlType)) { - return ColumnMetaData.Rep.of(String.class); - } else if (sqlType == SqlTypeName.TIMESTAMP) { - return ColumnMetaData.Rep.of(Long.class); - } else if (sqlType == SqlTypeName.DATE) { - return ColumnMetaData.Rep.of(Integer.class); - } else if (sqlType == SqlTypeName.INTEGER) { - // use Number.class for exact numeric types since JSON transport might switch longs to integers - return ColumnMetaData.Rep.of(Number.class); - } else if (sqlType == SqlTypeName.BIGINT) { - // use Number.class for exact numeric types since JSON transport might switch longs to integers - return ColumnMetaData.Rep.of(Number.class); - } else if (sqlType == SqlTypeName.FLOAT) { - return ColumnMetaData.Rep.of(Float.class); - } else if (sqlType == SqlTypeName.DOUBLE || sqlType == SqlTypeName.DECIMAL) { - return ColumnMetaData.Rep.of(Double.class); - } else if (sqlType == SqlTypeName.BOOLEAN) { - return ColumnMetaData.Rep.of(Boolean.class); - } else if (sqlType == SqlTypeName.OTHER) { - return ColumnMetaData.Rep.of(Object.class); - } else if (sqlType == SqlTypeName.ARRAY) { - return ColumnMetaData.Rep.of(Array.class); - } else { - throw new ISE("No rep for SQL type[%s]", sqlType); - } - } - - enum State - { - NEW, - PREPARED, - RUNNING, - DONE - } -} diff --git a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java index 0e7128b0690b..81167d919f29 100644 --- a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java +++ b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java @@ -54,6 +54,7 @@ import org.apache.druid.query.BaseQuery; import org.apache.druid.query.DefaultQueryConfig; import org.apache.druid.query.QueryRunnerFactoryConglomerate; +import org.apache.druid.server.DruidNode; import org.apache.druid.server.QueryLifecycleFactory; import org.apache.druid.server.QueryScheduler; import org.apache.druid.server.QuerySchedulerProvider; @@ -109,22 +110,29 @@ import java.sql.Timestamp; import java.sql.Types; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ThreadLocalRandom; -public abstract class DruidAvaticaHandlerTest extends CalciteTestBase +/** + * Tests the Avatica-based JDBC implementation using JSON serialization. See + * {@link DruidAvaticaProtobufHandlerTest} for a subclass which runs + * this same set of tests using Protobuf serialization. + */ +public class DruidAvaticaHandlerTest extends CalciteTestBase { private static final AvaticaServerConfig AVATICA_CONFIG = new AvaticaServerConfig() { @Override public int getMaxConnections() { - // This must match the number of Connection objects created in setUp() + // This must match the number of Connection objects created in testTooManyStatements() return 4; } @@ -146,6 +154,7 @@ public static void setUpClass() { resourceCloser = Closer.create(); conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(resourceCloser); + System.setProperty("user.timezone", "UTC"); } @AfterClass @@ -260,139 +269,155 @@ public void tearDown() throws Exception } @Test - public void testSelectCount() throws Exception + public void testSelectCount() throws SQLException { - final ResultSet resultSet = client.createStatement().executeQuery("SELECT COUNT(*) AS cnt FROM druid.foo"); - final List> rows = getRows(resultSet); - Assert.assertEquals( - ImmutableList.of( - ImmutableMap.of("cnt", 6L) - ), - rows - ); + try (Statement stmt = client.createStatement()) { + final ResultSet resultSet = stmt.executeQuery("SELECT COUNT(*) AS cnt FROM druid.foo"); + final List> rows = getRows(resultSet); + Assert.assertEquals( + ImmutableList.of( + ImmutableMap.of("cnt", 6L) + ), + rows + ); + } } @Test - public void testSelectCountNoTrailingSlash() throws Exception + public void testSelectCountNoTrailingSlash() throws SQLException { - final ResultSet resultSet = clientNoTrailingSlash.createStatement().executeQuery("SELECT COUNT(*) AS cnt FROM druid.foo"); - final List> rows = getRows(resultSet); - Assert.assertEquals( - ImmutableList.of( - ImmutableMap.of("cnt", 6L) - ), - rows - ); + try (Statement stmt = clientNoTrailingSlash.createStatement()) { + final ResultSet resultSet = stmt.executeQuery("SELECT COUNT(*) AS cnt FROM druid.foo"); + final List> rows = getRows(resultSet); + Assert.assertEquals( + ImmutableList.of( + ImmutableMap.of("cnt", 6L) + ), + rows + ); + } } @Test - public void testSelectCountAlternateStyle() throws Exception + public void testSelectCountAlternateStyle() throws SQLException { - final ResultSet resultSet = client.prepareStatement("SELECT COUNT(*) AS cnt FROM druid.foo").executeQuery(); - final List> rows = getRows(resultSet); - Assert.assertEquals( - ImmutableList.of( - ImmutableMap.of("cnt", 6L) - ), - rows - ); + try (PreparedStatement stmt = client.prepareStatement("SELECT COUNT(*) AS cnt FROM druid.foo")) { + final ResultSet resultSet = stmt.executeQuery(); + final List> rows = getRows(resultSet); + Assert.assertEquals( + ImmutableList.of( + ImmutableMap.of("cnt", 6L) + ), + rows + ); + } } @Test - public void testTimestampsInResponse() throws Exception + public void testTimestampsInResponse() throws SQLException { - final ResultSet resultSet = client.createStatement().executeQuery( - "SELECT __time, CAST(__time AS DATE) AS t2 FROM druid.foo LIMIT 1" - ); + try (Statement stmt = client.createStatement()) { + final ResultSet resultSet = stmt.executeQuery( + "SELECT __time, CAST(__time AS DATE) AS t2 FROM druid.foo LIMIT 1" + ); - Assert.assertEquals( - ImmutableList.of( - ImmutableMap.of( - "__time", new Timestamp(DateTimes.of("2000-01-01T00:00:00.000Z").getMillis()), - "t2", new Date(DateTimes.of("2000-01-01").getMillis()) - ) - ), - getRows(resultSet) - ); + Assert.assertEquals( + ImmutableList.of( + ImmutableMap.of( + "__time", new Timestamp(DateTimes.of("2000-01-01T00:00:00.000Z").getMillis()), + "t2", new Date(DateTimes.of("2000-01-01").getMillis()) + ) + ), + getRows(resultSet) + ); + } } @Test - public void testTimestampsInResponseLosAngelesTimeZone() throws Exception + public void testTimestampsInResponseLosAngelesTimeZone() throws SQLException { - final ResultSet resultSet = clientLosAngeles.createStatement().executeQuery( - "SELECT __time, CAST(__time AS DATE) AS t2 FROM druid.foo LIMIT 1" - ); + try (Statement stmt = clientLosAngeles.createStatement()) { + final ResultSet resultSet = stmt.executeQuery( + "SELECT __time, CAST(__time AS DATE) AS t2 FROM druid.foo LIMIT 1" + ); - final DateTimeZone timeZone = DateTimes.inferTzFromString("America/Los_Angeles"); - final DateTime localDateTime = new DateTime("2000-01-01T00Z", timeZone); + final DateTimeZone timeZone = DateTimes.inferTzFromString("America/Los_Angeles"); + final DateTime localDateTime = new DateTime("2000-01-01T00Z", timeZone); - final List> resultRows = getRows(resultSet); + final List> resultRows = getRows(resultSet); - Assert.assertEquals( - ImmutableList.of( - ImmutableMap.of( - "__time", new Timestamp(Calcites.jodaToCalciteTimestamp(localDateTime, timeZone)), - "t2", new Date(Calcites.jodaToCalciteTimestamp(localDateTime.dayOfMonth().roundFloorCopy(), timeZone)) - ) - ), - resultRows - ); + Assert.assertEquals( + ImmutableList.of( + ImmutableMap.of( + "__time", new Timestamp(Calcites.jodaToCalciteTimestamp(localDateTime, timeZone)), + "t2", new Date(Calcites.jodaToCalciteTimestamp(localDateTime.dayOfMonth().roundFloorCopy(), timeZone)) + ) + ), + resultRows + ); + } } @Test - public void testFieldAliasingSelect() throws Exception + public void testFieldAliasingSelect() throws SQLException { - final ResultSet resultSet = client.createStatement().executeQuery( - "SELECT dim2 AS \"x\", dim2 AS \"y\" FROM druid.foo LIMIT 1" - ); + try (Statement stmt = client.createStatement()) { + final ResultSet resultSet = stmt.executeQuery( + "SELECT dim2 AS \"x\", dim2 AS \"y\" FROM druid.foo LIMIT 1" + ); - Assert.assertEquals( - ImmutableList.of( - ImmutableMap.of("x", "a", "y", "a") - ), - getRows(resultSet) - ); + Assert.assertEquals( + ImmutableList.of( + ImmutableMap.of("x", "a", "y", "a") + ), + getRows(resultSet) + ); + } } @Test - public void testSelectBoolean() throws Exception + public void testSelectBoolean() throws SQLException { - final ResultSet resultSet = client.createStatement().executeQuery( - "SELECT dim2, dim2 IS NULL AS isnull FROM druid.foo LIMIT 1" - ); + try (Statement stmt = client.createStatement()) { + final ResultSet resultSet = stmt.executeQuery( + "SELECT dim2, dim2 IS NULL AS isnull FROM druid.foo LIMIT 1" + ); - Assert.assertEquals( - ImmutableList.of( - ImmutableMap.of("dim2", "a", "isnull", false) - ), - getRows(resultSet) - ); + Assert.assertEquals( + ImmutableList.of( + ImmutableMap.of("dim2", "a", "isnull", false) + ), + getRows(resultSet) + ); + } } @Test - public void testExplainSelectCount() throws Exception + public void testExplainSelectCount() throws SQLException { - final ResultSet resultSet = clientLosAngeles.createStatement().executeQuery( - "EXPLAIN PLAN FOR SELECT COUNT(*) AS cnt FROM druid.foo" - ); + try (Statement stmt = clientLosAngeles.createStatement()) { + final ResultSet resultSet = stmt.executeQuery( + "EXPLAIN PLAN FOR SELECT COUNT(*) AS cnt FROM druid.foo" + ); - Assert.assertEquals( - ImmutableList.of( - ImmutableMap.of( - "PLAN", - StringUtils.format("DruidQueryRel(query=[{\"queryType\":\"timeseries\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"granularity\":{\"type\":\"all\"},\"aggregations\":[{\"type\":\"count\",\"name\":\"a0\"}],\"context\":{\"sqlQueryId\":\"%s\",\"sqlStringifyArrays\":false,\"sqlTimeZone\":\"America/Los_Angeles\"}}], signature=[{a0:LONG}])\n", - DUMMY_SQL_QUERY_ID - ), - "RESOURCES", - "[{\"name\":\"foo\",\"type\":\"DATASOURCE\"}]" - ) - ), - getRows(resultSet) - ); + Assert.assertEquals( + ImmutableList.of( + ImmutableMap.of( + "PLAN", + StringUtils.format("DruidQueryRel(query=[{\"queryType\":\"timeseries\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"granularity\":{\"type\":\"all\"},\"aggregations\":[{\"type\":\"count\",\"name\":\"a0\"}],\"context\":{\"sqlQueryId\":\"%s\",\"sqlStringifyArrays\":false,\"sqlTimeZone\":\"America/Los_Angeles\"}}], signature=[{a0:LONG}])\n", + DUMMY_SQL_QUERY_ID + ), + "RESOURCES", + "[{\"name\":\"foo\",\"type\":\"DATASOURCE\"}]" + ) + ), + getRows(resultSet) + ); + } } @Test - public void testDatabaseMetaDataCatalogs() throws Exception + public void testDatabaseMetaDataCatalogs() throws SQLException { final DatabaseMetaData metaData = client.getMetaData(); Assert.assertEquals( @@ -404,7 +429,7 @@ public void testDatabaseMetaDataCatalogs() throws Exception } @Test - public void testDatabaseMetaDataSchemas() throws Exception + public void testDatabaseMetaDataSchemas() throws SQLException { final DatabaseMetaData metaData = client.getMetaData(); Assert.assertEquals( @@ -416,7 +441,7 @@ public void testDatabaseMetaDataSchemas() throws Exception } @Test - public void testDatabaseMetaDataTables() throws Exception + public void testDatabaseMetaDataTables() throws SQLException { final DatabaseMetaData metaData = client.getMetaData(); Assert.assertEquals( @@ -485,7 +510,7 @@ public void testDatabaseMetaDataTables() throws Exception } @Test - public void testDatabaseMetaDataTablesAsSuperuser() throws Exception + public void testDatabaseMetaDataTablesAsSuperuser() throws SQLException { final DatabaseMetaData metaData = superuserClient.getMetaData(); Assert.assertEquals( @@ -559,7 +584,7 @@ public void testDatabaseMetaDataTablesAsSuperuser() throws Exception } @Test - public void testDatabaseMetaDataColumns() throws Exception + public void testDatabaseMetaDataColumns() throws SQLException { final DatabaseMetaData metaData = client.getMetaData(); Assert.assertEquals( @@ -637,7 +662,7 @@ public void testDatabaseMetaDataColumns() throws Exception } @Test - public void testDatabaseMetaDataColumnsOnForbiddenDatasource() throws Exception + public void testDatabaseMetaDataColumnsOnForbiddenDatasource() throws SQLException { final DatabaseMetaData metaData = client.getMetaData(); Assert.assertEquals( @@ -650,7 +675,7 @@ public void testDatabaseMetaDataColumnsOnForbiddenDatasource() throws Exception } @Test - public void testDatabaseMetaDataColumnsWithSuperuser() throws Exception + public void testDatabaseMetaDataColumnsWithSuperuser() throws SQLException { final DatabaseMetaData metaData = superuserClient.getMetaData(); Assert.assertEquals( @@ -719,9 +744,8 @@ public void testDatabaseMetaDataColumnsWithSuperuser() throws Exception ); } - @Test(timeout = 90_000L) - public void testConcurrentQueries() throws Exception + public void testConcurrentQueries() throws InterruptedException, ExecutionException { final List> futures = new ArrayList<>(); final ListeningExecutorService exec = MoreExecutors.listeningDecorator( @@ -749,23 +773,24 @@ public void testConcurrentQueries() throws Exception for (int i = 0; i < 2000; i++) { Assert.assertEquals(i + 6, (int) integers.get(i)); } + exec.shutdown(); } @Test - public void testTooManyStatements() throws Exception + public void testTooManyStatements() throws SQLException { - final Statement statement1 = client.createStatement(); - final Statement statement2 = client.createStatement(); - final Statement statement3 = client.createStatement(); - final Statement statement4 = client.createStatement(); + client.createStatement(); + client.createStatement(); + client.createStatement(); + client.createStatement(); expectedException.expect(AvaticaClientRuntimeException.class); - expectedException.expectMessage("Too many open statements, limit is[4]"); - final Statement statement5 = client.createStatement(); + expectedException.expectMessage("Too many open statements, limit is [4]"); + client.createStatement(); } @Test - public void testNotTooManyStatementsWhenYouCloseThem() throws Exception + public void testNotTooManyStatementsWhenYouCloseThem() throws SQLException { client.createStatement().close(); client.createStatement().close(); @@ -777,54 +802,75 @@ public void testNotTooManyStatementsWhenYouCloseThem() throws Exception client.createStatement().close(); client.createStatement().close(); client.createStatement().close(); - - Assert.assertTrue(true); } + /** + * JDBC allows sequential reuse of statements. A statement is not closed until + * the application closes it (or the connection), but the statement's result set + * is closed on each EOF. + */ @Test - public void testNotTooManyStatementsWhenYouFullyIterateThem() throws Exception + public void testManyUsesOfTheSameStatement() throws SQLException { - for (int i = 0; i < 50; i++) { - final ResultSet resultSet = client.createStatement().executeQuery( - "SELECT COUNT(*) AS cnt FROM druid.foo" - ); - Assert.assertEquals( - ImmutableList.of( - ImmutableMap.of("cnt", 6L) - ), - getRows(resultSet) - ); + try (Statement statement = client.createStatement()) { + for (int i = 0; i < 50; i++) { + final ResultSet resultSet = statement.executeQuery( + "SELECT COUNT(*) AS cnt FROM druid.foo" + ); + Assert.assertEquals( + ImmutableList.of( + ImmutableMap.of("cnt", 6L) + ), + getRows(resultSet) + ); + } } - - Assert.assertTrue(true); } + /** + * Statements should not be closed if then encounter an error. The {@code ResultSet} + * can be closed, but not the statement. + */ @Test - public void testNotTooManyStatementsWhenTheyThrowErrors() throws Exception + public void tesErrorsDoNotCloseStatements() throws SQLException { - for (int i = 0; i < 50; i++) { - Exception thrown = null; + try (Statement statement = client.createStatement()) { try { - client.createStatement().executeQuery("SELECT SUM(nonexistent) FROM druid.foo"); + statement.executeQuery("SELECT SUM(nonexistent) FROM druid.foo"); + Assert.fail(); } catch (Exception e) { - thrown = e; + // Expected } - Assert.assertNotNull(thrown); - - final ResultSet resultSet = client.createStatement().executeQuery("SELECT COUNT(*) AS cnt FROM druid.foo"); + final ResultSet resultSet = statement.executeQuery("SELECT COUNT(*) AS cnt FROM druid.foo"); Assert.assertEquals( ImmutableList.of(ImmutableMap.of("cnt", 6L)), getRows(resultSet) ); } + } - Assert.assertTrue(true); + /** + * Since errors do not close statements, they must be closed by the application, + * preferably in a try-with-resources block. + */ + @Test + public void testNotTooManyStatementsWhenClosed() + { + for (int i = 0; i < 50; i++) { + try (Statement statement = client.createStatement()) { + statement.executeQuery("SELECT SUM(nonexistent) FROM druid.foo"); + Assert.fail(); + } + catch (Exception e) { + // Expected + } + } } @Test - public void testAutoReconnectOnNoSuchConnection() throws Exception + public void testAutoReconnectOnNoSuchConnection() throws SQLException { for (int i = 0; i < 50; i++) { final ResultSet resultSet = client.createStatement().executeQuery("SELECT COUNT(*) AS cnt FROM druid.foo"); @@ -834,12 +880,10 @@ public void testAutoReconnectOnNoSuchConnection() throws Exception ); druidMeta.closeAllConnections(); } - - Assert.assertTrue(true); } @Test - public void testTooManyConnections() throws Exception + public void testTooManyConnections() throws SQLException { client.createStatement(); clientLosAngeles.createStatement(); @@ -849,23 +893,16 @@ public void testTooManyConnections() throws Exception expectedException.expect(AvaticaClientRuntimeException.class); expectedException.expectMessage("Too many connections"); - final Connection connection5 = DriverManager.getConnection(url); + DriverManager.getConnection(url); } @Test - public void testNotTooManyConnectionsWhenTheyAreEmpty() throws Exception + public void testNotTooManyConnectionsWhenTheyAreEmpty() throws SQLException { - final Connection connection1 = DriverManager.getConnection(url); - connection1.createStatement().close(); - - final Connection connection2 = DriverManager.getConnection(url); - connection2.createStatement().close(); - - final Connection connection3 = DriverManager.getConnection(url); - connection3.createStatement().close(); - - final Connection connection4 = DriverManager.getConnection(url); - Assert.assertTrue(true); + for (int i = 0; i < 4; i++) { + try (Connection connection = DriverManager.getConnection(url)) { + } + } } @Test @@ -957,7 +994,6 @@ public Frame fetch( ); } - @Test public void testMinRowsPerFrame() throws Exception { @@ -1031,7 +1067,7 @@ public Frame fetch( String smallFrameUrl = this.getJdbcConnectionString(port); Connection smallFrameClient = DriverManager.getConnection(smallFrameUrl, "regularUser", "druid"); - // use a prepared statement because avatica currently ignores fetchSize on the initial fetch of a Statement + // use a prepared statement because Avatica currently ignores fetchSize on the initial fetch of a Statement PreparedStatement statement = smallFrameClient.prepareStatement("SELECT dim1 FROM druid.foo"); // set a fetch size below the minimum configured threshold statement.setFetchSize(2); @@ -1053,12 +1089,14 @@ public Frame fetch( } @Test - @SuppressWarnings("unchecked") - public void testSqlRequestLog() throws Exception + public void testSqlRequestLog() throws SQLException { // valid sql + testRequestLogger.clear(); for (int i = 0; i < 3; i++) { - client.createStatement().executeQuery("SELECT COUNT(*) AS cnt FROM druid.foo"); + try (Statement stmt = client.createStatement()) { + stmt.executeQuery("SELECT COUNT(*) AS cnt FROM druid.foo"); + } } Assert.assertEquals(3, testRequestLogger.getSqlQueryLogs().size()); for (RequestLogLine logLine : testRequestLogger.getSqlQueryLogs()) { @@ -1071,107 +1109,188 @@ public void testSqlRequestLog() throws Exception // invalid sql testRequestLogger.clear(); - try { - client.createStatement().executeQuery("SELECT notexist FROM druid.foo"); + try (Statement stmt = client.createStatement()) { + stmt.executeQuery("SELECT notexist FROM druid.foo"); Assert.fail("invalid SQL should throw SQLException"); } catch (SQLException e) { + // Expected } Assert.assertEquals(1, testRequestLogger.getSqlQueryLogs().size()); - final Map stats = testRequestLogger.getSqlQueryLogs().get(0).getQueryStats().getStats(); - Assert.assertEquals(false, stats.get("success")); - Assert.assertEquals("regularUser", stats.get("identity")); - Assert.assertTrue(stats.containsKey("exception")); + { + final Map stats = testRequestLogger.getSqlQueryLogs().get(0).getQueryStats().getStats(); + Assert.assertEquals(false, stats.get("success")); + Assert.assertEquals("regularUser", stats.get("identity")); + Assert.assertTrue(stats.containsKey("exception")); + } // unauthorized sql testRequestLogger.clear(); - try { - client.createStatement().executeQuery("SELECT count(*) FROM druid.forbiddenDatasource"); + try (Statement stmt = client.createStatement()) { + stmt.executeQuery("SELECT count(*) FROM druid.forbiddenDatasource"); Assert.fail("unauthorzed SQL should throw SQLException"); } catch (SQLException e) { + // Expected } + // SqlLifecycle does not allow logging for security failures. Assert.assertEquals(0, testRequestLogger.getSqlQueryLogs().size()); } @Test - public void testParameterBinding() throws Exception + public void testSqlRequestLogPrepared() throws SQLException { - PreparedStatement statement = client.prepareStatement("SELECT COUNT(*) AS cnt FROM druid.foo WHERE dim1 = ? OR dim1 = ?"); - statement.setString(1, "abc"); - statement.setString(2, "def"); - final ResultSet resultSet = statement.executeQuery(); - final List> rows = getRows(resultSet); - Assert.assertEquals( - ImmutableList.of( - ImmutableMap.of("cnt", 2L) - ), - rows - ); + // valid sql + testRequestLogger.clear(); + for (int i = 0; i < 3; i++) { + try (PreparedStatement stmt = client.prepareStatement("SELECT COUNT(*) AS cnt FROM druid.foo")) { + stmt.execute(); + } + } + Assert.assertEquals(6, testRequestLogger.getSqlQueryLogs().size()); + for (RequestLogLine logLine : testRequestLogger.getSqlQueryLogs()) { + final Map stats = logLine.getQueryStats().getStats(); + Assert.assertEquals(true, stats.get("success")); + Assert.assertEquals("regularUser", stats.get("identity")); + Assert.assertTrue(stats.containsKey("sqlQuery/time")); + Assert.assertTrue(stats.containsKey("sqlQuery/bytes")); + } + + // invalid sql + testRequestLogger.clear(); + try (PreparedStatement stmt = client.prepareStatement("SELECT notexist FROM druid.foo")) { + Assert.fail("invalid SQL should throw SQLException"); + } + catch (SQLException e) { + // Expected + } + Assert.assertEquals(1, testRequestLogger.getSqlQueryLogs().size()); + { + final Map stats = testRequestLogger.getSqlQueryLogs().get(0).getQueryStats().getStats(); + Assert.assertEquals(false, stats.get("success")); + Assert.assertEquals("regularUser", stats.get("identity")); + Assert.assertTrue(stats.containsKey("exception")); + } + + // unauthorized sql + testRequestLogger.clear(); + try (PreparedStatement stmt = client.prepareStatement("SELECT count(*) FROM druid.forbiddenDatasource")) { + Assert.fail("unauthorzed SQL should throw SQLException"); + } + catch (SQLException e) { + // Expected + } + // SqlLifecycle does not allow logging for security failures. + Assert.assertEquals(0, testRequestLogger.getSqlQueryLogs().size()); } @Test - public void testSysTableParameterBindingRegularUser() throws Exception + public void testParameterBinding() throws SQLException { - PreparedStatement statement = - client.prepareStatement("SELECT COUNT(*) AS cnt FROM sys.servers WHERE servers.host = ?"); - statement.setString(1, "dummy"); - - Assert.assertThrows( - "Insufficient permission to view servers", - AvaticaSqlException.class, - statement::executeQuery - ); + try (PreparedStatement statement = client.prepareStatement("SELECT COUNT(*) AS cnt FROM druid.foo WHERE dim1 = ? OR dim1 = ?")) { + statement.setString(1, "abc"); + statement.setString(2, "def"); + final ResultSet resultSet = statement.executeQuery(); + final List> rows = getRows(resultSet); + Assert.assertEquals( + ImmutableList.of( + ImmutableMap.of("cnt", 2L) + ), + rows + ); + } } @Test - public void testSysTableParameterBindingSuperUser() throws Exception + public void testSysTableParameterBindingRegularUser() throws SQLException { - PreparedStatement statement = - superuserClient.prepareStatement("SELECT COUNT(*) AS cnt FROM sys.servers WHERE servers.host = ?"); - statement.setString(1, "dummy"); - final ResultSet resultSet = statement.executeQuery(); - final List> rows = getRows(resultSet); - Assert.assertEquals( - ImmutableList.of( - ImmutableMap.of("cnt", 1L) - ), - rows - ); + try (PreparedStatement statement = + client.prepareStatement("SELECT COUNT(*) AS cnt FROM sys.servers WHERE servers.host = ?")) { + statement.setString(1, "dummy"); + + Assert.assertThrows( + "Insufficient permission to view servers", + AvaticaSqlException.class, + statement::executeQuery + ); + } } @Test - public void testExtendedCharacters() throws Exception + public void testSysTableParameterBindingSuperUser() throws SQLException { - final ResultSet resultSet = client.createStatement().executeQuery( - "SELECT COUNT(*) AS cnt FROM druid.lotsocolumns WHERE dimMultivalEnumerated = 'ㅑ ㅓ ㅕ ㅗ ㅛ ㅜ ㅠ ㅡ ㅣ'" - ); - final List> rows = getRows(resultSet); - Assert.assertEquals( - ImmutableList.of( - ImmutableMap.of("cnt", 1L) - ), - rows - ); + try (PreparedStatement statement = + superuserClient.prepareStatement("SELECT COUNT(*) AS cnt FROM sys.servers WHERE servers.host = ?")) { + statement.setString(1, "dummy"); + Assert.assertEquals( + ImmutableList.of( + ImmutableMap.of("cnt", 1L) + ), + getRows(statement.executeQuery()) + ); + } + } + @Test + public void testExecuteMany() throws SQLException + { + try (PreparedStatement statement = + superuserClient.prepareStatement("SELECT COUNT(*) AS cnt FROM sys.servers WHERE servers.host = ?")) { + statement.setString(1, "dummy"); + Assert.assertEquals( + ImmutableList.of( + ImmutableMap.of("cnt", 1L) + ), + getRows(statement.executeQuery()) + ); + statement.setString(1, "foo"); + Assert.assertEquals( + Collections.emptyList(), + getRows(statement.executeQuery()) + ); + statement.setString(1, "dummy"); + Assert.assertEquals( + ImmutableList.of( + ImmutableMap.of("cnt", 1L) + ), + getRows(statement.executeQuery()) + ); + } + } - PreparedStatement statement = client.prepareStatement( - "SELECT COUNT(*) AS cnt FROM druid.lotsocolumns WHERE dimMultivalEnumerated = ?" - ); - statement.setString(1, "ㅑ ㅓ ㅕ ㅗ ㅛ ㅜ ㅠ ㅡ ㅣ"); - final ResultSet resultSet2 = statement.executeQuery(); - final List> rows2 = getRows(resultSet2); - Assert.assertEquals( - ImmutableList.of( - ImmutableMap.of("cnt", 1L) - ), - rows - ); - Assert.assertEquals(rows, rows2); + @Test + public void testExtendedCharacters() throws SQLException + { + try (Statement stmt = client.createStatement()) { + final ResultSet resultSet = stmt.executeQuery( + "SELECT COUNT(*) AS cnt FROM druid.lotsocolumns WHERE dimMultivalEnumerated = 'ㅑ ㅓ ㅕ ㅗ ㅛ ㅜ ㅠ ㅡ ㅣ'" + ); + final List> rows = getRows(resultSet); + Assert.assertEquals( + ImmutableList.of( + ImmutableMap.of("cnt", 1L) + ), + rows + ); + } + + try (PreparedStatement statement = client.prepareStatement( + "SELECT COUNT(*) AS cnt FROM druid.lotsocolumns WHERE dimMultivalEnumerated = ?")) { + statement.setString(1, "ㅑ ㅓ ㅕ ㅗ ㅛ ㅜ ㅠ ㅡ ㅣ"); + final ResultSet resultSet2 = statement.executeQuery(); + final List> rows = getRows(resultSet2); + Assert.assertEquals( + ImmutableList.of( + ImmutableMap.of("cnt", 1L) + ), + rows + ); + Assert.assertEquals(rows, rows); + } } @Test - public void testEscapingForGetColumns() throws Exception + public void testEscapingForGetColumns() throws SQLException { final DatabaseMetaData metaData = client.getMetaData(); @@ -1325,7 +1444,7 @@ public void testEscapingForGetColumns() throws Exception } @Test - public void testEscapingForGetTables() throws Exception + public void testEscapingForGetTables() throws SQLException { final DatabaseMetaData metaData = client.getMetaData(); @@ -1374,36 +1493,51 @@ public void testEscapingForGetTables() throws Exception ); } - @Test - public void testArrayStuffs() throws Exception + public void testArrayStuff() throws SQLException { - PreparedStatement statement = client.prepareStatement( - "SELECT ARRAY_AGG(dim2) AS arr1, ARRAY_AGG(l1) AS arr2, ARRAY_AGG(d1) AS arr3, ARRAY_AGG(f1) AS arr4 FROM druid.numfoo" - ); - final ResultSet resultSet = statement.executeQuery(); - final List> rows = getRows(resultSet); - Assert.assertEquals(1, rows.size()); - Assert.assertTrue(rows.get(0).containsKey("arr1")); - Assert.assertTrue(rows.get(0).containsKey("arr2")); - Assert.assertTrue(rows.get(0).containsKey("arr3")); - Assert.assertTrue(rows.get(0).containsKey("arr4")); - if (NullHandling.sqlCompatible()) { - Assert.assertArrayEquals(new Object[]{"a", null, "", "a", "abc", null}, (Object[]) rows.get(0).get("arr1")); - Assert.assertArrayEquals(new Object[]{7L, 325323L, 0L, null, null, null}, (Object[]) rows.get(0).get("arr2")); - Assert.assertArrayEquals(new Object[]{1.0, 1.7, 0.0, null, null, null}, (Object[]) rows.get(0).get("arr3")); - Assert.assertArrayEquals(new Object[]{1.0f, 0.1f, 0.0f, null, null, null}, (Object[]) rows.get(0).get("arr4")); - } else { - Assert.assertArrayEquals(new Object[]{"a", null, null, "a", "abc", null}, (Object[]) rows.get(0).get("arr1")); - Assert.assertArrayEquals(new Object[]{7L, 325323L, 0L, 0L, 0L, 0L}, (Object[]) rows.get(0).get("arr2")); - Assert.assertArrayEquals(new Object[]{1.0, 1.7, 0.0, 0.0, 0.0, 0.0}, (Object[]) rows.get(0).get("arr3")); - Assert.assertArrayEquals(new Object[]{1.0f, 0.1f, 0.0f, 0.0f, 0.0f, 0.0f}, (Object[]) rows.get(0).get("arr4")); + try (PreparedStatement statement = client.prepareStatement( + "SELECT ARRAY_AGG(dim2) AS arr1, ARRAY_AGG(l1) AS arr2, ARRAY_AGG(d1) AS arr3, ARRAY_AGG(f1) AS arr4 FROM druid.numfoo")) { + final ResultSet resultSet = statement.executeQuery(); + final List> rows = getRows(resultSet); + Assert.assertEquals(1, rows.size()); + Assert.assertTrue(rows.get(0).containsKey("arr1")); + Assert.assertTrue(rows.get(0).containsKey("arr2")); + Assert.assertTrue(rows.get(0).containsKey("arr3")); + Assert.assertTrue(rows.get(0).containsKey("arr4")); + if (NullHandling.sqlCompatible()) { + Assert.assertArrayEquals(new Object[]{"a", null, "", "a", "abc", null}, (Object[]) rows.get(0).get("arr1")); + Assert.assertArrayEquals(new Object[]{7L, 325323L, 0L, null, null, null}, (Object[]) rows.get(0).get("arr2")); + Assert.assertArrayEquals(new Object[]{1.0, 1.7, 0.0, null, null, null}, (Object[]) rows.get(0).get("arr3")); + Assert.assertArrayEquals(new Object[]{1.0f, 0.1f, 0.0f, null, null, null}, (Object[]) rows.get(0).get("arr4")); + } else { + Assert.assertArrayEquals(new Object[]{"a", null, null, "a", "abc", null}, (Object[]) rows.get(0).get("arr1")); + Assert.assertArrayEquals(new Object[]{7L, 325323L, 0L, 0L, 0L, 0L}, (Object[]) rows.get(0).get("arr2")); + Assert.assertArrayEquals(new Object[]{1.0, 1.7, 0.0, 0.0, 0.0, 0.0}, (Object[]) rows.get(0).get("arr3")); + Assert.assertArrayEquals(new Object[]{1.0f, 0.1f, 0.0f, 0.0f, 0.0f, 0.0f}, (Object[]) rows.get(0).get("arr4")); + } } } - protected abstract String getJdbcConnectionString(int port); + // Default implementation is for JSON to allow debugging of tests. + protected String getJdbcConnectionString(final int port) + { + return StringUtils.format( + "jdbc:avatica:remote:url=http://127.0.0.1:%d%s", + port, + DruidAvaticaJsonHandler.AVATICA_PATH + ); + } - protected abstract AbstractAvaticaHandler getAvaticaHandler(DruidMeta druidMeta); + // Default implementation is for JSON to allow debugging of tests. + protected AbstractAvaticaHandler getAvaticaHandler(final DruidMeta druidMeta) + { + return new DruidAvaticaJsonHandler( + druidMeta, + new DruidNode("dummy", "dummy", false, 1, null, true, false), + new AvaticaMonitor() + ); + } private static List> getRows(final ResultSet resultSet) throws SQLException { @@ -1437,6 +1571,7 @@ private static List> getRows(final ResultSet resultSet, fina } } + @SafeVarargs private static Map row(final Pair... entries) { final Map m = new HashMap<>(); diff --git a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaJsonHandlerTest.java b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaJsonHandlerTest.java deleted file mode 100644 index 1e60905bfb1a..000000000000 --- a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaJsonHandlerTest.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.sql.avatica; - -import org.apache.calcite.avatica.server.AbstractAvaticaHandler; -import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.server.DruidNode; - -public class DruidAvaticaJsonHandlerTest extends DruidAvaticaHandlerTest -{ - @Override - protected String getJdbcConnectionString(final int port) - { - return StringUtils.format( - "jdbc:avatica:remote:url=http://127.0.0.1:%d%s", - port, - DruidAvaticaJsonHandler.AVATICA_PATH - ); - } - - @Override - protected AbstractAvaticaHandler getAvaticaHandler(final DruidMeta druidMeta) - { - return new DruidAvaticaJsonHandler( - druidMeta, - new DruidNode("dummy", "dummy", false, 1, null, true, false), - new AvaticaMonitor() - ); - } -} diff --git a/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java b/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java index c6eeb8339138..fba413d99095 100644 --- a/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java +++ b/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java @@ -20,20 +20,23 @@ package org.apache.druid.sql.avatica; import com.google.common.base.Function; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import org.apache.calcite.avatica.ColumnMetaData; import org.apache.calcite.avatica.Meta; +import org.apache.calcite.avatica.remote.TypedValue; +import org.apache.calcite.tools.RelConversionException; import org.apache.druid.common.config.NullHandling; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.math.expr.ExprMacroTable; -import org.apache.druid.query.QueryContext; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.security.AllowAllAuthenticator; import org.apache.druid.server.security.AuthTestUtils; import org.apache.druid.sql.SqlLifecycleFactory; +import org.apache.druid.sql.SqlQueryPlus; import org.apache.druid.sql.calcite.planner.CalciteRulesManager; import org.apache.druid.sql.calcite.planner.DruidOperatorTable; import org.apache.druid.sql.calcite.planner.PlannerConfig; @@ -58,6 +61,15 @@ public class DruidStatementTest extends CalciteTestBase { + private static String SUB_QUERY_WITH_ORDER_BY = + "select T20.F13 as F22\n" + + "from (SELECT DISTINCT dim1 as F13 FROM druid.foo T10) T20\n" + + "order by T20.F13 ASC"; + private static String SELECT_FROM_FOO = + "SELECT __time, cnt, dim1, dim2, m1 FROM druid.foo"; + private static String SELECT_STAR_FROM_FOO = + "SELECT * FROM druid.foo"; + @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -82,6 +94,7 @@ public static void tearDownClass() throws IOException private SpecificSegmentsQuerySegmentWalker walker; private SqlLifecycleFactory sqlLifecycleFactory; + private DruidConnection conn; @Before public void setUp() throws Exception @@ -103,89 +116,200 @@ public void setUp() throws Exception CalciteTests.DRUID_SCHEMA_NAME, new CalciteRulesManager(ImmutableSet.of()) ); - this.sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory(plannerFactory); + sqlLifecycleFactory = CalciteTests.createSqlLifecycleFactory(plannerFactory); + conn = new DruidConnection("dummy", 4, ImmutableMap.of(), ImmutableMap.of()); } @After public void tearDown() throws Exception { + conn.close(); walker.close(); walker = null; } + //----------------------------------------------------------------- + // Druid JDBC Statement + // + // The JDBC Statement class starts "empty", then allows executing + // one statement at a time. Executing a second automatically closes + // the result set from the first. Each statement takes a new query. + // Parameters are not generally used in this pattern. + + private DruidJdbcStatement jdbcStatement() + { + return new DruidJdbcStatement( + conn, + 0, + sqlLifecycleFactory + ); + } + @Test - public void testSignature() + public void testSubQueryWithOrderByDirect() throws RelConversionException { - final String sql = "SELECT * FROM druid.foo"; - try (final DruidStatement statement = statement(sql)) { - // Check signature. - final Meta.Signature signature = statement.getSignature(); - Assert.assertEquals(Meta.CursorFactory.ARRAY, signature.cursorFactory); - Assert.assertEquals(Meta.StatementType.SELECT, signature.statementType); - Assert.assertEquals(sql, signature.sql); + SqlQueryPlus queryPlus = new SqlQueryPlus( + SUB_QUERY_WITH_ORDER_BY, + null, + null, + AllowAllAuthenticator.ALLOW_ALL_RESULT + ); + try (final DruidJdbcStatement statement = jdbcStatement()) { + // First frame, ask for all rows. + statement.execute(queryPlus, -1); + Meta.Frame frame = statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6); Assert.assertEquals( - Lists.newArrayList( - Lists.newArrayList("__time", "TIMESTAMP", "java.lang.Long"), - Lists.newArrayList("dim1", "VARCHAR", "java.lang.String"), - Lists.newArrayList("dim2", "VARCHAR", "java.lang.String"), - Lists.newArrayList("dim3", "VARCHAR", "java.lang.String"), - Lists.newArrayList("cnt", "BIGINT", "java.lang.Number"), - Lists.newArrayList("m1", "FLOAT", "java.lang.Float"), - Lists.newArrayList("m2", "DOUBLE", "java.lang.Double"), - Lists.newArrayList("unique_dim1", "OTHER", "java.lang.Object") - ), - Lists.transform( - signature.columns, - new Function>() - { - @Override - public List apply(final ColumnMetaData columnMetaData) - { - return Lists.newArrayList( - columnMetaData.label, - columnMetaData.type.name, - columnMetaData.type.rep.clazz.getName() - ); - } - } - ) + subQueryWithOrderByResults(), + frame ); + Assert.assertTrue(statement.isDone()); } } @Test - public void testSubQueryWithOrderBy() + public void testFetchPastEOFDirect() throws RelConversionException { - final String sql = "select T20.F13 as F22 from (SELECT DISTINCT dim1 as F13 FROM druid.foo T10) T20 order by T20.F13 ASC"; - try (final DruidStatement statement = statement(sql)) { + SqlQueryPlus queryPlus = new SqlQueryPlus( + SUB_QUERY_WITH_ORDER_BY, + null, + null, + AllowAllAuthenticator.ALLOW_ALL_RESULT + ); + try (final DruidJdbcStatement statement = jdbcStatement()) { // First frame, ask for all rows. - Meta.Frame frame = statement.execute(Collections.emptyList()).nextFrame(DruidStatement.START_OFFSET, 6); + statement.execute(queryPlus, -1); + Meta.Frame frame = statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6); Assert.assertEquals( - Meta.Frame.create( - 0, - true, - Lists.newArrayList( - new Object[]{""}, - new Object[]{"1"}, - new Object[]{"10.1"}, - new Object[]{"2"}, - new Object[]{"abc"}, - new Object[]{"def"} - ) - ), + subQueryWithOrderByResults(), frame ); Assert.assertTrue(statement.isDone()); + try { + statement.nextFrame(6, 6); + Assert.fail(); + } + catch (Exception e) { + // Expected: can't work with an auto-closed result set. + } } } + /** + * Ensure an error is thrown if the execution step is skipped. + */ @Test - public void testSelectAllInFirstFrame() + public void testSkipExecuteDirect() { - final String sql = "SELECT __time, cnt, dim1, dim2, m1 FROM druid.foo"; - try (final DruidStatement statement = statement(sql)) { + try (final DruidJdbcStatement statement = jdbcStatement()) { + // Error: no call to execute; + statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6); + Assert.fail(); + } + catch (Exception e) { + // Expected + } + } + + @Test + public void testSignatureDirect() throws RelConversionException + { + SqlQueryPlus queryPlus = new SqlQueryPlus( + SELECT_STAR_FROM_FOO, + null, + null, + AllowAllAuthenticator.ALLOW_ALL_RESULT + ); + try (final DruidJdbcStatement statement = jdbcStatement()) { + // Check signature. + statement.execute(queryPlus, -1); + verifySignature(statement.getSignature()); + } + } + + /** + * Ensure an error is thrown if the client attempts to fetch from a + * statement after its result set is closed. + */ + @Test + public void testFetchAfterResultCloseDirect() + { + SqlQueryPlus queryPlus = new SqlQueryPlus( + SUB_QUERY_WITH_ORDER_BY, + null, + null, + AllowAllAuthenticator.ALLOW_ALL_RESULT + ); + try (final DruidJdbcStatement statement = jdbcStatement()) { // First frame, ask for all rows. - Meta.Frame frame = statement.execute(Collections.emptyList()).nextFrame(DruidStatement.START_OFFSET, 6); + statement.execute(queryPlus, -1); + statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6); + statement.closeResultSet(); + statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6); + Assert.fail(); + } + catch (Exception e) { + // Expected + } + } + + @Test + public void testSubQueryWithOrderByDirectTwice() throws RelConversionException + { + SqlQueryPlus queryPlus = new SqlQueryPlus( + SUB_QUERY_WITH_ORDER_BY, + null, + null, + AllowAllAuthenticator.ALLOW_ALL_RESULT + ); + try (final DruidJdbcStatement statement = jdbcStatement()) { + statement.execute(queryPlus, -1); + Meta.Frame frame = statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6); + Assert.assertEquals( + subQueryWithOrderByResults(), + frame + ); + + // Do it again. JDBC says we can reuse statements sequentially. + Assert.assertTrue(statement.isDone()); + statement.execute(queryPlus, -1); + frame = statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6); + Assert.assertEquals( + subQueryWithOrderByResults(), + frame + ); + Assert.assertTrue(statement.isDone()); + } + } + + private Meta.Frame subQueryWithOrderByResults() + { + return Meta.Frame.create( + 0, + true, + Lists.newArrayList( + new Object[]{""}, + new Object[]{"1"}, + new Object[]{"10.1"}, + new Object[]{"2"}, + new Object[]{"abc"}, + new Object[]{"def"} + ) + ); + } + + @Test + public void testSelectAllInFirstFrameDirect() throws RelConversionException + { + SqlQueryPlus queryPlus = new SqlQueryPlus( + SELECT_FROM_FOO, + null, + null, + AllowAllAuthenticator.ALLOW_ALL_RESULT + ); + try (final DruidJdbcStatement statement = jdbcStatement()) { + // First frame, ask for all rows. + statement.execute(queryPlus, -1); + Meta.Frame frame = statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6); Assert.assertEquals( Meta.Frame.create( 0, @@ -211,59 +335,309 @@ public void testSelectAllInFirstFrame() } } + /** + * Test results spread over two frames. Also checks various state-related + * methods. + * @throws RelConversionException + */ @Test - public void testSelectSplitOverTwoFrames() + public void testSelectSplitOverTwoFramesDirect() throws RelConversionException { - final String sql = "SELECT __time, cnt, dim1, dim2, m1 FROM druid.foo"; - try (final DruidStatement statement = statement(sql)) { + SqlQueryPlus queryPlus = new SqlQueryPlus( + SELECT_FROM_FOO, + null, + null, + AllowAllAuthenticator.ALLOW_ALL_RESULT + ); + try (final DruidJdbcStatement statement = jdbcStatement()) { + // First frame, ask for 2 rows. - Meta.Frame frame = statement.execute(Collections.emptyList()).nextFrame(DruidStatement.START_OFFSET, 2); + statement.execute(queryPlus, -1); + Assert.assertEquals(0, statement.getCurrentOffset()); + Assert.assertFalse(statement.isDone()); + Meta.Frame frame = statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 2); Assert.assertEquals( - Meta.Frame.create( - 0, - false, - Lists.newArrayList( - new Object[]{DateTimes.of("2000-01-01").getMillis(), 1L, "", "a", 1.0f}, - new Object[]{ - DateTimes.of("2000-01-02").getMillis(), - 1L, - "10.1", - NullHandling.defaultStringValue(), - 2.0f - } - ) - ), + firstFrameResults(), frame ); Assert.assertFalse(statement.isDone()); + Assert.assertEquals(2, statement.getCurrentOffset()); // Last frame, ask for all remaining rows. frame = statement.nextFrame(2, 10); Assert.assertEquals( - Meta.Frame.create( - 2, - true, - Lists.newArrayList( - new Object[]{DateTimes.of("2000-01-03").getMillis(), 1L, "2", "", 3.0f}, - new Object[]{DateTimes.of("2001-01-01").getMillis(), 1L, "1", "a", 4.0f}, - new Object[]{DateTimes.of("2001-01-02").getMillis(), 1L, "def", "abc", 5.0f}, - new Object[]{DateTimes.of("2001-01-03").getMillis(), 1L, "abc", NullHandling.defaultStringValue(), 6.0f} - ) - ), + secondFrameResults(), + frame + ); + Assert.assertTrue(statement.isDone()); + } + } + + /** + * Verify that JDBC automatically closes the first result set when we + * open a second for the same statement. + * @throws RelConversionException + */ + @Test + public void testTwoFramesAutoCloseDirect() throws RelConversionException + { + SqlQueryPlus queryPlus = new SqlQueryPlus( + SELECT_FROM_FOO, + null, + null, + AllowAllAuthenticator.ALLOW_ALL_RESULT + ); + try (final DruidJdbcStatement statement = jdbcStatement()) { + // First frame, ask for 2 rows. + statement.execute(queryPlus, -1); + Meta.Frame frame = statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 2); + Assert.assertEquals( + firstFrameResults(), + frame + ); + Assert.assertFalse(statement.isDone()); + + // Do it again. Closes the prior result set. + statement.execute(queryPlus, -1); + frame = statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 2); + Assert.assertEquals( + firstFrameResults(), + frame + ); + Assert.assertFalse(statement.isDone()); + + // Last frame, ask for all remaining rows. + frame = statement.nextFrame(2, 10); + Assert.assertEquals( + secondFrameResults(), frame ); Assert.assertTrue(statement.isDone()); } } - private DruidStatement statement(String sql) + /** + * Test that closing a statement with pending results automatically + * closes the underlying result set. + * @throws RelConversionException + */ + @Test + public void testTwoFramesCloseWithResultSetDirect() throws RelConversionException { - return new DruidStatement( - "", + SqlQueryPlus queryPlus = new SqlQueryPlus( + SELECT_FROM_FOO, + null, + null, + AllowAllAuthenticator.ALLOW_ALL_RESULT + ); + try (final DruidJdbcStatement statement = jdbcStatement()) { + // First frame, ask for 2 rows. + statement.execute(queryPlus, -1); + Meta.Frame frame = statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 2); + Assert.assertEquals( + firstFrameResults(), + frame + ); + Assert.assertFalse(statement.isDone()); + + // Leave result set open; close statement. + } + } + + private Meta.Frame firstFrameResults() + { + return Meta.Frame.create( + 0, + false, + Lists.newArrayList( + new Object[]{DateTimes.of("2000-01-01").getMillis(), 1L, "", "a", 1.0f}, + new Object[]{ + DateTimes.of("2000-01-02").getMillis(), + 1L, + "10.1", + NullHandling.defaultStringValue(), + 2.0f + } + ) + ); + } + + private Meta.Frame secondFrameResults() + { + return Meta.Frame.create( + 2, + true, + Lists.newArrayList( + new Object[]{DateTimes.of("2000-01-03").getMillis(), 1L, "2", "", 3.0f}, + new Object[]{DateTimes.of("2001-01-01").getMillis(), 1L, "1", "a", 4.0f}, + new Object[]{DateTimes.of("2001-01-02").getMillis(), 1L, "def", "abc", 5.0f}, + new Object[]{DateTimes.of("2001-01-03").getMillis(), 1L, "abc", NullHandling.defaultStringValue(), 6.0f} + ) + ); + } + + @SuppressWarnings("unchecked") + private void verifySignature(Meta.Signature signature) + { + Assert.assertEquals(Meta.CursorFactory.ARRAY, signature.cursorFactory); + Assert.assertEquals(Meta.StatementType.SELECT, signature.statementType); + Assert.assertEquals(SELECT_STAR_FROM_FOO, signature.sql); + Assert.assertEquals( + Lists.newArrayList( + Lists.newArrayList("__time", "TIMESTAMP", "java.lang.Long"), + Lists.newArrayList("dim1", "VARCHAR", "java.lang.String"), + Lists.newArrayList("dim2", "VARCHAR", "java.lang.String"), + Lists.newArrayList("dim3", "VARCHAR", "java.lang.String"), + Lists.newArrayList("cnt", "BIGINT", "java.lang.Number"), + Lists.newArrayList("m1", "FLOAT", "java.lang.Float"), + Lists.newArrayList("m2", "DOUBLE", "java.lang.Double"), + Lists.newArrayList("unique_dim1", "OTHER", "java.lang.Object") + ), + Lists.transform( + signature.columns, + new Function>() + { + @Override + public List apply(final ColumnMetaData columnMetaData) + { + return Lists.newArrayList( + columnMetaData.label, + columnMetaData.type.name, + columnMetaData.type.rep.clazz.getName() + ); + } + } + ) + ); + } + + //----------------------------------------------------------------- + // Druid JDBC Prepared Statement + // + // The JDBC PreparedStatement class starts with, then allows executing + // the statement sequentially, typically with a set of parameters. + + private DruidJdbcPreparedStatement jdbcPreparedStatement(SqlQueryPlus queryPlus) + { + return new DruidJdbcPreparedStatement( + conn, 0, - new QueryContext(), - sqlLifecycleFactory.factorize(), - () -> {} - ).prepare(sql, -1, AllowAllAuthenticator.ALLOW_ALL_RESULT); + queryPlus, + sqlLifecycleFactory, + Long.MAX_VALUE + ); + } + + @Test + public void testSubQueryWithOrderByPrepared() + { + final String sql = "select T20.F13 as F22 from (SELECT DISTINCT dim1 as F13 FROM druid.foo T10) T20 order by T20.F13 ASC"; + SqlQueryPlus queryPlus = new SqlQueryPlus( + sql, + null, + null, + AllowAllAuthenticator.ALLOW_ALL_RESULT + ); + try (final DruidJdbcPreparedStatement statement = jdbcPreparedStatement(queryPlus)) { + statement.prepare(); + // First frame, ask for all rows. + statement.execute(Collections.emptyList()); + Meta.Frame frame = statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6); + Assert.assertEquals( + subQueryWithOrderByResults(), + frame + ); + Assert.assertTrue(statement.isDone()); + } + } + + @Test + public void testSubQueryWithOrderByPreparedTwice() + { + final String sql = "select T20.F13 as F22 from (SELECT DISTINCT dim1 as F13 FROM druid.foo T10) T20 order by T20.F13 ASC"; + SqlQueryPlus queryPlus = new SqlQueryPlus( + sql, + null, + null, + AllowAllAuthenticator.ALLOW_ALL_RESULT + ); + try (final DruidJdbcPreparedStatement statement = jdbcPreparedStatement(queryPlus)) { + statement.prepare(); + statement.execute(Collections.emptyList()); + Meta.Frame frame = statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6); + Assert.assertEquals( + subQueryWithOrderByResults(), + frame + ); + + // Do it again. JDBC says we can reuse prepared statements sequentially. + Assert.assertTrue(statement.isDone()); + statement.execute(Collections.emptyList()); + frame = statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6); + Assert.assertEquals( + subQueryWithOrderByResults(), + frame + ); + Assert.assertTrue(statement.isDone()); + } + } + + @Test + public void testSignaturePrepared() + { + SqlQueryPlus queryPlus = new SqlQueryPlus( + SELECT_STAR_FROM_FOO, + null, + null, + AllowAllAuthenticator.ALLOW_ALL_RESULT + ); + try (final DruidJdbcPreparedStatement statement = jdbcPreparedStatement(queryPlus)) { + statement.prepare(); + verifySignature(statement.getSignature()); + } + } + + @Test + public void testParameters() + { + SqlQueryPlus queryPlus = new SqlQueryPlus( + "SELECT COUNT(*) AS cnt FROM sys.servers WHERE servers.host = ?", + null, + null, + AllowAllAuthenticator.ALLOW_ALL_RESULT + ); + Meta.Frame expected = Meta.Frame.create(0, true, Collections.singletonList(new Object[] {1L})); + List matchingParams = Collections.singletonList(TypedValue.ofLocal(ColumnMetaData.Rep.STRING, "dummy")); + try (final DruidJdbcPreparedStatement statement = jdbcPreparedStatement(queryPlus)) { + + // PreparedStatement protocol: prepare once... + statement.prepare(); + + // Execute many times. First time. + statement.execute(matchingParams); + Meta.Frame frame = statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6); + Assert.assertEquals( + expected, + frame + ); + + // Again, same value. + statement.execute(matchingParams); + frame = statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6); + Assert.assertEquals( + expected, + frame + ); + + // Again, no matches. + statement.execute( + Collections.singletonList( + TypedValue.ofLocal(ColumnMetaData.Rep.STRING, "foo"))); + frame = statement.nextFrame(AbstractDruidJdbcStatement.START_OFFSET, 6); + Assert.assertEquals( + Meta.Frame.create(0, true, Collections.emptyList()), + frame + ); + } } }