From 0d9696015de01c925bba0f8c577bbeb06dbd808f Mon Sep 17 00:00:00 2001 From: Jibing Li Date: Thu, 31 Oct 2024 17:15:32 +0800 Subject: [PATCH] External table getRowCount return -1 when row count is not avaiable or row count is 0. --- .../org/apache/doris/catalog/OlapTable.java | 8 ++--- .../java/org/apache/doris/catalog/Table.java | 2 +- .../org/apache/doris/catalog/TableIf.java | 2 ++ .../datasource/ExternalRowCountCache.java | 6 ++-- .../doris/datasource/ExternalTable.java | 4 +-- .../datasource/hive/HMSExternalTable.java | 18 +++++------ .../iceberg/IcebergExternalTable.java | 3 +- .../datasource/iceberg/IcebergUtils.java | 5 +-- .../paimon/PaimonExternalTable.java | 8 +++-- .../statistics/StatisticsAutoCollector.java | 2 +- .../doris/statistics/util/StatisticsUtil.java | 8 ++--- .../datasource/ExternalRowCountCacheTest.java | 32 +++++++++++++++++-- 12 files changed, 66 insertions(+), 32 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 999e0c43995f00..b0d27ac7b5c7d9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -139,8 +139,6 @@ public enum OlapTableState { WAITING_STABLE } - public static long ROW_COUNT_BEFORE_REPORT = -1; - @SerializedName(value = "tst", alternate = {"state"}) private volatile OlapTableState state; @@ -1618,12 +1616,12 @@ public long getRowCountForIndex(long indexId, boolean strict) { if (index == null) { LOG.warn("Index {} not exist in partition {}, table {}, {}", indexId, entry.getValue().getName(), id, name); - return ROW_COUNT_BEFORE_REPORT; + return UNKNOWN_ROW_COUNT; } if (strict && !index.getRowCountReported()) { - return ROW_COUNT_BEFORE_REPORT; + return UNKNOWN_ROW_COUNT; } - rowCount += index.getRowCount() == -1 ? 0 : index.getRowCount(); + rowCount += index.getRowCount() == UNKNOWN_ROW_COUNT ? 0 : index.getRowCount(); } return rowCount; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java index d98bba5edaca84..d85d98a8ea550f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java @@ -623,7 +623,7 @@ public List getChunkSizes() { @Override public long fetchRowCount() { - return 0; + return UNKNOWN_ROW_COUNT; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java index 3a688a7b59d17a..8f6e924f44a54d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java @@ -55,6 +55,8 @@ public interface TableIf { Logger LOG = LogManager.getLogger(TableIf.class); + long UNKNOWN_ROW_COUNT = -1; + default void readLock() { } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java index 075091e682d722..fc955c4964a30c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java @@ -102,7 +102,7 @@ protected Optional doLoad(RowCountKey rowCountKey) { } /** - * Get cached row count for the given table. Return 0 if cached not loaded or table not exists. + * Get cached row count for the given table. Return -1 if cached not loaded or table not exists. * Cached will be loaded async. * @return Cached row count or -1 if not exist */ @@ -111,13 +111,13 @@ public long getCachedRowCount(long catalogId, long dbId, long tableId) { try { CompletableFuture> f = rowCountCache.get(key); if (f.isDone()) { - return f.get().orElse(-1L); + return f.get().orElse(TableIf.UNKNOWN_ROW_COUNT); } LOG.info("Row count for table {}.{}.{} is still processing.", catalogId, dbId, tableId); } catch (Exception e) { LOG.warn("Unexpected exception while returning row count", e); } - return -1; + return TableIf.UNKNOWN_ROW_COUNT; } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java index 5c57c13b4b85ad..1eadb46fe82eed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java @@ -195,7 +195,7 @@ public long getRowCount() { makeSureInitialized(); } catch (Exception e) { LOG.warn("Failed to initialize table {}.{}.{}", catalog.getName(), dbName, name, e); - return -1; + return TableIf.UNKNOWN_ROW_COUNT; } // All external table should get external row count from cache. return Env.getCurrentEnv().getExtMetaCacheMgr().getRowCountCache().getCachedRowCount(catalog.getId(), dbId, id); @@ -221,7 +221,7 @@ public long getCachedRowCount() { * This is called by ExternalRowCountCache to load row count cache. */ public long fetchRowCount() { - return -1; + return UNKNOWN_ROW_COUNT; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index f72421da8a1134..b48b47acf1378e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -337,7 +337,7 @@ public long getCreateTime() { } private long getRowCountFromExternalSource() { - long rowCount = -1; + long rowCount = UNKNOWN_ROW_COUNT; switch (dlaType) { case HIVE: rowCount = StatisticsUtil.getHiveRowCount(this); @@ -350,7 +350,7 @@ private long getRowCountFromExternalSource() { LOG.debug("getRowCount for dlaType {} is not supported.", dlaType); } } - return rowCount; + return rowCount > 0 ? rowCount : UNKNOWN_ROW_COUNT; } @Override @@ -524,7 +524,7 @@ public long fetchRowCount() { // Get row count from hive metastore property. long rowCount = getRowCountFromExternalSource(); // Only hive table supports estimate row count by listing file. - if (rowCount == -1 && dlaType.equals(DLAType.HIVE)) { + if (rowCount == UNKNOWN_ROW_COUNT && dlaType.equals(DLAType.HIVE)) { LOG.info("Will estimate row count for table {} from file list.", name); rowCount = getRowCountFromFileList(); } @@ -834,11 +834,11 @@ public boolean isPartitionColumnAllowNull() { */ private long getRowCountFromFileList() { if (!GlobalVariable.enable_get_row_count_from_file_list) { - return -1; + return UNKNOWN_ROW_COUNT; } if (isView()) { - LOG.info("Table {} is view, return 0.", name); - return 0; + LOG.info("Table {} is view, return -1.", name); + return UNKNOWN_ROW_COUNT; } HiveMetaStoreCache.HivePartitionValues partitionValues = getAllPartitionValues(); @@ -865,8 +865,8 @@ private long getRowCountFromFileList() { estimatedRowSize += column.getDataType().getSlotSize(); } if (estimatedRowSize == 0) { - LOG.warn("Table {} estimated size is 0, return 0.", name); - return 0; + LOG.warn("Table {} estimated size is 0, return -1.", name); + return UNKNOWN_ROW_COUNT; } int totalPartitionSize = partitionValues == null ? 1 : partitionValues.getIdToPartitionItem().size(); @@ -878,7 +878,7 @@ private long getRowCountFromFileList() { long rows = totalSize / estimatedRowSize; LOG.info("Table {} rows {}, total size is {}, estimatedRowSize is {}", name, rows, totalSize, estimatedRowSize); - return rows; + return rows > 0 ? rows : UNKNOWN_ROW_COUNT; } // Get all partition values from cache. diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java index d4361a47797a2e..feded88ea326f0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java @@ -83,7 +83,8 @@ public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) { @Override public long fetchRowCount() { makeSureInitialized(); - return IcebergUtils.getIcebergRowCount(getCatalog(), getDbName(), getName()); + long rowCount = IcebergUtils.getIcebergRowCount(getCatalog(), getDbName(), getName()); + return rowCount > 0 ? rowCount : UNKNOWN_ROW_COUNT; } public Table getIcebergTable() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java index 7ae600756f17a5..ba6d628e492c20 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java @@ -41,6 +41,7 @@ import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.StructField; import org.apache.doris.catalog.StructType; +import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; import org.apache.doris.common.UserException; import org.apache.doris.common.info.SimpleTableInfo; @@ -604,9 +605,9 @@ public static long getIcebergRowCount(ExternalCatalog catalog, String dbName, St .getIcebergTable(catalog, dbName, tbName); Snapshot snapshot = icebergTable.currentSnapshot(); if (snapshot == null) { - LOG.info("Iceberg table {}.{}.{} is empty, return row count 0.", catalog.getName(), dbName, tbName); + LOG.info("Iceberg table {}.{}.{} is empty, return -1.", catalog.getName(), dbName, tbName); // empty table - return 0; + return TableIf.UNKNOWN_ROW_COUNT; } Map summary = snapshot.summary(); long rows = Long.parseLong(summary.get(TOTAL_RECORDS)) - Long.parseLong(summary.get(TOTAL_POSITION_DELETES)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index 4b364ef45ca321..c9eaf1b7df32ef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -193,12 +193,16 @@ public long fetchRowCount() { Table paimonTable = schemaCacheValue.map(value -> ((PaimonSchemaCacheValue) value).getPaimonTable()) .orElse(null); if (paimonTable == null) { - return -1; + LOG.info("Paimon table {} is null.", name); + return UNKNOWN_ROW_COUNT; } List splits = paimonTable.newReadBuilder().newScan().plan().splits(); for (Split split : splits) { rowCount += split.rowCount(); } - return rowCount; + if (rowCount == 0) { + LOG.info("Paimon table {} row count is 0, return -1", name); + } + return rowCount > 0 ? rowCount : UNKNOWN_ROW_COUNT; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java index 9ba52169605b43..28c2bd95c96880 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java @@ -208,7 +208,7 @@ protected AnalysisInfo createAnalyzeJobForTbl( } if (table instanceof OlapTable && analysisMethod.equals(AnalysisMethod.SAMPLE)) { OlapTable ot = (OlapTable) table; - if (ot.getRowCountForIndex(ot.getBaseIndexId(), true) == OlapTable.ROW_COUNT_BEFORE_REPORT) { + if (ot.getRowCountForIndex(ot.getBaseIndexId(), true) == TableIf.UNKNOWN_ROW_COUNT) { LOG.info("Table {} row count is not fully reported, skip auto analyzing this time.", ot.getName()); return null; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index b0fc3b9c1cfab1..a9c1612eb48b9c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -619,19 +619,19 @@ public static int getTableHealth(long totalRows, long updatedRows) { public static long getHiveRowCount(HMSExternalTable table) { Map parameters = table.getRemoteTable().getParameters(); if (parameters == null) { - return -1; + return TableIf.UNKNOWN_ROW_COUNT; } // Table parameters contains row count, simply get and return it. if (parameters.containsKey(NUM_ROWS)) { long rows = Long.parseLong(parameters.get(NUM_ROWS)); // Sometimes, the NUM_ROWS in hms is 0 but actually is not. Need to check TOTAL_SIZE if NUM_ROWS is 0. - if (rows != 0) { + if (rows > 0) { LOG.info("Get row count {} for hive table {} in table parameters.", rows, table.getName()); return rows; } } if (!parameters.containsKey(TOTAL_SIZE)) { - return -1; + return TableIf.UNKNOWN_ROW_COUNT; } // Table parameters doesn't contain row count but contain total size. Estimate row count : totalSize/rowSize long totalSize = Long.parseLong(parameters.get(TOTAL_SIZE)); @@ -641,7 +641,7 @@ public static long getHiveRowCount(HMSExternalTable table) { } if (estimatedRowSize == 0) { LOG.warn("Hive table {} estimated row size is invalid {}", table.getName(), estimatedRowSize); - return -1; + return TableIf.UNKNOWN_ROW_COUNT; } long rows = totalSize / estimatedRowSize; LOG.info("Get row count {} for hive table {} by total size {} and row size {}", diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java index 10b6b01527cb98..81605f93dcd1c1 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java @@ -17,6 +17,7 @@ package org.apache.doris.datasource; +import org.apache.doris.catalog.TableIf; import org.apache.doris.common.ThreadPoolManager; import mockit.Mock; @@ -44,7 +45,7 @@ protected Optional doLoad(ExternalRowCountCache.RowCountKey rowCountKey) { }; ExternalRowCountCache cache = new ExternalRowCountCache(executor); long cachedRowCount = cache.getCachedRowCount(1, 1, 1); - Assertions.assertEquals(-1, cachedRowCount); + Assertions.assertEquals(TableIf.UNKNOWN_ROW_COUNT, cachedRowCount); for (int i = 0; i < 60; i++) { if (counter.get() == 1) { break; @@ -63,12 +64,39 @@ protected Optional doLoad(ExternalRowCountCache.RowCountKey rowCountKey) { cache.getCachedRowCount(1, 1, 1); for (int i = 0; i < 60; i++) { cachedRowCount = cache.getCachedRowCount(1, 1, 1); - if (cachedRowCount != -1) { + if (cachedRowCount != TableIf.UNKNOWN_ROW_COUNT) { Assertions.assertEquals(100, cachedRowCount); break; } Thread.sleep(1000); } + cachedRowCount = cache.getCachedRowCount(1, 1, 1); + Assertions.assertEquals(100, cachedRowCount); Assertions.assertEquals(2, counter.get()); + + new MockUp() { + @Mock + protected Optional doLoad(ExternalRowCountCache.RowCountKey rowCountKey) { + counter.incrementAndGet(); + try { + Thread.sleep(1000000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return Optional.of(100L); + } + }; + cachedRowCount = cache.getCachedRowCount(2, 2, 2); + Assertions.assertEquals(TableIf.UNKNOWN_ROW_COUNT, cachedRowCount); + Thread.sleep(1000); + cachedRowCount = cache.getCachedRowCount(2, 2, 2); + Assertions.assertEquals(TableIf.UNKNOWN_ROW_COUNT, cachedRowCount); + for (int i = 0; i < 60; i++) { + if (counter.get() == 3) { + break; + } + Thread.sleep(1000); + } + Assertions.assertEquals(3, counter.get()); } }