Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public class OutFileClause {
private static final String HADOOP_FS_PROP_PREFIX = "dfs.";
private static final String HADOOP_PROP_PREFIX = "hadoop.";
private static final String BROKER_PROP_PREFIX = "broker.";
private static final String PROP_BROKER_NAME = "broker.name";
public static final String PROP_BROKER_NAME = "broker.name";
public static final String PROP_COLUMN_SEPARATOR = "column_separator";
public static final String PROP_LINE_DELIMITER = "line_delimiter";
public static final String PROP_MAX_FILE_SIZE = "max_file_size";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public SlotRef(SlotDescriptor desc) {
this.desc = desc;
this.type = desc.getType();
// TODO(zc): label is meaningful
this.label = null;
this.label = desc.getLabel();
if (this.type.equals(Type.CHAR)) {
this.type = Type.VARCHAR;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,12 @@ public PhysicalPlanTranslator(PlanTranslatorContext context, StatsErrorEstimator
*/
public PlanFragment translatePlan(PhysicalPlan physicalPlan) {
PlanFragment rootFragment = physicalPlan.accept(this, context);
List<Expr> outputExprs = Lists.newArrayList();
physicalPlan.getOutput().stream().map(Slot::getExprId)
.forEach(exprId -> outputExprs.add(context.findSlotRef(exprId)));
rootFragment.setOutputExprs(outputExprs);
if (CollectionUtils.isEmpty(rootFragment.getOutputExprs())) {
List<Expr> outputExprs = Lists.newArrayList();
physicalPlan.getOutput().stream().map(Slot::getExprId)
.forEach(exprId -> outputExprs.add(context.findSlotRef(exprId)));
rootFragment.setOutputExprs(outputExprs);
}
Collections.reverse(context.getPlanFragments());
// TODO: maybe we need to trans nullable directly? and then we could remove call computeMemLayout
context.getDescTable().computeMemLayout();
Expand Down Expand Up @@ -368,8 +370,8 @@ public PlanFragment visitPhysicalDistribute(PhysicalDistribute<? extends Plan> d
public PlanFragment visitPhysicalResultSink(PhysicalResultSink<? extends Plan> physicalResultSink,
PlanTranslatorContext context) {
PlanFragment planFragment = physicalResultSink.child().accept(this, context);
TResultSinkType resultSinkType = context.getConnectContext() != null ? context.getConnectContext()
.getResultSinkType() : null;
TResultSinkType resultSinkType = context.getConnectContext() != null
? context.getConnectContext().getResultSinkType() : null;
planFragment.setSink(new ResultSink(planFragment.getPlanRoot().getId(), resultSinkType));
return planFragment;
}
Expand Down Expand Up @@ -425,7 +427,7 @@ public PlanFragment visitPhysicalOlapTableSink(PhysicalOlapTableSink<? extends P
@Override
public PlanFragment visitPhysicalFileSink(PhysicalFileSink<? extends Plan> fileSink,
PlanTranslatorContext context) {
PlanFragment rootFragment = fileSink.child().accept(this, context);
PlanFragment sinkFragment = fileSink.child().accept(this, context);
OutFileClause outFile = new OutFileClause(
fileSink.getFilePath(),
fileSink.getFormat(),
Expand All @@ -435,7 +437,7 @@ public PlanFragment visitPhysicalFileSink(PhysicalFileSink<? extends Plan> fileS
List<Expr> outputExprs = Lists.newArrayList();
fileSink.getOutput().stream().map(Slot::getExprId)
.forEach(exprId -> outputExprs.add(context.findSlotRef(exprId)));
rootFragment.setOutputExprs(outputExprs);
sinkFragment.setOutputExprs(outputExprs);

// generate colLabels
List<String> labels = fileSink.getOutput().stream().map(NamedExpression::getName).collect(Collectors.toList());
Expand All @@ -446,11 +448,49 @@ public PlanFragment visitPhysicalFileSink(PhysicalFileSink<? extends Plan> fileS
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e.getCause());
}
ResultFileSink sink = new ResultFileSink(rootFragment.getPlanRoot().getId(), outFile,
ResultFileSink resultFileSink = new ResultFileSink(sinkFragment.getPlanRoot().getId(), outFile,
(ArrayList<String>) labels);

rootFragment.setSink(sink);
return rootFragment;
sinkFragment.setSink(resultFileSink);

// TODO: do parallel sink, we should do it in Nereids, but now we impl here temporarily
// because impl in Nereids affect too many things
if (fileSink.requestProperties(context.getConnectContext()).equals(PhysicalProperties.GATHER)) {
return sinkFragment;
} else {
// create output tuple
TupleDescriptor fileStatusDesc = ResultFileSink.constructFileStatusTupleDesc(context.getDescTable());

// create exchange node
ExchangeNode exchangeNode = new ExchangeNode(context.nextPlanNodeId(), sinkFragment.getPlanRoot());
exchangeNode.setPartitionType(TPartitionType.UNPARTITIONED);
exchangeNode.setNumInstances(1);

// create final result sink
TResultSinkType resultSinkType = context.getConnectContext() != null
? context.getConnectContext().getResultSinkType() : null;
ResultSink resultSink = new ResultSink(exchangeNode.getId(), resultSinkType);

// create top fragment
PlanFragment topFragment = new PlanFragment(context.nextFragmentId(), exchangeNode,
DataPartition.UNPARTITIONED);
topFragment.addChild(sinkFragment);
topFragment.setSink(resultSink);
context.addPlanFragment(topFragment);

// update sink fragment and result file sink
DataStreamSink streamSink = new DataStreamSink(exchangeNode.getId());
streamSink.setOutputPartition(DataPartition.UNPARTITIONED);
resultFileSink.resetByDataStreamSink(streamSink);
resultFileSink.setOutputTupleId(fileStatusDesc.getId());
sinkFragment.setDestination(exchangeNode);

// set out expr and tuple correct
exchangeNode.resetTupleIds(Lists.newArrayList(fileStatusDesc.getId()));
topFragment.resetOutputExprs(fileStatusDesc);

return topFragment;
}
}

/* ********************************************************************************************
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ public Void visitPhysicalFilter(PhysicalFilter<? extends Plan> filter, PlanConte

@Override
public Void visitPhysicalFileSink(PhysicalFileSink<? extends Plan> fileSink, PlanContext context) {
addRequestPropertyToChildren(PhysicalProperties.GATHER);
addRequestPropertyToChildren(fileSink.requestProperties(connectContext));
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.nereids.trees.plans.physical;

import org.apache.doris.analysis.OutFileClause;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
Expand All @@ -27,6 +28,7 @@
import org.apache.doris.nereids.trees.plans.algebra.Sink;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.Statistics;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -83,6 +85,21 @@ public Map<String, String> getProperties() {
return properties;
}

/**
* if enable parallel outfile and not broker export, we should request any here.
* and it will add a top fragment to summary export result in PhysicalPlanTranslator.
*/
public PhysicalProperties requestProperties(ConnectContext ctx) {
if (!ctx.getSessionVariable().enableParallelOutfile
|| ctx.getSessionVariable().getEnablePipelineEngine()
|| ctx.getSessionVariable().getEnablePipelineXEngine()
|| properties.containsKey(OutFileClause.PROP_BROKER_NAME)) {
return PhysicalProperties.GATHER;
}
// come here means we turn on parallel output export
return PhysicalProperties.ANY;
}

@Override
public Plan withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1, "PhysicalFileSink only accepts one child");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
Expand Down Expand Up @@ -385,7 +383,7 @@ private void pushDownResultFileSink(Analyzer analyzer) {
return;
}
// create result file sink desc
TupleDescriptor fileStatusDesc = constructFileStatusTupleDesc(analyzer);
TupleDescriptor fileStatusDesc = ResultFileSink.constructFileStatusTupleDesc(analyzer.getDescTbl());
resultFileSink.resetByDataStreamSink((DataStreamSink) secondPlanFragment.getSink());
resultFileSink.setOutputTupleId(fileStatusDesc.getId());
secondPlanFragment.setOutputExprs(topPlanFragment.getOutputExprs());
Expand Down Expand Up @@ -554,41 +552,6 @@ private void checkAndSetTopnOpt(PlanNode node) {
}
}

/**
* Construct a tuple for file status, the tuple schema as following:
* | FileNumber | Int |
* | TotalRows | Bigint |
* | FileSize | Bigint |
* | URL | Varchar |
*/
private TupleDescriptor constructFileStatusTupleDesc(Analyzer analyzer) {
TupleDescriptor resultFileStatusTupleDesc =
analyzer.getDescTbl().createTupleDescriptor("result_file_status");
resultFileStatusTupleDesc.setIsMaterialized(true);
SlotDescriptor fileNumber = analyzer.getDescTbl().addSlotDescriptor(resultFileStatusTupleDesc);
fileNumber.setLabel("FileNumber");
fileNumber.setType(ScalarType.createType(PrimitiveType.INT));
fileNumber.setIsMaterialized(true);
fileNumber.setIsNullable(false);
SlotDescriptor totalRows = analyzer.getDescTbl().addSlotDescriptor(resultFileStatusTupleDesc);
totalRows.setLabel("TotalRows");
totalRows.setType(ScalarType.createType(PrimitiveType.BIGINT));
totalRows.setIsMaterialized(true);
totalRows.setIsNullable(false);
SlotDescriptor fileSize = analyzer.getDescTbl().addSlotDescriptor(resultFileStatusTupleDesc);
fileSize.setLabel("FileSize");
fileSize.setType(ScalarType.createType(PrimitiveType.BIGINT));
fileSize.setIsMaterialized(true);
fileSize.setIsNullable(false);
SlotDescriptor url = analyzer.getDescTbl().addSlotDescriptor(resultFileStatusTupleDesc);
url.setLabel("URL");
url.setType(ScalarType.createType(PrimitiveType.VARCHAR));
url.setIsMaterialized(true);
url.setIsNullable(false);
resultFileStatusTupleDesc.computeStatAndMemLayout();
return resultFileStatusTupleDesc;
}

private static class QueryStatisticsTransferOptimizer {
private final PlanFragment root;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,15 @@

package org.apache.doris.planner;

import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.OutFileClause;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.util.FileFormatConstants;
import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TDataSinkType;
Expand Down Expand Up @@ -136,4 +142,43 @@ public PlanNodeId getExchNodeId() {
public DataPartition getOutputPartition() {
return outputPartition;
}

/**
* Construct a tuple for file status, the tuple schema as following:
* | FileNumber | Int |
* | TotalRows | Bigint |
* | FileSize | Bigint |
* | URL | Varchar |
*/
public static TupleDescriptor constructFileStatusTupleDesc(DescriptorTable descriptorTable) {
TupleDescriptor resultFileStatusTupleDesc =
descriptorTable.createTupleDescriptor("result_file_status");
resultFileStatusTupleDesc.setIsMaterialized(true);
SlotDescriptor fileNumber = descriptorTable.addSlotDescriptor(resultFileStatusTupleDesc);
fileNumber.setLabel("FileNumber");
fileNumber.setType(ScalarType.createType(PrimitiveType.INT));
fileNumber.setColumn(new Column("FileNumber", ScalarType.createType(PrimitiveType.INT)));
fileNumber.setIsMaterialized(true);
fileNumber.setIsNullable(false);
SlotDescriptor totalRows = descriptorTable.addSlotDescriptor(resultFileStatusTupleDesc);
totalRows.setLabel("TotalRows");
totalRows.setType(ScalarType.createType(PrimitiveType.BIGINT));
totalRows.setColumn(new Column("TotalRows", ScalarType.createType(PrimitiveType.BIGINT)));
totalRows.setIsMaterialized(true);
totalRows.setIsNullable(false);
SlotDescriptor fileSize = descriptorTable.addSlotDescriptor(resultFileStatusTupleDesc);
fileSize.setLabel("FileSize");
fileSize.setType(ScalarType.createType(PrimitiveType.BIGINT));
fileSize.setColumn(new Column("FileSize", ScalarType.createType(PrimitiveType.BIGINT)));
fileSize.setIsMaterialized(true);
fileSize.setIsNullable(false);
SlotDescriptor url = descriptorTable.addSlotDescriptor(resultFileStatusTupleDesc);
url.setLabel("URL");
url.setType(ScalarType.createType(PrimitiveType.VARCHAR));
url.setColumn(new Column("URL", ScalarType.createType(PrimitiveType.VARCHAR)));
url.setIsMaterialized(true);
url.setIsNullable(false);
resultFileStatusTupleDesc.computeStatAndMemLayout();
return resultFileStatusTupleDesc;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ public void testRewriteHavingClauseWithOrderBy() throws Exception {
+ subquery + ") order by a;";
LOG.info("EXPLAIN:{}", dorisAssert.query(query).explainQuery());
dorisAssert.query(query).explainContains("CROSS JOIN",
"order by: <slot 10> `$a$1`.`$c$1` ASC");
"order by: `$a$1`.`$c$1` ASC");
}

/**
Expand Down Expand Up @@ -376,7 +376,7 @@ public void testRewriteHavingClauseMissingAggregationColumn() throws Exception {
LOG.info("EXPLAIN:{}", dorisAssert.query(query).explainQuery());
dorisAssert.query(query).explainContains("group by: `empid`",
"CROSS JOIN",
"order by: <slot 10> `$a$1`.`$c$2` ASC",
"order by: `$a$1`.`$c$2` ASC",
"OUTPUT EXPRS:\n <slot 11> `$a$1`.`$c$1`");
}

Expand Down Expand Up @@ -490,8 +490,8 @@ public void testRewriteHavingClauseWithAlias() throws Exception {
LOG.info("EXPLAIN:{}", dorisAssert.query(query).explainQuery());
dorisAssert.query(query).explainContains("group by: `empid`",
"CROSS JOIN",
"order by: <slot 10> `$a$1`.`$c$2` ASC",
"OUTPUT EXPRS:\n <slot 11> `$a$1`.`$c$1`\n <slot 10> `$a$1`.`$c$2`");
"order by: `$a$1`.`$c$2` ASC",
"OUTPUT EXPRS:\n <slot 11> `$a$1`.`$c$1`\n `$a$1`.`$c$2`");
}

/**
Expand Down Expand Up @@ -603,8 +603,8 @@ public void testRewriteHavingClausewWithLimit() throws Exception {
LOG.info("EXPLAIN:{}", dorisAssert.query(query).explainQuery());
dorisAssert.query(query).explainContains("group by: `empid`",
"CROSS JOIN",
"order by: <slot 10> `$a$1`.`$c$2` ASC",
"OUTPUT EXPRS:\n <slot 11> `$a$1`.`$c$1`\n <slot 10> `$a$1`.`$c$2`");
"order by: `$a$1`.`$c$2` ASC",
"OUTPUT EXPRS:\n <slot 11> `$a$1`.`$c$1`\n `$a$1`.`$c$2`");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ public void aggInlineView() throws Exception {
String sql = "desc verbose select /*+ SET_VAR(enable_nereids_planner=false) */ e1 from (select k2 as c1 from db1.tbl1 group by c1) a lateral view explode_split(c1, \",\") tmp1 as e1 ";
String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(ctx, sql, true);
Assert.assertTrue(UtFrameUtils.checkPlanResultContainsNode(explainString, 2, "TABLE FUNCTION NODE"));
Assert.assertTrue(explainString.contains("table function: explode_split( `k2`, ',')"));
Assert.assertTrue(explainString.contains("table function: explode_split(`k2`, ',')"));
Assert.assertTrue(explainString.contains("lateral view tuple id: 3"));
Assert.assertTrue(explainString.contains("output slot id: 3"));
Assert.assertTrue(explainString.contains("tuple ids: 1 3"));
Expand All @@ -397,7 +397,7 @@ public void aggColumnInlineViewInTB() throws Exception {
+ "lateral view explode_split(c2, \",\") tmp1 as e1";
String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(ctx, sql, true);
Assert.assertTrue(UtFrameUtils.checkPlanResultContainsNode(explainString, 2, "TABLE FUNCTION NODE"));
Assert.assertTrue(explainString.contains("table function: explode_split(<slot 3> min(`k2`), ',')"));
Assert.assertTrue(explainString.contains("table function: explode_split(min(`k2`), ',')"));
Assert.assertTrue(explainString.contains("lateral view tuple id: 3"));
Assert.assertTrue(explainString.contains("output slot id: 2 6"));
Assert.assertTrue(explainString.contains("tuple ids: 1 3"));
Expand Down Expand Up @@ -480,7 +480,7 @@ public void aggColumnInOuterQuery() throws Exception {
+ "lateral view explode_split(c2, \",\") tmp1 as e1) tmp2";
String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(ctx, sql, true);
Assert.assertTrue(UtFrameUtils.checkPlanResultContainsNode(explainString, 2, "TABLE FUNCTION NODE"));
Assert.assertTrue(explainString.contains("table function: explode_split(<slot 3> min(`k2`), ',')"));
Assert.assertTrue(explainString.contains("table function: explode_split(min(`k2`), ',')"));
Assert.assertTrue(explainString.contains("lateral view tuple id: 3"));
Assert.assertTrue(explainString.contains("output slot id: 2"));
Assert.assertTrue(explainString.contains("tuple ids: 1 3"));
Expand Down
Loading