From 8fbff95162296d779a57f3d457d78d9458fe0e30 Mon Sep 17 00:00:00 2001 From: zy-kkk Date: Wed, 22 Jan 2025 22:54:09 +0800 Subject: [PATCH] [opt](jdbc catalog) Compatible with higher ClickHouse JDBC Driver versions (#46026) ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: 1. Since clickhouse changes the database level in jdbc metadata from schema to catalog in JDBC Driver 0.5.0 and later, we need to be compatible with this change 2. Since clickhouse JDBC Driver supports getting metadata from prepared statements only in Driver version 0.6.2 and later, if you use query tvf to query clickhouse catalog, you need to use a driver later than this version 3. Delete some tests and add them again later --- .../datasource/jdbc/JdbcExternalCatalog.java | 2 +- .../jdbc/client/JdbcClickHouseClient.java | 130 ++++++++++++++++++ .../datasource/jdbc/client/JdbcClient.java | 12 ++ .../jdbc/client/JdbcOceanBaseClient.java | 2 +- .../jdbc/client/JdbcClickHouseClientTest.java | 67 +++++++++ .../{ => client}/JdbcClientExceptionTest.java | 4 +- .../jdbc/test_clickhouse_jdbc_catalog.out | Bin 4381 -> 8056 bytes .../jdbc/test_clickhouse_jdbc_catalog.groovy | 73 ++++++---- 8 files changed, 261 insertions(+), 29 deletions(-) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/client/JdbcClickHouseClientTest.java rename fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/{ => client}/JdbcClientExceptionTest.java (97%) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java index fac322d21eb4da..03554dafbcb940 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java @@ -382,7 +382,7 @@ private void testFeToJdbcConnection() throws DdlException { jdbcClient.testConnection(); } catch (JdbcClientException e) { String errorMessage = "Test FE Connection to JDBC Failed: " + e.getMessage(); - LOG.error(errorMessage, e); + LOG.warn(errorMessage, e); throw new DdlException(errorMessage, e); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClickHouseClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClickHouseClient.java index bdf0cbbc934482..4f340bebed4732 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClickHouseClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClickHouseClient.java @@ -22,12 +22,103 @@ import org.apache.doris.catalog.Type; import org.apache.doris.datasource.jdbc.util.JdbcFieldSchema; +import com.google.common.collect.Lists; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; import java.util.Optional; +import java.util.function.Consumer; public class JdbcClickHouseClient extends JdbcClient { + private final Boolean databaseTermIsCatalog; + protected JdbcClickHouseClient(JdbcClientConfig jdbcClientConfig) { super(jdbcClientConfig); + try (Connection conn = getConnection()) { + String jdbcUrl = conn.getMetaData().getURL(); + if (!isNewClickHouseDriver(getJdbcDriverVersion())) { + this.databaseTermIsCatalog = false; + } else { + this.databaseTermIsCatalog = "catalog".equalsIgnoreCase(getDatabaseTermFromUrl(jdbcUrl)); + } + } catch (SQLException e) { + throw new JdbcClientException("Failed to initialize JdbcClickHouseClient: %s", e.getMessage()); + } + } + + @Override + public List getDatabaseNameList() { + Connection conn = null; + ResultSet rs = null; + List remoteDatabaseNames = Lists.newArrayList(); + try { + conn = getConnection(); + if (isOnlySpecifiedDatabase && includeDatabaseMap.isEmpty() && excludeDatabaseMap.isEmpty()) { + if (databaseTermIsCatalog) { + remoteDatabaseNames.add(conn.getCatalog()); + } else { + remoteDatabaseNames.add(conn.getSchema()); + } + } else { + if (databaseTermIsCatalog) { + rs = conn.getMetaData().getCatalogs(); + } else { + rs = conn.getMetaData().getSchemas(conn.getCatalog(), null); + } + while (rs.next()) { + remoteDatabaseNames.add(rs.getString(1)); + } + } + } catch (SQLException e) { + throw new JdbcClientException("failed to get database name list from jdbc", e); + } finally { + close(rs, conn); + } + return filterDatabaseNames(remoteDatabaseNames); + } + + @Override + protected void processTable(String remoteDbName, String remoteTableName, String[] tableTypes, + Consumer resultSetConsumer) { + Connection conn = null; + ResultSet rs = null; + try { + conn = super.getConnection(); + DatabaseMetaData databaseMetaData = conn.getMetaData(); + if (databaseTermIsCatalog) { + rs = databaseMetaData.getTables(remoteDbName, null, remoteTableName, tableTypes); + } else { + rs = databaseMetaData.getTables(null, remoteDbName, remoteTableName, tableTypes); + } + resultSetConsumer.accept(rs); + } catch (SQLException e) { + throw new JdbcClientException("Failed to process table", e); + } finally { + close(rs, conn); + } + } + + @Override + protected ResultSet getRemoteColumns(DatabaseMetaData databaseMetaData, String catalogName, String remoteDbName, + String remoteTableName) throws SQLException { + if (databaseTermIsCatalog) { + return databaseMetaData.getColumns(remoteDbName, null, remoteTableName, null); + } else { + return databaseMetaData.getColumns(catalogName, remoteDbName, remoteTableName, null); + } + } + + @Override + protected String getCatalogName(Connection conn) throws SQLException { + if (databaseTermIsCatalog) { + return null; + } else { + return conn.getCatalog(); + } } @Override @@ -121,4 +212,43 @@ protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) { return Type.UNSUPPORTED; } } + + /** + * Determine whether the driver version is greater than or equal to 0.5.0. + */ + private static boolean isNewClickHouseDriver(String driverVersion) { + if (driverVersion == null) { + throw new JdbcClientException("Driver version cannot be null"); + } + try { + String[] versionParts = driverVersion.split("\\."); + int majorVersion = Integer.parseInt(versionParts[0]); + int minorVersion = Integer.parseInt(versionParts[1]); + // Determine whether it is greater than or equal to 0.5.x + return (majorVersion > 0) || (majorVersion == 0 && minorVersion >= 5); + } catch (NumberFormatException | ArrayIndexOutOfBoundsException e) { + throw new JdbcClientException("Invalid clickhouse driver version format: " + driverVersion, e); + } + } + + /** + * Extract databaseterm parameters from the jdbc url. + */ + private String getDatabaseTermFromUrl(String jdbcUrl) { + if (jdbcUrl != null && jdbcUrl.toLowerCase().contains("databaseterm=schema")) { + return "schema"; + } + return "catalog"; + } + + /** + * Get the driver version. + */ + public String getJdbcDriverVersion() { + try (Connection conn = getConnection()) { + return conn.getMetaData().getDriverVersion(); + } catch (SQLException e) { + throw new JdbcClientException("Failed to get jdbc driver version", e); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java index a3dfdcda3193d4..121f7d6ba0499f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java @@ -480,4 +480,16 @@ public void testConnection() { public String getTestQuery() { return "select 1"; } + + public String getJdbcDriverVersion() { + Connection conn = null; + try { + conn = getConnection(); + return conn.getMetaData().getDriverVersion(); + } catch (SQLException e) { + throw new JdbcClientException("Failed to get jdbc driver version", e); + } finally { + close(conn); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOceanBaseClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOceanBaseClient.java index 0d3970c774b5bd..f43119875d63c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOceanBaseClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOceanBaseClient.java @@ -54,7 +54,7 @@ public JdbcClient createClient(JdbcClientConfig jdbcClientConfig) throws JdbcCli throw new JdbcClientException("Failed to determine OceanBase compatibility mode"); } } catch (SQLException e) { - throw new JdbcClientException("Failed to initialize JdbcOceanBaseClient", e.getMessage()); + throw new JdbcClientException("Failed to initialize JdbcOceanBaseClient: %s", e.getMessage()); } finally { close(rs, stmt, conn); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/client/JdbcClickHouseClientTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/client/JdbcClickHouseClientTest.java new file mode 100644 index 00000000000000..99e4aa62dd574d --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/client/JdbcClickHouseClientTest.java @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.jdbc.client; + +import org.junit.Assert; +import org.junit.Test; + +import java.lang.reflect.Method; + +public class JdbcClickHouseClientTest { + + @Test + public void testIsNewClickHouseDriver() { + try { + Method method = JdbcClickHouseClient.class.getDeclaredMethod("isNewClickHouseDriver", String.class); + method.setAccessible(true); + + // Valid test cases + Assert.assertTrue((boolean) method.invoke(null, "0.5.0")); // Major version 0, Minor version 5 + Assert.assertTrue((boolean) method.invoke(null, "1.0.0")); // Major version 1 + Assert.assertTrue((boolean) method.invoke(null, "0.6.3 (revision: a6a8a22)")); // Major version 0, Minor version 6 + Assert.assertFalse((boolean) method.invoke(null, "0.4.2 (revision: 1513b27)")); // Major version 0, Minor version 4 + + // Invalid version formats + try { + method.invoke(null, "invalid.version"); // Invalid version format + Assert.fail("Expected JdbcClientException for invalid version 'invalid.version'"); + } catch (Exception e) { + Assert.assertTrue(e.getCause() instanceof JdbcClientException); + Assert.assertTrue(e.getCause().getMessage().contains("Invalid clickhouse driver version format")); + } + + try { + method.invoke(null, ""); // Empty version + Assert.fail("Expected JdbcClientException for empty version"); + } catch (Exception e) { + Assert.assertTrue(e.getCause() instanceof JdbcClientException); + Assert.assertTrue(e.getCause().getMessage().contains("Invalid clickhouse driver version format")); + } + + try { + method.invoke(null, (Object) null); // Null version + Assert.fail("Expected JdbcClientException for null version"); + } catch (Exception e) { + Assert.assertTrue(e.getCause() instanceof JdbcClientException); + Assert.assertTrue(e.getCause().getMessage().contains("Driver version cannot be null")); + } + } catch (Exception e) { + Assert.fail("Exception occurred while testing isNewClickHouseDriver: " + e.getMessage()); + } + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/JdbcClientExceptionTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/client/JdbcClientExceptionTest.java similarity index 97% rename from fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/JdbcClientExceptionTest.java rename to fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/client/JdbcClientExceptionTest.java index 1bbf54e9438512..c99f2bcfe26dbc 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/JdbcClientExceptionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/client/JdbcClientExceptionTest.java @@ -15,9 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.datasource.jdbc; - -import org.apache.doris.datasource.jdbc.client.JdbcClientException; +package org.apache.doris.datasource.jdbc.client; import org.junit.Assert; import org.junit.Test; diff --git a/regression-test/data/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.out b/regression-test/data/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.out index 22f85579a83b532e1c615a93e2f79578b23981a9..910338e08622431116bc5636c9f6e8bd649c963e 100644 GIT binary patch delta 282 zcmbQM^uun0te}Lhu7YB6PG)j;Mt*5=YP@-TN@`kSX->(;Rcja}ut~(1l%;_rWpGKx zCl(cL*5^@WoGie_DuSXaIk6-$CqEt2Y^VfwvymiG%oav5yEr)`H8&Atv^W-Vn90&O K