Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1442,6 +1442,18 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static int cbo_default_sample_percentage = 10;

/*
* if true, will allow the system to collect statistics automatically
*/
@ConfField(mutable = true, masterOnly = true)
public static boolean enable_auto_collect_statistics = true;

/*
* the system automatically checks the time interval for statistics
*/
@ConfField(mutable = true, masterOnly = true)
public static int auto_check_statistics_in_sec = 300;

/**
* If this configuration is enabled, you should also specify the trace_export_url.
*/
Expand Down
11 changes: 11 additions & 0 deletions fe/fe-core/src/main/cup/sql_parser.cup
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,7 @@ terminal String
KW_PASSWORD_REUSE,
KW_PATH,
KW_PAUSE,
KW_PERIOD,
KW_PIPE,
KW_PLUGIN,
KW_PLUGINS,
Expand Down Expand Up @@ -5802,6 +5803,12 @@ with_analysis_properties ::=
put("num.buckets", String.valueOf(numBuckets.intValue()));
}};
:}
| KW_PERIOD INTEGER_LITERAL:periodInSec
{:
RESULT = new HashMap<String, String>() {{
put("period.seconds", String.valueOf(periodInSec.intValue()));
}};
:}
;

opt_with_analysis_properties ::=
Expand Down Expand Up @@ -7487,6 +7494,10 @@ keyword ::=
{: RESULT = id; :}
| KW_SAMPLE:id
{: RESULT = id; :}
| KW_INCREMENTAL:id
{: RESULT = id; :}
| KW_PERIOD:id
{: RESULT = id; :}
;

// Identifier that contain keyword
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisMethod;
import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisMode;
import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisType;
import org.apache.doris.statistics.AnalysisTaskInfo.ScheduleType;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
Expand All @@ -45,6 +47,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -85,6 +88,7 @@ public class AnalyzeStmt extends DdlStmt {
public static final String PROPERTY_SAMPLE_ROWS = "sample.rows";
public static final String PROPERTY_NUM_BUCKETS = "num.buckets";
public static final String PROPERTY_ANALYSIS_TYPE = "analysis.type";
public static final String PROPERTY_PERIOD_SECONDS = "period.seconds";

private static final ImmutableSet<String> PROPERTIES_SET = new ImmutableSet.Builder<String>()
.add(PROPERTY_SYNC)
Expand All @@ -93,6 +97,7 @@ public class AnalyzeStmt extends DdlStmt {
.add(PROPERTY_SAMPLE_ROWS)
.add(PROPERTY_NUM_BUCKETS)
.add(PROPERTY_ANALYSIS_TYPE)
.add(PROPERTY_PERIOD_SECONDS)
.build();

private final TableName tableName;
Expand Down Expand Up @@ -232,6 +237,11 @@ private void checkProperties() throws UserException {
1, Integer.MAX_VALUE, true, "needs at least 1 buckets");
}

if (properties.containsKey(PROPERTY_PERIOD_SECONDS)) {
checkNumericProperty(PROPERTY_PERIOD_SECONDS, properties.get(PROPERTY_PERIOD_SECONDS),
1, Integer.MAX_VALUE, true, "needs at least 1 seconds");
}

if (properties.containsKey(PROPERTY_ANALYSIS_TYPE)) {
try {
AnalysisType.valueOf(properties.get(PROPERTY_ANALYSIS_TYPE));
Expand Down Expand Up @@ -328,15 +338,30 @@ public int getNumBuckets() {
return Integer.parseInt(properties.get(PROPERTY_NUM_BUCKETS));
}

public long getPeriodTimeInMs() {
if (!properties.containsKey(PROPERTY_PERIOD_SECONDS)) {
return 0;
}
int minutes = Integer.parseInt(properties.get(PROPERTY_PERIOD_SECONDS));
return TimeUnit.SECONDS.toMillis(minutes);
}

public AnalysisMode getAnalysisMode() {
return isIncremental() ? AnalysisMode.INCREMENTAL : AnalysisMode.FULL;
}

public AnalysisType getAnalysisType() {
return AnalysisType.valueOf(properties.get(PROPERTY_ANALYSIS_TYPE));
}

public AnalysisMethod getAnalysisMethod() {
if (getSamplePercent() > 0 || getSampleRows() > 0) {
return AnalysisMethod.SAMPLE;
}
return AnalysisMethod.FULL;
double samplePercent = getSamplePercent();
int sampleRows = getSampleRows();
return (samplePercent > 0 || sampleRows > 0) ? AnalysisMethod.SAMPLE : AnalysisMethod.FULL;
}

public ScheduleType getScheduleType() {
return getPeriodTimeInMs() > 0 ? ScheduleType.PERIOD : ScheduleType.ONCE;
}

@Override
Expand Down
11 changes: 11 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.statistics.AnalysisManager;
import org.apache.doris.statistics.AnalysisTaskScheduler;
import org.apache.doris.statistics.StatisticsAutoAnalyzer;
import org.apache.doris.statistics.StatisticsCache;
import org.apache.doris.statistics.StatisticsCleaner;
import org.apache.doris.system.Backend;
Expand Down Expand Up @@ -452,6 +453,8 @@ public class Env {

private StatisticsCleaner statisticsCleaner;

private StatisticsAutoAnalyzer statisticsAutoAnalyzer;

public List<Frontend> getFrontends(FrontendNodeType nodeType) {
if (nodeType == null) {
// get all
Expand Down Expand Up @@ -653,6 +656,7 @@ private Env(boolean isCheckpointCatalog) {
if (Config.enable_stats && !isCheckpointCatalog) {
this.analysisManager = new AnalysisManager();
this.statisticsCleaner = new StatisticsCleaner();
this.statisticsAutoAnalyzer = new StatisticsAutoAnalyzer();
}
this.globalFunctionMgr = new GlobalFunctionMgr();
this.resourceGroupMgr = new ResourceGroupMgr();
Expand Down Expand Up @@ -880,6 +884,9 @@ public void initialize(String[] args) throws Exception {
if (statisticsCleaner != null) {
statisticsCleaner.start();
}
if (statisticsAutoAnalyzer != null) {
statisticsAutoAnalyzer.start();
}
}

// wait until FE is ready.
Expand Down Expand Up @@ -5378,4 +5385,8 @@ public GlobalFunctionMgr getGlobalFunctionMgr() {
public StatisticsCleaner getStatisticsCleaner() {
return statisticsCleaner;
}

public StatisticsAutoAnalyzer getStatisticsAutoAnalyzer() {
return statisticsAutoAnalyzer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.DistributionDesc;
import org.apache.doris.analysis.DropTableStmt;
import org.apache.doris.analysis.HashDistributionDesc;
import org.apache.doris.analysis.KeysDesc;
import org.apache.doris.analysis.TableName;
Expand All @@ -41,10 +42,12 @@
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

public class InternalSchemaInitializer extends Thread {

Expand All @@ -55,6 +58,12 @@ public class InternalSchemaInitializer extends Thread {
*/
public static final int TABLE_CREATION_RETRY_INTERVAL_IN_SECONDS = 5;

/**
* Used when an internal table schema changes.
* TODO remove this code after the table structure is stable
*/
private boolean isSchemaChanged = false;

public void run() {
if (FeConstants.disableInternalSchemaDb) {
return;
Expand Down Expand Up @@ -183,12 +192,26 @@ public CreateTableStmt buildAnalysisJobTblStmt() throws UserException {
columnDefs.add(new ColumnDef("tbl_name", TypeDef.createVarchar(1024)));
columnDefs.add(new ColumnDef("col_name", TypeDef.createVarchar(1024)));
columnDefs.add(new ColumnDef("index_id", TypeDef.create(PrimitiveType.BIGINT)));
columnDefs.add(new ColumnDef("col_partitions", TypeDef.createVarchar(ScalarType.MAX_VARCHAR_LENGTH)));
columnDefs.add(new ColumnDef("job_type", TypeDef.createVarchar(32)));
columnDefs.add(new ColumnDef("analysis_type", TypeDef.createVarchar(32)));
columnDefs.add(new ColumnDef("message", TypeDef.createVarchar(1024)));
columnDefs.add(new ColumnDef("last_exec_time_in_ms", TypeDef.create(PrimitiveType.BIGINT)));
columnDefs.add(new ColumnDef("state", TypeDef.createVarchar(32)));
columnDefs.add(new ColumnDef("analysis_mode", TypeDef.createVarchar(32)));
columnDefs.add(new ColumnDef("analysis_method", TypeDef.createVarchar(32)));
columnDefs.add(new ColumnDef("schedule_type", TypeDef.createVarchar(32)));
columnDefs.add(new ColumnDef("state", TypeDef.createVarchar(32)));
columnDefs.add(new ColumnDef("sample_percent", TypeDef.create(PrimitiveType.BIGINT)));
columnDefs.add(new ColumnDef("sample_rows", TypeDef.create(PrimitiveType.BIGINT)));
columnDefs.add(new ColumnDef("max_bucket_num", TypeDef.create(PrimitiveType.BIGINT)));
columnDefs.add(new ColumnDef("period_time_in_ms", TypeDef.create(PrimitiveType.BIGINT)));
columnDefs.add(new ColumnDef("last_exec_time_in_ms", TypeDef.create(PrimitiveType.BIGINT)));
columnDefs.add(new ColumnDef("message", TypeDef.createVarchar(1024)));
// TODO remove this code after the table structure is stable
if (!isSchemaChanged && isTableChanged(tableName, columnDefs)) {
isSchemaChanged = true;
DropTableStmt dropTableStmt = new DropTableStmt(true, tableName, true);
StatisticsUtil.analyze(dropTableStmt);
Env.getCurrentEnv().getInternalCatalog().dropTable(dropTableStmt);
}
String engineName = "olap";
ArrayList<String> uniqueKeys = Lists.newArrayList("job_id", "task_id",
"catalog_name", "db_name", "tbl_name", "col_name", "index_id");
Expand Down Expand Up @@ -218,9 +241,51 @@ private boolean created() {
return false;
}
Database db = optionalDatabase.get();
return db.getTable(StatisticConstants.STATISTIC_TBL_NAME).isPresent()
// TODO remove this code after the table structure is stable
try {
buildAnalysisJobTblStmt();
} catch (UserException ignored) {
// CHECKSTYLE IGNORE THIS LINE
}
return !isSchemaChanged
&& db.getTable(StatisticConstants.STATISTIC_TBL_NAME).isPresent()
&& db.getTable(StatisticConstants.HISTOGRAM_TBL_NAME).isPresent()
&& db.getTable(StatisticConstants.ANALYSIS_JOB_TABLE).isPresent();
}

/**
* Compare whether the current internal table schema meets expectations,
* delete and rebuild if it does not meet the table schema.
* TODO remove this code after the table structure is stable
*/
private boolean isTableChanged(TableName tableName, List<ColumnDef> columnDefs) {
try {
String catalogName = Env.getCurrentEnv().getInternalCatalog().getName();
String dbName = SystemInfoService.DEFAULT_CLUSTER + ":" + tableName.getDb();
TableIf table = StatisticsUtil.findTable(catalogName, dbName, tableName.getTbl());
List<Column> existColumns = table.getBaseSchema(false);
existColumns.sort(Comparator.comparing(Column::getName));
List<Column> columns = columnDefs.stream()
.map(ColumnDef::toColumn)
.sorted(Comparator.comparing(Column::getName))
.collect(Collectors.toList());
if (columns.size() != existColumns.size()) {
return true;
}
for (int i = 0; i < columns.size(); i++) {
Column c1 = columns.get(i);
Column c2 = existColumns.get(i);
if (!c1.getName().equals(c2.getName())
|| c1.getDataType() != c2.getDataType()) {
return true;
}
}
return false;
} catch (Throwable t) {
LOG.warn("Failed to check table schema", t);
return false;
}
}

}

13 changes: 13 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,8 @@ public class SessionVariable implements Serializable, Writable {

public static final String ENABLE_CBO_STATISTICS = "enable_cbo_statistics";

public static final String ENABLE_SAVE_STATISTICS_SYNC_JOB = "enable_save_statistics_sync_job";

public static final String ENABLE_ELIMINATE_SORT_NODE = "enable_eliminate_sort_node";

public static final String NEREIDS_TRACE_EVENT_MODE = "nereids_trace_event_mode";
Expand Down Expand Up @@ -731,6 +733,13 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = ENABLE_CBO_STATISTICS)
public boolean enableCboStatistics = false;

/**
* If true, when synchronously collecting statistics, the information of
* the statistics job will be saved, currently mainly used for p0 test
*/
@VariableMgr.VarAttr(name = ENABLE_SAVE_STATISTICS_SYNC_JOB)
public boolean enableSaveStatisticsSyncJob = false;

@VariableMgr.VarAttr(name = ENABLE_ELIMINATE_SORT_NODE)
public boolean enableEliminateSortNode = true;

Expand Down Expand Up @@ -1411,6 +1420,10 @@ public boolean getEnableCboStatistics() {
return enableCboStatistics;
}

public boolean isEnableSaveStatisticsSyncJob() {
return enableSaveStatisticsSyncJob;
}

public long getFileSplitSize() {
return fileSplitSize;
}
Expand Down
Loading