diff --git a/format/FlightSql.proto b/format/FlightSql.proto new file mode 100644 index 00000000000..4032ff0548a --- /dev/null +++ b/format/FlightSql.proto @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto3"; + +option java_package = "org.apache.arrow.flight.sql.impl"; +package arrow.flight.protocol.sql; + +/* + * Represents a metadata request. Used in the command member of FlightDescriptor + * for the following RPC calls: + * - GetSchema: return the schema of the query. + * - GetFlightInfo: execute the metadata request. + * + * The returned schema will be: + * < + * info_name: utf8, + * value: dense_union + * > + * where there is one row per requested piece of metadata information. + */ +message CommandGetSqlInfo { + /* + * Values are modelled after ODBC's SQLGetInfo() function. This information is intended to provide + * Flight SQL clients with basic, SQL syntax and SQL functions related information. + * More information types can be added in future releases. + * E.g. more SQL syntax support types, scalar functions support, type conversion support etc. + * + * // TODO: Flesh out the available set of metadata below. + * + * Initially, Flight SQL will support the following information types: + * - Server Information + * - Syntax Information + * + * 1. Server Information: Provides basic information about the Flight SQL Server. + * + * The name of the Flight SQL Server. + * FLIGHT_SQL_SERVER_NAME + * + * The native version of the Flight SQL Server. + * FLIGHT_SQL_SERVER_VERSION + * + * The Arrow format version of the Flight SQL Server. + * FLIGHT_SQL_SERVER_ARROW_VERSION + * + * Indicates whether the Flight SQL Server is read only. + * FLIGHT_SQL_SERVER_READ_ONLY + * + * 2. SQL Syntax Information: provides information about SQL syntax supported by the Flight SQL Server. + * + * Indicates whether the Flight SQL Server supports CREATE and DROP of catalogs. + * In a SQL environment, a catalog is a collection of schemas. + * SQL_DDL_CATALOG + * + * Indicates whether the Flight SQL Server supports CREATE and DROP of schemas. + * In a SQL environment, a catalog is a collection of tables, views, indexes etc. + * SQL_DDL_SCHEMA + * + * Indicates whether the Flight SQL Server supports CREATE and DROP of tables. + * In a SQL environment, a table is a collection of rows of information. Each row of information + * may have one or more columns of data. + * SQL_DDL_TABLE + * + * Indicates the case sensitivity of catalog, table and schema names. + * SQL_IDENTIFIER_CASE + * + * Indicates the supported character(s) used to surround a delimited identifier. + * SQL_IDENTIFIER_QUOTE_CHAR + * + * Indicates case sensitivity of quoted identifiers. + * SQL_QUOTED_IDENTIFIER_CASE + * + * If omitted, then all metadata will be retrieved. + * Flight SQL Servers may choose to include additional metadata above and beyond the specified set, however they must + * at least return the specified set. If additional metadata is included, the metadata names should be prefixed + * with "CUSTOM_" to identify them as such. + */ + repeated string info = 1; +} + +/* + * Represents a request to retrieve the list of catalogs on a Flight SQL enabled backend. + * Used in the command member of FlightDescriptor for the following RPC calls: + * - GetSchema: return the schema of the query. + * - GetFlightInfo: execute the catalog metadata request. + * + * The returned schema will be: + * < + * catalog_name: utf8 + * > + * The returned data should be ordered by catalog_name. + */ +message CommandGetCatalogs { +} + +/* + * Represents a request to retrieve the list of schemas on a Flight SQL enabled backend. + * Used in the command member of FlightDescriptor for the following RPC calls: + * - GetSchema: return the schema of the query. + * - GetFlightInfo: execute the catalog metadata request. + * + * The returned schema will be: + * < + * catalog_name: utf8, + * schema_name: utf8 + * > + * The returned data should be ordered by catalog_name, then schema_name. + */ +message CommandGetSchemas { + /* + * Specifies the Catalog to search for schemas. + * If omitted, then all catalogs are searched. + */ + string catalog = 1; + + /* + * Specifies a filter pattern for schemas to search for. + * When no schema_filter_pattern is provided, the pattern will not be used to narrow the search. + * In the pattern string, two special characters can be used to denote matching rules: + * - "%" means to match any substring with 0 or more characters. + * - "_" means to match any one character. + */ + string schema_filter_pattern = 2; +} + +/* + * Represents a request to retrieve the list of tables, and optionally their schemas, on a Flight SQL enabled backend. + * Used in the command member of FlightDescriptor for the following RPC calls: + * - GetSchema: return the schema of the query. + * - GetFlightInfo: execute the catalog metadata request. + * + * The returned schema will be: + * < + * catalog_name: utf8, + * schema_name: utf8, + * table_name: utf8, + * table_type: utf8, + * table_schema: bytes + * > + * The returned data should be ordered by catalog_name, schema_name, table_name, then table_type. + */ +message CommandGetTables { + /* + * Specifies the Catalog to search for the tables. + * If omitted, then all catalogs are searched. + */ + string catalog = 1; + + /* + * Specifies a filter pattern for schemas to search for. + * When no schema_filter_pattern is provided, all schemas matching other filters are searched. + * In the pattern string, two special characters can be used to denote matching rules: + * - "%" means to match any substring with 0 or more characters. + * - "_" means to match any one character. + */ + string schema_filter_pattern = 2; + + /* + * Specifies a filter pattern for tables to search for. + * When no table_name_filter_pattern is provided, all tables matching other filters are searched. + * In the pattern string, two special characters can be used to denote matching rules: + * - "%" means to match any substring with 0 or more characters. + * - "_" means to match any one character. + */ + string table_name_filter_pattern = 3; + + // Specifies a filter of table types which must match. + repeated string table_types = 4; + + // Specifies if the schema should be returned for found tables. + bool include_schema = 5; +} + +/* + * Represents a request to retrieve the list of table types on a Flight SQL enabled backend. + * Used in the command member of FlightDescriptor for the following RPC calls: + * - GetSchema: return the schema of the query. + * - GetFlightInfo: execute the catalog metadata request. + * + * The returned schema will be: + * < + * table_type: utf8 + * > + * The returned data should be ordered by table_type. + */ +message CommandGetTableTypes { +} + +/* + * Represents a request to retrieve the primary keys of a table on a Flight SQL enabled backend. + * Used in the command member of FlightDescriptor for the following RPC calls: + * - GetSchema: return the schema of the query. + * - GetFlightInfo: execute the catalog metadata request. + * + * The returned schema will be: + * < + * catalog_name: utf8, + * schema_name: utf8, + * table_name: utf8, + * column_name: utf8, + * key_sequence: int, + * key_name: utf8 + * > + * The returned data should be ordered by catalog_name, schema_name, table_name, key_name, then key_sequence. + */ +message CommandGetPrimaryKeys { + // Specifies the catalog to search for the table. + string catalog = 1; + + // Specifies the schema to search for the table. + string schema = 2; + + // Specifies the table to get the primary keys for. + string table = 3; +} + +/* + * Represents a request to retrieve the foreign keys of a table on a Flight SQL enabled backend. + * Used in the command member of FlightDescriptor for the following RPC calls: + * - GetSchema: return the schema of the query. + * - GetFlightInfo: execute the catalog metadata request. + * + * The returned schema will be: + * < + * pk_catalog_name: utf8, + * pk_schema_name: utf8, + * pk_table_name: utf8, + * pk_column_name: utf8, + * fk_catalog_name: utf8, + * fk_schema_name: utf8, + * fk_table_name: utf8, + * fk_column_name: utf8, + * key_sequence: int, + * fk_key_name: utf8, + * pk_key_name: utf8, + * update_rule: int, + * delete_rule: int + * > + * The returned data should be ordered by catalog_name, schema_name, table_name, key_name, then key_sequence. + */ +message CommandGetForeignKeys { + // Specifies the catalog to search for the primary key table. + string pk_catalog = 1; + + // Specifies the schema to search for the primary key table. + string pk_schema = 2; + + // Specifies the primary key table to get the foreign keys for. + string pk_table = 3; + + // Specifies the catalog to search for the foreign key table. + string fk_catalog = 4; + + // Specifies the schema to search for the foreign key table. + string fk_schema = 5; + + // Specifies the foreign key table to get the foreign keys for. + string fk_table = 6; +} + +// SQL Execution Action Messages + +/* + * Request message for the "GetPreparedStatement" action on a Flight SQL enabled backend. + */ +message ActionCreatePreparedStatementRequest { + // The valid SQL string to create a prepared statement for. + string query = 1; +} + +/* + * Wrap the result of a "GetPreparedStatement" action. + */ +message ActionCreatePreparedStatementResult { + // Opaque handle for the prepared statement on the server. + bytes prepared_statement_handle = 1; + + // If a result set generating query was provided, dataset_schema contains the + // schema of the dataset as described in Schema.fbs::Schema, it is serialized as an IPC message. + bytes dataset_schema = 2; + + // If the query provided contained parameters, parameter_schema contains the + // Schema of the expected parameters as described in Schema.fbs::Schema. + bytes parameter_schema = 3; +} + +/* + * Request message for the "ClosePreparedStatement" action on a Flight SQL enabled backend. + * Closes server resources associated with the prepared statement handle. + */ +message ActionClosePreparedStatementRequest { + // Opaque handle for the prepared statement on the server. + string prepared_statement_handle = 1; +} + + +// SQL Execution Messages. + +/* + * Represents a SQL query. Used in the command member of FlightDescriptor + * for the following RPC calls: + * - GetSchema: return the schema of the query. + * - GetFlightInfo: execute the query. + */ +message CommandStatementQuery { + // The SQL syntax. + string query = 1; +} + +/* + * Represents an instance of executing a prepared statement. Used in the command member of FlightDescriptor for + * the following RPC calls: + * - DoPut: bind parameter values. All of the bound parameter sets will be executed as a single atomic execution. + * - GetFlightInfo: execute the prepared statement instance. + */ +message CommandPreparedStatementQuery { + // Unique identifier for the instance of the prepared statement to execute. + bytes client_execution_handle = 1; + + // Opaque handle for the prepared statement on the server. + bytes prepared_statement_handle = 2; +} + +/* + * Represents a SQL update query. Used in the command member of FlightDescriptor + * for the the RPC call DoPut to cause the server to execute the included SQL update. + */ +message CommandStatementUpdate { + // The SQL syntax. + string query = 1; +} + +/* + * Represents a SQL update query. Used in the command member of FlightDescriptor + * for the the RPC call DoPut to cause the server to execute the included + * prepared statement handle as an update. + */ +message CommandPreparedStatementUpdate { + // Unique identifier for the instance of the prepared statement to execute. + bytes client_execution_handle = 1; + + // Opaque handle for the prepared statement on the server. + bytes prepared_statement_handle = 2; +} + +/* + * Returned from the RPC call DoPut when a CommandStatementUpdate + * CommandPreparedStatementUpdate was in the request, containing + * results from the update. + */ +message DoPutUpdateResult { + // The number of records updated. A return value of -1 represents + // an unknown updated record count. + int64 record_count = 1; +} diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowConfig.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowConfig.java index 9a587e7e44b..b1a41dc4cd8 100644 --- a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowConfig.java +++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowConfig.java @@ -127,6 +127,8 @@ public final class JdbcToArrowConfig { *

  • TIMESTAMP --> ArrowType.Timestamp(TimeUnit.MILLISECOND, calendar timezone)
  • *
  • CLOB --> ArrowType.Utf8
  • *
  • BLOB --> ArrowType.Binary
  • + *
  • ARRAY --> ArrowType.List
  • + *
  • STRUCT --> ArrowType.Struct
  • *
  • NULL --> ArrowType.Null
  • * */ @@ -149,13 +151,6 @@ public final class JdbcToArrowConfig { // set up type converter this.jdbcToArrowTypeConverter = jdbcToArrowTypeConverter != null ? jdbcToArrowTypeConverter : fieldInfo -> { - final String timezone; - if (calendar != null) { - timezone = calendar.getTimeZone().getID(); - } else { - timezone = null; - } - switch (fieldInfo.getJdbcType()) { case Types.BOOLEAN: case Types.BIT: @@ -191,6 +186,12 @@ public final class JdbcToArrowConfig { case Types.TIME: return new ArrowType.Time(TimeUnit.MILLISECOND, 32); case Types.TIMESTAMP: + final String timezone; + if (calendar != null) { + timezone = calendar.getTimeZone().getID(); + } else { + timezone = null; + } return new ArrowType.Timestamp(TimeUnit.MILLISECOND, timezone); case Types.BINARY: case Types.VARBINARY: @@ -201,6 +202,8 @@ public final class JdbcToArrowConfig { return new ArrowType.List(); case Types.NULL: return new ArrowType.Null(); + case Types.STRUCT: + return new ArrowType.Struct(); default: // no-op, shouldn't get here return null; diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightRuntimeException.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightRuntimeException.java index 76d3349a2c3..3abcce7b163 100644 --- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightRuntimeException.java +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightRuntimeException.java @@ -29,7 +29,7 @@ public class FlightRuntimeException extends RuntimeException { /** * Create a new exception from the given status. */ - FlightRuntimeException(CallStatus status) { + public FlightRuntimeException(CallStatus status) { super(status.description(), status.cause()); this.status = status; } diff --git a/java/flight/flight-sql/pom.xml b/java/flight/flight-sql/pom.xml new file mode 100644 index 00000000000..c08b7acf066 --- /dev/null +++ b/java/flight/flight-sql/pom.xml @@ -0,0 +1,171 @@ + + + + 4.0.0 + + arrow-java-root + org.apache.arrow + 4.0.0-SNAPSHOT + ../../pom.xml + + + flight-sql + Arrow Flight SQL + (Experimental)Contains utility classes to expose Flight SQL semantics for clients and servers over Arrow Flight + jar + + + 1.30.2 + 3.7.1 + 1 + + + + + org.apache.arrow + flight-core + ${project.version} + + + io.netty + netty-transport-native-unix-common + + + io.netty + netty-transport-native-kqueue + + + io.netty + netty-transport-native-epoll + + + + + org.apache.arrow + arrow-memory-core + ${project.version} + + + org.apache.arrow + arrow-memory-netty + ${project.version} + runtime + + + io.grpc + grpc-protobuf + ${dep.grpc.version} + + + com.google.guava + guava + + + io.grpc + grpc-stub + ${dep.grpc.version} + + + com.google.protobuf + protobuf-java + ${dep.protobuf.version} + + + io.grpc + grpc-api + ${dep.grpc.version} + + + org.apache.arrow + arrow-vector + ${project.version} + ${arrow.vector.classifier} + + + org.slf4j + slf4j-api + + + org.apache.derby + derby + 10.14.2.0 + test + + + org.apache.commons + commons-dbcp2 + 2.7.0 + test + + + commons-logging + commons-logging + + + + + org.apache.commons + commons-pool2 + 2.8.1 + test + + + + + + + + kr.motd.maven + os-maven-plugin + 1.5.0.Final + + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.5.0 + + com.google.protobuf:protoc:${dep.protobuf.version}:exe:${os.detected.classifier} + false + grpc-java + io.grpc:protoc-gen-grpc-java:${dep.grpc.version}:exe:${os.detected.classifier} + + + + src + + ${basedir}/../../../format/ + ${project.build.directory}/generated-sources/protobuf + + + compile + compile-custom + + + + test + + ${basedir}/src/test/protobuf + ${project.build.directory}/generated-test-sources/protobuf + + + compile + compile-custom + + + + + + + + diff --git a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java new file mode 100644 index 00000000000..de0485c9b34 --- /dev/null +++ b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.sql; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.arrow.flight.Action; +import org.apache.arrow.flight.CallOption; +import org.apache.arrow.flight.FlightClient; +import org.apache.arrow.flight.FlightDescriptor; +import org.apache.arrow.flight.FlightInfo; +import org.apache.arrow.flight.FlightStream; +import org.apache.arrow.flight.Result; +import org.apache.arrow.flight.SchemaResult; +import org.apache.arrow.flight.Ticket; +import org.apache.arrow.flight.sql.impl.FlightSql; +import org.apache.arrow.flight.sql.impl.FlightSql.ActionCreatePreparedStatementResult; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandPreparedStatementQuery; +import org.apache.arrow.vector.types.pojo.Schema; + +import com.google.protobuf.Any; +import com.google.protobuf.ByteString; + +import io.grpc.Status; + +/** + * Flight client with Flight SQL semantics. + */ +public class FlightSqlClient { + private FlightClient client; + + public FlightSqlClient(FlightClient client) { + this.client = client; + } + + /** + * Execute a query on the server. + * + * @param query The query to execute. + * @return a FlightInfo object representing the stream(s) to fetch. + */ + public FlightInfo execute(String query) { + final FlightSql.CommandStatementQuery.Builder builder = FlightSql.CommandStatementQuery.newBuilder(); + builder.setQuery(query); + final FlightDescriptor descriptor = FlightDescriptor.command(Any.pack(builder.build()).toByteArray()); + return client.getInfo(descriptor); + } + + /** + * Execute an update query on the server. + * + * @param query The query to execute. + * @return a FlightInfo object representing the stream(s) to fetch. + */ + public long executeUpdate(String query) { + final FlightSql.CommandStatementUpdate.Builder builder = FlightSql.CommandStatementUpdate.newBuilder(); + builder.setQuery(query); + return 0; // TODO + } + + /** + * Request a list of catalogs. + * + * @return a FlightInfo object representing the stream(s) to fetch. + */ + public FlightInfo getCatalogs() { + final FlightSql.CommandGetCatalogs.Builder builder = FlightSql.CommandGetCatalogs.newBuilder(); + final FlightDescriptor descriptor = FlightDescriptor.command(Any.pack(builder.build()).toByteArray()); + return client.getInfo(descriptor); + } + + /** + * Request a list of schemas. + * + * @param catalog The catalog. + * @param schemaFilterPattern The schema filter pattern. + * @return a FlightInfo object representing the stream(s) to fetch. + */ + public FlightInfo getSchemas(String catalog, String schemaFilterPattern) { + final FlightSql.CommandGetSchemas.Builder builder = FlightSql.CommandGetSchemas.newBuilder(); + + if (catalog != null) { + builder.setCatalog(catalog); + } + + if (schemaFilterPattern != null) { + builder.setSchemaFilterPattern(schemaFilterPattern); + } + + final FlightDescriptor descriptor = FlightDescriptor.command(Any.pack(builder.build()).toByteArray()); + return client.getInfo(descriptor); + } + + /** + * Get schema for a stream. + * @param descriptor The descriptor for the stream. + * @param options RPC-layer hints for this call. + */ + public SchemaResult getSchema(FlightDescriptor descriptor, CallOption... options) { + return this.client.getSchema(descriptor, options); + } + + /** + * Retrieve a stream from the server. + * @param ticket The ticket granting access to the data stream. + * @param options RPC-layer hints for this call. + */ + public FlightStream getStream(Ticket ticket, CallOption... options) { + return this.client.getStream(ticket, options); + } + + /** + * Request a set of Flight SQL metadata. + * + * @param info The set of metadata to retrieve. None to retrieve all metadata. + * @return a FlightInfo object representing the stream(s) to fetch. + */ + public FlightInfo getSqlInfo(String... info) { + final FlightSql.CommandGetSqlInfo.Builder builder = FlightSql.CommandGetSqlInfo.newBuilder(); + + if (info != null && 0 != info.length) { + builder.addAllInfo(Arrays.asList(info)); + } + + final FlightDescriptor descriptor = FlightDescriptor.command(Any.pack(builder.build()).toByteArray()); + return client.getInfo(descriptor); + } + + /** + * Request a list of tables. + * + * @param catalog The catalog. + * @param schemaFilterPattern The schema filter pattern. + * @param tableFilterPattern The table filter pattern. + * @param tableTypes The table types to include. + * @param includeSchema True to include the schema upon return, false to not include the schema. + * @return a FlightInfo object representing the stream(s) to fetch. + */ + public FlightInfo getTables(String catalog, String schemaFilterPattern, + String tableFilterPattern, List tableTypes, boolean includeSchema) { + final FlightSql.CommandGetTables.Builder builder = FlightSql.CommandGetTables.newBuilder(); + + if (catalog != null) { + builder.setCatalog(catalog); + } + + if (schemaFilterPattern != null) { + builder.setSchemaFilterPattern(schemaFilterPattern); + } + + if (tableFilterPattern != null) { + builder.setTableNameFilterPattern(tableFilterPattern); + } + + if (tableTypes != null) { + builder.addAllTableTypes(tableTypes); + } + builder.setIncludeSchema(includeSchema); + + final FlightDescriptor descriptor = FlightDescriptor.command(Any.pack(builder.build()).toByteArray()); + return client.getInfo(descriptor); + } + + /** + * Request the primary keys for a table. + * + * @param catalog The catalog. + * @param schema The schema. + * @param table The table. + * @return a FlightInfo object representing the stream(s) to fetch. + */ + public FlightInfo getPrimaryKeys(String catalog, String schema, String table) { + final FlightSql.CommandGetPrimaryKeys.Builder builder = FlightSql.CommandGetPrimaryKeys.newBuilder(); + + if (catalog != null) { + builder.setCatalog(catalog); + } + + if (schema != null) { + builder.setSchema(schema); + } + + builder.setTable(table); + final FlightDescriptor descriptor = FlightDescriptor.command(Any.pack(builder.build()).toByteArray()); + return client.getInfo(descriptor); + } + + /** + * Request the foreign keys for a table. + * + * One of pkTable or fkTable must be specified, both cannot be null. + * + * @param pkCatalog The primary key table catalog. + * @param pkSchema The primary key table schema. + * @param pkTable The primary key table. + * @param fkCatalog The foreign key table catalog. + * @param fkSchema The foreign key table schema. + * @param fkTable The foreign key table. + * @return a FlightInfo object representing the stream(s) to fetch. + */ + public FlightInfo getForeignKeys(String pkCatalog, String pkSchema, String pkTable, + String fkCatalog, String fkSchema, String fkTable) { + if (null == pkTable && null == fkTable) { + throw Status.INVALID_ARGUMENT.asRuntimeException(); + } + + final FlightSql.CommandGetForeignKeys.Builder builder = FlightSql.CommandGetForeignKeys.newBuilder(); + + if (pkCatalog != null) { + builder.setPkCatalog(pkCatalog); + } + + if (pkSchema != null) { + builder.setPkSchema(pkSchema); + } + + if (pkTable != null) { + builder.setPkTable(pkTable); + } + + if (fkCatalog != null) { + builder.setFkCatalog(fkCatalog); + } + + if (fkSchema != null) { + builder.setFkSchema(fkSchema); + } + + if (fkTable != null) { + builder.setFkTable(fkTable); + } + + final FlightDescriptor descriptor = FlightDescriptor.command(Any.pack(builder.build()).toByteArray()); + return client.getInfo(descriptor); + } + + /** + * Request a list of table types. + * + * @return a FlightInfo object representing the stream(s) to fetch. + */ + public FlightInfo getTableTypes() { + final FlightSql.CommandGetTableTypes.Builder builder = FlightSql.CommandGetTableTypes.newBuilder(); + final FlightDescriptor descriptor = FlightDescriptor.command(Any.pack(builder.build()).toByteArray()); + return client.getInfo(descriptor); + } + + /** + * Create a prepared statement on the server. + * + * @param query The query to prepare. + * @return The representation of the prepared statement which exists on the server. + */ + public PreparedStatement prepare(String query) { + return new PreparedStatement(client, query); + } + + /** + * Helper class to encapsulate Flight SQL prepared statement logic. + */ + public static class PreparedStatement implements Closeable { + private final FlightClient client; + private final ActionCreatePreparedStatementResult preparedStatementResult; + private AtomicLong invocationCount; + private boolean isClosed; + private Schema resultSetSchema = null; + private Schema parameterSchema = null; + + /** + * Constructor. + * + * @param client The client. FlightSqlPreparedStatement does not maintain this resource. + * @param sql The query. + */ + public PreparedStatement(FlightClient client, String sql) { + this.client = client; + + final Iterator preparedStatementResults = client.doAction(new Action( + FlightSqlUtils.FLIGHT_SQL_CREATEPREPAREDSTATEMENT.getType(), + Any.pack(FlightSql.ActionCreatePreparedStatementRequest + .newBuilder() + .setQuery(sql) + .build()) + .toByteArray())); + + preparedStatementResult = FlightSqlUtils.unpackAndParseOrThrow( + preparedStatementResults.next().getBody(), + ActionCreatePreparedStatementResult.class); + + invocationCount = new AtomicLong(0); + isClosed = false; + } + + /** + * Returns the Schema of the resultset. + * + * @return the Schema of the resultset. + */ + public Schema getResultSetSchema() { + if (resultSetSchema == null && preparedStatementResult.getDatasetSchema() != null) { + resultSetSchema = Schema.deserialize(preparedStatementResult.getDatasetSchema().asReadOnlyByteBuffer()); + } + return resultSetSchema; + } + + /** + * Returns the Schema of the parameters. + * + * @return the Schema of the parameters. + */ + public Schema getParameterSchema() { + if (parameterSchema == null && preparedStatementResult.getParameterSchema() != null) { + parameterSchema = Schema.deserialize(preparedStatementResult.getParameterSchema().asReadOnlyByteBuffer()); + } + return parameterSchema; + } + + /** + * Executes the prepared statement query on the server. + * + * @return a FlightInfo object representing the stream(s) to fetch. + * @throws IOException if the PreparedStatement is closed. + */ + public FlightInfo execute() throws IOException { + if (isClosed) { + throw new IllegalStateException("Prepared statement has already been closed on the server."); + } + + final FlightDescriptor descriptor = FlightDescriptor + .command(Any.pack(CommandPreparedStatementQuery.newBuilder() + .setClientExecutionHandle( + ByteString.copyFrom(ByteBuffer.allocate(Long.BYTES).putLong(invocationCount.getAndIncrement()))) + .setPreparedStatementHandle(preparedStatementResult.getPreparedStatementHandle()) + .build()) + .toByteArray()); + + return client.getInfo(descriptor); + } + + /** + * Executes the prepared statement update on the server. + * + * @return the number of rows updated. + */ + public long executeUpdate() { + throw Status.UNIMPLEMENTED.asRuntimeException(); + } + + // TODO: Set parameter values + + @Override + public void close() { + isClosed = true; + final Iterator closePreparedStatementResults = client.doAction(new Action( + FlightSqlUtils.FLIGHT_SQL_CLOSEPREPAREDSTATEMENT.getType(), + Any.pack(FlightSql.ActionClosePreparedStatementRequest + .newBuilder() + .setPreparedStatementHandleBytes(preparedStatementResult.getPreparedStatementHandle()) + .build()) + .toByteArray())); + closePreparedStatementResults.forEachRemaining(result -> { + }); + } + + /** + * Returns if the prepared statement is already closed. + * + * @return true if the prepared statement is already closed. + */ + public boolean isClosed() { + return isClosed; + } + } +} diff --git a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlProducer.java b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlProducer.java new file mode 100644 index 00000000000..ed5cdb4d2c2 --- /dev/null +++ b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlProducer.java @@ -0,0 +1,646 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.sql; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.arrow.flight.Action; +import org.apache.arrow.flight.ActionType; +import org.apache.arrow.flight.FlightDescriptor; +import org.apache.arrow.flight.FlightInfo; +import org.apache.arrow.flight.FlightProducer; +import org.apache.arrow.flight.FlightStream; +import org.apache.arrow.flight.PutResult; +import org.apache.arrow.flight.Result; +import org.apache.arrow.flight.SchemaResult; +import org.apache.arrow.flight.Ticket; +import org.apache.arrow.flight.sql.impl.FlightSql.ActionClosePreparedStatementRequest; +import org.apache.arrow.flight.sql.impl.FlightSql.ActionCreatePreparedStatementRequest; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetCatalogs; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetForeignKeys; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetPrimaryKeys; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetSchemas; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetSqlInfo; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetTableTypes; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetTables; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandPreparedStatementQuery; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandPreparedStatementUpdate; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandStatementQuery; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandStatementUpdate; +import org.apache.arrow.vector.types.Types; +import org.apache.arrow.vector.types.UnionMode; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; + +import com.google.protobuf.Any; +import com.google.protobuf.InvalidProtocolBufferException; + +import io.grpc.Status; + +/** + * API to Implement an Arrow Flight SQL producer. + */ +public abstract class FlightSqlProducer implements FlightProducer, AutoCloseable { + /** + * Depending on the provided command, method either: + * 1. Return information about a SQL query, or + * 2. Return information about a prepared statement. In this case, parameters binding is allowed. + * + * @param context Per-call context. + * @param descriptor The descriptor identifying the data stream. + * @return information about the given SQL query, or the given prepared statement. + */ + @Override + public FlightInfo getFlightInfo(CallContext context, FlightDescriptor descriptor) { + final Any command = FlightSqlUtils.parseOrThrow(descriptor.getCommand()); + + if (command.is(CommandStatementQuery.class)) { + return getFlightInfoStatement( + FlightSqlUtils.unpackOrThrow(command, CommandStatementQuery.class), context, descriptor); + } else if (command.is(CommandPreparedStatementQuery.class)) { + return getFlightInfoPreparedStatement( + FlightSqlUtils.unpackOrThrow(command, CommandPreparedStatementQuery.class), context, descriptor); + } else if (command.is(CommandGetCatalogs.class)) { + return getFlightInfoCatalogs( + FlightSqlUtils.unpackOrThrow(command, CommandGetCatalogs.class), context, descriptor); + } else if (command.is(CommandGetSchemas.class)) { + return getFlightInfoSchemas( + FlightSqlUtils.unpackOrThrow(command, CommandGetSchemas.class), context, descriptor); + } else if (command.is(CommandGetTables.class)) { + return getFlightInfoTables( + FlightSqlUtils.unpackOrThrow(command, CommandGetTables.class), context, descriptor); + } else if (command.is(CommandGetTableTypes.class)) { + return getFlightInfoTableTypes(context, descriptor); + } else if (command.is(CommandGetSqlInfo.class)) { + return getFlightInfoSqlInfo( + FlightSqlUtils.unpackOrThrow(command, CommandGetSqlInfo.class), context, descriptor); + } else if (command.is(CommandPreparedStatementQuery.class)) { + return getFlightInfoPrimaryKeys( + FlightSqlUtils.unpackOrThrow(command, CommandGetPrimaryKeys.class), context, descriptor); + } else if (command.is(CommandGetForeignKeys.class)) { + return getFlightInfoForeignKeys( + FlightSqlUtils.unpackOrThrow(command, CommandGetForeignKeys.class), context, descriptor); + } + + throw Status.INVALID_ARGUMENT.asRuntimeException(); + } + + /** + * Returns the schema of the result produced by the SQL query. + * + * @param context Per-call context. + * @param descriptor The descriptor identifying the data stream. + * @return the result set schema. + */ + @Override + public SchemaResult getSchema(CallContext context, FlightDescriptor descriptor) { + final Any command = FlightSqlUtils.parseOrThrow(descriptor.getCommand()); + + if (command.is(CommandStatementQuery.class)) { + return getSchemaStatement( + FlightSqlUtils.unpackOrThrow(command, CommandStatementQuery.class), context, descriptor); + } else if (command.is(CommandGetCatalogs.class)) { + return getSchemaCatalogs(); + } else if (command.is(CommandGetSchemas.class)) { + return getSchemaSchemas(); + } else if (command.is(CommandGetTables.class)) { + return getSchemaTables(); + } else if (command.is(CommandGetTableTypes.class)) { + return getSchemaTableTypes(); + } else if (command.is(CommandGetSqlInfo.class)) { + return getSchemaSqlInfo(); + } else if (command.is(CommandGetPrimaryKeys.class)) { + return getSchemaPrimaryKeys(); + } else if (command.is(CommandGetForeignKeys.class)) { + return getSchemaForeignKeys(); + } + + throw Status.INVALID_ARGUMENT.asRuntimeException(); + } + + /** + * Depending on the provided command, method either: + * 1. Return data for a stream produced by executing the provided SQL query, or + * 2. Return data for a prepared statement. In this case, parameters binding is allowed. + * + * @param context Per-call context. + * @param ticket The application-defined ticket identifying this stream. + * @param listener An interface for sending data back to the client. + */ + @Override + public void getStream(CallContext context, Ticket ticket, ServerStreamListener listener) { + final Any command; + + try { + command = Any.parseFrom(ticket.getBytes()); + } catch (InvalidProtocolBufferException e) { + listener.error(e); + return; + } + + if (command.is(CommandStatementQuery.class)) { + getStreamStatement( + FlightSqlUtils.unpackOrThrow(command, CommandStatementQuery.class), context, ticket, listener); + } else if (command.is(CommandPreparedStatementQuery.class)) { + getStreamPreparedStatement( + FlightSqlUtils.unpackOrThrow(command, CommandPreparedStatementQuery.class), context, ticket, listener); + } else if (command.is(CommandGetCatalogs.class)) { + getStreamCatalogs(context, ticket, listener); + } else if (command.is(CommandGetSchemas.class)) { + getStreamSchemas(FlightSqlUtils.unpackOrThrow(command, CommandGetSchemas.class), context, ticket, listener); + } else if (command.is(CommandGetTables.class)) { + getStreamTables(FlightSqlUtils.unpackOrThrow(command, CommandGetTables.class), context, ticket, listener); + } else if (command.is(CommandGetTableTypes.class)) { + getStreamTableTypes(context, ticket, listener); + } else if (command.is(CommandGetSqlInfo.class)) { + getStreamSqlInfo(FlightSqlUtils.unpackOrThrow(command, CommandGetSqlInfo.class), context, ticket, listener); + } else if (command.is(CommandGetPrimaryKeys.class)) { + getStreamPrimaryKeys(FlightSqlUtils.unpackOrThrow(command, CommandGetSqlInfo.class), context, ticket, listener); + } else if (command.is(CommandGetForeignKeys.class)) { + getStreamForeignKeys(FlightSqlUtils.unpackOrThrow(command, CommandGetSqlInfo.class), context, ticket, listener); + } else { + throw Status.INVALID_ARGUMENT.asRuntimeException(); + } + } + + /** + * Depending on the provided command, method either: + * 1. Execute provided SQL query as an update statement, or + * 2. Execute provided update SQL query prepared statement. In this case, parameters binding + * is allowed, or + * 3. Binds parameters to the provided prepared statement. + * + * @param context Per-call context. + * @param flightStream The data stream being uploaded. + * @param ackStream The data stream listener for update result acknowledgement. + * @return a Runnable to process the stream. + */ + @Override + public Runnable acceptPut(CallContext context, FlightStream flightStream, StreamListener ackStream) { + final Any command = FlightSqlUtils.parseOrThrow(flightStream.getDescriptor().getCommand()); + + if (command.is(CommandStatementUpdate.class)) { + return acceptPutStatement( + FlightSqlUtils.unpackOrThrow(command, CommandStatementUpdate.class), + context, flightStream, ackStream); + } else if (command.is(CommandPreparedStatementUpdate.class)) { + return acceptPutPreparedStatementUpdate( + FlightSqlUtils.unpackOrThrow(command, CommandPreparedStatementUpdate.class), + context, flightStream, ackStream); + } else if (command.is(CommandPreparedStatementQuery.class)) { + return acceptPutPreparedStatementQuery( + FlightSqlUtils.unpackOrThrow(command, CommandPreparedStatementQuery.class), + context, flightStream, ackStream); + } + + throw Status.INVALID_ARGUMENT.asRuntimeException(); + } + + /** + * Lists all available Flight SQL actions. + * + * @param context Per-call context. + * @param listener An interface for sending data back to the client. + */ + @Override + public void listActions(CallContext context, StreamListener listener) { + FlightSqlUtils.FLIGHT_SQL_ACTIONS.forEach(listener::onNext); + listener.onCompleted(); + } + + /** + * Performs the requested Flight SQL action. + * + * @param context Per-call context. + * @param action Client-supplied parameters. + * @param listener A stream of responses. + */ + @Override + public void doAction(CallContext context, Action action, StreamListener listener) { + if (action.getType().equals(FlightSqlUtils.FLIGHT_SQL_CREATEPREPAREDSTATEMENT.getType())) { + final ActionCreatePreparedStatementRequest request = FlightSqlUtils.unpackAndParseOrThrow(action.getBody(), + ActionCreatePreparedStatementRequest.class); + createPreparedStatement(request, context, listener); + } else if (action.getType().equals(FlightSqlUtils.FLIGHT_SQL_CLOSEPREPAREDSTATEMENT.getType())) { + final ActionClosePreparedStatementRequest request = FlightSqlUtils.unpackAndParseOrThrow(action.getBody(), + ActionClosePreparedStatementRequest.class); + closePreparedStatement(request, context, listener); + } + + throw Status.INVALID_ARGUMENT.asRuntimeException(); + } + + /** + * Creates a prepared statement on the server and returns a handle and metadata for in a + * {@link org.apache.arrow.flight.sql.impl.FlightSql.ActionCreatePreparedStatementResult} object in a {@link Result} + * object. + * + * @param request The sql command to generate the prepared statement. + * @param context Per-call context. + * @param listener A stream of responses. + */ + public abstract void createPreparedStatement(ActionCreatePreparedStatementRequest request, CallContext context, + StreamListener listener); + + /** + * Closes a prepared statement on the server. No result is expected. + * + * @param request The sql command to generate the prepared statement. + * @param context Per-call context. + * @param listener A stream of responses. + */ + public abstract void closePreparedStatement(ActionClosePreparedStatementRequest request, CallContext context, + StreamListener listener); + + /** + * Gets information about a particular SQL query based data stream. + * + * @param command The sql command to generate the data stream. + * @param context Per-call context. + * @param descriptor The descriptor identifying the data stream. + * @return Metadata about the stream. + */ + public abstract FlightInfo getFlightInfoStatement(CommandStatementQuery command, CallContext context, + FlightDescriptor descriptor); + + /** + * Gets information about a particular prepared statement data stream. + * + * @param command The prepared statement to generate the data stream. + * @param context Per-call context. + * @param descriptor The descriptor identifying the data stream. + * @return Metadata about the stream. + */ + public abstract FlightInfo getFlightInfoPreparedStatement(CommandPreparedStatementQuery command, + CallContext context, FlightDescriptor descriptor); + + /** + * Gets schema about a particular SQL query based data stream. + * + * @param command The sql command to generate the data stream. + * @param context Per-call context. + * @param descriptor The descriptor identifying the data stream. + * @return Schema for the stream. + */ + public abstract SchemaResult getSchemaStatement(CommandStatementQuery command, CallContext context, + FlightDescriptor descriptor); + + /** + * Returns data for a SQL query based data stream. + * + * @param command The sql command to generate the data stream. + * @param context Per-call context. + * @param ticket The application-defined ticket identifying this stream. + * @param listener An interface for sending data back to the client. + */ + public abstract void getStreamStatement(CommandStatementQuery command, CallContext context, Ticket ticket, + ServerStreamListener listener); + + /** + * Returns data for a particular prepared statement query instance. + * + * @param command The prepared statement to generate the data stream. + * @param context Per-call context. + * @param ticket The application-defined ticket identifying this stream. + * @param listener An interface for sending data back to the client. + */ + public abstract void getStreamPreparedStatement(CommandPreparedStatementQuery command, CallContext context, + Ticket ticket, ServerStreamListener listener); + + /** + * Accepts uploaded data for a particular SQL query based data stream. PutResults must be in the form of a + * {@link org.apache.arrow.flight.sql.impl.FlightSql.DoPutUpdateResult}. + * + * @param command The sql command to generate the data stream. + * @param context Per-call context. + * @param flightStream The data stream being uploaded. + * @param ackStream The result data stream. + * @return A runnable to process the stream. + */ + public abstract Runnable acceptPutStatement(CommandStatementUpdate command, CallContext context, + FlightStream flightStream, StreamListener ackStream); + + /** + * Accepts uploaded data for a particular prepared statement data stream. PutResults must be in the form of a + * {@link org.apache.arrow.flight.sql.impl.FlightSql.DoPutUpdateResult}. + * + * @param command The prepared statement to generate the data stream. + * @param context Per-call context. + * @param flightStream The data stream being uploaded. + * @param ackStream The result data stream. + * @return A runnable to process the stream. + */ + public abstract Runnable acceptPutPreparedStatementUpdate(CommandPreparedStatementUpdate command, + CallContext context, FlightStream flightStream, StreamListener ackStream); + + /** + * Accepts uploaded parameter values for a particular prepared statement query. + * + * @param command The prepared statement the parameter values will bind to. + * @param context Per-call context. + * @param flightStream The data stream being uploaded. + * @param ackStream The result data stream. + * @return A runnable to process the stream. + */ + public abstract Runnable acceptPutPreparedStatementQuery(CommandPreparedStatementQuery command, + CallContext context, FlightStream flightStream, StreamListener ackStream); + + /** + * Returns the SQL Info of the server by returning a + * {@link org.apache.arrow.flight.sql.impl.FlightSql.CommandGetSqlInfo} in a {@link Result}. + * + * @param request request filter parameters. + * @param context Per-call context. + * @param descriptor The descriptor identifying the data stream. + * @return Metadata about the stream. + */ + public abstract FlightInfo getFlightInfoSqlInfo(CommandGetSqlInfo request, CallContext context, + FlightDescriptor descriptor); + + /** + * Gets schema about the get SQL info data stream. + * + * @return Schema for the stream. + */ + public SchemaResult getSchemaSqlInfo() { + final List fields = new ArrayList<>(); + + fields.add(new Field("info_name", FieldType.nullable(Types.MinorType.VARCHAR.getType()), null)); + + // dense_union + final List children = new ArrayList<>(); + children.add(new Field("string_value", FieldType.nullable(Types.MinorType.VARCHAR.getType()), null)); + children.add(new Field("int_value", FieldType.nullable(Types.MinorType.INT.getType()), null)); + children.add(new Field("bigint_value", FieldType.nullable(Types.MinorType.BIGINT.getType()), null)); + children.add(new Field("int32_bitmask", FieldType.nullable(Types.MinorType.INT.getType()), null)); + + fields.add(new Field( + "value", + new FieldType(false, new ArrowType.Union(UnionMode.Dense, new int[] {0, 1, 2, 3}), /*dictionary=*/null), + children)); + + return new SchemaResult(new Schema(fields)); + } + + /** + * Returns data for SQL info based data stream. + * + * @param command The command to generate the data stream. + * @param context Per-call context. + * @param ticket The application-defined ticket identifying this stream. + * @param listener An interface for sending data back to the client. + */ + public abstract void getStreamSqlInfo(CommandGetSqlInfo command, CallContext context, Ticket ticket, + ServerStreamListener listener); + + /** + * Returns the available catalogs by returning a stream of + * {@link org.apache.arrow.flight.sql.impl.FlightSql.CommandGetCatalogs} objects in {@link Result} objects. + * + * @param request request filter parameters. + * @param context Per-call context. + * @param descriptor The descriptor identifying the data stream. + * @return Metadata about the stream. + */ + public abstract FlightInfo getFlightInfoCatalogs(CommandGetCatalogs request, CallContext context, + FlightDescriptor descriptor); + + /** + * Gets schema about the get catalogs data stream. + * + * @return Schema for the stream. + */ + public SchemaResult getSchemaCatalogs() { + final List fields = new ArrayList<>(); + + fields.add(new Field("catalog_name", FieldType.nullable(Types.MinorType.VARCHAR.getType()), null)); + + return new SchemaResult(new Schema(fields)); + } + + /** + * Returns data for catalogs based data stream. + * + * @param context Per-call context. + * @param ticket The application-defined ticket identifying this stream. + * @param listener An interface for sending data back to the client. + */ + public abstract void getStreamCatalogs(CallContext context, Ticket ticket, + ServerStreamListener listener); + + /** + * Returns the available schemas by returning a stream of + * {@link org.apache.arrow.flight.sql.impl.FlightSql.CommandGetSchemas} objects in {@link Result} objects. + * + * @param request request filter parameters. + * @param context Per-call context. + * @param descriptor The descriptor identifying the data stream. + * @return Metadata about the stream. + */ + public abstract FlightInfo getFlightInfoSchemas(CommandGetSchemas request, CallContext context, + FlightDescriptor descriptor); + + /** + * Gets schema about the get schemas data stream. + * + * @return Schema for the stream. + */ + public SchemaResult getSchemaSchemas() { + final List fields = new ArrayList<>(); + + fields.add(new Field("catalog_name", FieldType.nullable(Types.MinorType.VARCHAR.getType()), null)); + fields.add(new Field("schema_name", FieldType.nullable(Types.MinorType.VARCHAR.getType()), null)); + + return new SchemaResult(new Schema(fields)); + } + + /** + * Returns data for schemas based data stream. + * + * @param command The command to generate the data stream. + * @param context Per-call context. + * @param ticket The application-defined ticket identifying this stream. + * @param listener An interface for sending data back to the client. + */ + public abstract void getStreamSchemas(CommandGetSchemas command, CallContext context, Ticket ticket, + ServerStreamListener listener); + + /** + * Returns the available tables by returning a stream of + * {@link org.apache.arrow.flight.sql.impl.FlightSql.CommandGetTables} objects in {@link Result} objects. + * + * @param request request filter parameters. + * @param context Per-call context. + * @param descriptor The descriptor identifying the data stream. + * @return Metadata about the stream. + */ + public abstract FlightInfo getFlightInfoTables(CommandGetTables request, CallContext context, + FlightDescriptor descriptor); + + /** + * Gets schema about the get tables data stream. + * + * @return Schema for the stream. + */ + public SchemaResult getSchemaTables() { + final List fields = new ArrayList<>(); + + fields.add(new Field("catalog_name", FieldType.nullable(Types.MinorType.VARCHAR.getType()), null)); + fields.add(new Field("schema_name", FieldType.nullable(Types.MinorType.VARCHAR.getType()), null)); + fields.add(new Field("table_name", FieldType.nullable(Types.MinorType.VARCHAR.getType()), null)); + fields.add(new Field("table_type", FieldType.nullable(Types.MinorType.VARCHAR.getType()), null)); + fields.add(new Field("table_schema", FieldType.nullable(Types.MinorType.VARBINARY.getType()), null)); + + return new SchemaResult(new Schema(fields)); + } + + /** + * Returns data for tables based data stream. + * + * @param command The command to generate the data stream. + * @param context Per-call context. + * @param ticket The application-defined ticket identifying this stream. + * @param listener An interface for sending data back to the client. + */ + public abstract void getStreamTables(CommandGetTables command, CallContext context, Ticket ticket, + ServerStreamListener listener); + + /** + * Returns the available table types by returning a stream of + * {@link org.apache.arrow.flight.sql.impl.FlightSql.CommandGetTableTypes} objects in {@link Result} objects. + * + * @param context Per-call context. + * @param descriptor The descriptor identifying the data stream. + * @return Metadata about the stream. + */ + public abstract FlightInfo getFlightInfoTableTypes(CallContext context, FlightDescriptor descriptor); + + /** + * Gets schema about the get table types data stream. + * + * @return Schema for the stream. + */ + public SchemaResult getSchemaTableTypes() { + final List fields = new ArrayList<>(); + + fields.add(new Field("table_type", FieldType.nullable(Types.MinorType.VARCHAR.getType()), null)); + + return new SchemaResult(new Schema(fields)); + } + + /** + * Returns data for table types based data stream. + * + * @param context Per-call context. + * @param ticket The application-defined ticket identifying this stream. + * @param listener An interface for sending data back to the client. + */ + public abstract void getStreamTableTypes(CallContext context, Ticket ticket, ServerStreamListener listener); + + /** + * Returns the available primary keys by returning a stream of + * {@link org.apache.arrow.flight.sql.impl.FlightSql.CommandGetPrimaryKeys} objects in {@link Result} objects. + * + * @param request request filter parameters. + * @param context Per-call context. + * @param descriptor The descriptor identifying the data stream. + * @return Metadata about the stream. + */ + public abstract FlightInfo getFlightInfoPrimaryKeys(CommandGetPrimaryKeys request, CallContext context, + FlightDescriptor descriptor); + + /** + * Gets schema about the get primary keys data stream. + * + * @return Schema for the stream. + */ + public SchemaResult getSchemaPrimaryKeys() { + final List fields = new ArrayList<>(); + + fields.add(new Field("catalog_name", FieldType.nullable(Types.MinorType.VARCHAR.getType()), null)); + fields.add(new Field("schema_name", FieldType.nullable(Types.MinorType.VARCHAR.getType()), null)); + fields.add(new Field("table_name", FieldType.nullable(Types.MinorType.VARCHAR.getType()), null)); + fields.add(new Field("column_name", FieldType.nullable(Types.MinorType.VARCHAR.getType()), null)); + fields.add(new Field("key_sequence", FieldType.nullable(Types.MinorType.INT.getType()), null)); + fields.add(new Field("key_name", FieldType.nullable(Types.MinorType.VARCHAR.getType()), null)); + + return new SchemaResult(new Schema(fields)); + } + + /** + * Returns data for primary keys based data stream. + * + * @param command The command to generate the data stream. + * @param context Per-call context. + * @param ticket The application-defined ticket identifying this stream. + * @param listener An interface for sending data back to the client. + */ + public abstract void getStreamPrimaryKeys(CommandGetPrimaryKeys command, CallContext context, Ticket ticket, + ServerStreamListener listener); + + /** + * Returns the available primary keys by returning a stream of + * {@link org.apache.arrow.flight.sql.impl.FlightSql.CommandGetForeignKeys} objects in {@link Result} objects. + * + * @param request request filter parameters. + * @param context Per-call context. + * @param descriptor The descriptor identifying the data stream. + * @return Metadata about the stream. + */ + public abstract FlightInfo getFlightInfoForeignKeys(CommandGetForeignKeys request, CallContext context, + FlightDescriptor descriptor); + + /** + * Gets schema about the get foreign keys data stream. + * + * @return Schema for the stream. + */ + public SchemaResult getSchemaForeignKeys() { + final List fields = new ArrayList<>(); + + fields.add(new Field("pk_catalog_name", FieldType.nullable(Types.MinorType.VARCHAR.getType()), null)); + fields.add(new Field("pk_schema_name", FieldType.nullable(Types.MinorType.VARCHAR.getType()), null)); + fields.add(new Field("pk_table_name", FieldType.nullable(Types.MinorType.VARCHAR.getType()), null)); + fields.add(new Field("pk_column_name", FieldType.nullable(Types.MinorType.VARCHAR.getType()), null)); + fields.add(new Field("fk_catalog_name", FieldType.nullable(Types.MinorType.VARCHAR.getType()), null)); + fields.add(new Field("fk_schema_name", FieldType.nullable(Types.MinorType.VARCHAR.getType()), null)); + fields.add(new Field("fk_table_name", FieldType.nullable(Types.MinorType.VARCHAR.getType()), null)); + fields.add(new Field("fk_column_name", FieldType.nullable(Types.MinorType.VARCHAR.getType()), null)); + fields.add(new Field("key_sequence", FieldType.nullable(Types.MinorType.INT.getType()), null)); + fields.add(new Field("fk_key_name", FieldType.nullable(Types.MinorType.VARCHAR.getType()), null)); + fields.add(new Field("pk_key_name", FieldType.nullable(Types.MinorType.VARCHAR.getType()), null)); + fields.add(new Field("update_rule", FieldType.nullable(Types.MinorType.INT.getType()), null)); + fields.add(new Field("delete_rule", FieldType.nullable(Types.MinorType.INT.getType()), null)); + + return new SchemaResult(new Schema(fields)); + } + + /** + * Returns data for foreign keys based data stream. + * + * @param command The command to generate the data stream. + * @param context Per-call context. + * @param ticket The application-defined ticket identifying this stream. + * @param listener An interface for sending data back to the client. + */ + public abstract void getStreamForeignKeys(CommandGetForeignKeys command, CallContext context, Ticket ticket, + ServerStreamListener listener); +} diff --git a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlUtils.java b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlUtils.java new file mode 100644 index 00000000000..9bafca17d8b --- /dev/null +++ b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlUtils.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.sql; + +import java.util.List; + +import org.apache.arrow.flight.ActionType; + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.Any; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; + +/** + * Utilities to work with Flight SQL semantics. + */ +public final class FlightSqlUtils { + public static final ActionType FLIGHT_SQL_CREATEPREPAREDSTATEMENT = new ActionType("CreatePreparedStatement", + "Creates a reusable prepared statement resource on the server. \n" + + "Request Message: ActionCreatePreparedStatementRequest\n" + + "Response Message: ActionCreatePreparedStatementResult"); + + public static final ActionType FLIGHT_SQL_CLOSEPREPAREDSTATEMENT = new ActionType("ClosePreparedStatement", + "Closes a reusable prepared statement resource on the server. \n" + + "Request Message: ActionClosePreparedStatementRequest\n" + + "Response Message: N/A"); + + public static final List FLIGHT_SQL_ACTIONS = ImmutableList.of( + FLIGHT_SQL_CREATEPREPAREDSTATEMENT, + FLIGHT_SQL_CLOSEPREPAREDSTATEMENT + ); + + /** + * Helper to parse {@link com.google.protobuf.Any} objects to the specific protobuf object. + * + * @param source the raw bytes source value. + * @return the materialized protobuf object. + */ + public static Any parseOrThrow(byte[] source) { + try { + return Any.parseFrom(source); + } catch (InvalidProtocolBufferException e) { + throw new AssertionError(e.getMessage()); + } + } + + /** + * Helper to unpack {@link com.google.protobuf.Any} objects to the specific protobuf object. + * + * @param source the parsed Source value. + * @param as the class to unpack as. + * @param the class to unpack as. + * @return the materialized protobuf object. + */ + public static T unpackOrThrow(Any source, Class as) { + try { + return source.unpack(as); + } catch (InvalidProtocolBufferException e) { + throw new AssertionError(e.getMessage()); + } + } + + /** + * Helper to parse and unpack {@link com.google.protobuf.Any} objects to the specific protobuf object. + * + * @param source the raw bytes source value. + * @param as the class to unpack as. + * @param the class to unpack as. + * @return the materialized protobuf object. + */ + public static T unpackAndParseOrThrow(byte[] source, Class as) { + return unpackOrThrow(parseOrThrow(source), as); + } +} diff --git a/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/TestFlightSql.java b/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/TestFlightSql.java new file mode 100644 index 00000000000..69300ceab89 --- /dev/null +++ b/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/TestFlightSql.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.arrow.flight.sql.FlightSqlClient; +import org.apache.arrow.flight.sql.FlightSqlExample; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.Types; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; + +/** + * Test direct usage of Flight SQL workflows. + */ +public class TestFlightSql { + private static BufferAllocator allocator; + private static FlightServer server; + + private static FlightClient client; + private static FlightSqlClient sqlClient; + + protected static final Schema SCHEMA_INT_TABLE = new Schema(Arrays.asList( + new Field("KEYNAME", FieldType.nullable(Types.MinorType.VARCHAR.getType()), null), + new Field("VALUE", FieldType.nullable(Types.MinorType.INT.getType()), null))); + + @BeforeClass + public static void setUp() throws Exception { + allocator = new RootAllocator(Integer.MAX_VALUE); + + final Location serverLocation = Location.forGrpcInsecure(FlightTestUtil.LOCALHOST, 0); + server = FlightServer.builder(allocator, serverLocation, new FlightSqlExample(serverLocation)).build(); + server.start(); + + final Location clientLocation = Location.forGrpcInsecure(FlightTestUtil.LOCALHOST, server.getPort()); + client = FlightClient.builder(allocator, clientLocation).build(); + sqlClient = new FlightSqlClient(client); + } + + @AfterClass + public static void tearDown() throws Exception { + AutoCloseables.close(client, server, allocator); + } + + @Test + public void testGetTables() throws Exception { + final FlightInfo info = sqlClient.getTables(null, null, null, null, false); + try (final FlightStream stream = sqlClient.getStream(info.getEndpoints().get(0).getTicket())) { + final List> results = getResults(stream); + Assertions.assertEquals(1, results.size()); + Assertions.assertEquals( + Arrays.asList(null, "APP", "INTTABLE", "TABLE", SCHEMA_INT_TABLE.toJson()), results.get(0)); + } + } + + @Test + public void testSimplePrepStmt() throws Exception { + final FlightSqlClient.PreparedStatement preparedStatement = sqlClient.prepare("Select * from intTable"); + + final Schema actualSchema = preparedStatement.getResultSetSchema(); + Assertions.assertEquals(SCHEMA_INT_TABLE, actualSchema); + + final FlightInfo info = preparedStatement.execute(); + Assertions.assertEquals(SCHEMA_INT_TABLE, info.getSchema()); + + try (final FlightStream stream = sqlClient.getStream(info.getEndpoints().get(0).getTicket())) { + Assertions.assertEquals(SCHEMA_INT_TABLE, stream.getSchema()); + + final List> results = getResults(stream); + Assertions.assertEquals(3, results.size()); + Assertions.assertEquals(Arrays.asList("one", "1"), results.get(0)); + Assertions.assertEquals(Arrays.asList("zero", "0"), results.get(1)); + Assertions.assertEquals(Arrays.asList("negative one", "-1"), results.get(2)); + } + + AutoCloseables.close(preparedStatement); + Assertions.assertTrue(preparedStatement.isClosed()); + } + + List> getResults(FlightStream stream) { + final List> results = new ArrayList<>(); + while (stream.next()) { + final VectorSchemaRoot root = stream.getRoot(); + final long rowCount = root.getRowCount(); + for (int i = 0; i < rowCount; ++i) { + results.add(new ArrayList<>()); + } + + for (Field field : root.getSchema().getFields()) { + final FieldVector fieldVector = root.getVector(field.getName()); + + if (fieldVector instanceof VarCharVector) { + final VarCharVector varcharVector = (VarCharVector) fieldVector; + for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) { + results.get(rowIndex).add(varcharVector.getObject(rowIndex).toString()); + } + } else if (fieldVector instanceof IntVector) { + for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) { + results.get(rowIndex).add(String.valueOf(((IntVector) fieldVector).get(rowIndex))); + } + } else if (fieldVector instanceof VarBinaryVector) { + final VarBinaryVector varbinaryVector = (VarBinaryVector) fieldVector; + for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) { + results.get(rowIndex).add(Schema.deserialize(ByteBuffer.wrap(varbinaryVector.get(rowIndex))).toJson()); + } + } else { + throw new UnsupportedOperationException("Not yet implemented"); + } + } + } + stream.getRoot().clear(); + return results; + } +} diff --git a/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/FlightSqlExample.java b/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/FlightSqlExample.java new file mode 100644 index 00000000000..bb0d727db61 --- /dev/null +++ b/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/FlightSqlExample.java @@ -0,0 +1,676 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.sql; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ParameterMetaData; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Types; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.stream.Stream; + +import org.apache.arrow.flight.CallStatus; +import org.apache.arrow.flight.Criteria; +import org.apache.arrow.flight.FlightDescriptor; +import org.apache.arrow.flight.FlightEndpoint; +import org.apache.arrow.flight.FlightInfo; +import org.apache.arrow.flight.FlightRuntimeException; +import org.apache.arrow.flight.FlightStatusCode; +import org.apache.arrow.flight.FlightStream; +import org.apache.arrow.flight.Location; +import org.apache.arrow.flight.PutResult; +import org.apache.arrow.flight.Result; +import org.apache.arrow.flight.SchemaResult; +import org.apache.arrow.flight.Ticket; +import org.apache.arrow.flight.sql.impl.FlightSql; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandPreparedStatementQuery; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandPreparedStatementUpdate; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandStatementQuery; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandStatementUpdate; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.dictionary.DictionaryProvider; +import org.apache.arrow.vector.types.DateUnit; +import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.TimeUnit; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.commons.dbcp2.ConnectionFactory; +import org.apache.commons.dbcp2.DriverManagerConnectionFactory; +import org.apache.commons.dbcp2.PoolableConnection; +import org.apache.commons.dbcp2.PoolableConnectionFactory; +import org.apache.commons.dbcp2.PoolingDataSource; +import org.apache.commons.pool2.ObjectPool; +import org.apache.commons.pool2.impl.GenericObjectPool; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.Any; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; + +import io.grpc.Status; + +/** + * Proof of concept {@link FlightSqlProducer} implementation showing an Apache Derby backed Flight SQL server capable + * of the following workflows: + * - returning a list of tables from the action "GetTables". + * - creation of a prepared statement from the action "GetPreparedStatement". + * - execution of a prepared statement by using a {@link CommandPreparedStatementQuery} with getFlightInfo and + * getStream. + */ +public class FlightSqlExample extends FlightSqlProducer implements AutoCloseable { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FlightSqlExample.class); + + private static final int BIT_WIDTH_8 = 8; + private static final int BIT_WIDTH_16 = 16; + private static final int BIT_WIDTH_32 = 32; + private static final int BIT_WIDTH_64 = 64; + private static final boolean IS_SIGNED_TRUE = true; + + private static final int BATCH_ROW_SIZE = 1000; + + private final Location location; + private final PoolingDataSource dataSource; + + private final LoadingCache commandExecutePreparedStatementLoadingCache; + private final LoadingCache preparedStatementLoadingCache; + + public FlightSqlExample(Location location) { + removeDerbyDatabaseIfExists(); + populateDerbyDatabase(); + + final ConnectionFactory connectionFactory = + new DriverManagerConnectionFactory("jdbc:derby:target/derbyDB", null); + final PoolableConnectionFactory poolableConnectionFactory = new PoolableConnectionFactory(connectionFactory, null); + final ObjectPool connectionPool = new GenericObjectPool<>(poolableConnectionFactory); + poolableConnectionFactory.setPool(connectionPool); + + // PoolingDataSource takes ownership of connectionPool. + dataSource = new PoolingDataSource<>(connectionPool); + + preparedStatementLoadingCache = + CacheBuilder.newBuilder() + .maximumSize(100) + .expireAfterWrite(10, java.util.concurrent.TimeUnit.MINUTES) + .removalListener(new PreparedStatementRemovalListener()) + .build(new PreparedStatementCacheLoader(dataSource)); + + commandExecutePreparedStatementLoadingCache = + CacheBuilder.newBuilder() + .maximumSize(100) + .expireAfterWrite(10, java.util.concurrent.TimeUnit.MINUTES) + .removalListener(new CommandExecutePreparedStatementRemovalListener()) + .build(new CommandExecutePreparedStatementCacheLoader(preparedStatementLoadingCache)); + + this.location = location; + } + + @Override + public void getTables(FlightSql.ActionGetTablesRequest request, CallContext context, + StreamListener listener) { + try { + final String catalog = (request.getCatalog().isEmpty() ? null : request.getCatalog()); + + final String schemaFilterPattern = + (request.getSchemaFilterPattern().isEmpty() ? null : request.getSchemaFilterPattern()); + + final String tableFilterPattern = + (request.getTableNameFilterPattern().isEmpty() ? null : request.getTableNameFilterPattern()); + + final String[] tableTypes = request.getTableTypesList().size() == 0 ? null : + request.getTableTypesList().toArray(new String[request.getTableTypesList().size()]); + + try (final Connection connection = dataSource.getConnection(); + final ResultSet tables = connection.getMetaData().getTables( + catalog, + schemaFilterPattern, + tableFilterPattern, + tableTypes)) { + while (tables.next()) { + listener.onNext(getTableResult(tables, request.getIncludeSchema())); + } + } + } catch (SQLException e) { + listener.onError(e); + } finally { + listener.onCompleted(); + } + } + + private Result getTableResult(final ResultSet tables, boolean includeSchema) throws SQLException { + + final String catalog = tables.getString("TABLE_CAT"); + final String schema = tables.getString("TABLE_SCHEM"); + final String table = tables.getString("TABLE_NAME"); + final String tableType = tables.getString("TABLE_TYPE"); + + final ActionGetTablesResult.Builder builder = ActionGetTablesResult.newBuilder() + .setCatalog(catalog) + .setSchema(schema) + .setTable(table) + .setTableType(tableType); + + if (includeSchema) { + final Schema pojoSchema = buildSchema(catalog, schema, table); + builder.setArrowMetadata(ByteString.copyFrom(pojoSchema.toByteArray())); + } + + return new Result(Any.pack(builder.build()).toByteArray()); + } + + @Override + public void getPreparedStatement(FlightSql.ActionGetPreparedStatementRequest request, CallContext context, + StreamListener listener) { + final PreparedStatementCacheKey handle = new PreparedStatementCacheKey( + UUID.randomUUID().toString(), request.getQuery()); + + try { + final PreparedStatementContext preparedStatementContext = preparedStatementLoadingCache.get(handle); + final PreparedStatement preparedStatement = preparedStatementContext.getPreparedStatement(); + + // todo + final Schema pojoParameterMetaDataSchema = buildSchema(preparedStatement.getParameterMetaData()); + final Schema pojoResultSetSchema = buildSchema(preparedStatement.getMetaData()); + + listener.onNext(new Result( + Any.pack(ActionGetPreparedStatementResult.newBuilder() + .setDatasetSchema(ByteString.copyFrom(pojoResultSetSchema.toByteArray())) + .setParameterSchema(ByteString.copyFrom(pojoParameterMetaDataSchema.toByteArray())) + .setPreparedStatementHandle(handle.toProtocol()) + .build()) + .toByteArray())); + + } catch (Throwable e) { + listener.onError(e); + } finally { + listener.onCompleted(); + } + } + + @Override + public FlightInfo getFlightInfoPreparedStatement(CommandPreparedStatementQuery command, FlightDescriptor descriptor, + CallContext context) { + try { + final ResultSet resultSet = commandExecutePreparedStatementLoadingCache.get(command); + final Schema schema = buildSchema(resultSet.getMetaData()); + + final List endpoints = ImmutableList + .of(new FlightEndpoint(new Ticket(Any.pack(command).toByteArray()), location)); + + return new FlightInfo(schema, descriptor, endpoints, -1, -1); + } catch (Throwable e) { + logger.error("There was a problem executing the prepared statement", e); + throw new FlightRuntimeException(new CallStatus(FlightStatusCode.INTERNAL, e, e.getMessage(), null)); + } + } + + private Schema buildSchema(String catalog, String schema, String table) throws SQLException { + final List fields = new ArrayList<>(); + + try (final Connection connection = dataSource.getConnection(); + final ResultSet columns = connection.getMetaData().getColumns( + catalog, + schema, + table, + null);) { + + while (columns.next()) { + final String columnName = columns.getString("COLUMN_NAME"); + final int jdbcDataType = columns.getInt("DATA_TYPE"); + final String jdbcDataTypeName = columns.getString("TYPE_NAME"); + final String jdbcIsNullable = columns.getString("IS_NULLABLE"); + final boolean arrowIsNullable = jdbcIsNullable.equals("YES"); + + final int precision = columns.getInt("DECIMAL_DIGITS"); + final int scale = columns.getInt("COLUMN_SIZE"); + final ArrowType arrowType = FlightSqlUtils.getArrowTypeFromJDBCType(jdbcDataType, precision, scale); + + final FieldType fieldType = new FieldType(arrowIsNullable, arrowType, /*dictionary=*/null); + fields.add(new Field(columnName, fieldType, null)); + } + } + + return new Schema(fields); + } + + @Override + public void getStreamPreparedStatement(CommandPreparedStatementQuery command, CallContext context, Ticket ticket, + ServerStreamListener listener) { + try { + final ResultSet resultSet = commandExecutePreparedStatementLoadingCache.get(command); + final ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); + final Schema schema = buildSchema(resultSetMetaData); + final DictionaryProvider dictionaryProvider = new DictionaryProvider.MapDictionaryProvider(); + + try (final BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + + listener.start(root, dictionaryProvider); + final int columnCount = resultSetMetaData.getColumnCount(); + + while (resultSet.next()) { + final int rowCounter = readBatch(resultSet, resultSetMetaData, root, columnCount); + + for (int resultSetColumnCounter = 1; resultSetColumnCounter <= columnCount; resultSetColumnCounter++) { + final String columnName = resultSetMetaData.getColumnName(resultSetColumnCounter); + root.getVector(columnName).setValueCount(rowCounter); + } + + root.setRowCount(rowCounter); + listener.putNext(); + } + } + } catch (Throwable e) { + listener.error(e); + } finally { + listener.completed(); + commandExecutePreparedStatementLoadingCache.invalidate(command); + } + } + + private int readBatch(ResultSet resultSet, ResultSetMetaData resultSetMetaData, VectorSchemaRoot root, + int columnCount) throws SQLException { + int rowCounter = 0; + do { + for (int resultSetColumnCounter = 1; resultSetColumnCounter <= columnCount; resultSetColumnCounter++) { + final String columnName = resultSetMetaData.getColumnName(resultSetColumnCounter); + + final FieldVector fieldVector = root.getVector(columnName); + + if (fieldVector instanceof VarCharVector) { + final String value = resultSet.getString(resultSetColumnCounter); + if (resultSet.wasNull()) { + // TODO handle null + } else { + ((VarCharVector) fieldVector).setSafe(rowCounter, value.getBytes(), 0, value.length()); + } + } else if (fieldVector instanceof IntVector) { + final int value = resultSet.getInt(resultSetColumnCounter); + + if (resultSet.wasNull()) { + // TODO handle null + } else { + ((IntVector) fieldVector).setSafe(rowCounter, value); + } + } else { + throw new UnsupportedOperationException(); + } + } + rowCounter++; + } + while (rowCounter < BATCH_ROW_SIZE && resultSet.next()); + + return rowCounter; + } + + + @Override + public void closePreparedStatement(FlightSql.ActionClosePreparedStatementRequest request, CallContext context, + StreamListener listener) { + try { + preparedStatementLoadingCache.invalidate( + PreparedStatementCacheKey.fromProtocol(request.getPreparedStatementHandleBytes())); + } catch (InvalidProtocolBufferException e) { + listener.onError(e); + } finally { + listener.onCompleted(); + } + } + + private Schema buildSchema(ResultSetMetaData resultSetMetaData) throws SQLException { + Preconditions.checkNotNull(resultSetMetaData, "ResultSetMetaData object can't be null"); + final List resultSetFields = new ArrayList<>(); + + for (int resultSetCounter = 1; resultSetCounter <= resultSetMetaData.getColumnCount(); resultSetCounter++) { + final String name = resultSetMetaData.getColumnName(resultSetCounter); + + final int jdbcDataType = resultSetMetaData.getColumnType(resultSetCounter); + + final int jdbcIsNullable = resultSetMetaData.isNullable(resultSetCounter); + final boolean arrowIsNullable = jdbcIsNullable == ResultSetMetaData.columnNullable; + + final int precision = resultSetMetaData.getPrecision(resultSetCounter); + final int scale = resultSetMetaData.getScale(resultSetCounter); + + final ArrowType arrowType = getArrowTypeFromJDBCType(jdbcDataType, precision, scale); + + final FieldType fieldType = new FieldType(arrowIsNullable, arrowType, /*dictionary=*/null); + resultSetFields.add(new Field(name, fieldType, null)); + } + final Schema pojoResultSetSchema = new Schema(resultSetFields); + return pojoResultSetSchema; + } + + private Schema buildSchema(ParameterMetaData parameterMetaData) throws SQLException { + Preconditions.checkNotNull(parameterMetaData, "ParameterMetaData object can't be null"); + final List parameterFields = new ArrayList<>(); + + for (int parameterCounter = 1; parameterCounter <= parameterMetaData.getParameterCount(); parameterCounter++) { + final int jdbcDataType = parameterMetaData.getParameterType(parameterCounter); + + final int jdbcIsNullable = parameterMetaData.isNullable(parameterCounter); + final boolean arrowIsNullable = jdbcIsNullable == ParameterMetaData.parameterNullable; + + final int precision = parameterMetaData.getPrecision(parameterCounter); + final int scale = parameterMetaData.getScale(parameterCounter); + + final ArrowType arrowType = getArrowTypeFromJDBCType(jdbcDataType, precision, scale); + + final FieldType fieldType = new FieldType(arrowIsNullable, arrowType, /*dictionary=*/null); + parameterFields.add(new Field(null, fieldType, null)); + } + final Schema pojoParameterMetaDataSchema = new Schema(parameterFields); + return pojoParameterMetaDataSchema; + } + + @Override + public void close() throws Exception { + try { + commandExecutePreparedStatementLoadingCache.cleanUp(); + } catch (Throwable e) { + // Swallow + } + + try { + preparedStatementLoadingCache.cleanUp(); + } catch (Throwable e) { + // Swallow + } + + AutoCloseables.close(dataSource); + } + + private static class CommandExecutePreparedStatementRemovalListener + implements RemovalListener { + @Override + public void onRemoval(RemovalNotification notification) { + try { + AutoCloseables.close(notification.getValue()); + } catch (Throwable e) { + // Swallow + } + } + } + + private static class CommandExecutePreparedStatementCacheLoader + extends CacheLoader { + + private final LoadingCache preparedStatementLoadingCache; + + private CommandExecutePreparedStatementCacheLoader(LoadingCache preparedStatementLoadingCache) { + this.preparedStatementLoadingCache = preparedStatementLoadingCache; + } + + @Override + public ResultSet load(CommandPreparedStatementQuery commandExecutePreparedStatement) + throws SQLException, InvalidProtocolBufferException, ExecutionException { + final PreparedStatementCacheKey preparedStatementCacheKey = + PreparedStatementCacheKey.fromProtocol(commandExecutePreparedStatement.getPreparedStatementHandle()); + final PreparedStatementContext preparedStatementContext = preparedStatementLoadingCache + .get(preparedStatementCacheKey); + return preparedStatementContext.getPreparedStatement().executeQuery(); + } + } + + + private static class PreparedStatementRemovalListener implements RemovalListener { + @Override + public void onRemoval(RemovalNotification notification) { + try { + AutoCloseables.close(notification.getValue()); + } catch (Throwable e) { + // swallow + } + } + } + + private static class PreparedStatementCacheLoader extends CacheLoader { + + // Owned by parent class. + private final PoolingDataSource dataSource; + + private PreparedStatementCacheLoader(PoolingDataSource dataSource) { + this.dataSource = dataSource; + } + + @Override + public PreparedStatementContext load(PreparedStatementCacheKey key) throws SQLException { + + // Ownership of the connection will be passed to the context. + final Connection connection = dataSource.getConnection(); + try { + final PreparedStatement preparedStatement = connection.prepareStatement(key.getSql()); + return new PreparedStatementContext(connection, preparedStatement); + } catch (SQLException e) { + connection.close(); + throw e; + } + } + } + + private static void removeDerbyDatabaseIfExists() { + final Path path = Paths.get("target" + File.separator + "derbyDB"); + + try (final Stream walk = Files.walk(path)) { + walk.sorted(Comparator.reverseOrder()) + .map(Path::toFile) + .forEach(File::delete); + } catch (NoSuchFileException e) { + // Ignore as there was no data directory to clean up. + } catch (IOException e) { + throw new RuntimeException("Failed to remove derby data directory.", e); + } + } + + private static void populateDerbyDatabase() { + try (final Connection conn = DriverManager.getConnection("jdbc:derby:target/derbyDB;create=true")) { + conn.createStatement().execute("CREATE TABLE intTable (keyName varchar(100), value int)"); + conn.createStatement().execute("INSERT INTO intTable (keyName, value) VALUES ('one', 1)"); + conn.createStatement().execute("INSERT INTO intTable (keyName, value) VALUES ('zero', 0)"); + conn.createStatement().execute("INSERT INTO intTable (keyName, value) VALUES ('negative one', -1)"); + } catch (SQLException e) { + throw new RuntimeException("Failed to create derby database.", e); + } + } + + + @Override + public void listFlights(CallContext context, Criteria criteria, StreamListener listener) { + // TODO - build example implementation + throw Status.UNIMPLEMENTED.asRuntimeException(); + } + + @Override + public FlightInfo getFlightInfoStatement(CommandStatementQuery command, FlightDescriptor descriptor, + CallContext context) { + // TODO - build example implementation + throw Status.UNIMPLEMENTED.asRuntimeException(); + } + + @Override + public SchemaResult getSchema(CallContext context, FlightDescriptor descriptor) { + // TODO - build example implementation + throw Status.UNIMPLEMENTED.asRuntimeException(); + } + + @Override + public void doExchange(CallContext context, FlightStream reader, ServerStreamListener writer) { + // TODO - build example implementation + throw Status.UNIMPLEMENTED.asRuntimeException(); + } + + @Override + public void getSqlInfo(CallContext context, StreamListener listener) { + // TODO - build example implementation + throw Status.UNIMPLEMENTED.asRuntimeException(); + } + + @Override + public void getCatalogs(FlightSql.ActionGetCatalogsRequest request, CallContext context, + StreamListener listener) { + // TODO - build example implementation + throw Status.UNIMPLEMENTED.asRuntimeException(); + } + + @Override + public void getSchemas(FlightSql.ActionGetSchemasRequest request, CallContext context, + StreamListener listener) { + // TODO - build example implementation + throw Status.UNIMPLEMENTED.asRuntimeException(); + } + + @Override + public void getTableTypes(CallContext context, StreamListener listener) { + // TODO - build example implementation + throw Status.UNIMPLEMENTED.asRuntimeException(); + } + + @Override + public SchemaResult getSchemaStatement(CommandStatementQuery command, FlightDescriptor descriptor, + CallContext context) { + // TODO - build example implementation + throw Status.UNIMPLEMENTED.asRuntimeException(); + } + + @Override + public Runnable acceptPutStatement(CommandStatementUpdate command, + CallContext context, FlightStream flightStream, StreamListener ackStream) { + // TODO - build example implementation + throw Status.UNIMPLEMENTED.asRuntimeException(); + } + + @Override + public Runnable acceptPutPreparedStatementUpdate(CommandPreparedStatementUpdate command, CallContext context, + FlightStream flightStream, StreamListener ackStream) { + // TODO - build example implementation + throw Status.UNIMPLEMENTED.asRuntimeException(); + } + + @Override + public Runnable acceptPutPreparedStatementQuery(CommandPreparedStatementQuery command, CallContext context, + FlightStream flightStream, StreamListener ackStream) { + // TODO - build example implementation + throw Status.UNIMPLEMENTED.asRuntimeException(); + } + + @Override + public void getStreamStatement(CommandStatementQuery command, CallContext context, Ticket ticket, + ServerStreamListener listener) { + throw Status.UNIMPLEMENTED.asRuntimeException(); + } + + + /** + * Converts {@link java.sql.Types} values returned from JDBC Apis to Arrow types. + * + * @param jdbcDataType {@link java.sql.Types} value. + * @param precision Precision of the type. + * @param scale Scale of the type. + * @return The Arrow equivalent type. + */ + static ArrowType getArrowTypeFromJDBCType(int jdbcDataType, int precision, int scale) { + switch (jdbcDataType) { + case Types.BIT: + case Types.BOOLEAN: + return ArrowType.Bool.INSTANCE; + case Types.TINYINT: + return new ArrowType.Int(BIT_WIDTH_8, IS_SIGNED_TRUE); + case Types.SMALLINT: + return new ArrowType.Int(BIT_WIDTH_16, IS_SIGNED_TRUE); + case Types.INTEGER: + return new ArrowType.Int(BIT_WIDTH_32, IS_SIGNED_TRUE); + case Types.BIGINT: + return new ArrowType.Int(BIT_WIDTH_64, IS_SIGNED_TRUE); + case Types.FLOAT: + case Types.REAL: + return new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE); + case Types.DOUBLE: + return new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE); + case Types.NUMERIC: + case Types.DECIMAL: + return new ArrowType.Decimal(precision, scale); + case Types.DATE: + return new ArrowType.Date(DateUnit.DAY); + case Types.TIME: + return new ArrowType.Time(TimeUnit.MILLISECOND, BIT_WIDTH_32); + case Types.TIMESTAMP: + return new ArrowType.Timestamp(TimeUnit.MILLISECOND, null); + case Types.BINARY: + case Types.VARBINARY: + case Types.LONGVARBINARY: + return ArrowType.Binary.INSTANCE; + case Types.NULL: + return ArrowType.Null.INSTANCE; + + case Types.CHAR: + case Types.VARCHAR: + case Types.LONGVARCHAR: + case Types.CLOB: + case Types.NCHAR: + case Types.NVARCHAR: + case Types.LONGNVARCHAR: + case Types.NCLOB: + + case Types.OTHER: + case Types.JAVA_OBJECT: + case Types.DISTINCT: + case Types.STRUCT: + case Types.ARRAY: + case Types.BLOB: + case Types.REF: + case Types.DATALINK: + case Types.ROWID: + case Types.SQLXML: + case Types.REF_CURSOR: + case Types.TIME_WITH_TIMEZONE: + case Types.TIMESTAMP_WITH_TIMEZONE: + default: + return ArrowType.Utf8.INSTANCE; + } + } +} diff --git a/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/PreparedStatementCacheKey.java b/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/PreparedStatementCacheKey.java new file mode 100644 index 00000000000..cc8db427b55 --- /dev/null +++ b/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/PreparedStatementCacheKey.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.sql; + +import java.util.Objects; + +import org.apache.arrow.flight.sql.impl.FlightSqlExample.PreparedStatementHandle; +import org.apache.arrow.util.Preconditions; + +import com.google.protobuf.Any; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; + +class PreparedStatementCacheKey { + + private final String uuid; + private final String sql; + + PreparedStatementCacheKey(final String uuid, final String sql) { + this.uuid = uuid; + this.sql = sql; + } + + String getUuid() { + return uuid; + } + + String getSql() { + return sql; + } + + ByteString toProtocol() { + return Any.pack(org.apache.arrow.flight.sql.impl.FlightSqlExample.PreparedStatementHandle + .newBuilder() + .setSql(getSql()) + .setUuid(getUuid()) + .build()) + .toByteString(); + } + + static PreparedStatementCacheKey fromProtocol(ByteString byteString) throws InvalidProtocolBufferException { + final Any parsed = Any.parseFrom(byteString); + Preconditions.checkArgument(parsed.is(PreparedStatementHandle.class)); + + final PreparedStatementHandle preparedStatementHandle = parsed.unpack(PreparedStatementHandle.class); + return new PreparedStatementCacheKey(preparedStatementHandle.getUuid(), preparedStatementHandle.getSql()); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof PreparedStatementCacheKey)) { + return false; + } + + PreparedStatementCacheKey that = (PreparedStatementCacheKey) o; + + return Objects.equals(uuid, that.uuid) && + Objects.equals(sql, that.sql); + } + + @Override + public int hashCode() { + return Objects.hash(uuid, sql); + } +} diff --git a/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/PreparedStatementContext.java b/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/PreparedStatementContext.java new file mode 100644 index 00000000000..cd38255fd03 --- /dev/null +++ b/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/PreparedStatementContext.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.sql; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.util.Objects; + +import org.apache.arrow.util.AutoCloseables; + +class PreparedStatementContext implements AutoCloseable { + + private final Connection connection; + private final PreparedStatement preparedStatement; + + PreparedStatementContext(Connection connection, PreparedStatement preparedStatement) { + this.preparedStatement = preparedStatement; + this.connection = connection; + } + + PreparedStatement getPreparedStatement() { + return preparedStatement; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (!(o instanceof PreparedStatementContext)) { + return false; + } + + PreparedStatementContext that = (PreparedStatementContext) o; + + return Objects.equals(connection, that.connection) && + Objects.equals(preparedStatement, that.preparedStatement); + } + + @Override + public int hashCode() { + return Objects.hash(connection, preparedStatement); + } + + @Override + public void close() throws Exception { + AutoCloseables.close(preparedStatement, connection); + } +} diff --git a/java/flight/flight-sql/src/test/protobuf/flightSqlExample.proto b/java/flight/flight-sql/src/test/protobuf/flightSqlExample.proto new file mode 100644 index 00000000000..c6ebfcabaf8 --- /dev/null +++ b/java/flight/flight-sql/src/test/protobuf/flightSqlExample.proto @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

    + * http://www.apache.org/licenses/LICENSE-2.0 + *

    + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto3"; + +option java_package = "org.apache.arrow.flight.sql.impl"; + +message PreparedStatementHandle { + string uuid = 1; + string sql = 2; +} diff --git a/java/pom.xml b/java/pom.xml index c8f79e77743..71376ee9935 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -673,6 +673,7 @@ plasma flight/flight-core flight/flight-grpc + flight/flight-sql performance algorithm adapter/avro