From e2ede7ad06a6aef3dc9e28f149ab06a1c1e6bf76 Mon Sep 17 00:00:00 2001 From: morningman Date: Tue, 20 Aug 2019 11:38:04 +0800 Subject: [PATCH 1/2] Reduce the number of partition info in BrokerScanNode param And we should reduce the number of partition info in BrokerScanNode param if user already set target partitions to load, instead of adding all partitions' info. It will cause the size of RPC packet too large. --- be/src/exec/broker_scan_node.cpp | 8 ++++---- .../java/org/apache/doris/planner/BrokerScanNode.java | 7 ++++--- .../main/java/org/apache/doris/planner/DataSplitSink.java | 4 ++-- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp index e1623dae35350a..f18ee8f697ca20 100644 --- a/be/src/exec/broker_scan_node.cpp +++ b/be/src/exec/broker_scan_node.cpp @@ -342,10 +342,10 @@ Status BrokerScanNode::scanner_scan( continue; } - // The reason we check if partition_expr_ctxs is empty is when loading data to - // a unpartitioned table who has no partition_expr_ctxs, user can specify - // a partition name. And we check here to avoid crash and make our - // process run as normal + // user may not specify the target partitions, or the table is unpartitioned. + // so here we only check if target partition is set. + // the OlapTableSink will check partition again, so don't worry about passing + // invalid rows. if (scan_range.params.__isset.partition_ids && !partition_expr_ctxs.empty()) { int64_t partition_id = get_partition_id(partition_expr_ctxs, row); if (partition_id == -1 || diff --git a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java index d3a7c45d5be605..b79255281d175a 100644 --- a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -68,6 +68,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -205,7 +206,7 @@ public void setLoadInfo(long loadJobId, this.strictMode = strictMode; } - private void createPartitionInfos() throws AnalysisException { + private void createPartitionInfos(List targetPartitionIds) throws AnalysisException { if (partitionInfos != null) { return; } @@ -218,7 +219,7 @@ private void createPartitionInfos() throws AnalysisException { partitionExprs = Lists.newArrayList(); partitionInfos = DataSplitSink.EtlRangePartitionInfo.createParts( - (OlapTable) targetTable, exprByName, null, partitionExprs); + (OlapTable) targetTable, exprByName, Sets.newHashSet(targetPartitionIds), partitionExprs); } // Called from init, construct source tuple information @@ -235,7 +236,7 @@ private void initParams(ParamCreateContext context) throws AnalysisException, Us List partitionIds = fileGroup.getPartitionIds(); if (partitionIds != null && partitionIds.size() > 0) { params.setPartition_ids(partitionIds); - createPartitionInfos(); + createPartitionInfos(partitionIds); } params.setProperties(brokerDesc.getProperties()); diff --git a/fe/src/main/java/org/apache/doris/planner/DataSplitSink.java b/fe/src/main/java/org/apache/doris/planner/DataSplitSink.java index 8e8f25ba8f0def..6bdfb2e81d648f 100644 --- a/fe/src/main/java/org/apache/doris/planner/DataSplitSink.java +++ b/fe/src/main/java/org/apache/doris/planner/DataSplitSink.java @@ -362,7 +362,7 @@ private static EtlPartitionRange createPartitionRange( } public static List createParts( - OlapTable tbl, Map exprByCol, Set targetPartitions, + OlapTable tbl, Map exprByCol, Set targetPartitionIds, List partitionExprs) throws AnalysisException { List parts = Lists.newArrayList(); PartitionInfo partInfo = tbl.getPartitionInfo(); @@ -372,7 +372,7 @@ public static List createParts( partitionExprs.add(exprByCol.get(col.getName())); } for (Partition part : tbl.getPartitions()) { - if (targetPartitions != null && !targetPartitions.contains(part.getName())) { + if (targetPartitionIds != null && !targetPartitionIds.contains(part.getId())) { continue; } DistributionInfo distInfo = part.getDistributionInfo(); From 27cdd47682cc6a567df997cd2f026743816ca473 Mon Sep 17 00:00:00 2001 From: morningman Date: Tue, 20 Aug 2019 17:19:26 +0800 Subject: [PATCH 2/2] remove unused partition info --- be/src/exec/broker_scan_node.cpp | 20 ------------ .../apache/doris/planner/BrokerScanNode.java | 32 ------------------- 2 files changed, 52 deletions(-) diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp index f18ee8f697ca20..5e9fccf4ddea88 100644 --- a/be/src/exec/broker_scan_node.cpp +++ b/be/src/exec/broker_scan_node.cpp @@ -342,26 +342,6 @@ Status BrokerScanNode::scanner_scan( continue; } - // user may not specify the target partitions, or the table is unpartitioned. - // so here we only check if target partition is set. - // the OlapTableSink will check partition again, so don't worry about passing - // invalid rows. - if (scan_range.params.__isset.partition_ids && !partition_expr_ctxs.empty()) { - int64_t partition_id = get_partition_id(partition_expr_ctxs, row); - if (partition_id == -1 || - !std::binary_search(scan_range.params.partition_ids.begin(), - scan_range.params.partition_ids.end(), - partition_id)) { - counter->num_rows_filtered++; - - std::stringstream error_msg; - error_msg << "No corresponding partition, partition id: " << partition_id; - _runtime_state->append_error_msg_to_file(Tuple::to_string(tuple, *_tuple_desc), - error_msg.str()); - continue; - } - } - // eval conjuncts of this row. if (eval_conjuncts(&conjunct_ctxs[0], conjunct_ctxs.size(), row)) { row_batch->commit_last_row(); diff --git a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java index b79255281d175a..b1742753a87b28 100644 --- a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -38,7 +38,6 @@ import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.FsBroker; -import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Table; @@ -68,7 +67,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -125,8 +123,6 @@ public int compare(TBrokerFileStatus o1, TBrokerFileStatus o2) { private int nextBe = 0; private Analyzer analyzer; - - private List partitionInfos; private List partitionExprs; private static class ParamCreateContext { @@ -206,22 +202,6 @@ public void setLoadInfo(long loadJobId, this.strictMode = strictMode; } - private void createPartitionInfos(List targetPartitionIds) throws AnalysisException { - if (partitionInfos != null) { - return; - } - - Map exprByName = Maps.newHashMap(); - - for (SlotDescriptor slotDesc : desc.getSlots()) { - exprByName.put(slotDesc.getColumn().getName(), new SlotRef(slotDesc)); - } - - partitionExprs = Lists.newArrayList(); - partitionInfos = DataSplitSink.EtlRangePartitionInfo.createParts( - (OlapTable) targetTable, exprByName, Sets.newHashSet(targetPartitionIds), partitionExprs); - } - // Called from init, construct source tuple information private void initParams(ParamCreateContext context) throws AnalysisException, UserException { TBrokerScanRangeParams params = new TBrokerScanRangeParams(); @@ -231,14 +211,6 @@ private void initParams(ParamCreateContext context) throws AnalysisException, Us params.setColumn_separator(fileGroup.getValueSeparator().getBytes(Charset.forName("UTF-8"))[0]); params.setLine_delimiter(fileGroup.getLineDelimiter().getBytes(Charset.forName("UTF-8"))[0]); params.setStrict_mode(strictMode); - - // Parse partition information - List partitionIds = fileGroup.getPartitionIds(); - if (partitionIds != null && partitionIds.size() > 0) { - params.setPartition_ids(partitionIds); - createPartitionInfos(partitionIds); - } - params.setProperties(brokerDesc.getProperties()); initColumns(context); } @@ -743,10 +715,6 @@ public void finalize(Analyzer analyzer) throws UserException { protected void toThrift(TPlanNode msg) { msg.node_type = TPlanNodeType.BROKER_SCAN_NODE; TBrokerScanNode brokerScanNode = new TBrokerScanNode(desc.getId().asInt()); - if (partitionInfos != null) { - brokerScanNode.setPartition_exprs(Expr.treesToThrift(partitionExprs)); - brokerScanNode.setPartition_infos(DataSplitSink.EtlRangePartitionInfo.listToNonDistThrift(partitionInfos)); - } msg.setBroker_scan_node(brokerScanNode); }