From 2e8831c74058149a4baa6e5e62d32c09901bc6de Mon Sep 17 00:00:00 2001 From: zy-kkk Date: Thu, 26 Dec 2024 16:46:18 +0800 Subject: [PATCH] [opt](jdbc catalog) Compatible with higher ClickHouse JDBC Driver versions --- .../datasource/jdbc/JdbcExternalCatalog.java | 2 +- .../jdbc/client/JdbcClickHouseClient.java | 130 ++++++++++++++++++ .../datasource/jdbc/client/JdbcClient.java | 12 ++ .../jdbc/client/JdbcOceanBaseClient.java | 2 +- .../jdbc/client/JdbcClickHouseClientTest.java | 67 +++++++++ .../{ => client}/JdbcClientExceptionTest.java | 4 +- .../jdbc/test_clickhouse_jdbc_catalog.out | Bin 4381 -> 8056 bytes .../jdbc/test_clickhouse_jdbc_catalog.groovy | 73 ++++++---- 8 files changed, 261 insertions(+), 29 deletions(-) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/client/JdbcClickHouseClientTest.java rename fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/{ => client}/JdbcClientExceptionTest.java (97%) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java index fac322d21eb4da..03554dafbcb940 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java @@ -382,7 +382,7 @@ private void testFeToJdbcConnection() throws DdlException { jdbcClient.testConnection(); } catch (JdbcClientException e) { String errorMessage = "Test FE Connection to JDBC Failed: " + e.getMessage(); - LOG.error(errorMessage, e); + LOG.warn(errorMessage, e); throw new DdlException(errorMessage, e); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClickHouseClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClickHouseClient.java index bdf0cbbc934482..4f340bebed4732 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClickHouseClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClickHouseClient.java @@ -22,12 +22,103 @@ import org.apache.doris.catalog.Type; import org.apache.doris.datasource.jdbc.util.JdbcFieldSchema; +import com.google.common.collect.Lists; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; import java.util.Optional; +import java.util.function.Consumer; public class JdbcClickHouseClient extends JdbcClient { + private final Boolean databaseTermIsCatalog; + protected JdbcClickHouseClient(JdbcClientConfig jdbcClientConfig) { super(jdbcClientConfig); + try (Connection conn = getConnection()) { + String jdbcUrl = conn.getMetaData().getURL(); + if (!isNewClickHouseDriver(getJdbcDriverVersion())) { + this.databaseTermIsCatalog = false; + } else { + this.databaseTermIsCatalog = "catalog".equalsIgnoreCase(getDatabaseTermFromUrl(jdbcUrl)); + } + } catch (SQLException e) { + throw new JdbcClientException("Failed to initialize JdbcClickHouseClient: %s", e.getMessage()); + } + } + + @Override + public List getDatabaseNameList() { + Connection conn = null; + ResultSet rs = null; + List remoteDatabaseNames = Lists.newArrayList(); + try { + conn = getConnection(); + if (isOnlySpecifiedDatabase && includeDatabaseMap.isEmpty() && excludeDatabaseMap.isEmpty()) { + if (databaseTermIsCatalog) { + remoteDatabaseNames.add(conn.getCatalog()); + } else { + remoteDatabaseNames.add(conn.getSchema()); + } + } else { + if (databaseTermIsCatalog) { + rs = conn.getMetaData().getCatalogs(); + } else { + rs = conn.getMetaData().getSchemas(conn.getCatalog(), null); + } + while (rs.next()) { + remoteDatabaseNames.add(rs.getString(1)); + } + } + } catch (SQLException e) { + throw new JdbcClientException("failed to get database name list from jdbc", e); + } finally { + close(rs, conn); + } + return filterDatabaseNames(remoteDatabaseNames); + } + + @Override + protected void processTable(String remoteDbName, String remoteTableName, String[] tableTypes, + Consumer resultSetConsumer) { + Connection conn = null; + ResultSet rs = null; + try { + conn = super.getConnection(); + DatabaseMetaData databaseMetaData = conn.getMetaData(); + if (databaseTermIsCatalog) { + rs = databaseMetaData.getTables(remoteDbName, null, remoteTableName, tableTypes); + } else { + rs = databaseMetaData.getTables(null, remoteDbName, remoteTableName, tableTypes); + } + resultSetConsumer.accept(rs); + } catch (SQLException e) { + throw new JdbcClientException("Failed to process table", e); + } finally { + close(rs, conn); + } + } + + @Override + protected ResultSet getRemoteColumns(DatabaseMetaData databaseMetaData, String catalogName, String remoteDbName, + String remoteTableName) throws SQLException { + if (databaseTermIsCatalog) { + return databaseMetaData.getColumns(remoteDbName, null, remoteTableName, null); + } else { + return databaseMetaData.getColumns(catalogName, remoteDbName, remoteTableName, null); + } + } + + @Override + protected String getCatalogName(Connection conn) throws SQLException { + if (databaseTermIsCatalog) { + return null; + } else { + return conn.getCatalog(); + } } @Override @@ -121,4 +212,43 @@ protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) { return Type.UNSUPPORTED; } } + + /** + * Determine whether the driver version is greater than or equal to 0.5.0. + */ + private static boolean isNewClickHouseDriver(String driverVersion) { + if (driverVersion == null) { + throw new JdbcClientException("Driver version cannot be null"); + } + try { + String[] versionParts = driverVersion.split("\\."); + int majorVersion = Integer.parseInt(versionParts[0]); + int minorVersion = Integer.parseInt(versionParts[1]); + // Determine whether it is greater than or equal to 0.5.x + return (majorVersion > 0) || (majorVersion == 0 && minorVersion >= 5); + } catch (NumberFormatException | ArrayIndexOutOfBoundsException e) { + throw new JdbcClientException("Invalid clickhouse driver version format: " + driverVersion, e); + } + } + + /** + * Extract databaseterm parameters from the jdbc url. + */ + private String getDatabaseTermFromUrl(String jdbcUrl) { + if (jdbcUrl != null && jdbcUrl.toLowerCase().contains("databaseterm=schema")) { + return "schema"; + } + return "catalog"; + } + + /** + * Get the driver version. + */ + public String getJdbcDriverVersion() { + try (Connection conn = getConnection()) { + return conn.getMetaData().getDriverVersion(); + } catch (SQLException e) { + throw new JdbcClientException("Failed to get jdbc driver version", e); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java index a3dfdcda3193d4..121f7d6ba0499f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java @@ -480,4 +480,16 @@ public void testConnection() { public String getTestQuery() { return "select 1"; } + + public String getJdbcDriverVersion() { + Connection conn = null; + try { + conn = getConnection(); + return conn.getMetaData().getDriverVersion(); + } catch (SQLException e) { + throw new JdbcClientException("Failed to get jdbc driver version", e); + } finally { + close(conn); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOceanBaseClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOceanBaseClient.java index 0d3970c774b5bd..f43119875d63c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOceanBaseClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOceanBaseClient.java @@ -54,7 +54,7 @@ public JdbcClient createClient(JdbcClientConfig jdbcClientConfig) throws JdbcCli throw new JdbcClientException("Failed to determine OceanBase compatibility mode"); } } catch (SQLException e) { - throw new JdbcClientException("Failed to initialize JdbcOceanBaseClient", e.getMessage()); + throw new JdbcClientException("Failed to initialize JdbcOceanBaseClient: %s", e.getMessage()); } finally { close(rs, stmt, conn); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/client/JdbcClickHouseClientTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/client/JdbcClickHouseClientTest.java new file mode 100644 index 00000000000000..99e4aa62dd574d --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/client/JdbcClickHouseClientTest.java @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.jdbc.client; + +import org.junit.Assert; +import org.junit.Test; + +import java.lang.reflect.Method; + +public class JdbcClickHouseClientTest { + + @Test + public void testIsNewClickHouseDriver() { + try { + Method method = JdbcClickHouseClient.class.getDeclaredMethod("isNewClickHouseDriver", String.class); + method.setAccessible(true); + + // Valid test cases + Assert.assertTrue((boolean) method.invoke(null, "0.5.0")); // Major version 0, Minor version 5 + Assert.assertTrue((boolean) method.invoke(null, "1.0.0")); // Major version 1 + Assert.assertTrue((boolean) method.invoke(null, "0.6.3 (revision: a6a8a22)")); // Major version 0, Minor version 6 + Assert.assertFalse((boolean) method.invoke(null, "0.4.2 (revision: 1513b27)")); // Major version 0, Minor version 4 + + // Invalid version formats + try { + method.invoke(null, "invalid.version"); // Invalid version format + Assert.fail("Expected JdbcClientException for invalid version 'invalid.version'"); + } catch (Exception e) { + Assert.assertTrue(e.getCause() instanceof JdbcClientException); + Assert.assertTrue(e.getCause().getMessage().contains("Invalid clickhouse driver version format")); + } + + try { + method.invoke(null, ""); // Empty version + Assert.fail("Expected JdbcClientException for empty version"); + } catch (Exception e) { + Assert.assertTrue(e.getCause() instanceof JdbcClientException); + Assert.assertTrue(e.getCause().getMessage().contains("Invalid clickhouse driver version format")); + } + + try { + method.invoke(null, (Object) null); // Null version + Assert.fail("Expected JdbcClientException for null version"); + } catch (Exception e) { + Assert.assertTrue(e.getCause() instanceof JdbcClientException); + Assert.assertTrue(e.getCause().getMessage().contains("Driver version cannot be null")); + } + } catch (Exception e) { + Assert.fail("Exception occurred while testing isNewClickHouseDriver: " + e.getMessage()); + } + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/JdbcClientExceptionTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/client/JdbcClientExceptionTest.java similarity index 97% rename from fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/JdbcClientExceptionTest.java rename to fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/client/JdbcClientExceptionTest.java index 1bbf54e9438512..c99f2bcfe26dbc 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/JdbcClientExceptionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/client/JdbcClientExceptionTest.java @@ -15,9 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.datasource.jdbc; - -import org.apache.doris.datasource.jdbc.client.JdbcClientException; +package org.apache.doris.datasource.jdbc.client; import org.junit.Assert; import org.junit.Test; diff --git a/regression-test/data/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.out b/regression-test/data/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.out index 22f85579a83b532e1c615a93e2f79578b23981a9..910338e08622431116bc5636c9f6e8bd649c963e 100644 GIT binary patch delta 282 zcmbQM^uun0te}Lhu7YB6PG)j;Mt*5=YP@-TN@`kSX->(;Rcja}ut~(1l%;_rWpGKx zCl(cL*5^@WoGie_DuSXaIk6-$CqEt2Y^VfwvymiG%oav5yEr)`H8&Atv^W-Vn90&O K