Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 0 additions & 20 deletions be/src/exec/broker_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,26 +342,6 @@ Status BrokerScanNode::scanner_scan(
continue;
}

// The reason we check if partition_expr_ctxs is empty is when loading data to
// a unpartitioned table who has no partition_expr_ctxs, user can specify
// a partition name. And we check here to avoid crash and make our
// process run as normal
if (scan_range.params.__isset.partition_ids && !partition_expr_ctxs.empty()) {
int64_t partition_id = get_partition_id(partition_expr_ctxs, row);
if (partition_id == -1 ||
!std::binary_search(scan_range.params.partition_ids.begin(),
scan_range.params.partition_ids.end(),
partition_id)) {
counter->num_rows_filtered++;

std::stringstream error_msg;
error_msg << "No corresponding partition, partition id: " << partition_id;
_runtime_state->append_error_msg_to_file(Tuple::to_string(tuple, *_tuple_desc),
error_msg.str());
continue;
}
}

// eval conjuncts of this row.
if (eval_conjuncts(&conjunct_ctxs[0], conjunct_ctxs.size(), row)) {
row_batch->commit_last_row();
Expand Down
31 changes: 0 additions & 31 deletions fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Table;
Expand Down Expand Up @@ -124,8 +123,6 @@ public int compare(TBrokerFileStatus o1, TBrokerFileStatus o2) {
private int nextBe = 0;

private Analyzer analyzer;

private List<DataSplitSink.EtlRangePartitionInfo> partitionInfos;
private List<Expr> partitionExprs;

private static class ParamCreateContext {
Expand Down Expand Up @@ -205,22 +202,6 @@ public void setLoadInfo(long loadJobId,
this.strictMode = strictMode;
}

private void createPartitionInfos() throws AnalysisException {
if (partitionInfos != null) {
return;
}

Map<String, Expr> exprByName = Maps.newHashMap();

for (SlotDescriptor slotDesc : desc.getSlots()) {
exprByName.put(slotDesc.getColumn().getName(), new SlotRef(slotDesc));
}

partitionExprs = Lists.newArrayList();
partitionInfos = DataSplitSink.EtlRangePartitionInfo.createParts(
(OlapTable) targetTable, exprByName, null, partitionExprs);
}

// Called from init, construct source tuple information
private void initParams(ParamCreateContext context) throws AnalysisException, UserException {
TBrokerScanRangeParams params = new TBrokerScanRangeParams();
Expand All @@ -230,14 +211,6 @@ private void initParams(ParamCreateContext context) throws AnalysisException, Us
params.setColumn_separator(fileGroup.getValueSeparator().getBytes(Charset.forName("UTF-8"))[0]);
params.setLine_delimiter(fileGroup.getLineDelimiter().getBytes(Charset.forName("UTF-8"))[0]);
params.setStrict_mode(strictMode);

// Parse partition information
List<Long> partitionIds = fileGroup.getPartitionIds();
if (partitionIds != null && partitionIds.size() > 0) {
params.setPartition_ids(partitionIds);
createPartitionInfos();
}

params.setProperties(brokerDesc.getProperties());
initColumns(context);
}
Expand Down Expand Up @@ -742,10 +715,6 @@ public void finalize(Analyzer analyzer) throws UserException {
protected void toThrift(TPlanNode msg) {
msg.node_type = TPlanNodeType.BROKER_SCAN_NODE;
TBrokerScanNode brokerScanNode = new TBrokerScanNode(desc.getId().asInt());
if (partitionInfos != null) {
brokerScanNode.setPartition_exprs(Expr.treesToThrift(partitionExprs));
brokerScanNode.setPartition_infos(DataSplitSink.EtlRangePartitionInfo.listToNonDistThrift(partitionInfos));
}
msg.setBroker_scan_node(brokerScanNode);
}

Expand Down
4 changes: 2 additions & 2 deletions fe/src/main/java/org/apache/doris/planner/DataSplitSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ private static EtlPartitionRange createPartitionRange(
}

public static List<EtlRangePartitionInfo> createParts(
OlapTable tbl, Map<String, Expr> exprByCol, Set<String> targetPartitions,
OlapTable tbl, Map<String, Expr> exprByCol, Set<Long> targetPartitionIds,
List<Expr> partitionExprs) throws AnalysisException {
List<EtlRangePartitionInfo> parts = Lists.newArrayList();
PartitionInfo partInfo = tbl.getPartitionInfo();
Expand All @@ -372,7 +372,7 @@ public static List<EtlRangePartitionInfo> createParts(
partitionExprs.add(exprByCol.get(col.getName()));
}
for (Partition part : tbl.getPartitions()) {
if (targetPartitions != null && !targetPartitions.contains(part.getName())) {
if (targetPartitionIds != null && !targetPartitionIds.contains(part.getId())) {
continue;
}
DistributionInfo distInfo = part.getDistributionInfo();
Expand Down