Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
import org.apache.doris.statistics.HistogramTask;
import org.apache.doris.statistics.MVAnalysisTask;
import org.apache.doris.statistics.OlapAnalysisTask;
import org.apache.doris.statistics.TableStats;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TColumn;
Expand Down Expand Up @@ -1122,6 +1124,32 @@ public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) {
return new MVAnalysisTask(info);
}

@Override
public boolean needReAnalyzeTable(TableStats tblStats) {
long rowCount = getRowCount();
// TODO: Do we need to analyze an empty table?
if (rowCount == 0) {
return false;
}
long updateRows = tblStats.updatedRows.get();
int tblHealth = StatisticsUtil.getTableHealth(rowCount, updateRows);
return tblHealth < Config.table_stats_health_threshold;
}

@Override
public Set<String> findReAnalyzeNeededPartitions(TableStats tableStats) {
if (tableStats == null) {
return getPartitionNames().stream().map(this::getPartition)
.filter(Partition::hasData).map(Partition::getName).collect(Collectors.toSet());
}
return getPartitionNames().stream()
.map(this::getPartition)
.filter(Partition::hasData)
.filter(partition ->
partition.getVisibleVersionTime() >= tableStats.updatedTime).map(Partition::getName)
.collect(Collectors.toSet());
}

@Override
public long getRowCount() {
long rowCount = 0;
Expand Down
12 changes: 11 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.TableStats;
import org.apache.doris.thrift.TTableDescriptor;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -571,6 +572,15 @@ public Optional<ColumnStatistic> getColumnStatistic(String colName) {
return Optional.empty();
}

public void analyze(String dbName) {
public void analyze(String dbName) {}

@Override
public boolean needReAnalyzeTable(TableStats tblStats) {
return true;
}

@Override
public Set<String> findReAnalyzeNeededPartitions(TableStats tableStats) {
return Collections.emptySet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.TableStats;
import org.apache.doris.thrift.TTableDescriptor;

import com.google.common.collect.Lists;
Expand Down Expand Up @@ -136,6 +137,10 @@ default int getBaseColumnIdxByName(String colName) {

Optional<ColumnStatistic> getColumnStatistic(String colName);

boolean needReAnalyzeTable(TableStats tblStats);

Set<String> findReAnalyzeNeededPartitions(TableStats tableStats);

void write(DataOutput out) throws IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.TableStats;
import org.apache.doris.thrift.TTableDescriptor;

import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import lombok.Getter;
import org.apache.commons.lang3.NotImplementedException;
Expand All @@ -46,8 +48,10 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;

Expand Down Expand Up @@ -375,4 +379,19 @@ public void gsonPostProcess() throws IOException {
rwLock = new ReentrantReadWriteLock(true);
objectCreated = false;
}

@Override
public boolean needReAnalyzeTable(TableStats tblStats) {
// TODO: Find a way to decide if this external table need to be reanalyzed.
// For now, simply return true for all external tables.
return true;
}

@Override
public Set<String> findReAnalyzeNeededPartitions(TableStats tableStats) {
HashSet<String> partitions = Sets.newHashSet();
// TODO: Find a way to collect external table partitions that need to be analyzed.
partitions.add("Dummy Partition");
return partitions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ protected synchronized void makeSureInitialized() {
}
}
objectCreated = true;
estimatedRowCount = getRowCountFromExternalSource();
estimatedRowCount = getRowCountFromExternalSource(true);
}
}

Expand Down Expand Up @@ -277,19 +277,19 @@ public long getUpdateTime() {
@Override
public long getRowCount() {
makeSureInitialized();
long rowCount = getRowCountFromExternalSource();
long rowCount = getRowCountFromExternalSource(false);
if (rowCount == -1) {
LOG.debug("Will estimate row count from file list.");
rowCount = StatisticsUtil.getRowCountFromFileList(this);
}
return rowCount;
}

private long getRowCountFromExternalSource() {
private long getRowCountFromExternalSource(boolean isInit) {
long rowCount;
switch (dlaType) {
case HIVE:
rowCount = StatisticsUtil.getHiveRowCount(this);
rowCount = StatisticsUtil.getHiveRowCount(this, isInit);
break;
case ICEBERG:
rowCount = StatisticsUtil.getIcebergRowCount(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,6 @@ default CatalogLog constructEditLog() {
// Return a copy of all db collection.
@SuppressWarnings({"rawtypes", "unchecked"})
public Collection<DatabaseIf> getAllDbs();

public boolean enableAutoAnalyze();
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public abstract class ExternalCatalog
implements CatalogIf<ExternalDatabase<? extends ExternalTable>>, Writable, GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(ExternalCatalog.class);

public static final String ENABLE_AUTO_ANALYZE = "enable.auto.analyze";

// Unique id of this catalog, will be assigned after catalog is loaded.
@SerializedName(value = "id")
protected long id;
Expand Down Expand Up @@ -587,6 +589,20 @@ public boolean useSelfSplitter() {

@Override
public Collection<DatabaseIf> getAllDbs() {
makeSureInitialized();
return new HashSet<>(idToDb.values());
}

@Override
public boolean enableAutoAnalyze() {
// By default, external catalog disables auto analyze, uses could set catalog property to enable it:
// "enable.auto.analyze" = true
Map<String, String> properties = catalogProperty.getProperties();
boolean ret = false;
if (properties.containsKey(ENABLE_AUTO_ANALYZE)
&& properties.get(ENABLE_AUTO_ANALYZE).equalsIgnoreCase("true")) {
ret = true;
}
return ret;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3134,4 +3134,9 @@ public void replayAutoIncrementIdUpdateLog(AutoIncrementIdUpdateLog log) throws
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(log.getTableId(), TableType.OLAP);
olapTable.getAutoIncrementGenerator().applyChange(log.getColumnId(), log.getBatchEndId());
}

@Override
public boolean enableAutoAnalyze() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.View;
import org.apache.doris.common.Config;
Expand Down Expand Up @@ -88,7 +87,9 @@ protected void runAfterCatalogReady() {
private void analyzeAll() {
Set<CatalogIf> catalogs = Env.getCurrentEnv().getCatalogMgr().getCopyOfCatalog();
for (CatalogIf ctl : catalogs) {

if (!ctl.enableAutoAnalyze()) {
continue;
}
Collection<DatabaseIf> dbs = ctl.getAllDbs();
for (DatabaseIf<TableIf> databaseIf : dbs) {
if (StatisticConstants.STATISTICS_DB_BLACK_LIST.contains(databaseIf.getFullName())) {
Expand Down Expand Up @@ -158,11 +159,11 @@ protected AnalysisInfo getReAnalyzeRequiredPart(AnalysisInfo jobInfo) {
.findTable(jobInfo.catalogName, jobInfo.dbName, jobInfo.tblName);
TableStats tblStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(table.getId());

if (!(tblStats == null || needReanalyzeTable(table, tblStats))) {
if (!(tblStats == null || table.needReAnalyzeTable(tblStats))) {
return null;
}

Set<String> needRunPartitions = findReAnalyzeNeededPartitions(table, tblStats);
Set<String> needRunPartitions = table.findReAnalyzeNeededPartitions(tblStats);

if (needRunPartitions.isEmpty()) {
return null;
Expand All @@ -171,31 +172,6 @@ protected AnalysisInfo getReAnalyzeRequiredPart(AnalysisInfo jobInfo) {
return getAnalysisJobInfo(jobInfo, table, needRunPartitions);
}

@VisibleForTesting
protected Set<String> findReAnalyzeNeededPartitions(TableIf table, TableStats tableStats) {
if (tableStats == null) {
return table.getPartitionNames().stream().map(table::getPartition)
.filter(Partition::hasData).map(Partition::getName).collect(Collectors.toSet());
}
return table.getPartitionNames().stream()
.map(table::getPartition)
.filter(Partition::hasData)
.filter(partition ->
partition.getVisibleVersionTime() >= tableStats.updatedTime).map(Partition::getName)
.collect(Collectors.toSet());
}

private boolean needReanalyzeTable(TableIf table, TableStats tblStats) {
long rowCount = table.getRowCount();
// TODO: Do we need to analyze an empty table?
if (rowCount == 0) {
return false;
}
long updateRows = tblStats.updatedRows.get();
int tblHealth = StatisticsUtil.getTableHealth(rowCount, updateRows);
return tblHealth < Config.table_stats_health_threshold;
}

@VisibleForTesting
public AnalysisInfo getAnalysisJobInfo(AnalysisInfo jobInfo, TableIf table,
Set<String> needRunPartitions) {
Expand Down Expand Up @@ -247,6 +223,9 @@ protected void createSystemAnalysisJob(AnalysisInfo jobInfo)
Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>();
AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager();
analysisManager.createTaskForEachColumns(jobInfo, analysisTaskInfos, false);
if (StatisticsUtil.isExternalTable(jobInfo.catalogName, jobInfo.dbName, jobInfo.tblName)) {
analysisManager.createTableLevelTaskForExternalTable(jobInfo, analysisTaskInfos, false);
}
Env.getCurrentEnv().getAnalysisManager().registerSysJob(jobInfo, analysisTaskInfos);
analysisTaskInfos.values().forEach(analysisTaskExecutor::submitTask);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,9 +489,10 @@ public static int getTableHealth(long totalRows, long updatedRows) {
* First get it from remote table parameters. If not found, estimate it : totalSize/estimatedRowSize
*
* @param table Hive HMSExternalTable to estimate row count.
* @param isInit Flag to indicate if this is called during init. To avoid recursively get schema.
* @return estimated row count
*/
public static long getHiveRowCount(HMSExternalTable table) {
public static long getHiveRowCount(HMSExternalTable table, boolean isInit) {
Map<String, String> parameters = table.getRemoteTable().getParameters();
if (parameters == null) {
return -1;
Expand All @@ -500,7 +501,7 @@ public static long getHiveRowCount(HMSExternalTable table) {
if (parameters.containsKey(NUM_ROWS)) {
return Long.parseLong(parameters.get(NUM_ROWS));
}
if (!parameters.containsKey(TOTAL_SIZE)) {
if (!parameters.containsKey(TOTAL_SIZE) || isInit) {
return -1;
}
// Table parameters doesn't contain row count but contain total size. Estimate row count : totalSize/rowSize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import mockit.Injectable;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -140,14 +139,25 @@ public List<Column> getBaseSchema() {
}

@Test
public void testGetReAnalyzeRequiredPart0(@Mocked TableIf tableIf) {
public void testGetReAnalyzeRequiredPart0() {

new Expectations() {
{
tableIf.getRowCount();
result = 100;
TableIf tableIf = new OlapTable();

new MockUp<OlapTable>() {
@Mock
protected Set<String> findReAnalyzeNeededPartitions(TableStats tableStats) {
Set<String> partitionNames = new HashSet<>();
partitionNames.add("p1");
partitionNames.add("p2");
return partitionNames;
}

@Mock
public long getRowCount() {
return 100;
}
};

new MockUp<StatisticsUtil>() {
@Mock
public TableIf findTable(String catalogName, String dbName, String tblName) {
Expand Down Expand Up @@ -176,14 +186,6 @@ public TableStats findTableStatsStatus(long tblId) {
};

new MockUp<StatisticsAutoAnalyzer>() {
@Mock
protected Set<String> findReAnalyzeNeededPartitions(TableIf table, TableStats tableStats) {
Set<String> partitionNames = new HashSet<>();
partitionNames.add("p1");
partitionNames.add("p2");
return partitionNames;
}

@Mock
public AnalysisInfo getAnalysisJobInfo(AnalysisInfo jobInfo, TableIf table,
Set<String> needRunPartitions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ suite("test_hive_statistic", "p2,external,hive,external_remote,external_remote_h
);
"""
logger.info("catalog " + catalog_name + " created")

// Test analyze table without init.
sql """analyze table ${catalog_name}.tpch_1000_parquet.region with sync"""

sql """switch ${catalog_name};"""
logger.info("switched to catalog " + catalog_name)
sql """use statistics;"""
Expand Down