From de576726c1c2c49da6bfa47903aeb03c7485b3c9 Mon Sep 17 00:00:00 2001 From: Zou Xinyi Date: Tue, 31 Dec 2024 17:53:06 +0800 Subject: [PATCH 1/3] 1 --- .../arrowflight/DorisFlightSqlProducer.java | 72 +++- .../arrowflight/FlightSqlSchemaHelper.java | 389 ++++++++++++++++++ .../arrowflight/auth2/FlightAuthUtils.java | 7 +- 3 files changed, 456 insertions(+), 12 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlSchemaHelper.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java index b968ab04c57c83..59676639ad18b2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java @@ -72,10 +72,12 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.ipc.WriteChannel; import org.apache.arrow.vector.ipc.message.MessageSerializer; import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.arrow.vector.util.Text; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -131,21 +133,19 @@ private void getStreamStatementResult(String handle, ServerStreamListener listen String[] handleParts = handle.split(":"); String executedPeerIdentity = handleParts[0]; String queryId = handleParts[1]; + // The tokens used for authentication between getStreamStatement and getFlightInfoStatement are different. ConnectContext connectContext = flightSessionsManager.getConnectContext(executedPeerIdentity); try { - // The tokens used for authentication between getStreamStatement and getFlightInfoStatement are different. final FlightSqlResultCacheEntry flightSqlResultCacheEntry = Objects.requireNonNull( connectContext.getFlightSqlChannel().getResult(queryId)); final VectorSchemaRoot vectorSchemaRoot = flightSqlResultCacheEntry.getVectorSchemaRoot(); listener.start(vectorSchemaRoot); listener.putNext(); } catch (Exception e) { - listener.error(e); String errMsg = "get stream statement failed, " + e.getMessage() + ", " + Util.getRootCauseMessage(e) + ", error code: " + connectContext.getState().getErrorCode() + ", error msg: " + connectContext.getState().getErrorMessage(); - LOG.warn(errMsg, e); - throw CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException(); + handleStreamException(e, errMsg, listener); } finally { listener.completed(); // The result has been sent or sent failed, delete it. @@ -280,7 +280,7 @@ private FlightInfo executeQueryStatement(String peerIdentity, ConnectContext con String errMsg = "get flight info statement failed, " + e.getMessage() + ", " + Util.getRootCauseMessage(e) + ", error code: " + connectContext.getState().getErrorCode() + ", error msg: " + connectContext.getState().getErrorMessage(); - LOG.warn(errMsg, e); + LOG.error(errMsg, e); throw CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException(); } finally { connectContext.setCommand(MysqlCommand.COM_SLEEP); @@ -361,7 +361,7 @@ public void createPreparedStatement(final ActionCreatePreparedStatementRequest r String errMsg = "create prepared statement failed, " + e.getMessage() + ", " + Util.getRootCauseMessage( e) + ", error code: " + connectContext.getState().getErrorCode() + ", error msg: " + connectContext.getState().getErrorMessage(); - LOG.warn(errMsg, e); + LOG.error(errMsg, e); listener.onError(CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException()); return; } catch (final Throwable t) { @@ -407,7 +407,7 @@ public Runnable acceptPutPreparedStatementUpdate(CommandPreparedStatementUpdate } catch (Exception e) { String errMsg = "acceptPutPreparedStatementUpdate failed, " + e.getMessage() + ", " + Util.getRootCauseMessage(e); - LOG.warn(errMsg, e); + LOG.error(errMsg, e); throw CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException(); } }; @@ -451,7 +451,21 @@ public FlightInfo getFlightInfoCatalogs(final CommandGetCatalogs request, final @Override public void getStreamCatalogs(final CallContext context, final ServerStreamListener listener) { - throw CallStatus.UNIMPLEMENTED.withDescription("getStreamCatalogs unimplemented").toRuntimeException(); + try (final VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(Schemas.GET_CATALOGS_SCHEMA, + rootAllocator)) { + listener.start(vectorSchemaRoot); + vectorSchemaRoot.allocateNew(); + VarCharVector catalogNameVector = (VarCharVector) vectorSchemaRoot.getVector("catalog_name"); + // Only show Internal Catalog, which is consistent with `jdbc:mysql`. + // Otherwise, if the configured ExternalCatalog cannot be connected, + // `catalog.getAllDbs()` will be stuck and wait until the timeout period ends. + catalogNameVector.setSafe(0, new Text("internal")); + vectorSchemaRoot.setRowCount(1); + listener.putNext(); + listener.completed(); + } catch (final Exception ex) { + handleStreamException(ex, "", listener); + } } @Override @@ -463,7 +477,22 @@ public FlightInfo getFlightInfoSchemas(final CommandGetDbSchemas request, final @Override public void getStreamSchemas(final CommandGetDbSchemas command, final CallContext context, final ServerStreamListener listener) { - throw CallStatus.UNIMPLEMENTED.withDescription("getStreamSchemas unimplemented").toRuntimeException(); + try { + ConnectContext connectContext = flightSessionsManager.getConnectContext(context.peerIdentity()); + FlightSqlSchemaHelper flightSqlSchemaHelper = new FlightSqlSchemaHelper(connectContext); + flightSqlSchemaHelper.setParameterForGetDbSchemas(command); + final Schema schema = Schemas.GET_SCHEMAS_SCHEMA; + + try (VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, rootAllocator)) { + listener.start(vectorSchemaRoot); + vectorSchemaRoot.allocateNew(); + flightSqlSchemaHelper.getSchemas(vectorSchemaRoot); + listener.putNext(); + listener.completed(); + } + } catch (final Exception e) { + handleStreamException(e, "", listener); + } } @Override @@ -479,7 +508,23 @@ public FlightInfo getFlightInfoTables(final CommandGetTables request, final Call @Override public void getStreamTables(final CommandGetTables command, final CallContext context, final ServerStreamListener listener) { - throw CallStatus.UNIMPLEMENTED.withDescription("getStreamTables unimplemented").toRuntimeException(); + try { + ConnectContext connectContext = flightSessionsManager.getConnectContext(context.peerIdentity()); + FlightSqlSchemaHelper flightSqlSchemaHelper = new FlightSqlSchemaHelper(connectContext); + flightSqlSchemaHelper.setParameterForGetTables(command); + final Schema schema = command.getIncludeSchema() ? Schemas.GET_TABLES_SCHEMA + : Schemas.GET_TABLES_SCHEMA_NO_SCHEMA; + + try (VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, rootAllocator)) { + listener.start(vectorSchemaRoot); + vectorSchemaRoot.allocateNew(); + flightSqlSchemaHelper.getTables(vectorSchemaRoot); + listener.putNext(); + listener.completed(); + } + } catch (final Exception e) { + handleStreamException(e, "", listener); + } } @Override @@ -545,9 +590,14 @@ public void getStreamCrossReference(CommandGetCrossReference command, CallContex private FlightInfo getFlightInfoForSchema(final T request, final FlightDescriptor descriptor, final Schema schema) { final Ticket ticket = new Ticket(Any.pack(request).toByteArray()); - // TODO Support multiple endpoints. final List endpoints = Collections.singletonList(new FlightEndpoint(ticket, location)); return new FlightInfo(schema, descriptor, endpoints, -1, -1); } + + private static void handleStreamException(Exception e, String errMsg, ServerStreamListener listener) { + LOG.error(errMsg, e); + listener.error(CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException()); + throw CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlSchemaHelper.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlSchemaHelper.java new file mode 100644 index 00000000000000..eec1c47d9c21ca --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlSchemaHelper.java @@ -0,0 +1,389 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.service.arrowflight; + +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.service.ExecuteEnv; +import org.apache.doris.service.FrontendServiceImpl; +import org.apache.doris.thrift.TColumnDef; +import org.apache.doris.thrift.TColumnDesc; +import org.apache.doris.thrift.TDescribeTablesParams; +import org.apache.doris.thrift.TDescribeTablesResult; +import org.apache.doris.thrift.TGetDbsParams; +import org.apache.doris.thrift.TGetDbsResult; +import org.apache.doris.thrift.TGetTablesParams; +import org.apache.doris.thrift.TListTableStatusResult; +import org.apache.doris.thrift.TTableStatus; + +import com.google.common.base.Preconditions; +import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils; +import org.apache.arrow.flight.sql.FlightSqlColumnMetadata; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetDbSchemas; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetTables; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ZeroVector; +import org.apache.arrow.vector.complex.BaseRepeatedValueVector; +import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.ipc.WriteChannel; +import org.apache.arrow.vector.ipc.message.MessageSerializer; +import org.apache.arrow.vector.types.DateUnit; +import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.TimeUnit; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.arrow.vector.util.Text; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.thrift.TException; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.channels.Channels; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class FlightSqlSchemaHelper { + private static final Logger LOG = LogManager.getLogger(FlightSqlSchemaHelper.class); + private final ConnectContext ctx; + private final FrontendServiceImpl impl; + private boolean includeSchema; + private String catalogFilterPattern; + private String dbSchemaFilterPattern; + private String tableNameFilterPattern; + private List tableTypesList; + private static final byte[] EMPTY_SERIALIZED_SCHEMA = getSerializedSchema(Collections.emptyList()); + + public FlightSqlSchemaHelper(ConnectContext context) { + ctx = context; + impl = new FrontendServiceImpl(ExecuteEnv.getInstance()); + } + + /** + * Set in the Tables request object the parameter that user passed via CommandGetTables. + */ + public void setParameterForGetTables(CommandGetTables command) { + includeSchema = command.getIncludeSchema(); + // Only show Internal Catalog, which is consistent with `jdbc:mysql`. + // Otherwise, if the configured ExternalCatalog cannot be connected, + // `catalog.getAllDbs()` will be stuck and wait until the timeout period ends. + catalogFilterPattern = command.hasCatalog() ? command.getCatalog() : "internal"; + dbSchemaFilterPattern = command.hasDbSchemaFilterPattern() ? command.getDbSchemaFilterPattern() : null; + if (command.hasTableNameFilterPattern()) { + if (command.getTableNameFilterPattern().contains(".")) { + Preconditions.checkState(command.getTableNameFilterPattern().split("\\.", -1).length == 2); + dbSchemaFilterPattern = command.getTableNameFilterPattern().split("\\.", -1)[0]; + tableNameFilterPattern = command.getTableNameFilterPattern().split("\\.", -1)[1]; + } else { + tableNameFilterPattern = command.getTableNameFilterPattern(); + } + } else { + tableNameFilterPattern = null; + } + tableTypesList = command.getTableTypesList().isEmpty() ? null : command.getTableTypesList(); + } + + /** + * Set in the Schemas request object the parameter that user passed via CommandGetDbSchemas. + */ + public void setParameterForGetDbSchemas(CommandGetDbSchemas command) { + catalogFilterPattern = command.hasCatalog() ? command.getCatalog() : "internal"; + dbSchemaFilterPattern = command.hasDbSchemaFilterPattern() ? command.getDbSchemaFilterPattern() : null; + } + + /** + * Call FrontendServiceImpl->getDbNames. + */ + private TGetDbsResult getDbNames() throws TException { + TGetDbsParams getDbsParams = new TGetDbsParams(); + getDbsParams.setCatalog(catalogFilterPattern); + if (dbSchemaFilterPattern != null) { + getDbsParams.setPattern(dbSchemaFilterPattern); + } + getDbsParams.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift()); + return impl.getDbNames(getDbsParams); + } + + /** + * Call FrontendServiceImpl->listTableStatus. + */ + private TListTableStatusResult listTableStatus(String dbName, String catalogName) throws TException { + TGetTablesParams getTablesParams = new TGetTablesParams(); + getTablesParams.setDb(dbName); + if (!catalogName.isEmpty()) { + getTablesParams.setCatalog(catalogName); + } + if (tableNameFilterPattern != null) { + getTablesParams.setPattern(tableNameFilterPattern); + } + if (tableTypesList != null) { + getTablesParams.setType(tableTypesList.get(0)); // currently only one type is supported. + } + getTablesParams.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift()); + return impl.listTableStatus(getTablesParams); + } + + /** + * Call FrontendServiceImpl->describeTables. + */ + private TDescribeTablesResult describeTables(String dbName, String catalogName, List tablesName) + throws TException { + TDescribeTablesParams describeTablesParams = new TDescribeTablesParams(); + describeTablesParams.setDb(dbName); + if (!catalogName.isEmpty()) { + describeTablesParams.setCatalog(catalogName); + } + describeTablesParams.setTablesName(tablesName); + describeTablesParams.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift()); + return impl.describeTables(describeTablesParams); + } + + /** + * Convert Doris data type to an arrowType. + *

+ * Ref: `convert_to_arrow_type` in be/src/util/arrow/row_batch.cpp. + * which is consistent with the type of Arrow data returned by Doris Arrow Flight Sql query. + */ + private static ArrowType getArrowType(PrimitiveType primitiveType, Integer precision, Integer scale, + String timeZone) { + switch (primitiveType) { + case BOOLEAN: + return new ArrowType.Bool(); + case TINYINT: + return new ArrowType.Int(8, true); + case SMALLINT: + return new ArrowType.Int(16, true); + case INT: + case IPV4: + return new ArrowType.Int(32, true); + case BIGINT: + return new ArrowType.Int(64, true); + case FLOAT: + return new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE); + case DOUBLE: + return new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE); + case LARGEINT: + case VARCHAR: + case STRING: + case CHAR: + case DATETIME: + case DATE: + case JSONB: + case IPV6: + case VARIANT: + return new ArrowType.Utf8(); + case DATEV2: + return new ArrowType.Date(DateUnit.MILLISECOND); + case DATETIMEV2: + if (scale > 3) { + return new ArrowType.Timestamp(TimeUnit.MICROSECOND, timeZone); + } else if (scale > 0) { + return new ArrowType.Timestamp(TimeUnit.MILLISECOND, timeZone); + } else { + return new ArrowType.Timestamp(TimeUnit.SECOND, timeZone); + } + case DECIMAL32: + case DECIMAL64: + case DECIMAL128: + return new ArrowType.Decimal(precision, scale, 128); + case DECIMAL256: + return new ArrowType.Decimal(precision, scale, 256); + case DECIMALV2: + return new ArrowType.Decimal(27, 9, 128); + case HLL: + case BITMAP: + case QUANTILE_STATE: + return new ArrowType.Binary(); + case MAP: + return new ArrowType.Map(false); + case ARRAY: + return new ArrowType.List(); + case STRUCT: + return new ArrowType.Struct(); + default: + return new ArrowType.Null(); + } + } + + private static ArrowType columnDescToArrowType(final TColumnDesc desc) { + PrimitiveType primitiveType = PrimitiveType.fromThrift(desc.getColumnType()); + Integer precision = desc.isSetColumnPrecision() ? desc.getColumnPrecision() : null; + Integer scale = desc.isSetColumnScale() ? desc.getColumnScale() : null; + // TODO there is no timezone in TColumnDesc, so use current timezone. + String timeZone = JdbcToArrowUtils.getUtcCalendar().getTimeZone().getID(); + return getArrowType(primitiveType, precision, scale, timeZone); + } + + private static Map createFlightSqlColumnMetadata(final String dbName, final String tableName, + final TColumnDesc desc) { + final FlightSqlColumnMetadata.Builder columnMetadataBuilder = new FlightSqlColumnMetadata.Builder().schemaName( + dbName).tableName(tableName).typeName(PrimitiveType.fromThrift(desc.getColumnType()).toString()) + .isAutoIncrement(false).isCaseSensitive(false).isReadOnly(true).isSearchable(true); + + if (desc.isSetColumnPrecision()) { + columnMetadataBuilder.precision(desc.getColumnPrecision()); + } + if (desc.isSetColumnScale()) { + columnMetadataBuilder.scale(desc.getColumnScale()); + } + return columnMetadataBuilder.build().getMetadataMap(); + } + + /** + * Construct > + */ + private Map> buildTableToFields(String dbName, TDescribeTablesResult describeTablesResult, + List tablesName) { + Map> tableToFields = new HashMap<>(); + int columnIndex = 0; + for (int tableIndex = 0; tableIndex < describeTablesResult.getTablesOffsetSize(); tableIndex++) { + String tableName = tablesName.get(tableIndex); + final List fields = new ArrayList<>(); + Integer tableOffset = describeTablesResult.getTablesOffset().get(tableIndex); + for (; columnIndex < tableOffset; columnIndex++) { + TColumnDef columnDef = describeTablesResult.getColumns().get(columnIndex); + TColumnDesc columnDesc = columnDef.getColumnDesc(); + final ArrowType columnArrowType = columnDescToArrowType(columnDesc); + + List columnArrowTypeChildren; + // Arrow complex types may require children fields for parsing the schema on C++ + switch (columnArrowType.getTypeID()) { + case List: + case LargeList: + case FixedSizeList: + columnArrowTypeChildren = Collections.singletonList( + Field.notNullable(BaseRepeatedValueVector.DATA_VECTOR_NAME, + ZeroVector.INSTANCE.getField().getType())); + break; + case Map: + columnArrowTypeChildren = Collections.singletonList( + Field.notNullable(MapVector.DATA_VECTOR_NAME, new ArrowType.List())); + break; + case Struct: + columnArrowTypeChildren = Collections.emptyList(); + break; + default: + columnArrowTypeChildren = null; + break; + } + + final Field field = new Field(columnDesc.getColumnName(), + new FieldType(columnDesc.isIsAllowNull(), columnArrowType, null, + createFlightSqlColumnMetadata(dbName, tableName, columnDesc)), columnArrowTypeChildren); + fields.add(field); + } + tableToFields.put(tableName, fields); + } + return tableToFields; + } + + protected static byte[] getSerializedSchema(List fields) { + if (EMPTY_SERIALIZED_SCHEMA == null && fields == null) { + fields = Collections.emptyList(); + } else if (fields == null) { + return Arrays.copyOf(EMPTY_SERIALIZED_SCHEMA, EMPTY_SERIALIZED_SCHEMA.length); + } + + final ByteArrayOutputStream columnOutputStream = new ByteArrayOutputStream(); + final Schema schema = new Schema(fields); + + try { + MessageSerializer.serialize(new WriteChannel(Channels.newChannel(columnOutputStream)), schema); + } catch (final IOException e) { + throw new RuntimeException("IO Error when serializing schema '" + schema + "'.", e); + } + + return columnOutputStream.toByteArray(); + } + + /** + * for FlightSqlProducer Schemas.GET_SCHEMAS_SCHEMA + */ + public void getSchemas(VectorSchemaRoot vectorSchemaRoot) throws TException { + VarCharVector catalogNameVector = (VarCharVector) vectorSchemaRoot.getVector("catalog_name"); + VarCharVector schemaNameVector = (VarCharVector) vectorSchemaRoot.getVector("db_schema_name"); + + TGetDbsResult getDbsResult = getDbNames(); + for (int dbIndex = 0; dbIndex < getDbsResult.getDbs().size(); dbIndex++) { + String dbName = getDbsResult.getDbs().get(dbIndex); + String catalogName = getDbsResult.isSetCatalogs() ? getDbsResult.getCatalogs().get(dbIndex) : ""; + catalogNameVector.setSafe(dbIndex, new Text(catalogName)); + schemaNameVector.setSafe(dbIndex, new Text(dbName)); + } + vectorSchemaRoot.setRowCount(getDbsResult.getDbs().size()); + } + + /** + * for FlightSqlProducer Schemas.GET_TABLES_SCHEMA_NO_SCHEMA and Schemas.GET_TABLES_SCHEMA + */ + public void getTables(VectorSchemaRoot vectorSchemaRoot) throws TException { + VarCharVector catalogNameVector = (VarCharVector) vectorSchemaRoot.getVector("catalog_name"); + VarCharVector schemaNameVector = (VarCharVector) vectorSchemaRoot.getVector("db_schema_name"); + VarCharVector tableNameVector = (VarCharVector) vectorSchemaRoot.getVector("table_name"); + VarCharVector tableTypeVector = (VarCharVector) vectorSchemaRoot.getVector("table_type"); + VarBinaryVector schemaVector = (VarBinaryVector) vectorSchemaRoot.getVector("table_schema"); + + int tablesCount = 0; + TGetDbsResult getDbsResult = getDbNames(); + for (int dbIndex = 0; dbIndex < getDbsResult.getDbs().size(); dbIndex++) { + String dbName = getDbsResult.getDbs().get(dbIndex); + String catalogName = getDbsResult.isSetCatalogs() ? getDbsResult.getCatalogs().get(dbIndex) : ""; + TListTableStatusResult listTableStatusResult = listTableStatus(dbName, catalogName); + + Map> tableToFields; + if (includeSchema) { + List tablesName = new ArrayList<>(); + for (TTableStatus tableStatus : listTableStatusResult.getTables()) { + tablesName.add(tableStatus.getName()); + } + TDescribeTablesResult describeTablesResult = describeTables(dbName, catalogName, tablesName); + tableToFields = buildTableToFields(dbName, describeTablesResult, tablesName); + } else { + tableToFields = null; + } + + for (TTableStatus tableStatus : listTableStatusResult.getTables()) { + catalogNameVector.setSafe(tablesCount, new Text(catalogName)); + schemaNameVector.setSafe(tablesCount, new Text(dbName)); + // DBeaver uses `Arrow Flight SQL JDBC Driver Core [16.1.0]` driver to connect to Doris. + // The metadata only shows one layer of `Tables`. All tables will be displayed together, + // so the database name and table name are spelled together for distinction. + // When DBeaver uses `MySQL Connector/J [mysql-connector-j-8.2.0` driver to connect to Doris, + // the metadata will show two layers of `Databases - Tables`. + // + // TODO, show two layers of original data `Databases - Tables` in DBeaver. + tableNameVector.setSafe(tablesCount, new Text(dbName + "." + tableStatus.getName())); + tableTypeVector.setSafe(tablesCount, new Text(tableStatus.getType())); + if (includeSchema) { + List fields = tableToFields.get(tableStatus.getName()); + schemaVector.setSafe(tablesCount, getSerializedSchema(fields)); + } + tablesCount++; + } + } + vectorSchemaRoot.setRowCount(tablesCount); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightAuthUtils.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightAuthUtils.java index b605dff66b6a21..c93e0b5a309442 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightAuthUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightAuthUtils.java @@ -49,7 +49,12 @@ public static FlightAuthResult authenticateCredentials(String username, String p Logger logger) { try { List currentUserIdentity = Lists.newArrayList(); - + // If the password is empty, DBeaver will pass "null" string for authentication. + // This behavior of DBeaver is strange, but we have to be compatible with it, of course, + // it may be a problem with Arrow Flight Jdbc driver. + // Here, "null" is converted to null, if user's password is really the string "null", + // authentication will fail. Usually, the user's password will not be "null", let's hope so. + password = (password.equals("null")) ? null : password; Env.getCurrentEnv().getAuth().checkPlainPassword(username, remoteIp, password, currentUserIdentity); Preconditions.checkState(currentUserIdentity.size() == 1); return FlightAuthResult.of(username, currentUserIdentity.get(0), remoteIp); From 9e219a911705a8cc1127bf6a2fd11ec3fb8c2aee Mon Sep 17 00:00:00 2001 From: Zou Xinyi Date: Thu, 2 Jan 2025 00:12:40 +0800 Subject: [PATCH 2/3] 3 --- .../arrowflight/DorisFlightSqlProducer.java | 31 ++- .../arrowflight/FlightSqlSchemaHelper.java | 215 +++++++++--------- 2 files changed, 123 insertions(+), 123 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java index 59676639ad18b2..154fd9f0b6b83c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java @@ -72,12 +72,10 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.util.AutoCloseables; -import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.ipc.WriteChannel; import org.apache.arrow.vector.ipc.message.MessageSerializer; import org.apache.arrow.vector.types.pojo.Schema; -import org.apache.arrow.vector.util.Text; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -451,20 +449,20 @@ public FlightInfo getFlightInfoCatalogs(final CommandGetCatalogs request, final @Override public void getStreamCatalogs(final CallContext context, final ServerStreamListener listener) { - try (final VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(Schemas.GET_CATALOGS_SCHEMA, - rootAllocator)) { - listener.start(vectorSchemaRoot); - vectorSchemaRoot.allocateNew(); - VarCharVector catalogNameVector = (VarCharVector) vectorSchemaRoot.getVector("catalog_name"); - // Only show Internal Catalog, which is consistent with `jdbc:mysql`. - // Otherwise, if the configured ExternalCatalog cannot be connected, - // `catalog.getAllDbs()` will be stuck and wait until the timeout period ends. - catalogNameVector.setSafe(0, new Text("internal")); - vectorSchemaRoot.setRowCount(1); - listener.putNext(); - listener.completed(); - } catch (final Exception ex) { - handleStreamException(ex, "", listener); + try { + ConnectContext connectContext = flightSessionsManager.getConnectContext(context.peerIdentity()); + FlightSqlSchemaHelper flightSqlSchemaHelper = new FlightSqlSchemaHelper(connectContext); + final Schema schema = Schemas.GET_CATALOGS_SCHEMA; + + try (final VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, rootAllocator)) { + listener.start(vectorSchemaRoot); + vectorSchemaRoot.allocateNew(); + flightSqlSchemaHelper.getCatalogs(vectorSchemaRoot); + listener.putNext(); + listener.completed(); + } + } catch (final Exception e) { + handleStreamException(e, "", listener); } } @@ -547,7 +545,6 @@ public FlightInfo getFlightInfoPrimaryKeys(final CommandGetPrimaryKeys request, @Override public void getStreamPrimaryKeys(final CommandGetPrimaryKeys command, final CallContext context, final ServerStreamListener listener) { - throw CallStatus.UNIMPLEMENTED.withDescription("getStreamPrimaryKeys unimplemented").toRuntimeException(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlSchemaHelper.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlSchemaHelper.java index eec1c47d9c21ca..577d46d9073ab0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlSchemaHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlSchemaHelper.java @@ -31,7 +31,6 @@ import org.apache.doris.thrift.TListTableStatusResult; import org.apache.doris.thrift.TTableStatus; -import com.google.common.base.Preconditions; import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils; import org.apache.arrow.flight.sql.FlightSqlColumnMetadata; import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetDbSchemas; @@ -63,103 +62,27 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; public class FlightSqlSchemaHelper { private static final Logger LOG = LogManager.getLogger(FlightSqlSchemaHelper.class); private final ConnectContext ctx; private final FrontendServiceImpl impl; private boolean includeSchema; - private String catalogFilterPattern; - private String dbSchemaFilterPattern; - private String tableNameFilterPattern; - private List tableTypesList; - private static final byte[] EMPTY_SERIALIZED_SCHEMA = getSerializedSchema(Collections.emptyList()); + private String catalogFilterPattern = null; + private String dbSchemaFilterPattern = null; + private String tableNameFilterPattern = null; + private List tableTypesList = null; public FlightSqlSchemaHelper(ConnectContext context) { ctx = context; impl = new FrontendServiceImpl(ExecuteEnv.getInstance()); } - /** - * Set in the Tables request object the parameter that user passed via CommandGetTables. - */ - public void setParameterForGetTables(CommandGetTables command) { - includeSchema = command.getIncludeSchema(); - // Only show Internal Catalog, which is consistent with `jdbc:mysql`. - // Otherwise, if the configured ExternalCatalog cannot be connected, - // `catalog.getAllDbs()` will be stuck and wait until the timeout period ends. - catalogFilterPattern = command.hasCatalog() ? command.getCatalog() : "internal"; - dbSchemaFilterPattern = command.hasDbSchemaFilterPattern() ? command.getDbSchemaFilterPattern() : null; - if (command.hasTableNameFilterPattern()) { - if (command.getTableNameFilterPattern().contains(".")) { - Preconditions.checkState(command.getTableNameFilterPattern().split("\\.", -1).length == 2); - dbSchemaFilterPattern = command.getTableNameFilterPattern().split("\\.", -1)[0]; - tableNameFilterPattern = command.getTableNameFilterPattern().split("\\.", -1)[1]; - } else { - tableNameFilterPattern = command.getTableNameFilterPattern(); - } - } else { - tableNameFilterPattern = null; - } - tableTypesList = command.getTableTypesList().isEmpty() ? null : command.getTableTypesList(); - } - - /** - * Set in the Schemas request object the parameter that user passed via CommandGetDbSchemas. - */ - public void setParameterForGetDbSchemas(CommandGetDbSchemas command) { - catalogFilterPattern = command.hasCatalog() ? command.getCatalog() : "internal"; - dbSchemaFilterPattern = command.hasDbSchemaFilterPattern() ? command.getDbSchemaFilterPattern() : null; - } - - /** - * Call FrontendServiceImpl->getDbNames. - */ - private TGetDbsResult getDbNames() throws TException { - TGetDbsParams getDbsParams = new TGetDbsParams(); - getDbsParams.setCatalog(catalogFilterPattern); - if (dbSchemaFilterPattern != null) { - getDbsParams.setPattern(dbSchemaFilterPattern); - } - getDbsParams.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift()); - return impl.getDbNames(getDbsParams); - } - - /** - * Call FrontendServiceImpl->listTableStatus. - */ - private TListTableStatusResult listTableStatus(String dbName, String catalogName) throws TException { - TGetTablesParams getTablesParams = new TGetTablesParams(); - getTablesParams.setDb(dbName); - if (!catalogName.isEmpty()) { - getTablesParams.setCatalog(catalogName); - } - if (tableNameFilterPattern != null) { - getTablesParams.setPattern(tableNameFilterPattern); - } - if (tableTypesList != null) { - getTablesParams.setType(tableTypesList.get(0)); // currently only one type is supported. - } - getTablesParams.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift()); - return impl.listTableStatus(getTablesParams); - } - - /** - * Call FrontendServiceImpl->describeTables. - */ - private TDescribeTablesResult describeTables(String dbName, String catalogName, List tablesName) - throws TException { - TDescribeTablesParams describeTablesParams = new TDescribeTablesParams(); - describeTablesParams.setDb(dbName); - if (!catalogName.isEmpty()) { - describeTablesParams.setCatalog(catalogName); - } - describeTablesParams.setTablesName(tablesName); - describeTablesParams.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift()); - return impl.describeTables(describeTablesParams); - } + private static final byte[] EMPTY_SERIALIZED_SCHEMA = getSerializedSchema(Collections.emptyList()); /** * Convert Doris data type to an arrowType. @@ -252,6 +175,93 @@ private static Map createFlightSqlColumnMetadata(final String db return columnMetadataBuilder.build().getMetadataMap(); } + protected static byte[] getSerializedSchema(List fields) { + if (EMPTY_SERIALIZED_SCHEMA == null && fields == null) { + fields = Collections.emptyList(); + } else if (fields == null) { + return Arrays.copyOf(EMPTY_SERIALIZED_SCHEMA, EMPTY_SERIALIZED_SCHEMA.length); + } + + final ByteArrayOutputStream columnOutputStream = new ByteArrayOutputStream(); + final Schema schema = new Schema(fields); + + try { + MessageSerializer.serialize(new WriteChannel(Channels.newChannel(columnOutputStream)), schema); + } catch (final IOException e) { + throw new RuntimeException("IO Error when serializing schema '" + schema + "'.", e); + } + + return columnOutputStream.toByteArray(); + } + + /** + * Set in the Tables request object the parameter that user passed via CommandGetTables. + */ + public void setParameterForGetTables(CommandGetTables command) { + includeSchema = command.getIncludeSchema(); + catalogFilterPattern = command.hasCatalog() ? command.getCatalog() : "internal"; + dbSchemaFilterPattern = command.hasDbSchemaFilterPattern() ? command.getDbSchemaFilterPattern() : null; + tableNameFilterPattern = command.hasTableNameFilterPattern() ? command.getTableNameFilterPattern() : null; + tableTypesList = command.getTableTypesList().isEmpty() ? null : command.getTableTypesList(); + } + + /** + * Set in the Schemas request object the parameter that user passed via CommandGetDbSchemas. + */ + public void setParameterForGetDbSchemas(CommandGetDbSchemas command) { + catalogFilterPattern = command.hasCatalog() ? command.getCatalog() : "internal"; + dbSchemaFilterPattern = command.hasDbSchemaFilterPattern() ? command.getDbSchemaFilterPattern() : null; + } + + /** + * Call FrontendServiceImpl->getDbNames. + */ + private TGetDbsResult getDbNames() throws TException { + TGetDbsParams getDbsParams = new TGetDbsParams(); + if (catalogFilterPattern != null) { + getDbsParams.setCatalog(catalogFilterPattern); + } + if (dbSchemaFilterPattern != null) { + getDbsParams.setPattern(dbSchemaFilterPattern); + } + getDbsParams.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift()); + return impl.getDbNames(getDbsParams); + } + + /** + * Call FrontendServiceImpl->listTableStatus. + */ + private TListTableStatusResult listTableStatus(String dbName, String catalogName) throws TException { + TGetTablesParams getTablesParams = new TGetTablesParams(); + getTablesParams.setDb(dbName); + if (!catalogName.isEmpty()) { + getTablesParams.setCatalog(catalogName); + } + if (tableNameFilterPattern != null) { + getTablesParams.setPattern(tableNameFilterPattern); + } + if (tableTypesList != null) { + getTablesParams.setType(tableTypesList.get(0)); // currently only one type is supported. + } + getTablesParams.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift()); + return impl.listTableStatus(getTablesParams); + } + + /** + * Call FrontendServiceImpl->describeTables. + */ + private TDescribeTablesResult describeTables(String dbName, String catalogName, List tablesName) + throws TException { + TDescribeTablesParams describeTablesParams = new TDescribeTablesParams(); + describeTablesParams.setDb(dbName); + if (!catalogName.isEmpty()) { + describeTablesParams.setCatalog(catalogName); + } + describeTablesParams.setTablesName(tablesName); + describeTablesParams.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift()); + return impl.describeTables(describeTablesParams); + } + /** * Construct > */ @@ -300,23 +310,23 @@ private Map> buildTableToFields(String dbName, TDescribeTabl return tableToFields; } - protected static byte[] getSerializedSchema(List fields) { - if (EMPTY_SERIALIZED_SCHEMA == null && fields == null) { - fields = Collections.emptyList(); - } else if (fields == null) { - return Arrays.copyOf(EMPTY_SERIALIZED_SCHEMA, EMPTY_SERIALIZED_SCHEMA.length); - } + /** + * for FlightSqlProducer Schemas.GET_CATALOGS_SCHEMA + */ + public void getCatalogs(VectorSchemaRoot vectorSchemaRoot) throws TException { + VarCharVector catalogNameVector = (VarCharVector) vectorSchemaRoot.getVector("catalog_name"); - final ByteArrayOutputStream columnOutputStream = new ByteArrayOutputStream(); - final Schema schema = new Schema(fields); + Set catalogsSet = new LinkedHashSet<>(); + catalogsSet.add("internal"); // An ordered Set with "internal" first. + TGetDbsResult getDbsResult = getDbNames(); + catalogsSet.addAll(getDbsResult.getCatalogs()); - try { - MessageSerializer.serialize(new WriteChannel(Channels.newChannel(columnOutputStream)), schema); - } catch (final IOException e) { - throw new RuntimeException("IO Error when serializing schema '" + schema + "'.", e); + int catalogIndex = 0; + for (String catalog : catalogsSet) { + catalogNameVector.setSafe(catalogIndex, new Text(catalog)); + catalogIndex++; } - - return columnOutputStream.toByteArray(); + vectorSchemaRoot.setRowCount(catalogIndex); } /** @@ -368,14 +378,7 @@ public void getTables(VectorSchemaRoot vectorSchemaRoot) throws TException { for (TTableStatus tableStatus : listTableStatusResult.getTables()) { catalogNameVector.setSafe(tablesCount, new Text(catalogName)); schemaNameVector.setSafe(tablesCount, new Text(dbName)); - // DBeaver uses `Arrow Flight SQL JDBC Driver Core [16.1.0]` driver to connect to Doris. - // The metadata only shows one layer of `Tables`. All tables will be displayed together, - // so the database name and table name are spelled together for distinction. - // When DBeaver uses `MySQL Connector/J [mysql-connector-j-8.2.0` driver to connect to Doris, - // the metadata will show two layers of `Databases - Tables`. - // - // TODO, show two layers of original data `Databases - Tables` in DBeaver. - tableNameVector.setSafe(tablesCount, new Text(dbName + "." + tableStatus.getName())); + tableNameVector.setSafe(tablesCount, new Text(tableStatus.getName())); tableTypeVector.setSafe(tablesCount, new Text(tableStatus.getType())); if (includeSchema) { List fields = tableToFields.get(tableStatus.getName()); From 59404d65811c7f9bd027dcb9ecaeb4e94a225fd3 Mon Sep 17 00:00:00 2001 From: Zou Xinyi Date: Thu, 2 Jan 2025 00:29:33 +0800 Subject: [PATCH 3/3] 4 --- .../doris/service/arrowflight/FlightSqlSchemaHelper.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlSchemaHelper.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlSchemaHelper.java index 577d46d9073ab0..4b314a16c5fe94 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlSchemaHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlSchemaHelper.java @@ -17,7 +17,9 @@ package org.apache.doris.service.arrowflight; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.datasource.CatalogIf; import org.apache.doris.qe.ConnectContext; import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendServiceImpl; @@ -318,8 +320,9 @@ public void getCatalogs(VectorSchemaRoot vectorSchemaRoot) throws TException { Set catalogsSet = new LinkedHashSet<>(); catalogsSet.add("internal"); // An ordered Set with "internal" first. - TGetDbsResult getDbsResult = getDbNames(); - catalogsSet.addAll(getDbsResult.getCatalogs()); + for (CatalogIf catalog : Env.getCurrentEnv().getCatalogMgr().listCatalogs()) { + catalogsSet.add(catalog.getName()); + } int catalogIndex = 0; for (String catalog : catalogsSet) {