Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,13 @@

package org.apache.arrow.driver.jdbc;

import static org.apache.arrow.driver.jdbc.utils.ConvertUtils.convertArrowFieldsToColumnMetaDataList;

import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.Properties;
import java.util.TimeZone;

import org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaFactory;
import org.apache.calcite.avatica.AvaticaResultSetMetaData;
Expand Down Expand Up @@ -89,12 +86,6 @@ public ArrowFlightPreparedStatement newPreparedStatement(
ArrowFlightSqlClientHandler.PreparedStatement preparedStatement =
flightConnection.getMeta().getPreparedStatement(statementHandle);

if (preparedStatement == null) {
preparedStatement = flightConnection.getClientHandler().prepare(signature.sql);
}
final Schema resultSetSchema = preparedStatement.getDataSetSchema();
signature.columns.addAll(convertArrowFieldsToColumnMetaDataList(resultSetSchema.getFields()));

return ArrowFlightPreparedStatement.newPreparedStatement(
flightConnection, preparedStatement, statementHandle,
signature, resultType, resultSetConcurrency, resultSetHoldability);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ static ArrowFlightJdbcFlightStreamResultSet fromFlightInfo(
final TimeZone timeZone = TimeZone.getDefault();
final QueryState state = new QueryState();

final Meta.Signature signature = ArrowFlightMetaImpl.newSignature(null);
final Meta.Signature signature = ArrowFlightMetaImpl.newSignature(null, null, null);

final AvaticaResultSetMetaData resultSetMetaData =
new AvaticaResultSetMetaData(null, null, signature);
Expand Down Expand Up @@ -154,11 +154,7 @@ private void populateDataForCurrentFlightStream() throws SQLException {
currentVectorSchemaRoot = originalRoot;
}

if (schema != null) {
populateData(currentVectorSchemaRoot, schema);
} else {
populateData(currentVectorSchemaRoot);
}
populateData(currentVectorSchemaRoot, schema);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,18 @@

package org.apache.arrow.driver.jdbc;

import static java.util.Objects.isNull;

import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.TimeZone;

import org.apache.arrow.driver.jdbc.utils.ConvertUtils;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.calcite.avatica.AvaticaResultSet;
import org.apache.calcite.avatica.AvaticaResultSetMetaData;
Expand Down Expand Up @@ -74,7 +72,7 @@ public static ArrowFlightJdbcVectorSchemaRootResultSet fromVectorSchemaRoot(
final TimeZone timeZone = TimeZone.getDefault();
final QueryState state = new QueryState();

final Meta.Signature signature = ArrowFlightMetaImpl.newSignature(null);
final Meta.Signature signature = ArrowFlightMetaImpl.newSignature(null, null, null);

final AvaticaResultSetMetaData resultSetMetaData =
new AvaticaResultSetMetaData(null, null, signature);
Expand All @@ -93,17 +91,12 @@ protected AvaticaResultSet execute() throws SQLException {
}

void populateData(final VectorSchemaRoot vectorSchemaRoot) {
final List<Field> fields = vectorSchemaRoot.getSchema().getFields();
final List<ColumnMetaData> columns = ConvertUtils.convertArrowFieldsToColumnMetaDataList(fields);
signature.columns.clear();
signature.columns.addAll(columns);

this.vectorSchemaRoot = vectorSchemaRoot;
execute2(new ArrowFlightJdbcCursor(vectorSchemaRoot), this.signature.columns);
populateData(vectorSchemaRoot, null);
}

void populateData(final VectorSchemaRoot vectorSchemaRoot, final Schema schema) {
final List<ColumnMetaData> columns = ConvertUtils.convertArrowFieldsToColumnMetaDataList(schema.getFields());
Schema currentSchema = schema == null ? vectorSchemaRoot.getSchema() : schema;
final List<ColumnMetaData> columns = ConvertUtils.convertArrowFieldsToColumnMetaDataList(currentSchema.getFields());
signature.columns.clear();
signature.columns.addAll(columns);

Expand Down Expand Up @@ -137,7 +130,7 @@ public void close() {
} catch (final Exception e) {
exceptions.add(e);
}
if (!isNull(statement)) {
if (!Objects.isNull(statement)) {
try {
super.close();
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.arrow.driver.jdbc;

import static java.lang.String.format;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLTimeoutException;
Expand All @@ -29,7 +27,10 @@
import java.util.concurrent.ConcurrentHashMap;

import org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler.PreparedStatement;
import org.apache.arrow.driver.jdbc.utils.AvaticaParameterBinder;
import org.apache.arrow.driver.jdbc.utils.ConvertUtils;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaParameter;
import org.apache.calcite.avatica.ColumnMetaData;
Expand All @@ -54,12 +55,20 @@ public ArrowFlightMetaImpl(final AvaticaConnection connection) {
setDefaultConnectionProperties();
}

static Signature newSignature(final String sql) {
/**
* Construct a signature.
*/
static Signature newSignature(final String sql, Schema resultSetSchema, Schema parameterSchema) {
List<ColumnMetaData> columnMetaData = resultSetSchema == null ?
new ArrayList<>() : ConvertUtils.convertArrowFieldsToColumnMetaDataList(resultSetSchema.getFields());
List<AvaticaParameter> parameters = parameterSchema == null ?
new ArrayList<>() : ConvertUtils.convertArrowFieldsToAvaticaParameters(parameterSchema.getFields());

return new Signature(
new ArrayList<ColumnMetaData>(),
columnMetaData,
sql,
Collections.<AvaticaParameter>emptyList(),
Collections.<String, Object>emptyMap(),
parameters,
Collections.emptyMap(),
null, // unnecessary, as SQL requests use ArrowFlightJdbcCursor
StatementType.SELECT
);
Expand All @@ -84,23 +93,28 @@ public void commit(final ConnectionHandle connectionHandle) {
public ExecuteResult execute(final StatementHandle statementHandle,
final List<TypedValue> typedValues, final long maxRowCount) {
Preconditions.checkArgument(connection.id.equals(statementHandle.connectionId),
"Connection IDs are not consistent");
"Connection IDs are not consistent");
PreparedStatement preparedStatement = getPreparedStatement(statementHandle);

if (preparedStatement == null) {
throw new IllegalStateException("Prepared statement not found: " + statementHandle);
}


new AvaticaParameterBinder(preparedStatement, ((ArrowFlightConnection) connection).getBufferAllocator())
.bind(typedValues);

if (statementHandle.signature == null) {
// Update query
final StatementHandleKey key = new StatementHandleKey(statementHandle);
PreparedStatement preparedStatement = statementHandlePreparedStatementMap.get(key);
if (preparedStatement == null) {
throw new IllegalStateException("Prepared statement not found: " + statementHandle);
}
long updatedCount = preparedStatement.executeUpdate();
return new ExecuteResult(Collections.singletonList(MetaResultSet.count(statementHandle.connectionId,
statementHandle.id, updatedCount)));
statementHandle.id, updatedCount)));
} else {
// TODO Why is maxRowCount ignored?
return new ExecuteResult(
Collections.singletonList(MetaResultSet.create(
statementHandle.connectionId, statementHandle.id,
true, statementHandle.signature, null)));
Collections.singletonList(MetaResultSet.create(
statementHandle.connectionId, statementHandle.id,
true, statementHandle.signature, null)));
}
}

Expand All @@ -114,7 +128,23 @@ public ExecuteResult execute(final StatementHandle statementHandle,
public ExecuteBatchResult executeBatch(final StatementHandle statementHandle,
final List<List<TypedValue>> parameterValuesList)
throws IllegalStateException {
throw new IllegalStateException("executeBatch not implemented.");
Preconditions.checkArgument(connection.id.equals(statementHandle.connectionId),
"Connection IDs are not consistent");
PreparedStatement preparedStatement = getPreparedStatement(statementHandle);

if (preparedStatement == null) {
throw new IllegalStateException("Prepared statement not found: " + statementHandle);
}

final AvaticaParameterBinder binder = new AvaticaParameterBinder(preparedStatement,
((ArrowFlightConnection) connection).getBufferAllocator());
for (int i = 0; i < parameterValuesList.size(); i++) {
binder.bind(parameterValuesList.get(i), i);
}

// Update query
long[] updatedCounts = {preparedStatement.executeUpdate()};
return new ExecuteBatchResult(updatedCounts);
}

@Override
Expand All @@ -126,18 +156,24 @@ public Frame fetch(final StatementHandle statementHandle, final long offset,
* the results.
*/
throw AvaticaConnection.HELPER.wrap(
format("%s does not use frames.", this),
String.format("%s does not use frames.", this),
AvaticaConnection.HELPER.unsupported());
}

private PreparedStatement prepareForHandle(final String query, StatementHandle handle) {
final PreparedStatement preparedStatement =
((ArrowFlightConnection) connection).getClientHandler().prepare(query);
handle.signature = newSignature(query, preparedStatement.getDataSetSchema(),
preparedStatement.getParameterSchema());
statementHandlePreparedStatementMap.put(new StatementHandleKey(handle), preparedStatement);
return preparedStatement;
}

@Override
public StatementHandle prepare(final ConnectionHandle connectionHandle,
final String query, final long maxRowCount) {
final StatementHandle handle = super.createStatement(connectionHandle);
handle.signature = newSignature(query);
final PreparedStatement preparedStatement =
((ArrowFlightConnection) connection).getClientHandler().prepare(query);
statementHandlePreparedStatementMap.put(new StatementHandleKey(handle), preparedStatement);
prepareForHandle(query, handle);
return handle;
}

Expand All @@ -157,20 +193,18 @@ public ExecuteResult prepareAndExecute(final StatementHandle handle,
final PrepareCallback callback)
throws NoSuchStatementException {
try {
final PreparedStatement preparedStatement =
((ArrowFlightConnection) connection).getClientHandler().prepare(query);
PreparedStatement preparedStatement = prepareForHandle(query, handle);
final StatementType statementType = preparedStatement.getType();
statementHandlePreparedStatementMap.put(new StatementHandleKey(handle), preparedStatement);
final Signature signature = newSignature(query);

final long updateCount =
statementType.equals(StatementType.UPDATE) ? preparedStatement.executeUpdate() : -1;
synchronized (callback.getMonitor()) {
callback.clear();
callback.assign(signature, null, updateCount);
callback.assign(handle.signature, null, updateCount);
}
callback.execute();
final MetaResultSet metaResultSet = MetaResultSet.create(handle.connectionId, handle.id,
false, signature, null);
false, handle.signature, null);
return new ExecuteResult(Collections.singletonList(metaResultSet));
} catch (SQLTimeoutException e) {
// So far AvaticaStatement(executeInternal) only handles NoSuchStatement and Runtime Exceptions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,12 @@

package org.apache.arrow.driver.jdbc;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

import org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler;
import org.apache.arrow.driver.jdbc.utils.ConvertUtils;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.calcite.avatica.AvaticaPreparedStatement;
import org.apache.calcite.avatica.Meta.Signature;
import org.apache.calcite.avatica.Meta.StatementHandle;
Expand All @@ -50,36 +47,6 @@ private ArrowFlightPreparedStatement(final ArrowFlightConnection connection,
this.preparedStatement = Preconditions.checkNotNull(preparedStatement);
}

/**
* Creates a new {@link ArrowFlightPreparedStatement} from the provided information.
*
* @param connection the {@link Connection} to use.
* @param statementHandle the {@link StatementHandle} to use.
* @param signature the {@link Signature} to use.
* @param resultSetType the ResultSet type.
* @param resultSetConcurrency the ResultSet concurrency.
* @param resultSetHoldability the ResultSet holdability.
* @return a new {@link PreparedStatement}.
* @throws SQLException on error.
*/
static ArrowFlightPreparedStatement createNewPreparedStatement(
final ArrowFlightConnection connection,
final StatementHandle statementHandle,
final Signature signature,
final int resultSetType,
final int resultSetConcurrency,
final int resultSetHoldability) throws SQLException {

final ArrowFlightSqlClientHandler.PreparedStatement prepare = connection.getClientHandler().prepare(signature.sql);
final Schema resultSetSchema = prepare.getDataSetSchema();

signature.columns.addAll(ConvertUtils.convertArrowFieldsToColumnMetaDataList(resultSetSchema.getFields()));

return new ArrowFlightPreparedStatement(
connection, prepare, statementHandle,
signature, resultSetType, resultSetConcurrency, resultSetHoldability);
}

static ArrowFlightPreparedStatement newPreparedStatement(final ArrowFlightConnection connection,
final ArrowFlightSqlClientHandler.PreparedStatement preparedStmt,
final StatementHandle statementHandle,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.calcite.avatica.Meta.StatementType;
import org.slf4j.Logger;
Expand Down Expand Up @@ -206,6 +207,15 @@ public interface PreparedStatement extends AutoCloseable {
*/
Schema getDataSetSchema();

/**
* Gets the {@link Schema} of the parameters for this {@link PreparedStatement}.
*
* @return {@link Schema}.
*/
Schema getParameterSchema();

void setParameters(VectorSchemaRoot parameters);

@Override
void close();
}
Expand Down Expand Up @@ -241,6 +251,16 @@ public Schema getDataSetSchema() {
return preparedStatement.getResultSetSchema();
}

@Override
public Schema getParameterSchema() {
return preparedStatement.getParameterSchema();
}

@Override
public void setParameters(VectorSchemaRoot parameters) {
preparedStatement.setParameters(parameters);
}

@Override
public void close() {
try {
Expand Down
Loading