From b90daa84adfb9de709bf3f91f13bc4708f349d44 Mon Sep 17 00:00:00 2001 From: Jibing-Li <64681310+Jibing-Li@users.noreply.github.com> Date: Fri, 30 Aug 2024 19:24:09 +0800 Subject: [PATCH] [feature](statistics) Support get row count for JDBC external table. (#38889) Support get row count for JDBC external table by executing select count(1) from table. Return -1 when external table row count is unknown. --- .../datasource/jdbc/JdbcExternalTable.java | 58 +++++++++++++++++++ .../jdbc/test_mysql_jdbc_statistics.groovy | 47 +++++++++------ 2 files changed, 88 insertions(+), 17 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java index b31fc5c24a9c7f..07ce183a589671 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java @@ -17,19 +17,28 @@ package org.apache.doris.datasource.jdbc; +import org.apache.doris.analysis.StatementBase; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.JdbcResource; import org.apache.doris.catalog.JdbcTable; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.SchemaCacheValue; +import org.apache.doris.qe.AutoCloseConnectContext; +import org.apache.doris.qe.StmtExecutor; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.statistics.JdbcAnalysisTask; +import org.apache.doris.statistics.ResultRow; +import org.apache.doris.statistics.util.StatisticsUtil; import org.apache.doris.thrift.TTableDescriptor; +import org.apache.commons.text.StringSubstitutor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; /** @@ -38,6 +47,9 @@ public class JdbcExternalTable extends ExternalTable { private static final Logger LOG = LogManager.getLogger(JdbcExternalTable.class); + public static final String MYSQL_ROW_COUNT_SQL = "SELECT * FROM QUERY" + + "(\"catalog\"=\"${ctlName}\", \"query\"=\"show table status from `${dbName}` like '${tblName}'\");"; + private JdbcTable jdbcTable; /** @@ -98,4 +110,50 @@ public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) { makeSureInitialized(); return new JdbcAnalysisTask(info); } + + @Override + public long fetchRowCount() { + Map params = new HashMap<>(); + params.put("ctlName", catalog.getName()); + params.put("dbName", dbName); + params.put("tblName", name); + switch (((JdbcExternalCatalog) catalog).getDatabaseTypeName()) { + case JdbcResource.MYSQL: + try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { + StringSubstitutor stringSubstitutor = new StringSubstitutor(params); + String sql = stringSubstitutor.replace(MYSQL_ROW_COUNT_SQL); + StmtExecutor stmtExecutor = new StmtExecutor(r.connectContext, sql); + List resultRows = stmtExecutor.executeInternalQuery(); + if (resultRows == null || resultRows.size() != 1) { + LOG.info("No mysql status found for table {}.{}.{}", catalog.getName(), dbName, name); + return -1; + } + StatementBase parsedStmt = stmtExecutor.getParsedStmt(); + if (parsedStmt == null || parsedStmt.getColLabels() == null) { + LOG.info("No column label found for table {}.{}.{}", catalog.getName(), dbName, name); + return -1; + } + ResultRow resultRow = resultRows.get(0); + List colLabels = parsedStmt.getColLabels(); + int index = colLabels.indexOf("TABLE_ROWS"); + if (index == -1) { + LOG.info("No TABLE_ROWS in status for table {}.{}.{}", catalog.getName(), dbName, name); + return -1; + } + long rows = Long.parseLong(resultRow.get(index)); + LOG.info("Get mysql table {}.{}.{} row count {}", catalog.getName(), dbName, name, rows); + return rows; + } catch (Exception e) { + LOG.warn("Failed to fetch mysql row count for table {}.{}.{}. Reason [{}]", + catalog.getName(), dbName, name, e.getMessage()); + return -1; + } + case JdbcResource.ORACLE: + case JdbcResource.POSTGRESQL: + case JdbcResource.SQLSERVER: + default: + break; + } + return -1; + } } diff --git a/regression-test/suites/external_table_p0/jdbc/test_mysql_jdbc_statistics.groovy b/regression-test/suites/external_table_p0/jdbc/test_mysql_jdbc_statistics.groovy index 66b04ebd5136ea..e9bd59d8cb29a8 100644 --- a/regression-test/suites/external_table_p0/jdbc/test_mysql_jdbc_statistics.groovy +++ b/regression-test/suites/external_table_p0/jdbc/test_mysql_jdbc_statistics.groovy @@ -17,6 +17,7 @@ suite("test_mysql_jdbc_statistics", "p0,external,mysql,external_docker,external_docker_mysql") { String enabled = context.config.otherConfigs.get("enableJdbcTest") + logger.info("enabled " + enabled) String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") String mysql_port = context.config.otherConfigs.get("mysql_57_port"); String s3_endpoint = getS3Endpoint() @@ -35,28 +36,40 @@ suite("test_mysql_jdbc_statistics", "p0,external,mysql,external_docker,external_ );""" sql """use ${catalog_name}.doris_test""" + + def result = sql """show table stats ex_tb0""" + Thread.sleep(1000) + for (int i = 0; i < 20; i++) { + result = sql """show table stats ex_tb0"""; + if (result[0][2] != "-1") { + assertEquals("5", result[0][2]) + break; + } + logger.info("Table row count not ready yet. Wait 1 second.") + Thread.sleep(1000) + } sql """analyze table ex_tb0 with sync""" - def result = sql """show column stats ex_tb0 (name)""" - assertTrue(result.size() == 1) - assertTrue(result[0][0] == "name") - assertTrue(result[0][2] == "5.0") - assertTrue(result[0][3] == "5.0") - assertTrue(result[0][4] == "0.0") - assertTrue(result[0][5] == "15.0") - assertTrue(result[0][6] == "3.0") + result = sql """show column stats ex_tb0 (name)""" + assertEquals(result.size(), 1) + assertEquals(result[0][0], "name") + assertEquals(result[0][2], "5.0") + assertEquals(result[0][3], "5.0") + assertEquals(result[0][4], "0.0") + assertEquals(result[0][5], "15.0") + assertEquals(result[0][6], "3.0") assertEquals(result[0][7], "'abc'") assertEquals(result[0][8], "'abg'") result = sql """show column stats ex_tb0 (id)""" - assertTrue(result.size() == 1) - assertTrue(result[0][0] == "id") - assertTrue(result[0][2] == "5.0") - assertTrue(result[0][3] == "5.0") - assertTrue(result[0][4] == "0.0") - assertTrue(result[0][5] == "20.0") - assertTrue(result[0][6] == "4.0") - assertTrue(result[0][7] == "111") - assertTrue(result[0][8] == "115") + assertEquals(result.size(), 1) + assertEquals(result[0][0], "id") + assertEquals(result[0][2], "5.0") + assertEquals(result[0][3], "5.0") + assertEquals(result[0][4], "0.0") + assertEquals(result[0][5], "20.0") + assertEquals(result[0][6], "4.0") + assertEquals(result[0][7], "111") + assertEquals(result[0][8], "115") sql """drop catalog ${catalog_name}""" }