diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 37e5f265bd652b..7ddc51224b7f4b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -131,8 +131,6 @@ public enum OlapTableState { WAITING_STABLE } - public static long ROW_COUNT_BEFORE_REPORT = -1; - @SerializedName(value = "state") private volatile OlapTableState state; @@ -1519,12 +1517,12 @@ public long getRowCountForIndex(long indexId, boolean strict) { if (index == null) { LOG.warn("Index {} not exist in partition {}, table {}, {}", indexId, entry.getValue().getName(), id, name); - return ROW_COUNT_BEFORE_REPORT; + return UNKNOWN_ROW_COUNT; } if (strict && !index.getRowCountReported()) { - return ROW_COUNT_BEFORE_REPORT; + return UNKNOWN_ROW_COUNT; } - rowCount += index.getRowCount() == -1 ? 0 : index.getRowCount(); + rowCount += index.getRowCount() == UNKNOWN_ROW_COUNT ? 0 : index.getRowCount(); } return rowCount; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java index 862d6c1878e026..8d648df33567f3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java @@ -586,7 +586,7 @@ public List getChunkSizes() { @Override public long fetchRowCount() { - return 0; + return UNKNOWN_ROW_COUNT; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java index d42a32ef8d2e1f..8f9594e82c5c33 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java @@ -56,6 +56,8 @@ public interface TableIf { Logger LOG = LogManager.getLogger(TableIf.class); + long UNKNOWN_ROW_COUNT = -1; + default void readLock() { } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java index faf01a493844b1..0826187317adda 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java @@ -94,7 +94,7 @@ protected Optional doLoad(RowCountKey rowCountKey) { } /** - * Get cached row count for the given table. Return 0 if cached not loaded or table not exists. + * Get cached row count for the given table. Return -1 if cached not loaded or table not exists. * Cached will be loaded async. * @param catalogId * @param dbId @@ -106,13 +106,13 @@ public long getCachedRowCount(long catalogId, long dbId, long tableId) { try { CompletableFuture> f = rowCountCache.get(key); if (f.isDone()) { - return f.get().orElse(0L); + return f.get().orElse(TableIf.UNKNOWN_ROW_COUNT); } LOG.info("Row count for table {}.{}.{} is still processing.", catalogId, dbId, tableId); } catch (Exception e) { LOG.warn("Unexpected exception while returning row count", e); } - return 0; + return TableIf.UNKNOWN_ROW_COUNT; } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java index 71ac00e48e6b09..590a4cbe04625c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java @@ -200,7 +200,7 @@ public long getRowCount() { makeSureInitialized(); } catch (Exception e) { LOG.warn("Failed to initialize table {}.{}.{}", catalog.getName(), dbName, name, e); - return 0; + return TableIf.UNKNOWN_ROW_COUNT; } // All external table should get external row count from cache. return Env.getCurrentEnv().getExtMetaCacheMgr().getRowCountCache().getCachedRowCount(catalog.getId(), dbId, id); @@ -226,7 +226,7 @@ public long getCachedRowCount() { * This is called by ExternalRowCountCache to load row count cache. */ public long fetchRowCount() { - return 0; + return UNKNOWN_ROW_COUNT; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index 5f2c8cbddf3c31..5df44fda4769fe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -344,7 +344,7 @@ public long getCreateTime() { } private long getRowCountFromExternalSource() { - long rowCount; + long rowCount = UNKNOWN_ROW_COUNT; switch (dlaType) { case HIVE: rowCount = StatisticsUtil.getHiveRowCount(this); @@ -358,7 +358,7 @@ private long getRowCountFromExternalSource() { } rowCount = -1; } - return rowCount; + return rowCount > 0 ? rowCount : UNKNOWN_ROW_COUNT; } @Override @@ -532,7 +532,7 @@ public long fetchRowCount() { // Get row count from hive metastore property. long rowCount = getRowCountFromExternalSource(); // Only hive table supports estimate row count by listing file. - if (rowCount == -1 && dlaType.equals(DLAType.HIVE)) { + if (rowCount == UNKNOWN_ROW_COUNT && dlaType.equals(DLAType.HIVE)) { LOG.info("Will estimate row count for table {} from file list.", name); rowCount = getRowCountFromFileList(); } @@ -838,11 +838,11 @@ public boolean isPartitionColumnAllowNull() { */ private long getRowCountFromFileList() { if (!GlobalVariable.enable_get_row_count_from_file_list) { - return -1; + return UNKNOWN_ROW_COUNT; } if (isView()) { - LOG.info("Table {} is view, return 0.", name); - return 0; + LOG.info("Table {} is view, return -1.", name); + return UNKNOWN_ROW_COUNT; } HiveMetaStoreCache.HivePartitionValues partitionValues = getAllPartitionValues(); @@ -869,8 +869,8 @@ private long getRowCountFromFileList() { estimatedRowSize += column.getDataType().getSlotSize(); } if (estimatedRowSize == 0) { - LOG.warn("Table {} estimated size is 0, return 0.", name); - return 0; + LOG.warn("Table {} estimated size is 0, return -1.", name); + return UNKNOWN_ROW_COUNT; } int totalPartitionSize = partitionValues == null ? 1 : partitionValues.getIdToPartitionItem().size(); @@ -882,7 +882,7 @@ private long getRowCountFromFileList() { long rows = totalSize / estimatedRowSize; LOG.info("Table {} rows {}, total size is {}, estimatedRowSize is {}", name, rows, totalSize, estimatedRowSize); - return rows; + return rows > 0 ? rows : UNKNOWN_ROW_COUNT; } // Get all partition values from cache. diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java index d4361a47797a2e..feded88ea326f0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java @@ -83,7 +83,8 @@ public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) { @Override public long fetchRowCount() { makeSureInitialized(); - return IcebergUtils.getIcebergRowCount(getCatalog(), getDbName(), getName()); + long rowCount = IcebergUtils.getIcebergRowCount(getCatalog(), getDbName(), getName()); + return rowCount > 0 ? rowCount : UNKNOWN_ROW_COUNT; } public Table getIcebergTable() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java index 62d260dacaf5cb..58519d92636962 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java @@ -41,6 +41,7 @@ import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.StructField; import org.apache.doris.catalog.StructType; +import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; import org.apache.doris.common.UserException; import org.apache.doris.common.info.SimpleTableInfo; @@ -604,7 +605,7 @@ public static long getIcebergRowCount(ExternalCatalog catalog, String dbName, St if (snapshot == null) { LOG.info("Iceberg table {}.{}.{} is empty, return row count 0.", catalog.getName(), dbName, tbName); // empty table - return 0; + return TableIf.UNKNOWN_ROW_COUNT; } Map summary = snapshot.summary(); long rows = Long.parseLong(summary.get(TOTAL_RECORDS)) @@ -614,7 +615,7 @@ public static long getIcebergRowCount(ExternalCatalog catalog, String dbName, St } catch (Exception e) { LOG.warn("Fail to collect row count for db {} table {}", dbName, tbName, e); } - return -1; + return TableIf.UNKNOWN_ROW_COUNT; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index 618c51caea1cc4..196b01efe2c0f1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -194,16 +194,16 @@ public long fetchRowCount() { Table paimonTable = schemaCacheValue.map(value -> ((PaimonSchemaCacheValue) value).getPaimonTable()) .orElse(null); if (paimonTable == null) { - return -1; + return UNKNOWN_ROW_COUNT; } List splits = paimonTable.newReadBuilder().newScan().plan().splits(); for (Split split : splits) { rowCount += split.rowCount(); } - return rowCount; + return rowCount > 0 ? rowCount : UNKNOWN_ROW_COUNT; } catch (Exception e) { LOG.warn("Fail to collect row count for db {} table {}", dbName, name, e); } - return -1; + return UNKNOWN_ROW_COUNT; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java index a7c2fc6365ba29..62d3a5b2946dd1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java @@ -181,7 +181,7 @@ protected void createAnalyzeJobForTbl(DatabaseIf db, ? AnalysisMethod.SAMPLE : AnalysisMethod.FULL; if (table instanceof OlapTable && analysisMethod.equals(AnalysisMethod.SAMPLE)) { OlapTable ot = (OlapTable) table; - if (ot.getRowCountForIndex(ot.getBaseIndexId(), true) == OlapTable.ROW_COUNT_BEFORE_REPORT) { + if (ot.getRowCountForIndex(ot.getBaseIndexId(), true) == TableIf.UNKNOWN_ROW_COUNT) { LOG.info("Table {} row count is not fully reported, skip auto analyzing this time.", ot.getName()); return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index 775138480d93bc..288eb88e95fc23 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -560,19 +560,19 @@ public static int getTableHealth(long totalRows, long updatedRows) { public static long getHiveRowCount(HMSExternalTable table) { Map parameters = table.getRemoteTable().getParameters(); if (parameters == null) { - return -1; + return TableIf.UNKNOWN_ROW_COUNT; } // Table parameters contains row count, simply get and return it. if (parameters.containsKey(NUM_ROWS)) { long rows = Long.parseLong(parameters.get(NUM_ROWS)); // Sometimes, the NUM_ROWS in hms is 0 but actually is not. Need to check TOTAL_SIZE if NUM_ROWS is 0. - if (rows != 0) { + if (rows > 0) { LOG.info("Get row count {} for hive table {} in table parameters.", rows, table.getName()); return rows; } } if (!parameters.containsKey(TOTAL_SIZE)) { - return -1; + return TableIf.UNKNOWN_ROW_COUNT; } // Table parameters doesn't contain row count but contain total size. Estimate row count : totalSize/rowSize long totalSize = Long.parseLong(parameters.get(TOTAL_SIZE)); @@ -582,7 +582,7 @@ public static long getHiveRowCount(HMSExternalTable table) { } if (estimatedRowSize == 0) { LOG.warn("Hive table {} estimated row size is invalid {}", table.getName(), estimatedRowSize); - return -1; + return TableIf.UNKNOWN_ROW_COUNT; } long rows = totalSize / estimatedRowSize; LOG.info("Get row count {} for hive table {} by total size {} and row size {}", diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java new file mode 100644 index 00000000000000..81605f93dcd1c1 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.ThreadPoolManager; + +import mockit.Mock; +import mockit.MockUp; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Optional; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicInteger; + +public class ExternalRowCountCacheTest { + @Test + public void testLoadWithException() throws Exception { + ThreadPoolExecutor executor = ThreadPoolManager.newDaemonFixedThreadPool( + 1, Integer.MAX_VALUE, "TEST", true); + AtomicInteger counter = new AtomicInteger(0); + + new MockUp() { + @Mock + protected Optional doLoad(ExternalRowCountCache.RowCountKey rowCountKey) { + counter.incrementAndGet(); + return null; + } + }; + ExternalRowCountCache cache = new ExternalRowCountCache(executor); + long cachedRowCount = cache.getCachedRowCount(1, 1, 1); + Assertions.assertEquals(TableIf.UNKNOWN_ROW_COUNT, cachedRowCount); + for (int i = 0; i < 60; i++) { + if (counter.get() == 1) { + break; + } + Thread.sleep(1000); + } + Assertions.assertEquals(1, counter.get()); + + new MockUp() { + @Mock + protected Optional doLoad(ExternalRowCountCache.RowCountKey rowCountKey) { + counter.incrementAndGet(); + return Optional.of(100L); + } + }; + cache.getCachedRowCount(1, 1, 1); + for (int i = 0; i < 60; i++) { + cachedRowCount = cache.getCachedRowCount(1, 1, 1); + if (cachedRowCount != TableIf.UNKNOWN_ROW_COUNT) { + Assertions.assertEquals(100, cachedRowCount); + break; + } + Thread.sleep(1000); + } + cachedRowCount = cache.getCachedRowCount(1, 1, 1); + Assertions.assertEquals(100, cachedRowCount); + Assertions.assertEquals(2, counter.get()); + + new MockUp() { + @Mock + protected Optional doLoad(ExternalRowCountCache.RowCountKey rowCountKey) { + counter.incrementAndGet(); + try { + Thread.sleep(1000000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return Optional.of(100L); + } + }; + cachedRowCount = cache.getCachedRowCount(2, 2, 2); + Assertions.assertEquals(TableIf.UNKNOWN_ROW_COUNT, cachedRowCount); + Thread.sleep(1000); + cachedRowCount = cache.getCachedRowCount(2, 2, 2); + Assertions.assertEquals(TableIf.UNKNOWN_ROW_COUNT, cachedRowCount); + for (int i = 0; i < 60; i++) { + if (counter.get() == 3) { + break; + } + Thread.sleep(1000); + } + Assertions.assertEquals(3, counter.get()); + } +}