Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,10 @@ public boolean forceFull() {
return properties.containsKey(PROPERTY_FORCE_FULL);
}

public boolean isSampleRows() {
return properties.containsKey(PROPERTY_SAMPLE_ROWS);
}

public String toSQL() {
StringBuilder sb = new StringBuilder();
sb.append("PROPERTIES(");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,9 @@ public void check() throws AnalysisException {
analyzeProperties.check();

// TODO support external table
if (analyzeProperties.isSample()) {
if (!(table instanceof OlapTable)) {
throw new AnalysisException("Sampling statistics "
+ "collection of external tables is not supported");
}
if (analyzeProperties.isSampleRows() && !(table instanceof OlapTable)) {
throw new AnalysisException("Sampling statistics "
+ "collection of external tables is not supported with rows, use percent instead.");
}
if (analyzeProperties.isSync()
&& (analyzeProperties.isAutomatic() || analyzeProperties.getPeriodTimeInMs() != 0)) {
Expand Down
11 changes: 9 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
Original file line number Diff line number Diff line change
Expand Up @@ -470,9 +470,16 @@ protected void analyzeSortHints() throws AnalysisException {
}

protected void analyzeSample() throws AnalysisException {
if ((sampleTabletIds != null || tableSample != null) && desc.getTable().getType() != TableIf.TableType.OLAP) {
if ((sampleTabletIds != null || tableSample != null)
&& desc.getTable().getType() != TableIf.TableType.OLAP
&& desc.getTable().getType() != TableIf.TableType.HMS_EXTERNAL_TABLE) {
throw new AnalysisException("Sample table " + desc.getTable().getName()
+ " type " + desc.getTable().getType() + " is not OLAP");
+ " type " + desc.getTable().getType() + " is not supported");
}
if (tableSample != null && TableIf.TableType.HMS_EXTERNAL_TABLE.equals(desc.getTable().getType())) {
if (!tableSample.isPercent()) {
throw new AnalysisException("HMS table doesn't support sample rows, use percent instead.");
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1071,6 +1071,9 @@ public static class HiveFileStatus {
long length;
long blockSize;
long modificationTime;
boolean splittable;
List<String> partitionValues;
AcidInfo acidInfo;
}

@Data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2024,6 +2024,7 @@ private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef, SelectStmt s
break;
case HIVE:
scanNode = new HiveScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true);
((HiveScanNode) scanNode).setTableSample(tblRef.getTableSample());
break;
default:
throw new UserException("Not supported table type: " + ((HMSExternalTable) table).getDlaType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.TableSample;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
Expand Down Expand Up @@ -92,6 +93,8 @@ public abstract class FileQueryScanNode extends FileScanNode {
protected Map<String, SlotDescriptor> destSlotDescByName;
protected TFileScanRangeParams params;

protected TableSample tableSample;

/**
* External file scan node for Query hms table
* needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv
Expand Down Expand Up @@ -200,6 +203,10 @@ private void updateRequiredSlots() throws UserException {
setColumnPositionMapping();
}

public void setTableSample(TableSample tSample) {
this.tableSample = tSample;
}

@Override
public void finalize(Analyzer analyzer) throws UserException {
doFinalize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -218,6 +219,11 @@ private void getFileSplitByPartitions(HiveMetaStoreCache cache, List<HivePartiti
if (ConnectContext.get().getExecutor() != null) {
ConnectContext.get().getExecutor().getSummaryProfile().setGetPartitionFilesFinishTime();
}
if (tableSample != null) {
List<HiveMetaStoreCache.HiveFileStatus> hiveFileStatuses = selectFiles(fileCaches);
splitAllFiles(allFiles, hiveFileStatuses);
return;
}
for (HiveMetaStoreCache.FileCacheValue fileCacheValue : fileCaches) {
// This if branch is to support old splitter, will remove later.
if (fileCacheValue.getSplits() != null) {
Expand All @@ -235,6 +241,42 @@ private void getFileSplitByPartitions(HiveMetaStoreCache cache, List<HivePartiti
}
}

private void splitAllFiles(List<Split> allFiles,
List<HiveMetaStoreCache.HiveFileStatus> hiveFileStatuses) throws IOException {
for (HiveMetaStoreCache.HiveFileStatus status : hiveFileStatuses) {
allFiles.addAll(splitFile(status.getPath(), status.getBlockSize(),
status.getBlockLocations(), status.getLength(), status.getModificationTime(),
status.isSplittable(), status.getPartitionValues(),
new HiveSplitCreator(status.getAcidInfo())));
}
}

private List<HiveMetaStoreCache.HiveFileStatus> selectFiles(List<FileCacheValue> inputCacheValue) {
List<HiveMetaStoreCache.HiveFileStatus> fileList = Lists.newArrayList();
long totalSize = 0;
for (FileCacheValue value : inputCacheValue) {
for (HiveMetaStoreCache.HiveFileStatus file : value.getFiles()) {
file.setSplittable(value.isSplittable());
file.setPartitionValues(value.getPartitionValues());
file.setAcidInfo(value.getAcidInfo());
fileList.add(file);
totalSize += file.getLength();
}
}
long sampleSize = totalSize * tableSample.getSampleValue() / 100;
long selectedSize = 0;
Collections.shuffle(fileList);
int index = 0;
for (HiveMetaStoreCache.HiveFileStatus file : fileList) {
selectedSize += file.getLength();
index += 1;
if (selectedSize >= sampleSize) {
break;
}
}
return fileList.subList(0, index);
}

private List<FileCacheValue> getFileSplitByTransaction(HiveMetaStoreCache cache, List<HivePartition> partitions) {
for (HivePartition partition : partitions) {
if (partition.getPartitionValues() == null || partition.getPartitionValues().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.doris.statistics;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.qe.AutoCloseConnectContext;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.StmtExecutor;
Expand All @@ -31,9 +31,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
Expand Down Expand Up @@ -61,14 +58,14 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
+ "${idxId} AS idx_id, "
+ "'${colId}' AS col_id, "
+ "NULL AS part_id, "
+ "COUNT(1) AS row_count, "
+ "${countExpr} AS row_count, "
+ "NDV(`${colName}`) AS ndv, "
+ "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS null_count, "
+ "${nullCountExpr} AS null_count, "
+ "MIN(`${colName}`) AS min, "
+ "MAX(`${colName}`) AS max, "
+ "${dataSizeFunction} AS data_size, "
+ "NOW() "
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}`";
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${sampleExpr}";

private static final String ANALYZE_PARTITION_TEMPLATE = " SELECT "
+ "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}', '-', ${partId}) AS id, "
Expand All @@ -86,8 +83,8 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
+ "${dataSizeFunction} AS data_size, "
+ "NOW() FROM `${catalogName}`.`${dbName}`.`${tblName}` where ";

private static final String ANALYZE_TABLE_COUNT_TEMPLATE = "SELECT COUNT(1) as rowCount "
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}`";
private static final String ANALYZE_TABLE_COUNT_TEMPLATE = "SELECT ${countExpr} as rowCount "
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${sampleExpr}";

// cache stats for each partition, it would be inserted into column_statistics in a batch.
private final List<List<ColStatsData>> buf = new ArrayList<>();
Expand Down Expand Up @@ -163,6 +160,7 @@ private void getTableColumnStats() throws Exception {
params.put("colName", col.getName());
params.put("colId", info.colName);
params.put("dataSizeFunction", getDataSizeFunction(col));
params.put("nullCountExpr", getNullCountExpression());
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(sb.toString());
executeInsertSql(sql);
Expand Down Expand Up @@ -279,27 +277,39 @@ private Map<String, String> buildTableStatsParams(String partId) {
commonParams.put("catalogName", catalog.getName());
commonParams.put("dbName", db.getFullName());
commonParams.put("tblName", tbl.getName());
commonParams.put("sampleExpr", getSampleExpression());
commonParams.put("countExpr", getCountExpression());
if (col != null) {
commonParams.put("type", col.getType().toString());
}
commonParams.put("lastAnalyzeTimeInMs", String.valueOf(System.currentTimeMillis()));
return commonParams;
}

private void setParameterData(Map<String, String> parameters, Map<String, String> params) {
String numRows = "";
String timestamp = "";
if (parameters.containsKey(NUM_ROWS)) {
numRows = parameters.get(NUM_ROWS);
protected String getCountExpression() {
if (info.samplePercent > 0) {
return String.format("ROUND(COUNT(1) * 100 / %d)", info.samplePercent);
} else {
return "COUNT(1)";
}
}

protected String getNullCountExpression() {
if (info.samplePercent > 0) {
return String.format("ROUND(SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) * 100 / %d)",
info.samplePercent);
} else {
return "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END)";
}
if (parameters.containsKey(TIMESTAMP)) {
timestamp = parameters.get(TIMESTAMP);
}

protected String getDataSizeFunction(Column column) {
String originFunction = super.getDataSizeFunction(column);
if (info.samplePercent > 0 && !isPartitionOnly) {
return String.format("ROUND((%s) * 100 / %d)", originFunction, info.samplePercent);
} else {
return originFunction;
}
params.put("numRows", numRows);
params.put("rowCount", numRows);
params.put("update_time", TimeUtils.DATETIME_FORMAT.format(
LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong(timestamp) * 1000),
ZoneId.systemDefault())));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_hive_sample_statistic", "p2,external,hive,external_remote,external_remote_hive") {
String enabled = context.config.otherConfigs.get("enableExternalHiveTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
String extHiveHmsHost = context.config.otherConfigs.get("extHiveHmsHost")
String extHiveHmsPort = context.config.otherConfigs.get("extHiveHmsPort")
String catalog_name = "test_hive_sample_statistic"
sql """drop catalog if exists ${catalog_name};"""
sql """
create catalog if not exists ${catalog_name} properties (
'type'='hms',
'hadoop.username' = 'hadoop',
'hive.metastore.uris' = 'thrift://${extHiveHmsHost}:${extHiveHmsPort}'
);
"""
logger.info("catalog " + catalog_name + " created")

sql """use ${catalog_name}.tpch_1000_parquet"""
sql """analyze table part with sample percent 10 with sync;"""

def result = sql """show table stats part"""
assertTrue(result.size() == 1)
assertTrue(Long.parseLong(result[0][2]) >= 200000000)
assertTrue(Long.parseLong(result[0][2]) < 220000000)

def ctlId
result = sql """show proc '/catalogs'"""

for (int i = 0; i < result.size(); i++) {
if (result[i][1] == catalog_name) {
ctlId = result[i][0]
}
}

result = sql """select count from internal.__internal_schema.column_statistics where catalog_id='$ctlId' and col_id='p_partkey'"""
assertTrue(result.size() == 1)
assertTrue(result[0][0] >= 200000000)
assertTrue(result[0][0] < 220000000)

result = sql """select count from internal.__internal_schema.column_statistics where catalog_id='$ctlId' and col_id='p_name'"""
assertTrue(result.size() == 1)
assertTrue(result[0][0] >= 200000000)
assertTrue(result[0][0] < 220000000)

result = sql """select count from internal.__internal_schema.column_statistics where catalog_id='$ctlId' and col_id='p_mfgr'"""
assertTrue(result.size() == 1)
assertTrue(result[0][0] >= 200000000)
assertTrue(result[0][0] < 220000000)

result = sql """select count from internal.__internal_schema.column_statistics where catalog_id='$ctlId' and col_id='p_brand'"""
assertTrue(result.size() == 1)
assertTrue(result[0][0] >= 200000000)
assertTrue(result[0][0] < 220000000)

result = sql """select count from internal.__internal_schema.column_statistics where catalog_id='$ctlId' and col_id='p_type'"""
assertTrue(result.size() == 1)
assertTrue(result[0][0] >= 200000000)
assertTrue(result[0][0] < 220000000)

result = sql """select count from internal.__internal_schema.column_statistics where catalog_id='$ctlId' and col_id='p_size'"""
assertTrue(result.size() == 1)
assertTrue(result[0][0] >= 200000000)
assertTrue(result[0][0] < 220000000)

result = sql """select count from internal.__internal_schema.column_statistics where catalog_id='$ctlId' and col_id='p_container'"""
assertTrue(result.size() == 1)
assertTrue(result[0][0] >= 200000000)
assertTrue(result[0][0] < 220000000)

result = sql """select count from internal.__internal_schema.column_statistics where catalog_id='$ctlId' and col_id='p_retailprice'"""
assertTrue(result.size() == 1)
assertTrue(result[0][0] >= 200000000)
assertTrue(result[0][0] < 220000000)

result = sql """select count from internal.__internal_schema.column_statistics where catalog_id='$ctlId' and col_id='p_comment'"""
assertTrue(result.size() == 1)
assertTrue(result[0][0] >= 200000000)
assertTrue(result[0][0] < 220000000)

sql """drop catalog ${catalog_name}""";
}
}