From bbc91a12b7831c445687e6d91d738f6bd5aa8091 Mon Sep 17 00:00:00 2001 From: Weihao Li <18110526956@163.com> Date: Wed, 4 Mar 2026 14:35:24 +0800 Subject: [PATCH 1/5] FE & UT Signed-off-by: Weihao Li <18110526956@163.com> --- .../TableDistributedPlanGenerator.java | 299 ++++++++++++++---- ...gnedAggregationTreeDeviceViewScanNode.java | 113 +++++++ ...gnedAggregationTreeDeviceViewScanNode.java | 113 +++++++ .../PushAggregationIntoTableScan.java | 4 - .../relational/analyzer/TreeViewTest.java | 82 ++++- .../planner/assertions/PlanMatchPattern.java | 34 ++ 6 files changed, 579 insertions(+), 66 deletions(-) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AlignedAggregationTreeDeviceViewScanNode.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/NonAlignedAggregationTreeDeviceViewScanNode.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java index 7072b5f519f73..e6bbf92b4a9f5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java @@ -57,6 +57,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AlignedAggregationTreeDeviceViewScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AssignUniqueId; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; @@ -72,6 +73,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.NonAlignedAggregationTreeDeviceViewScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode; @@ -1188,70 +1190,19 @@ private boolean prefixMatched(OrderingScheme childOrdering, List preGrou @Override public List visitAggregationTableScan( AggregationTableScanNode node, PlanContext context) { - String dbName = - node instanceof AggregationTreeDeviceViewScanNode - ? ((AggregationTreeDeviceViewScanNode) node).getTreeDBName() - : node.getQualifiedObjectName().getDatabaseName(); + String dbName = node.getQualifiedObjectName().getDatabaseName(); DataPartition dataPartition = analysis.getDataPartitionInfo(); if (dbName == null || dataPartition == null) { node.setRegionReplicaSet(NOT_ASSIGNED); return Collections.singletonList(node); } - boolean needSplit = false; - List> regionReplicaSetsList = new ArrayList<>(); - if (dataPartition != null) { - Map>> seriesSlotMap = - dataPartition.getDataPartitionMap().get(dbName); - if (seriesSlotMap == null) { - throw new SemanticException( - String.format("Given queried database: %s is not exist!", dbName)); - } - Map> cachedSeriesSlotWithRegions = new HashMap<>(); - for (DeviceEntry deviceEntry : node.getDeviceEntries()) { - List regionReplicaSets = - getDeviceReplicaSets( - dataPartition, - seriesSlotMap, - deviceEntry.getDeviceID(), - node.getTimeFilter(), - cachedSeriesSlotWithRegions); - if (regionReplicaSets.size() > 1) { - needSplit = true; - context.deviceCrossRegion = true; - queryContext.setNeedUpdateScanNumForLastQuery(node.mayUseLastCache()); - } - regionReplicaSetsList.add(regionReplicaSets); - } - } - - if (regionReplicaSetsList.isEmpty()) { - regionReplicaSetsList = Collections.singletonList(Collections.singletonList(NOT_ASSIGNED)); - } + AggregationDistributionInfo distributionInfo = + prepareAggregationDistribution(node, dbName, dataPartition, context); Map regionNodeMap = new HashMap<>(); - // Step is SINGLE and device data in more than one region, we need to final aggregate the result - // from different region here, so split - // this node into two-stage - needSplit = needSplit && node.getStep() == SINGLE; - AggregationNode finalAggregation = null; - if (needSplit) { - Pair splitResult = - split(node, symbolAllocator, queryId); - finalAggregation = splitResult.left; - AggregationTableScanNode partialAggregation = splitResult.right; - - // cover case: complete push-down + group by + streamable - if (!context.hasSortProperty && finalAggregation.isStreamable()) { - OrderingScheme expectedOrderingSchema = - constructOrderingSchema(node.getPreGroupedSymbols()); - context.setExpectedOrderingScheme(expectedOrderingSchema); - } - - buildRegionNodeMap(node, regionReplicaSetsList, regionNodeMap, partialAggregation); - } else { - buildRegionNodeMap(node, regionReplicaSetsList, regionNodeMap, node); - } + buildRegionNodeMap( + node, distributionInfo.regionReplicaSetsList, regionNodeMap, distributionInfo.templateNode); List resultTableScanNodeList = new ArrayList<>(); TRegionReplicaSet mostUsedDataRegion = null; @@ -1276,6 +1227,165 @@ public List visitAggregationTableScan( processSortProperty(node, resultTableScanNodeList, context); } + if (distributionInfo.needSplit) { + AggregationNode finalAggregation = distributionInfo.finalAggregation; + if (resultTableScanNodeList.size() == 1) { + finalAggregation.setChild(resultTableScanNodeList.get(0)); + } else if (resultTableScanNodeList.size() > 1) { + OrderingScheme childOrdering = + nodeOrderingMap.get(resultTableScanNodeList.get(0).getPlanNodeId()); + finalAggregation.setChild( + mergeChildrenViaCollectOrMergeSort(childOrdering, resultTableScanNodeList)); + } else { + throw new IllegalStateException("List.size should >= 1, but now is 0"); + } + resultTableScanNodeList = Collections.singletonList(finalAggregation); + } + + return resultTableScanNodeList; + } + + @Override + public List visitAggregationTreeDeviceViewScan( + AggregationTreeDeviceViewScanNode node, PlanContext context) { + String dbName = node.getTreeDBName(); + DataPartition dataPartition = analysis.getDataPartitionInfo(); + if (dbName == null || dataPartition == null) { + node.setRegionReplicaSet(NOT_ASSIGNED); + return Collections.singletonList(node); + } + + AggregationDistributionInfo distributionInfo = + prepareAggregationDistribution(node, dbName, dataPartition, context); + + List> regionReplicaSetsList = distributionInfo.regionReplicaSetsList; + AggregationTableScanNode templateNode = distributionInfo.templateNode; + AggregationNode finalAggregation = distributionInfo.finalAggregation; + boolean needSplit = distributionInfo.needSplit; + + Map< + TRegionReplicaSet, + Pair< + AlignedAggregationTreeDeviceViewScanNode, + NonAlignedAggregationTreeDeviceViewScanNode>> + tableScanNodeMap = new HashMap<>(); + + for (int i = 0; i < regionReplicaSetsList.size(); i++) { + DeviceEntry deviceEntry = node.getDeviceEntries().get(i); + List regionReplicaSets = regionReplicaSetsList.get(i); + + for (TRegionReplicaSet regionReplicaSet : regionReplicaSets) { + boolean aligned = deviceEntry instanceof AlignedDeviceEntry; + Pair + pair = tableScanNodeMap.computeIfAbsent(regionReplicaSet, k -> new Pair<>(null, null)); + + if (aligned && pair.left == null) { + AlignedAggregationTreeDeviceViewScanNode scanNode = + new AlignedAggregationTreeDeviceViewScanNode( + queryId.genPlanNodeId(), + templateNode.getQualifiedObjectName(), + templateNode.getOutputSymbols(), + templateNode.getAssignments(), + new ArrayList<>(), + templateNode.getTagAndAttributeIndexMap(), + templateNode.getScanOrder(), + templateNode.getTimePredicate().orElse(null), + templateNode.getPushDownPredicate(), + templateNode.getPushDownLimit(), + templateNode.getPushDownOffset(), + templateNode.isPushLimitToEachDevice(), + templateNode.containsNonAlignedDevice(), + templateNode.getProjection(), + templateNode.getAggregations(), + templateNode.getGroupingSets(), + templateNode.getPreGroupedSymbols(), + templateNode.getStep(), + templateNode.getGroupIdSymbol(), + node.getTreeDBName(), + node.getMeasurementColumnNameMap()); + scanNode.setRegionReplicaSet(regionReplicaSet); + pair.left = scanNode; + } + + if (!aligned && pair.right == null) { + NonAlignedAggregationTreeDeviceViewScanNode scanNode = + new NonAlignedAggregationTreeDeviceViewScanNode( + queryId.genPlanNodeId(), + templateNode.getQualifiedObjectName(), + templateNode.getOutputSymbols(), + templateNode.getAssignments(), + new ArrayList<>(), + templateNode.getTagAndAttributeIndexMap(), + templateNode.getScanOrder(), + templateNode.getTimePredicate().orElse(null), + templateNode.getPushDownPredicate(), + templateNode.getPushDownLimit(), + templateNode.getPushDownOffset(), + templateNode.isPushLimitToEachDevice(), + templateNode.containsNonAlignedDevice(), + templateNode.getProjection(), + templateNode.getAggregations(), + templateNode.getGroupingSets(), + templateNode.getPreGroupedSymbols(), + templateNode.getStep(), + templateNode.getGroupIdSymbol(), + node.getTreeDBName(), + node.getMeasurementColumnNameMap()); + scanNode.setRegionReplicaSet(regionReplicaSet); + pair.right = scanNode; + } + + if (aligned) { + pair.left.appendDeviceEntry(deviceEntry); + } else { + pair.right.appendDeviceEntry(deviceEntry); + } + } + } + + if (tableScanNodeMap.isEmpty()) { + node.setRegionReplicaSet(NOT_ASSIGNED); + return Collections.singletonList(node); + } + + List resultTableScanNodeList = new ArrayList<>(); + TRegionReplicaSet mostUsedDataRegion = null; + int maxDeviceEntrySizeOfTableScan = 0; + for (Map.Entry< + TRegionReplicaSet, + Pair< + AlignedAggregationTreeDeviceViewScanNode, + NonAlignedAggregationTreeDeviceViewScanNode>> + entry : topology.filterReachableCandidates(tableScanNodeMap.entrySet())) { + TRegionReplicaSet regionReplicaSet = entry.getKey(); + Pair + pair = entry.getValue(); + int currentDeviceEntrySize = 0; + + if (pair.left != null) { + currentDeviceEntrySize += pair.left.getDeviceEntries().size(); + resultTableScanNodeList.add(pair.left); + } + + if (pair.right != null) { + currentDeviceEntrySize += pair.right.getDeviceEntries().size(); + resultTableScanNodeList.add(pair.right); + } + + if (mostUsedDataRegion == null || currentDeviceEntrySize > maxDeviceEntrySizeOfTableScan) { + mostUsedDataRegion = regionReplicaSet; + maxDeviceEntrySizeOfTableScan = currentDeviceEntrySize; + } + } + if (mostUsedDataRegion == null) { + throw new RootFIPlacementException(tableScanNodeMap.keySet()); + } + context.mostUsedRegion = mostUsedDataRegion; + + if (context.hasSortProperty) { + processSortProperty(node, resultTableScanNodeList, context); + } + if (needSplit) { if (resultTableScanNodeList.size() == 1) { finalAggregation.setChild(resultTableScanNodeList.get(0)); @@ -1293,6 +1403,83 @@ public List visitAggregationTableScan( return resultTableScanNodeList; } + private static class AggregationDistributionInfo { + private final List> regionReplicaSetsList; + private final AggregationTableScanNode templateNode; + private final AggregationNode finalAggregation; + private final boolean needSplit; + + AggregationDistributionInfo( + List> regionReplicaSetsList, + AggregationTableScanNode templateNode, + AggregationNode finalAggregation, + boolean needSplit) { + this.regionReplicaSetsList = regionReplicaSetsList; + this.templateNode = templateNode; + this.finalAggregation = finalAggregation; + this.needSplit = needSplit; + } + } + + private AggregationDistributionInfo prepareAggregationDistribution( + AggregationTableScanNode node, + String dbName, + DataPartition dataPartition, + PlanContext context) { + boolean needSplit = false; + List> regionReplicaSetsList = new ArrayList<>(); + + Map>> seriesSlotMap = + dataPartition.getDataPartitionMap().get(dbName); + if (seriesSlotMap == null) { + throw new SemanticException( + String.format("Given queried database: %s is not exist!", dbName)); + } + + Map> cachedSeriesSlotWithRegions = new HashMap<>(); + for (DeviceEntry deviceEntry : node.getDeviceEntries()) { + List regionReplicaSets = + getDeviceReplicaSets( + dataPartition, + seriesSlotMap, + deviceEntry.getDeviceID(), + node.getTimeFilter(), + cachedSeriesSlotWithRegions); + if (regionReplicaSets.size() > 1) { + needSplit = true; + context.deviceCrossRegion = true; + queryContext.setNeedUpdateScanNumForLastQuery(node.mayUseLastCache()); + } + regionReplicaSetsList.add(regionReplicaSets); + } + + if (regionReplicaSetsList.isEmpty()) { + regionReplicaSetsList = Collections.singletonList(Collections.singletonList(NOT_ASSIGNED)); + } + + AggregationTableScanNode templateNode = node; + AggregationNode finalAggregation = null; + // Step is SINGLE and device data in more than one region, we need to final aggregate the result + // from different region here, so split this node into two-stage + needSplit = needSplit && node.getStep() == SINGLE; + if (needSplit) { + Pair splitResult = + split(node, symbolAllocator, queryId); + finalAggregation = splitResult.left; + templateNode = splitResult.right; + + // cover case: complete push-down + group by + streamable + if (!context.hasSortProperty && finalAggregation.isStreamable()) { + OrderingScheme expectedOrderingSchema = + constructOrderingSchema(node.getPreGroupedSymbols()); + context.setExpectedOrderingScheme(expectedOrderingSchema); + } + } + + return new AggregationDistributionInfo( + regionReplicaSetsList, templateNode, finalAggregation, needSplit); + } + private List getDeviceReplicaSets( DataPartition dataPartition, Map>> seriesSlotMap, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AlignedAggregationTreeDeviceViewScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AlignedAggregationTreeDeviceViewScanNode.java new file mode 100644 index 0000000000000..9c879717321bd --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AlignedAggregationTreeDeviceViewScanNode.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.node; + +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; +import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class AlignedAggregationTreeDeviceViewScanNode extends AggregationTreeDeviceViewScanNode { + + public AlignedAggregationTreeDeviceViewScanNode( + PlanNodeId id, + QualifiedObjectName qualifiedObjectName, + List outputSymbols, + Map assignments, + List deviceEntries, + Map tagAndAttributeIndexMap, + Ordering scanOrder, + Expression timePredicate, + Expression pushDownPredicate, + long pushDownLimit, + long pushDownOffset, + boolean pushLimitToEachDevice, + boolean containsNonAlignedDevice, + Assignments projection, + Map aggregations, + AggregationNode.GroupingSetDescriptor groupingSets, + List preGroupedSymbols, + AggregationNode.Step step, + Optional groupIdSymbol, + String treeDBName, + Map measurementColumnNameMap) { + super( + id, + qualifiedObjectName, + outputSymbols, + assignments, + deviceEntries, + tagAndAttributeIndexMap, + scanOrder, + timePredicate, + pushDownPredicate, + pushDownLimit, + pushDownOffset, + pushLimitToEachDevice, + containsNonAlignedDevice, + projection, + aggregations, + groupingSets, + preGroupedSymbols, + step, + groupIdSymbol, + treeDBName, + measurementColumnNameMap); + } + + @Override + public AlignedAggregationTreeDeviceViewScanNode clone() { + return new AlignedAggregationTreeDeviceViewScanNode( + getPlanNodeId(), + qualifiedObjectName, + outputSymbols, + assignments, + deviceEntries, + tagAndAttributeIndexMap, + scanOrder, + timePredicate, + pushDownPredicate, + pushDownLimit, + pushDownOffset, + pushLimitToEachDevice, + containsNonAlignedDevice, + projection, + aggregations, + groupingSets, + preGroupedSymbols, + step, + groupIdSymbol, + getTreeDBName(), + getMeasurementColumnNameMap()); + } + + @Override + public String toString() { + return "AlignedAggregationTreeDeviceViewScanNode-" + this.getPlanNodeId(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/NonAlignedAggregationTreeDeviceViewScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/NonAlignedAggregationTreeDeviceViewScanNode.java new file mode 100644 index 0000000000000..2649023b9065e --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/NonAlignedAggregationTreeDeviceViewScanNode.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.node; + +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; +import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class NonAlignedAggregationTreeDeviceViewScanNode extends AggregationTreeDeviceViewScanNode { + + public NonAlignedAggregationTreeDeviceViewScanNode( + PlanNodeId id, + QualifiedObjectName qualifiedObjectName, + List outputSymbols, + Map assignments, + List deviceEntries, + Map tagAndAttributeIndexMap, + Ordering scanOrder, + Expression timePredicate, + Expression pushDownPredicate, + long pushDownLimit, + long pushDownOffset, + boolean pushLimitToEachDevice, + boolean containsNonAlignedDevice, + Assignments projection, + Map aggregations, + AggregationNode.GroupingSetDescriptor groupingSets, + List preGroupedSymbols, + AggregationNode.Step step, + Optional groupIdSymbol, + String treeDBName, + Map measurementColumnNameMap) { + super( + id, + qualifiedObjectName, + outputSymbols, + assignments, + deviceEntries, + tagAndAttributeIndexMap, + scanOrder, + timePredicate, + pushDownPredicate, + pushDownLimit, + pushDownOffset, + pushLimitToEachDevice, + containsNonAlignedDevice, + projection, + aggregations, + groupingSets, + preGroupedSymbols, + step, + groupIdSymbol, + treeDBName, + measurementColumnNameMap); + } + + @Override + public NonAlignedAggregationTreeDeviceViewScanNode clone() { + return new NonAlignedAggregationTreeDeviceViewScanNode( + getPlanNodeId(), + qualifiedObjectName, + outputSymbols, + assignments, + deviceEntries, + tagAndAttributeIndexMap, + scanOrder, + timePredicate, + pushDownPredicate, + pushDownLimit, + pushDownOffset, + pushLimitToEachDevice, + containsNonAlignedDevice, + projection, + aggregations, + groupingSets, + preGroupedSymbols, + step, + groupIdSymbol, + getTreeDBName(), + getMeasurementColumnNameMap()); + } + + @Override + public String toString() { + return "NonAlignedAggregationTreeDeviceViewScanNode-" + this.getPlanNodeId(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java index a47794fd329ce..3bcb6f1935b25 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java @@ -105,10 +105,6 @@ public PlanNode visitAggregation(AggregationNode node, Context context) { return node; } - if (tableScanNode.containsNonAlignedDevice()) { - return node; - } - PushDownLevel pushDownLevel = calculatePushDownLevel( node.getAggregations().values(), diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TreeViewTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TreeViewTest.java index 351bcb7f605f9..1dcd7d5487af5 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TreeViewTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TreeViewTest.java @@ -140,7 +140,7 @@ public void rawDataQueryTest() { public void aggregationQueryTest() { PlanTester planTester = new PlanTester(); - // has non-aligned DeviceEntry, no push-down + // has non-aligned DeviceEntry LogicalQueryPlan logicalQueryPlan = planTester.createPlan( "select tag1, count(s1) from " @@ -149,14 +149,83 @@ public void aggregationQueryTest() { PlanMatchPattern expectedPlanPattern = output( aggregation( - ImmutableMap.of("count", aggregationFunction("count", ImmutableList.of("s1"))), - treeDeviceViewTableScan( + ImmutableMap.of("count", aggregationFunction("count", ImmutableList.of("count_0"))), + aggregationTreeDeviceViewTableScan( + singleGroupingSet("tag1"), + ImmutableList.of("tag1"), + Optional.empty(), + PARTIAL, DEFAULT_TREE_DEVICE_VIEW_TABLE_FULL_NAME, - ImmutableList.of("tag1", "s1"), + ImmutableList.of("tag1", "count_0"), ImmutableSet.of("tag1", "s1")))); assertPlan(logicalQueryPlan, expectedPlanPattern); - // only aligned DeviceEntry, do push-down + assertPlan( + planTester.getFragmentPlan(0), + output( + aggregation( + ImmutableMap.of("count", aggregationFunction("count", ImmutableList.of("count_1"))), + FINAL, + mergeSort(exchange(), exchange(), exchange(), exchange())))); + + assertPlan( + planTester.getFragmentPlan(1), + aggregation( + ImmutableMap.of("count", aggregationFunction("count", ImmutableList.of("count_0"))), + INTERMEDIATE, + aggregationTreeDeviceViewTableScan( + singleGroupingSet("tag1"), + ImmutableList.of("tag1"), + Optional.empty(), + PARTIAL, + DEFAULT_TREE_DEVICE_VIEW_TABLE_FULL_NAME, + ImmutableList.of("tag1", "count_0"), + ImmutableSet.of("tag1", "s1"), + true))); + assertPlan( + planTester.getFragmentPlan(2), + aggregation( + ImmutableMap.of("count", aggregationFunction("count", ImmutableList.of("count_0"))), + INTERMEDIATE, + aggregationTreeDeviceViewTableScan( + singleGroupingSet("tag1"), + ImmutableList.of("tag1"), + Optional.empty(), + PARTIAL, + DEFAULT_TREE_DEVICE_VIEW_TABLE_FULL_NAME, + ImmutableList.of("tag1", "count_0"), + ImmutableSet.of("tag1", "s1"), + false))); + assertPlan( + planTester.getFragmentPlan(3), + aggregation( + ImmutableMap.of("count", aggregationFunction("count", ImmutableList.of("count_0"))), + INTERMEDIATE, + aggregationTreeDeviceViewTableScan( + singleGroupingSet("tag1"), + ImmutableList.of("tag1"), + Optional.empty(), + PARTIAL, + DEFAULT_TREE_DEVICE_VIEW_TABLE_FULL_NAME, + ImmutableList.of("tag1", "count_0"), + ImmutableSet.of("tag1", "s1"), + true))); + assertPlan( + planTester.getFragmentPlan(4), + aggregation( + ImmutableMap.of("count", aggregationFunction("count", ImmutableList.of("count_0"))), + INTERMEDIATE, + aggregationTreeDeviceViewTableScan( + singleGroupingSet("tag1"), + ImmutableList.of("tag1"), + Optional.empty(), + PARTIAL, + DEFAULT_TREE_DEVICE_VIEW_TABLE_FULL_NAME, + ImmutableList.of("tag1", "count_0"), + ImmutableSet.of("tag1", "s1"), + false))); + + // only aligned DeviceEntry logicalQueryPlan = planTester.createPlan( "select tag1, count(s1) from " @@ -199,7 +268,8 @@ public void aggregationQueryTest() { PARTIAL, DEFAULT_TREE_DEVICE_VIEW_TABLE_FULL_NAME, ImmutableList.of("tag1", "count_0"), - ImmutableSet.of("tag1", "s1")))); + ImmutableSet.of("tag1", "s1"), + true))); } } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java index 03f79fd2ec29a..2ca57e5296a5b 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java @@ -29,6 +29,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AlignedAggregationTreeDeviceViewScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AssignUniqueId; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CteScanNode; @@ -43,6 +44,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.NonAlignedAggregationTreeDeviceViewScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; @@ -429,6 +431,38 @@ public static PlanMatchPattern aggregationTreeDeviceViewTableScan( return result; } + public static PlanMatchPattern aggregationTreeDeviceViewTableScan( + GroupingSetDescriptor groupingSets, + List preGroupedSymbols, + Optional groupId, + AggregationNode.Step step, + String expectedTableName, + List outputSymbols, + Set assignmentsKeys, + boolean aligned) { + PlanMatchPattern result = + aligned + ? node(AlignedAggregationTreeDeviceViewScanNode.class) + : node(NonAlignedAggregationTreeDeviceViewScanNode.class); + + result.with( + new AggregationDeviceTableScanMatcher( + groupingSets, + preGroupedSymbols, + ImmutableList.of(), + groupId, + step, + expectedTableName, + Optional.empty(), + outputSymbols, + assignmentsKeys)); + + outputSymbols.forEach( + outputSymbol -> + result.withAlias(outputSymbol, new ColumnReference(expectedTableName, outputSymbol))); + return result; + } + // Attention: Now we only pass aliases according to outputSymbols, but we don't verify the output // column if exists in Table and their order because there maybe partial Agg-result. public static PlanMatchPattern aggregationTableScan( From b3616e7a256da2a164d4559a1c264bf32ae94986 Mon Sep 17 00:00:00 2001 From: Weihao Li <18110526956@163.com> Date: Fri, 13 Mar 2026 07:47:27 +0800 Subject: [PATCH 2/5] BE Signed-off-by: Weihao Li <18110526956@163.com> --- .../AbstractAggTableScanOperator.java | 23 +- .../DeviceIteratorScanOperator.java | 4 +- .../LastQueryAggTableScanOperator.java | 2 +- ...gnedDeviceViewAggregationScanOperator.java | 219 ++++++++++++++++++ .../plan/planner/TableOperatorGenerator.java | 84 ++++++- .../plan/planner/plan/node/PlanVisitor.java | 12 + ...gnedAggregationTreeDeviceViewScanNode.java | 6 + ...gnedAggregationTreeDeviceViewScanNode.java | 6 + 8 files changed, 341 insertions(+), 15 deletions(-) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeNonAlignedDeviceViewAggregationScanOperator.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java index a59ec643b071f..9d3979bafecd2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java @@ -34,6 +34,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry; import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource; import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; @@ -71,7 +72,7 @@ public abstract class AbstractAggTableScanOperator extends AbstractDataSourceOperator { private boolean finished = false; - private TsBlock inputTsBlock; + protected TsBlock inputTsBlock; protected List tableAggregators; protected final List groupingKeySchemas; @@ -104,11 +105,11 @@ public abstract class AbstractAggTableScanOperator extends AbstractDataSourceOpe // e.g. for aggregation `last(s1), count(s2), count(s1)`, the inputChannels should be [0, 1, 0] protected List aggregatorInputChannels; - private QueryDataSource queryDataSource; + protected QueryDataSource queryDataSource; protected ITableTimeRangeIterator timeIterator; - private boolean allAggregatorsHasFinalResult = false; + protected boolean allAggregatorsHasFinalResult = false; protected AbstractAggTableScanOperator(AbstractAggTableScanOperatorParameter parameter) { @@ -193,7 +194,7 @@ protected void constructAlignedSeriesScanUtil() { } /** Return true if we have the result of this timeRange. */ - protected Optional calculateAggregationResultForCurrentTimeRange() { + protected Optional calculateAggregationResultForCurrentTimeRange() throws Exception { try { if (calcFromCachedData()) { updateResultTsBlock(); @@ -706,7 +707,7 @@ public boolean isAllAggregatorsHasFinalResult(List aggregators) return true; } - private void checkIfAllAggregatorHasFinalResult() { + protected void checkIfAllAggregatorHasFinalResult() throws Exception { if (allAggregatorsHasFinalResult && (timeIterator.getType() == ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR || tableAggregators.isEmpty())) { @@ -729,7 +730,7 @@ private void checkIfAllAggregatorHasFinalResult() { } } - private void nextDevice() { + protected void nextDevice() throws Exception { currentDeviceIndex++; this.operatorContext.recordSpecifiedInfo( CURRENT_DEVICE_INDEX_STRING, Integer.toString(currentDeviceIndex)); @@ -812,6 +813,8 @@ public static class AbstractAggTableScanOperatorParameter { protected List deviceEntries; protected int deviceCount; + private List outputSymbols; + public AbstractAggTableScanOperatorParameter( PlanNodeId sourceId, OperatorContext context, @@ -830,7 +833,8 @@ public AbstractAggTableScanOperatorParameter( boolean ascending, boolean canUseStatistics, List aggregatorInputChannels, - String timeColumnName) { + String timeColumnName, + List outputSymbols) { this.sourceId = sourceId; this.context = context; this.aggColumnSchemas = aggColumnSchemas; @@ -849,6 +853,11 @@ public AbstractAggTableScanOperatorParameter( this.canUseStatistics = canUseStatistics; this.aggregatorInputChannels = aggregatorInputChannels; this.timeColumnName = timeColumnName; + this.outputSymbols = outputSymbols; + } + + public List getOutputSymbols() { + return outputSymbols; } public OperatorContext getOperatorContext() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/DeviceIteratorScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/DeviceIteratorScanOperator.java index f088e448fd727..0c01fce0bce9e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/DeviceIteratorScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/DeviceIteratorScanOperator.java @@ -113,7 +113,7 @@ private void constructCurrentDeviceOperatorTree() { } DeviceEntry deviceEntry = this.deviceEntries.get(this.currentDeviceIndex); - deviceChildOperatorTreeGenerator.generateCurrentDeviceOperatorTree(deviceEntry); + deviceChildOperatorTreeGenerator.generateCurrentDeviceOperatorTree(deviceEntry, true); currentDeviceRootOperator = deviceChildOperatorTreeGenerator.getCurrentDeviceRootOperator(); dataSourceOperators = deviceChildOperatorTreeGenerator.getCurrentDeviceDataSourceOperators(); currentDeviceInit = false; @@ -217,7 +217,7 @@ public interface DeviceChildOperatorTreeGenerator { boolean keepOffsetAndLimitOperatorAfterDeviceIterator(); // Generate the following operator subtree based on the current deviceEntry - void generateCurrentDeviceOperatorTree(DeviceEntry deviceEntry); + void generateCurrentDeviceOperatorTree(DeviceEntry deviceEntry, boolean needAdaptor); // Returns the root operator of the subtree Operator getCurrentDeviceRootOperator(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java index 9b4c37995de63..2beccf46d8e6c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java @@ -156,7 +156,7 @@ public TsBlock next() throws Exception { } /** Main process logic, calc the last aggregation results of current device. */ - private void processCurrentDevice() { + private void processCurrentDevice() throws Exception { if (currentHitCacheIndex < hitCachesIndexes.size() && outputDeviceIndex == hitCachesIndexes.get(currentHitCacheIndex)) { currentDeviceEntry = cachedDeviceEntries.get(currentHitCacheIndex); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeNonAlignedDeviceViewAggregationScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeNonAlignedDeviceViewAggregationScanOperator.java new file mode 100644 index 0000000000000..7b6255c54023d --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeNonAlignedDeviceViewAggregationScanOperator.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.source.relational; + +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; +import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITableTimeRangeIterator; +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.execution.operator.source.AbstractDataSourceOperator; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; +import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource; +import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.List; +import java.util.Optional; + +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.CURRENT_DEVICE_INDEX_STRING; + +public class TreeNonAlignedDeviceViewAggregationScanOperator + extends AbstractDefaultAggTableScanOperator { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance( + TreeNonAlignedDeviceViewAggregationScanOperator.class); + + private final IDeviceID.TreeDeviceIdColumnValueExtractor extractor; + private final DeviceIteratorScanOperator.DeviceChildOperatorTreeGenerator childOperatorGenerator; + + private Operator child; + private List dataSourceOperators; + + public TreeNonAlignedDeviceViewAggregationScanOperator( + AbstractAggTableScanOperatorParameter parameter, + IDeviceID.TreeDeviceIdColumnValueExtractor extractor, + DeviceIteratorScanOperator.DeviceChildOperatorTreeGenerator childOperatorGenerator) { + super(parameter); + this.extractor = extractor; + this.childOperatorGenerator = childOperatorGenerator; + constructCurrentDeviceOperatorTree(); + } + + @Override + public ListenableFuture isBlocked() { + return child.isBlocked(); + } + + @Override + String getNthIdColumnValue(DeviceEntry deviceEntry, int idColumnIndex) { + return (String) extractor.extract(deviceEntry.getDeviceID(), idColumnIndex); + } + + @Override + protected Optional calculateAggregationResultForCurrentTimeRange() { + try { + // First try to calculate from cached data + if (calcFromCachedData()) { + updateResultTsBlock(); + checkIfAllAggregatorHasFinalResult(); + return Optional.of(true); + } + + // Read from child operator + if (readAndCalcFromChild()) { + updateResultTsBlock(); + checkIfAllAggregatorHasFinalResult(); + return Optional.of(true); + } + + // No more data from child, finish the current device + if (!child.hasNext()) { + updateResultTsBlock(); + timeIterator.resetCurTimeRange(); + nextDevice(); + + if (currentDeviceIndex >= deviceCount) { + // All devices consumed + timeIterator.setFinished(); + return Optional.of(true); + } else { + // More devices to process, child should provide next device's data + return Optional.of(false); + } + } + + return Optional.of(false); + } catch (Exception e) { + throw new RuntimeException("Error while processing aggregation from child operator", e); + } + } + + /** Read data from child operator and calculate aggregation. */ + private boolean readAndCalcFromChild() throws Exception { + long start = System.nanoTime(); + + while (child.hasNext()) { + // Get next TsBlock from child + TsBlock tsBlock = child.nextWithTimer(); + if (tsBlock == null || tsBlock.isEmpty()) { + continue; + } + + // Calculate aggregation from raw data + if (calcUsingRawData(tsBlock)) { + return true; + } + + // If not finished, continue reading from child + } + + return false; + } + + @Override + protected void nextDevice() throws Exception { + currentDeviceIndex++; + childOperatorGenerator.getCurrentDeviceStartCloseOperator().close(); + if (currentDeviceIndex >= deviceEntries.size()) { + return; + } + constructCurrentDeviceOperatorTree(); + queryDataSource.reset(); + initQueryDataSource(queryDataSource); + this.operatorContext.recordSpecifiedInfo( + CURRENT_DEVICE_INDEX_STRING, Integer.toString(currentDeviceIndex)); + } + + private void constructCurrentDeviceOperatorTree() { + if (this.deviceEntries.isEmpty()) { + return; + } + if (this.deviceEntries.get(this.currentDeviceIndex) == null) { + throw new IllegalStateException( + "Device entries of index " + this.currentDeviceIndex + " is empty"); + } + DeviceEntry deviceEntry = this.deviceEntries.get(this.currentDeviceIndex); + + childOperatorGenerator.generateCurrentDeviceOperatorTree(deviceEntry, false); + child = childOperatorGenerator.getCurrentDeviceRootOperator(); + dataSourceOperators = childOperatorGenerator.getCurrentDeviceDataSourceOperators(); + } + + /** same with {@link DeviceIteratorScanOperator#initQueryDataSource(IQueryDataSource)} */ + @Override + public void initQueryDataSource(IQueryDataSource dataSource) { + + this.queryDataSource = (QueryDataSource) dataSource; + this.resultTsBlockBuilder = new TsBlockBuilder(getResultDataTypes()); + if (dataSourceOperators == null || dataSourceOperators.isEmpty()) { + return; + } + for (Operator operator : dataSourceOperators) { + ((AbstractDataSourceOperator) operator).initQueryDataSource(dataSource); + } + } + + @Override + protected void checkIfAllAggregatorHasFinalResult() throws Exception { + if (allAggregatorsHasFinalResult + && (timeIterator.getType() == ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR + || tableAggregators.isEmpty())) { + nextDevice(); + inputTsBlock = null; + + if (currentDeviceIndex >= deviceCount) { + // all devices have been consumed + timeIterator.setFinished(); + } + + allAggregatorsHasFinalResult = false; + } + } + + @Override + public long calculateMaxPeekMemory() { + return child.calculateMaxPeekMemory(); + } + + @Override + public long calculateMaxReturnSize() { + return child.calculateMaxReturnSize(); + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return child.calculateRetainedSizeAfterCallingNext(); + } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(seriesScanUtil) + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext) + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(sourceId) + + (resultTsBlockBuilder == null ? 0 : resultTsBlockBuilder.getRetainedSizeInBytes()) + + RamUsageEstimator.sizeOfCollection(deviceEntries) + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(child); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index 1bc788251a764..8ad9a1d62cdce 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -135,6 +135,7 @@ import org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.TreeAlignedDeviceViewAggregationScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.TreeAlignedDeviceViewScanOperator; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.TreeNonAlignedDeviceViewAggregationScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.TreeToTableViewAdaptorOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AggregationOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.LastByDescAccumulator; @@ -185,6 +186,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AlignedAggregationTreeDeviceViewScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AssignUniqueId; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CteScanNode; @@ -203,6 +205,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.Measure; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.NonAlignedAggregationTreeDeviceViewScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode; @@ -591,9 +594,15 @@ public boolean keepOffsetAndLimitOperatorAfterDeviceIterator() { } @Override - public void generateCurrentDeviceOperatorTree(DeviceEntry deviceEntry) { + public void generateCurrentDeviceOperatorTree( + DeviceEntry deviceEntry, boolean needAdaptor) { calculateSeriesScanOptionsList(); - operator = constructTreeToTableViewAdaptorOperator(deviceEntry); + if (needAdaptor) { + operator = constructTreeToTableViewAdaptorOperator(deviceEntry); + } else { + seriesScanOperators = new ArrayList<>(measurementSchemas.size()); + operator = constructAndJoinScanOperators(deviceEntry); + } boolean needToPruneColumn = node.getAssignments().size() != node.getOutputSymbols().size(); if (isSingleColumn) { @@ -2821,8 +2830,8 @@ private GroupedAggregator buildGroupByAggregator( } @Override - public Operator visitAggregationTreeDeviceViewScan( - AggregationTreeDeviceViewScanNode node, LocalExecutionPlanContext context) { + public Operator visitAlignedAggregationTreeDeviceViewScan( + AlignedAggregationTreeDeviceViewScanNode node, LocalExecutionPlanContext context) { QualifiedObjectName qualifiedObjectName = node.getQualifiedObjectName(); TsTable tsTable = DataNodeTableCache.getInstance() @@ -2852,6 +2861,67 @@ public Operator visitAggregationTreeDeviceViewScan( return treeAlignedDeviceViewAggregationScanOperator; } + @Override + public Operator visitNonAlignedAggregationTreeDeviceViewScan( + NonAlignedAggregationTreeDeviceViewScanNode node, LocalExecutionPlanContext context) { + QualifiedObjectName qualifiedObjectName = node.getQualifiedObjectName(); + TsTable tsTable = + DataNodeTableCache.getInstance() + .getTable(qualifiedObjectName.getDatabaseName(), qualifiedObjectName.getObjectName()); + IDeviceID.TreeDeviceIdColumnValueExtractor idColumnValueExtractor = + createTreeDeviceIdColumnValueExtractor(DataNodeTreeViewSchemaUtils.getPrefixPath(tsTable)); + + AbstractAggTableScanOperator.AbstractAggTableScanOperatorParameter parameter = + constructAbstractAggTableScanOperatorParameter( + node, + context, + TreeAlignedDeviceViewAggregationScanOperator.class.getSimpleName(), + node.getMeasurementColumnNameMap(), + tsTable.getCachedTableTTL()); + + TreeNonAlignedDeviceViewScanNode scanNode = + new TreeNonAlignedDeviceViewScanNode( + node.getPlanNodeId(), + node.getQualifiedObjectName(), + // The outputSymbols of AggTableScanNode is not equals with TableScanNode + parameter.getOutputSymbols(), + node.getAssignments(), + node.getDeviceEntries(), + node.getTagAndAttributeIndexMap(), + node.getScanOrder(), + node.getTimePredicate().orElse(null), + node.getPushDownPredicate(), + node.getPushDownLimit(), + node.getPushDownOffset(), + node.isPushLimitToEachDevice(), + true, + node.getTreeDBName(), + node.getMeasurementColumnNameMap()); + + DeviceIteratorScanOperator.TreeNonAlignedDeviceViewScanParameters params = + constructTreeNonAlignedDeviceViewScanOperatorParameter( + scanNode, + context, + TreeNonAlignedDeviceViewScanNode.class.getSimpleName(), + node.getMeasurementColumnNameMap(), + idColumnValueExtractor, + tsTable.getCachedTableTTL()); + + TreeNonAlignedDeviceViewAggregationScanOperator aggTableScanOperator = + new TreeNonAlignedDeviceViewAggregationScanOperator( + parameter, idColumnValueExtractor, params.generator); + + addSource( + aggTableScanOperator, + context, + node, + parameter.getMeasurementColumnNames(), + parameter.getMeasurementSchemas(), + parameter.getAllSensors(), + AggregationTableScanNode.class.getSimpleName()); + return aggTableScanOperator; + } + private AbstractAggTableScanOperator.AbstractAggTableScanOperatorParameter constructAbstractAggTableScanOperatorParameter( AggregationTableScanNode node, @@ -2882,6 +2952,8 @@ public Operator visitAggregationTreeDeviceViewScan( Map aggColumnLayout = new HashMap<>(aggDistinctArgumentCount); int[] aggColumnsIndexArray = new int[aggDistinctArgumentCount]; + List outputSymbols = new ArrayList<>(); + String timeColumnName = null; int channel = 0; int measurementColumnCount = 0; @@ -2907,6 +2979,7 @@ public Operator visitAggregationTreeDeviceViewScan( measurementSchemas.add( new MeasurementSchema(realMeasurementName, getTSDataType(schema.getType()))); measurementColumnsIndexMap.put(symbol.getName(), measurementColumnCount - 1); + outputSymbols.add(symbol); break; case TIME: aggColumnsIndexArray[channel] = -1; @@ -3046,7 +3119,8 @@ public Operator visitAggregationTreeDeviceViewScan( scanAscending, canUseStatistic, aggregatorInputChannels, - timeColumnName); + timeColumnName, + outputSymbols); } // used for AggregationTableScanNode diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java index 44f1cd8bc1f67..7f148f5a3d5c2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java @@ -123,6 +123,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.GroupReference; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AlignedAggregationTreeDeviceViewScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CteScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExceptNode; @@ -132,6 +133,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.NonAlignedAggregationTreeDeviceViewScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.RowNumberNode; @@ -841,6 +843,16 @@ public R visitAggregationTreeDeviceViewScan(AggregationTreeDeviceViewScanNode no return visitAggregationTableScan(node, context); } + public R visitAlignedAggregationTreeDeviceViewScan( + AlignedAggregationTreeDeviceViewScanNode node, C context) { + return visitAggregationTreeDeviceViewScan(node, context); + } + + public R visitNonAlignedAggregationTreeDeviceViewScan( + NonAlignedAggregationTreeDeviceViewScanNode node, C context) { + return visitAggregationTreeDeviceViewScan(node, context); + } + public R visitTreeAlignedDeviceViewScan(TreeAlignedDeviceViewScanNode node, C context) { return visitTreeDeviceViewScan(node, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AlignedAggregationTreeDeviceViewScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AlignedAggregationTreeDeviceViewScanNode.java index 9c879717321bd..acc08e0a3d865 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AlignedAggregationTreeDeviceViewScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AlignedAggregationTreeDeviceViewScanNode.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.node; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName; @@ -80,6 +81,11 @@ public AlignedAggregationTreeDeviceViewScanNode( measurementColumnNameMap); } + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitAlignedAggregationTreeDeviceViewScan(this, context); + } + @Override public AlignedAggregationTreeDeviceViewScanNode clone() { return new AlignedAggregationTreeDeviceViewScanNode( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/NonAlignedAggregationTreeDeviceViewScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/NonAlignedAggregationTreeDeviceViewScanNode.java index 2649023b9065e..04dfcff65900a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/NonAlignedAggregationTreeDeviceViewScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/NonAlignedAggregationTreeDeviceViewScanNode.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.node; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName; @@ -80,6 +81,11 @@ public NonAlignedAggregationTreeDeviceViewScanNode( measurementColumnNameMap); } + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitNonAlignedAggregationTreeDeviceViewScan(this, context); + } + @Override public NonAlignedAggregationTreeDeviceViewScanNode clone() { return new NonAlignedAggregationTreeDeviceViewScanNode( From 921ad10651df5c69139717d1d7af3223d3694e16 Mon Sep 17 00:00:00 2001 From: Weihao Li <18110526956@163.com> Date: Fri, 13 Mar 2026 07:55:32 +0800 Subject: [PATCH 3/5] fix some Signed-off-by: Weihao Li <18110526956@163.com> --- .../execution/operator/DeviceIteratorScanOperatorTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/DeviceIteratorScanOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/DeviceIteratorScanOperatorTest.java index 951c2b895a1ee..69c1864aaecdd 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/DeviceIteratorScanOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/DeviceIteratorScanOperatorTest.java @@ -132,7 +132,8 @@ public boolean keepOffsetAndLimitOperatorAfterDeviceIterator() { } @Override - public void generateCurrentDeviceOperatorTree(DeviceEntry deviceEntry) { + public void generateCurrentDeviceOperatorTree( + DeviceEntry deviceEntry, boolean needAdaptor) { AlignedFullPath alignedPath = new AlignedFullPath( deviceEntry.getDeviceID(), From 09e02ecac33fa5d4bd6806c0cc050b0f8a6a5ee9 Mon Sep 17 00:00:00 2001 From: Weihao Li <18110526956@163.com> Date: Fri, 13 Mar 2026 15:13:45 +0800 Subject: [PATCH 4/5] fix some Signed-off-by: Weihao Li <18110526956@163.com> --- .../TreeNonAlignedDeviceViewAggregationScanOperator.java | 4 ++-- .../db/queryengine/plan/planner/TableOperatorGenerator.java | 1 + .../planner/distribute/TableDistributedPlanGenerator.java | 2 ++ 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeNonAlignedDeviceViewAggregationScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeNonAlignedDeviceViewAggregationScanOperator.java index 7b6255c54023d..bf3f4715a4e5a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeNonAlignedDeviceViewAggregationScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeNonAlignedDeviceViewAggregationScanOperator.java @@ -113,7 +113,7 @@ protected Optional calculateAggregationResultForCurrentTimeRange() { private boolean readAndCalcFromChild() throws Exception { long start = System.nanoTime(); - while (child.hasNext()) { + while (System.nanoTime() - start < leftRuntimeOfOneNextCall && child.hasNext()) { // Get next TsBlock from child TsBlock tsBlock = child.nextWithTimer(); if (tsBlock == null || tsBlock.isEmpty()) { @@ -125,7 +125,7 @@ private boolean readAndCalcFromChild() throws Exception { return true; } - // If not finished, continue reading from child + // If not finished, continue reading from child } return false; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index 8ad9a1d62cdce..d35fe73f7efcd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -2952,6 +2952,7 @@ public Operator visitNonAlignedAggregationTreeDeviceViewScan( Map aggColumnLayout = new HashMap<>(aggDistinctArgumentCount); int[] aggColumnsIndexArray = new int[aggDistinctArgumentCount]; + // TODO MODIFY List outputSymbols = new ArrayList<>(); String timeColumnName = null; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java index e6bbf92b4a9f5..95e7e68bef9d2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java @@ -1270,6 +1270,8 @@ public List visitAggregationTreeDeviceViewScan( NonAlignedAggregationTreeDeviceViewScanNode>> tableScanNodeMap = new HashMap<>(); + // construct AlignedAggregationTreeDeviceViewScanNode and + // NonAlignedAggregationTreeDeviceViewScanNode for each region for (int i = 0; i < regionReplicaSetsList.size(); i++) { DeviceEntry deviceEntry = node.getDeviceEntries().get(i); List regionReplicaSets = regionReplicaSetsList.get(i); From 17a45590343211f2b2348f0fbe693113a703eafc Mon Sep 17 00:00:00 2001 From: Weihao Li <18110526956@163.com> Date: Fri, 13 Mar 2026 15:25:10 +0800 Subject: [PATCH 5/5] fix finally Signed-off-by: Weihao Li <18110526956@163.com> --- .../DeviceIteratorScanOperator.java | 4 ++ .../plan/planner/TableOperatorGenerator.java | 48 ++++++++++--------- 2 files changed, 30 insertions(+), 22 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/DeviceIteratorScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/DeviceIteratorScanOperator.java index 0c01fce0bce9e..14ef601d05ca9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/DeviceIteratorScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/DeviceIteratorScanOperator.java @@ -188,6 +188,10 @@ public long ramBytesUsed() { + RamUsageEstimator.sizeOfCollection(deviceEntries); } + public DeviceChildOperatorTreeGenerator getDeviceChildOperatorTreeGenerator() { + return deviceChildOperatorTreeGenerator; + } + public static class TreeNonAlignedDeviceViewScanParameters { public final OperatorContext context; public final List deviceEntries; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index d35fe73f7efcd..e2497028a6971 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -314,6 +314,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Iterables.getOnlyElement; import static java.util.Objects.requireNonNull; @@ -2879,11 +2880,12 @@ public Operator visitNonAlignedAggregationTreeDeviceViewScan( node.getMeasurementColumnNameMap(), tsTable.getCachedTableTTL()); + // construct source operator (generator) TreeNonAlignedDeviceViewScanNode scanNode = new TreeNonAlignedDeviceViewScanNode( node.getPlanNodeId(), node.getQualifiedObjectName(), - // The outputSymbols of AggTableScanNode is not equals with TableScanNode + // the outputSymbols of AggTableScanNode is not equals with TableScanNode parameter.getOutputSymbols(), node.getAssignments(), node.getDeviceEntries(), @@ -2898,28 +2900,30 @@ public Operator visitNonAlignedAggregationTreeDeviceViewScan( node.getTreeDBName(), node.getMeasurementColumnNameMap()); - DeviceIteratorScanOperator.TreeNonAlignedDeviceViewScanParameters params = - constructTreeNonAlignedDeviceViewScanOperatorParameter( - scanNode, - context, - TreeNonAlignedDeviceViewScanNode.class.getSimpleName(), - node.getMeasurementColumnNameMap(), - idColumnValueExtractor, - tsTable.getCachedTableTTL()); - - TreeNonAlignedDeviceViewAggregationScanOperator aggTableScanOperator = - new TreeNonAlignedDeviceViewAggregationScanOperator( - parameter, idColumnValueExtractor, params.generator); + Operator sourceOperator = visitTreeNonAlignedDeviceViewScan(scanNode, context); + if (sourceOperator instanceof DeviceIteratorScanOperator) { + // Use deviceChildOperatorTreeGenerator directly, we will control switch of devices in + // TreeNonAlignedDeviceViewAggregationScanOperator + TreeNonAlignedDeviceViewAggregationScanOperator aggTableScanOperator = + new TreeNonAlignedDeviceViewAggregationScanOperator( + parameter, + idColumnValueExtractor, + ((DeviceIteratorScanOperator) sourceOperator).getDeviceChildOperatorTreeGenerator()); - addSource( - aggTableScanOperator, - context, - node, - parameter.getMeasurementColumnNames(), - parameter.getMeasurementSchemas(), - parameter.getAllSensors(), - AggregationTableScanNode.class.getSimpleName()); - return aggTableScanOperator; + addSource( + aggTableScanOperator, + context, + node, + parameter.getMeasurementColumnNames(), + parameter.getMeasurementSchemas(), + parameter.getAllSensors(), + AggregationTableScanNode.class.getSimpleName()); + return aggTableScanOperator; + } else { + checkState(sourceOperator instanceof EmptyDataOperator, ""); + // source data is empty, return directly + return sourceOperator; + } } private AbstractAggTableScanOperator.AbstractAggTableScanOperatorParameter