From 174dcab2cb06fdbe7a8cff505fe91730695d8a0d Mon Sep 17 00:00:00 2001 From: szywilliam Date: Sun, 23 Mar 2025 10:54:13 +0800 Subject: [PATCH 01/11] fix small issues --- .../manager/load/service/TopologyService.java | 10 +- .../SimpleFragmentParallelPlanner.java | 153 +------------ .../exceptions/RootFIPlacementException.java | 2 +- .../plan/AbstractFragmentParallelPlanner.java | 201 ++++++++++++++++++ .../distribute/TableDistributedPlanner.java | 2 +- .../TableModelQueryFragmentPlanner.java | 155 +------------- .../iotdb/db/utils/ErrorHandlingUtils.java | 5 +- 7 files changed, 227 insertions(+), 301 deletions(-) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java index 7aa3ac91c59dd..103fb414a8089 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java @@ -234,7 +234,7 @@ private synchronized void topologyProbing() { } if (!partitioned.isEmpty()) { - logAsymmetricPartition(partitioned); + logAsymmetricPartition(partitioned, dataNodeIds.size()); } // 5. notify the listeners on topology change @@ -243,8 +243,14 @@ private synchronized void topologyProbing() { } } - private void logAsymmetricPartition(final Map> partitioned) { + private void logAsymmetricPartition( + final Map> partitioned, final int totalNodes) { for (final int fromId : partitioned.keySet()) { + final Set unreachable = partitioned.get(fromId); + if (unreachable.size() >= totalNodes - 1) { + // this node is partitioned symmetrically from other nodes + continue; + } for (final int toId : partitioned.get(fromId)) { if (partitioned.get(toId) == null || !partitioned.get(toId).contains(fromId)) { LOGGER.warn("[Topology] Asymmetric network partition from {} to {}", fromId, toId); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java index ee11364947b97..4bbd442840206 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java @@ -19,42 +19,29 @@ package org.apache.iotdb.db.queryengine.plan.planner.distribution; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; -import org.apache.iotdb.common.rpc.thrift.TEndPoint; -import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; -import org.apache.iotdb.commons.partition.QueryExecutor; -import org.apache.iotdb.commons.partition.StorageExecutor; -import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.queryengine.common.DataNodeEndPoints; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.PlanFragmentId; import org.apache.iotdb.db.queryengine.plan.ClusterTopology; import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; import org.apache.iotdb.db.queryengine.plan.expression.Expression; -import org.apache.iotdb.db.queryengine.plan.planner.IFragmentParallelPlaner; -import org.apache.iotdb.db.queryengine.plan.planner.exceptions.ReplicaSetUnreachableException; +import org.apache.iotdb.db.queryengine.plan.planner.plan.AbstractFragmentParallelPlanner; import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment; import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.TreeModelTimePredicate; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.MultiChildrenSinkNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastSeriesSourceNode; import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTimeSeriesStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainAnalyzeStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement; -import org.apache.commons.collections4.CollectionUtils; import org.apache.tsfile.read.common.Path; import org.apache.tsfile.utils.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; @@ -63,8 +50,7 @@ * A simple implementation of IFragmentParallelPlaner. This planner will transform one PlanFragment * into only one FragmentInstance. */ -public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner { - private static final Logger logger = LoggerFactory.getLogger(SimpleFragmentParallelPlanner.class); +public class SimpleFragmentParallelPlanner extends AbstractFragmentParallelPlanner { private final SubPlan subPlan; private final Analysis analysis; @@ -82,6 +68,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner { public SimpleFragmentParallelPlanner( SubPlan subPlan, Analysis analysis, MPPQueryContext context) { + super(context); this.subPlan = subPlan; this.analysis = analysis; this.queryContext = context; @@ -94,7 +81,7 @@ public SimpleFragmentParallelPlanner( @Override public List parallelPlan() { prepare(); - calculateNodeTopologyBetweenInstance(); + calculateNodeTopologyBetweenInstance(fragmentInstanceList, planNodeMap, instanceMap); return fragmentInstanceList; } @@ -153,46 +140,8 @@ private void produceFragmentInstance(PlanFragment fragment) { queryContext.isExplainAnalyze(), fragment.isRoot()); - // Get the target region for origin PlanFragment, then its instance will be distributed one - // of them. - TRegionReplicaSet regionReplicaSet = fragment.getTargetRegionForTreeModel(); - if (regionReplicaSet != null - && !CollectionUtils.isEmpty(regionReplicaSet.getDataNodeLocations())) { - regionReplicaSet = topology.getValidatedReplicaSet(regionReplicaSet); - if (regionReplicaSet.getDataNodeLocations().isEmpty()) { - throw new ReplicaSetUnreachableException(fragment.getTargetRegionForTreeModel()); - } - } - - // Set ExecutorType and target host for the instance - // We need to store all the replica host in case of the scenario that the instance need to be - // redirected - // to another host when scheduling - if (regionReplicaSet == null || regionReplicaSet.getRegionId() == null) { - TDataNodeLocation dataNodeLocation = fragment.getTargetLocation(); - if (dataNodeLocation != null) { - // now only the case ShowQueries will enter here - fragmentInstance.setExecutorAndHost(new QueryExecutor(dataNodeLocation)); - } else { - // no data region && no dataNodeLocation, we need to execute this FI on local - // now only the case AggregationQuery has schemaengine but no data region will enter here - fragmentInstance.setExecutorAndHost( - new QueryExecutor(DataNodeEndPoints.getLocalDataNodeLocation())); - } - } else { - fragmentInstance.setExecutorAndHost(new StorageExecutor(regionReplicaSet)); - fragmentInstance.setHostDataNode(selectTargetDataNode(regionReplicaSet)); - } - - dataNodeFIMap.compute( - fragmentInstance.getHostDataNode(), - (k, v) -> { - if (v == null) { - v = new ArrayList<>(); - } - v.add(fragmentInstance); - return v; - }); + selectExecutorAndHost( + fragment, fragmentInstance, topology::getValidatedReplicaSet, dataNodeFIMap); if (analysis.getTreeStatement() instanceof QueryStatement || analysis.getTreeStatement() instanceof ExplainAnalyzeStatement @@ -205,96 +154,6 @@ private void produceFragmentInstance(PlanFragment fragment) { fragmentInstanceList.add(fragmentInstance); } - private TDataNodeLocation selectTargetDataNode(TRegionReplicaSet regionReplicaSet) { - if (regionReplicaSet == null - || regionReplicaSet.getDataNodeLocations() == null - || regionReplicaSet.getDataNodeLocations().isEmpty()) { - throw new IllegalArgumentException( - String.format("regionReplicaSet is invalid: %s", regionReplicaSet)); - } - String readConsistencyLevel = - IoTDBDescriptor.getInstance().getConfig().getReadConsistencyLevel(); - // TODO: (Chen Rongzhao) need to make the values of ReadConsistencyLevel as static variable or - // enums - boolean selectRandomDataNode = "weak".equals(readConsistencyLevel); - - // When planning fragment onto specific DataNode, the DataNode whose endPoint is in - // black list won't be considered because it may have connection issue now. - List availableDataNodes = - filterAvailableTDataNode(regionReplicaSet.getDataNodeLocations()); - if (availableDataNodes.isEmpty()) { - String errorMsg = - String.format( - "All replicas for region[%s] are not available in these DataNodes[%s]", - regionReplicaSet.getRegionId(), regionReplicaSet.getDataNodeLocations()); - throw new IllegalArgumentException(errorMsg); - } - if (regionReplicaSet.getDataNodeLocationsSize() != availableDataNodes.size()) { - logger.info("available replicas: {}", availableDataNodes); - } - int targetIndex; - if (!selectRandomDataNode || queryContext.getSession() == null) { - targetIndex = 0; - } else { - targetIndex = (int) (queryContext.getSession().getSessionId() % availableDataNodes.size()); - } - return availableDataNodes.get(targetIndex); - } - - private List filterAvailableTDataNode( - List originalDataNodeList) { - List result = new LinkedList<>(); - for (TDataNodeLocation dataNodeLocation : originalDataNodeList) { - if (isAvailableDataNode(dataNodeLocation)) { - result.add(dataNodeLocation); - } - } - return result; - } - - private boolean isAvailableDataNode(TDataNodeLocation dataNodeLocation) { - for (TEndPoint endPoint : queryContext.getEndPointBlackList()) { - if (endPoint.equals(dataNodeLocation.internalEndPoint)) { - return false; - } - } - return true; - } - - private void calculateNodeTopologyBetweenInstance() { - for (FragmentInstance instance : fragmentInstanceList) { - PlanNode rootNode = instance.getFragment().getPlanNodeTree(); - if (rootNode instanceof MultiChildrenSinkNode) { - MultiChildrenSinkNode sinkNode = (MultiChildrenSinkNode) rootNode; - sinkNode - .getDownStreamChannelLocationList() - .forEach( - downStreamChannelLocation -> { - // Set target Endpoint for FragmentSinkNode - PlanNodeId downStreamNodeId = - new PlanNodeId(downStreamChannelLocation.getRemotePlanNodeId()); - FragmentInstance downStreamInstance = findDownStreamInstance(downStreamNodeId); - downStreamChannelLocation.setRemoteEndpoint( - downStreamInstance.getHostDataNode().getMPPDataExchangeEndPoint()); - downStreamChannelLocation.setRemoteFragmentInstanceId( - downStreamInstance.getId().toThrift()); - - // Set upstream info for corresponding ExchangeNode in downstream FragmentInstance - PlanNode downStreamExchangeNode = planNodeMap.get(downStreamNodeId).right; - ((ExchangeNode) downStreamExchangeNode) - .setUpstream( - instance.getHostDataNode().getMPPDataExchangeEndPoint(), - instance.getId(), - sinkNode.getPlanNodeId()); - }); - } - } - } - - private FragmentInstance findDownStreamInstance(PlanNodeId exchangeNodeId) { - return instanceMap.get(planNodeMap.get(exchangeNodeId).left); - } - private void recordPlanNodeRelation(PlanNode root, PlanFragmentId planFragmentId) { planNodeMap.put(root.getPlanNodeId(), new Pair<>(planFragmentId, root)); for (PlanNode child : root.getChildren()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/exceptions/RootFIPlacementException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/exceptions/RootFIPlacementException.java index 15b6c640aa029..8910673ef3847 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/exceptions/RootFIPlacementException.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/exceptions/RootFIPlacementException.java @@ -33,7 +33,7 @@ public class RootFIPlacementException extends IoTDBRuntimeException { public RootFIPlacementException(Collection replicaSets) { super( - "root FragmentInstance placement error: " + replicaSets.toString(), + "Root FragmentInstance placement error: " + replicaSets.toString(), TSStatusCode.PLAN_FAILED_NETWORK_PARTITION.getStatusCode()); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java new file mode 100644 index 0000000000000..dffed3bee7b3b --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.planner.plan; + +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.commons.partition.QueryExecutor; +import org.apache.iotdb.commons.partition.StorageExecutor; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.queryengine.common.DataNodeEndPoints; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; +import org.apache.iotdb.db.queryengine.common.PlanFragmentId; +import org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelLocation; +import org.apache.iotdb.db.queryengine.plan.planner.IFragmentParallelPlaner; +import org.apache.iotdb.db.queryengine.plan.planner.exceptions.ReplicaSetUnreachableException; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.MultiChildrenSinkNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableModelQueryFragmentPlanner; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExchangeNode; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.tsfile.utils.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +public abstract class AbstractFragmentParallelPlanner implements IFragmentParallelPlaner { + private static final Logger LOGGER = + LoggerFactory.getLogger(TableModelQueryFragmentPlanner.class); + + protected MPPQueryContext queryContext; + + protected AbstractFragmentParallelPlanner(MPPQueryContext queryContext) { + this.queryContext = queryContext; + } + + protected void selectExecutorAndHost( + PlanFragment fragment, + FragmentInstance fragmentInstance, + Function validator, + Map> dataNodeFIMap) { + // Get the target region for origin PlanFragment, then its instance will be distributed one + // of them. + TRegionReplicaSet regionReplicaSet = fragment.getTargetRegionForTreeModel(); + if (regionReplicaSet != null + && !CollectionUtils.isEmpty(regionReplicaSet.getDataNodeLocations())) { + regionReplicaSet = validator.apply(regionReplicaSet); + if (regionReplicaSet.getDataNodeLocations().isEmpty()) { + throw new ReplicaSetUnreachableException(fragment.getTargetRegionForTreeModel()); + } + } + // Set ExecutorType and target host for the instance + // We need to store all the replica host in case of the scenario that the instance need to be + // redirected + // to another host when scheduling + if (regionReplicaSet == null || regionReplicaSet.getRegionId() == null) { + TDataNodeLocation dataNodeLocation = fragment.getTargetLocation(); + if (dataNodeLocation != null) { + // now only the case ShowQueries will enter here + fragmentInstance.setExecutorAndHost(new QueryExecutor(dataNodeLocation)); + } else { + // no data region && no dataNodeLocation, we need to execute this FI on local + // now only the case AggregationQuery has schemaengine but no data region will enter here + fragmentInstance.setExecutorAndHost( + new QueryExecutor(DataNodeEndPoints.getLocalDataNodeLocation())); + } + } else { + fragmentInstance.setExecutorAndHost(new StorageExecutor(regionReplicaSet)); + fragmentInstance.setHostDataNode(this.selectTargetDataNode(regionReplicaSet)); + } + + dataNodeFIMap.compute( + fragmentInstance.getHostDataNode(), + (k, v) -> { + if (v == null) { + v = new ArrayList<>(); + } + v.add(fragmentInstance); + return v; + }); + } + + protected TDataNodeLocation selectTargetDataNode(TRegionReplicaSet regionReplicaSet) { + if (regionReplicaSet == null + || regionReplicaSet.getDataNodeLocations() == null + || regionReplicaSet.getDataNodeLocations().isEmpty()) { + throw new IllegalArgumentException( + String.format("regionReplicaSet is invalid: %s", regionReplicaSet)); + } + String readConsistencyLevel = + IoTDBDescriptor.getInstance().getConfig().getReadConsistencyLevel(); + // TODO: (Chen Rongzhao) need to make the values of ReadConsistencyLevel as static variable or + // enums + boolean selectRandomDataNode = "weak".equals(readConsistencyLevel); + + // When planning fragment onto specific DataNode, the DataNode whose endPoint is in + // black list won't be considered because it may have connection issue now. + List availableDataNodes = + filterAvailableTDataNode(regionReplicaSet.getDataNodeLocations()); + if (availableDataNodes.isEmpty()) { + String errorMsg = + String.format( + "All replicas for region[%s] are not available in these DataNodes[%s]", + regionReplicaSet.getRegionId(), regionReplicaSet.getDataNodeLocations()); + throw new IllegalArgumentException(errorMsg); + } + if (regionReplicaSet.getDataNodeLocationsSize() != availableDataNodes.size()) { + LOGGER.info("available replicas: {}", availableDataNodes); + } + int targetIndex; + if (!selectRandomDataNode || queryContext.getSession() == null) { + targetIndex = 0; + } else { + targetIndex = (int) (queryContext.getSession().getSessionId() % availableDataNodes.size()); + } + return availableDataNodes.get(targetIndex); + } + + protected void calculateNodeTopologyBetweenInstance( + List fragmentInstanceList, + Map> planNodeMap, + Map instanceMap) { + for (FragmentInstance instance : fragmentInstanceList) { + PlanNode rootNode = instance.getFragment().getPlanNodeTree(); + if (rootNode instanceof MultiChildrenSinkNode) { + MultiChildrenSinkNode sinkNode = (MultiChildrenSinkNode) rootNode; + for (DownStreamChannelLocation downStreamChannelLocation : + sinkNode.getDownStreamChannelLocationList()) { + // Set target Endpoint for FragmentSinkNode + PlanNodeId downStreamNodeId = + new PlanNodeId(downStreamChannelLocation.getRemotePlanNodeId()); + FragmentInstance downStreamInstance = + findDownStreamInstance(planNodeMap, instanceMap, downStreamNodeId); + downStreamChannelLocation.setRemoteEndpoint( + downStreamInstance.getHostDataNode().getMPPDataExchangeEndPoint()); + downStreamChannelLocation.setRemoteFragmentInstanceId( + downStreamInstance.getId().toThrift()); + + // Set upstream info for corresponding ExchangeNode in downstream FragmentInstance + PlanNode downStreamExchangeNode = planNodeMap.get(downStreamNodeId).right; + ((ExchangeNode) downStreamExchangeNode) + .setUpstream( + instance.getHostDataNode().getMPPDataExchangeEndPoint(), + instance.getId(), + sinkNode.getPlanNodeId()); + } + } + } + } + + private FragmentInstance findDownStreamInstance( + Map> planNodeMap, + Map instanceMap, + PlanNodeId exchangeNodeId) { + return instanceMap.get(planNodeMap.get(exchangeNodeId).left); + } + + private List filterAvailableTDataNode( + List originalDataNodeList) { + List result = new LinkedList<>(); + for (TDataNodeLocation dataNodeLocation : originalDataNodeList) { + if (isAvailableDataNode(dataNodeLocation)) { + result.add(dataNodeLocation); + } + } + return result; + } + + private boolean isAvailableDataNode(TDataNodeLocation dataNodeLocation) { + for (TEndPoint endPoint : queryContext.getEndPointBlackList()) { + if (endPoint.equals(dataNodeLocation.internalEndPoint)) { + return false; + } + } + return true; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java index f1b903b3047be..6a33149c6807e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java @@ -168,7 +168,7 @@ private DistributedQueryPlan generateDistributedPlan( mppQueryContext.getQueryType() == QueryType.READ ? new TableModelQueryFragmentPlanner( subPlan, analysis, mppQueryContext, nodeDistributionMap) - .plan() + .parallelPlan() : new WriteFragmentParallelPlanner( subPlan, analysis, mppQueryContext, WritePlanNode::splitByPartition) .parallelPlan(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java index 2d152dc49f578..28fe3852cabf1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java @@ -20,46 +20,30 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.distribute; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; -import org.apache.iotdb.common.rpc.thrift.TEndPoint; -import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; -import org.apache.iotdb.commons.partition.QueryExecutor; -import org.apache.iotdb.commons.partition.StorageExecutor; -import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.queryengine.common.DataNodeEndPoints; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.PlanFragmentId; -import org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelLocation; import org.apache.iotdb.db.queryengine.plan.ClusterTopology; import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; import org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistribution; -import org.apache.iotdb.db.queryengine.plan.planner.exceptions.ReplicaSetUnreachableException; +import org.apache.iotdb.db.queryengine.plan.planner.plan.AbstractFragmentParallelPlanner; import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment; import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.MultiChildrenSinkNode; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis; -import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExchangeNode; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CountDevice; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDevice; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement; -import org.apache.commons.collections4.CollectionUtils; import org.apache.tsfile.utils.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; -public class TableModelQueryFragmentPlanner { - - private static final Logger LOGGER = - LoggerFactory.getLogger(TableModelQueryFragmentPlanner.class); +public class TableModelQueryFragmentPlanner extends AbstractFragmentParallelPlanner { private final SubPlan subPlan; @@ -86,15 +70,17 @@ public class TableModelQueryFragmentPlanner { Analysis analysis, MPPQueryContext queryContext, final Map nodeDistributionMap) { + super(queryContext); this.subPlan = subPlan; this.analysis = analysis; this.queryContext = queryContext; this.nodeDistributionMap = nodeDistributionMap; } - public List plan() { + @Override + public List parallelPlan() { prepare(); - calculateNodeTopologyBetweenInstance(); + calculateNodeTopologyBetweenInstance(fragmentInstanceList, planNodeMap, instanceMap); return fragmentInstanceList; } @@ -125,47 +111,8 @@ private void produceFragmentInstance( queryContext.isExplainAnalyze(), fragment.isRoot()); - // Get the target region for origin PlanFragment, then its instance will be distributed one - // of them. - TRegionReplicaSet regionReplicaSet = fragment.getTargetRegionForTableModel(nodeDistributionMap); - if (regionReplicaSet != null - && !CollectionUtils.isEmpty(regionReplicaSet.getDataNodeLocations())) { - regionReplicaSet = topology.getValidatedReplicaSet(regionReplicaSet); - if (regionReplicaSet.getDataNodeLocations().isEmpty()) { - throw new ReplicaSetUnreachableException( - fragment.getTargetRegionForTableModel(nodeDistributionMap)); - } - } - - // Set ExecutorType and target host for the instance, - // We need to store all the replica host in case of the scenario that the instance need to be - // redirected - // to another host when scheduling - if (regionReplicaSet == null || regionReplicaSet.getRegionId() == null) { - TDataNodeLocation dataNodeLocation = fragment.getTargetLocation(); - if (dataNodeLocation != null) { - // now only the case ShowStatement will enter here - fragmentInstance.setExecutorAndHost(new QueryExecutor(dataNodeLocation)); - } else { - // no data region && no dataNodeLocation, we need to execute this FI on local - // now only the case AggregationQuery has schemaengine but no data region will enter here - fragmentInstance.setExecutorAndHost( - new QueryExecutor(DataNodeEndPoints.getLocalDataNodeLocation())); - } - } else { - fragmentInstance.setExecutorAndHost(new StorageExecutor(regionReplicaSet)); - fragmentInstance.setHostDataNode(selectTargetDataNode(regionReplicaSet)); - } - - dataNodeFIMap.compute( - fragmentInstance.getHostDataNode(), - (k, v) -> { - if (v == null) { - v = new ArrayList<>(); - } - v.add(fragmentInstance); - return v; - }); + selectExecutorAndHost( + fragment, fragmentInstance, topology::getValidatedReplicaSet, dataNodeFIMap); final Statement statement = analysis.getStatement(); if (analysis.isQuery() || statement instanceof ShowDevice || statement instanceof CountDevice) { @@ -174,90 +121,4 @@ private void produceFragmentInstance( instanceMap.putIfAbsent(fragment.getId(), fragmentInstance); fragmentInstanceList.add(fragmentInstance); } - - private TDataNodeLocation selectTargetDataNode(TRegionReplicaSet regionReplicaSet) { - if (regionReplicaSet == null - || regionReplicaSet.getDataNodeLocations() == null - || regionReplicaSet.getDataNodeLocations().isEmpty()) { - throw new IllegalArgumentException( - String.format("RegionReplicaSet is invalid: %s", regionReplicaSet)); - } - String readConsistencyLevel = - IoTDBDescriptor.getInstance().getConfig().getReadConsistencyLevel(); - boolean selectRandomDataNode = "weak".equals(readConsistencyLevel); - - // When planning fragment onto specific DataNode, the DataNode whose endPoint is in - // black list won't be considered because it may have connection issue now. - List availableDataNodes = - filterAvailableTDataNode(regionReplicaSet.getDataNodeLocations()); - if (availableDataNodes.isEmpty()) { - String errorMsg = - String.format( - "All replicas for region[%s] are not available in these DataNodes[%s]", - regionReplicaSet.getRegionId(), regionReplicaSet.getDataNodeLocations()); - throw new IllegalArgumentException(errorMsg); - } - if (regionReplicaSet.getDataNodeLocationsSize() != availableDataNodes.size()) { - LOGGER.info("Available replicas: {}", availableDataNodes); - } - int targetIndex; - if (!selectRandomDataNode || queryContext.getSession() == null) { - targetIndex = 0; - } else { - targetIndex = (int) (queryContext.getSession().getSessionId() % availableDataNodes.size()); - } - return availableDataNodes.get(targetIndex); - } - - private List filterAvailableTDataNode( - List originalDataNodeList) { - List result = new LinkedList<>(); - for (TDataNodeLocation dataNodeLocation : originalDataNodeList) { - if (isAvailableDataNode(dataNodeLocation)) { - result.add(dataNodeLocation); - } - } - return result; - } - - private boolean isAvailableDataNode(TDataNodeLocation dataNodeLocation) { - for (TEndPoint endPoint : queryContext.getEndPointBlackList()) { - if (endPoint.equals(dataNodeLocation.internalEndPoint)) { - return false; - } - } - return true; - } - - private void calculateNodeTopologyBetweenInstance() { - for (FragmentInstance instance : fragmentInstanceList) { - PlanNode rootNode = instance.getFragment().getPlanNodeTree(); - if (rootNode instanceof MultiChildrenSinkNode) { - MultiChildrenSinkNode sinkNode = (MultiChildrenSinkNode) rootNode; - for (DownStreamChannelLocation downStreamChannelLocation : - sinkNode.getDownStreamChannelLocationList()) { - // Set target Endpoint for FragmentSinkNode - PlanNodeId downStreamNodeId = - new PlanNodeId(downStreamChannelLocation.getRemotePlanNodeId()); - FragmentInstance downStreamInstance = findDownStreamInstance(downStreamNodeId); - downStreamChannelLocation.setRemoteEndpoint( - downStreamInstance.getHostDataNode().getMPPDataExchangeEndPoint()); - downStreamChannelLocation.setRemoteFragmentInstanceId( - downStreamInstance.getId().toThrift()); - - // Set upstream info for corresponding ExchangeNode in downstream FragmentInstance - PlanNode downStreamExchangeNode = planNodeMap.get(downStreamNodeId).right; - ((ExchangeNode) downStreamExchangeNode) - .setUpstream( - instance.getHostDataNode().getMPPDataExchangeEndPoint(), - instance.getId(), - sinkNode.getPlanNodeId()); - } - } - } - } - - private FragmentInstance findDownStreamInstance(PlanNodeId exchangeNodeId) { - return instanceMap.get(planNodeMap.get(exchangeNodeId).left); - } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java index 49a8086d6354b..2634beab8b244 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java @@ -154,9 +154,8 @@ private static TSStatus tryCatchQueryException(Exception e) { } else if (t instanceof QueryInBatchStatementException) { return RpcUtils.getStatus( TSStatusCode.QUERY_NOT_ALLOWED, INFO_NOT_ALLOWED_IN_BATCH_ERROR + rootCause.getMessage()); - } else if (t instanceof RootFIPlacementException) { - return RpcUtils.getStatus(TSStatusCode.PLAN_FAILED_NETWORK_PARTITION, rootCause.getMessage()); - } else if (t instanceof ReplicaSetUnreachableException) { + } else if (t instanceof RootFIPlacementException + || t instanceof ReplicaSetUnreachableException) { return RpcUtils.getStatus(TSStatusCode.PLAN_FAILED_NETWORK_PARTITION, rootCause.getMessage()); } else if (t instanceof IoTDBException) { return RpcUtils.getStatus(((IoTDBException) t).getErrorCode(), rootCause.getMessage()); From 2b89681157dfd99f4185404e172ba475699d96d1 Mon Sep 17 00:00:00 2001 From: szywilliam Date: Sun, 23 Mar 2025 22:57:25 +0800 Subject: [PATCH 02/11] fix CI --- .../SimpleFragmentParallelPlanner.java | 35 +++++++++++++++++- .../plan/AbstractFragmentParallelPlanner.java | 37 +------------------ .../TableModelQueryFragmentPlanner.java | 34 ++++++++++++++++- 3 files changed, 68 insertions(+), 38 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java index 4bbd442840206..47e61a89a18d1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java @@ -31,6 +31,8 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.TreeModelTimePredicate; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.MultiChildrenSinkNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastSeriesSourceNode; import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTimeSeriesStatement; @@ -81,7 +83,7 @@ public SimpleFragmentParallelPlanner( @Override public List parallelPlan() { prepare(); - calculateNodeTopologyBetweenInstance(fragmentInstanceList, planNodeMap, instanceMap); + calculateNodeTopologyBetweenInstance(); return fragmentInstanceList; } @@ -154,6 +156,37 @@ private void produceFragmentInstance(PlanFragment fragment) { fragmentInstanceList.add(fragmentInstance); } + void calculateNodeTopologyBetweenInstance() { + for (FragmentInstance instance : fragmentInstanceList) { + PlanNode rootNode = instance.getFragment().getPlanNodeTree(); + if (rootNode instanceof MultiChildrenSinkNode) { + MultiChildrenSinkNode sinkNode = (MultiChildrenSinkNode) rootNode; + sinkNode + .getDownStreamChannelLocationList() + .forEach( + downStreamChannelLocation -> { + // Set target Endpoint for FragmentSinkNode + PlanNodeId downStreamNodeId = + new PlanNodeId(downStreamChannelLocation.getRemotePlanNodeId()); + FragmentInstance downStreamInstance = + findDownStreamInstance(planNodeMap, instanceMap, downStreamNodeId); + downStreamChannelLocation.setRemoteEndpoint( + downStreamInstance.getHostDataNode().getMPPDataExchangeEndPoint()); + downStreamChannelLocation.setRemoteFragmentInstanceId( + downStreamInstance.getId().toThrift()); + + // Set upstream info for corresponding ExchangeNode in downstream FragmentInstance + PlanNode downStreamExchangeNode = planNodeMap.get(downStreamNodeId).right; + ((ExchangeNode) downStreamExchangeNode) + .setUpstream( + instance.getHostDataNode().getMPPDataExchangeEndPoint(), + instance.getId(), + sinkNode.getPlanNodeId()); + }); + } + } + } + private void recordPlanNodeRelation(PlanNode root, PlanFragmentId planFragmentId) { planNodeMap.put(root.getPlanNodeId(), new Pair<>(planFragmentId, root)); for (PlanNode child : root.getChildren()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java index dffed3bee7b3b..0912f37915c5e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java @@ -28,14 +28,11 @@ import org.apache.iotdb.db.queryengine.common.DataNodeEndPoints; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.PlanFragmentId; -import org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelLocation; import org.apache.iotdb.db.queryengine.plan.planner.IFragmentParallelPlaner; import org.apache.iotdb.db.queryengine.plan.planner.exceptions.ReplicaSetUnreachableException; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.MultiChildrenSinkNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableModelQueryFragmentPlanner; -import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExchangeNode; import org.apache.commons.collections4.CollectionUtils; import org.apache.tsfile.utils.Pair; @@ -140,39 +137,7 @@ protected TDataNodeLocation selectTargetDataNode(TRegionReplicaSet regionReplica return availableDataNodes.get(targetIndex); } - protected void calculateNodeTopologyBetweenInstance( - List fragmentInstanceList, - Map> planNodeMap, - Map instanceMap) { - for (FragmentInstance instance : fragmentInstanceList) { - PlanNode rootNode = instance.getFragment().getPlanNodeTree(); - if (rootNode instanceof MultiChildrenSinkNode) { - MultiChildrenSinkNode sinkNode = (MultiChildrenSinkNode) rootNode; - for (DownStreamChannelLocation downStreamChannelLocation : - sinkNode.getDownStreamChannelLocationList()) { - // Set target Endpoint for FragmentSinkNode - PlanNodeId downStreamNodeId = - new PlanNodeId(downStreamChannelLocation.getRemotePlanNodeId()); - FragmentInstance downStreamInstance = - findDownStreamInstance(planNodeMap, instanceMap, downStreamNodeId); - downStreamChannelLocation.setRemoteEndpoint( - downStreamInstance.getHostDataNode().getMPPDataExchangeEndPoint()); - downStreamChannelLocation.setRemoteFragmentInstanceId( - downStreamInstance.getId().toThrift()); - - // Set upstream info for corresponding ExchangeNode in downstream FragmentInstance - PlanNode downStreamExchangeNode = planNodeMap.get(downStreamNodeId).right; - ((ExchangeNode) downStreamExchangeNode) - .setUpstream( - instance.getHostDataNode().getMPPDataExchangeEndPoint(), - instance.getId(), - sinkNode.getPlanNodeId()); - } - } - } - } - - private FragmentInstance findDownStreamInstance( + protected FragmentInstance findDownStreamInstance( Map> planNodeMap, Map instanceMap, PlanNodeId exchangeNodeId) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java index 28fe3852cabf1..138335c8e6524 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java @@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.PlanFragmentId; +import org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelLocation; import org.apache.iotdb.db.queryengine.plan.ClusterTopology; import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; import org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistribution; @@ -31,7 +32,9 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.MultiChildrenSinkNode; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExchangeNode; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CountDevice; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDevice; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement; @@ -80,7 +83,7 @@ public class TableModelQueryFragmentPlanner extends AbstractFragmentParallelPlan @Override public List parallelPlan() { prepare(); - calculateNodeTopologyBetweenInstance(fragmentInstanceList, planNodeMap, instanceMap); + calculateNodeTopologyBetweenInstance(); return fragmentInstanceList; } @@ -99,6 +102,35 @@ private void recordPlanNodeRelation(PlanNode root, PlanFragmentId planFragmentId root.getChildren().forEach(child -> recordPlanNodeRelation(child, planFragmentId)); } + private void calculateNodeTopologyBetweenInstance() { + for (FragmentInstance instance : fragmentInstanceList) { + PlanNode rootNode = instance.getFragment().getPlanNodeTree(); + if (rootNode instanceof MultiChildrenSinkNode) { + MultiChildrenSinkNode sinkNode = (MultiChildrenSinkNode) rootNode; + for (DownStreamChannelLocation downStreamChannelLocation : + sinkNode.getDownStreamChannelLocationList()) { + // Set target Endpoint for FragmentSinkNode + PlanNodeId downStreamNodeId = + new PlanNodeId(downStreamChannelLocation.getRemotePlanNodeId()); + FragmentInstance downStreamInstance = + findDownStreamInstance(planNodeMap, instanceMap, downStreamNodeId); + downStreamChannelLocation.setRemoteEndpoint( + downStreamInstance.getHostDataNode().getMPPDataExchangeEndPoint()); + downStreamChannelLocation.setRemoteFragmentInstanceId( + downStreamInstance.getId().toThrift()); + + // Set upstream info for corresponding ExchangeNode in downstream FragmentInstance + PlanNode downStreamExchangeNode = planNodeMap.get(downStreamNodeId).right; + ((ExchangeNode) downStreamExchangeNode) + .setUpstream( + instance.getHostDataNode().getMPPDataExchangeEndPoint(), + instance.getId(), + sinkNode.getPlanNodeId()); + } + } + } + } + private void produceFragmentInstance( PlanFragment fragment, final Map nodeDistributionMap) { FragmentInstance fragmentInstance = From 35e1f6ecd67a6578fdfd1d9e4db883fb5afdd468 Mon Sep 17 00:00:00 2001 From: szywilliam Date: Sun, 23 Mar 2025 22:59:11 +0800 Subject: [PATCH 03/11] fix CI --- .../planner/distribution/SimpleFragmentParallelPlanner.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java index 47e61a89a18d1..c7c78b4b109a1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java @@ -156,7 +156,7 @@ private void produceFragmentInstance(PlanFragment fragment) { fragmentInstanceList.add(fragmentInstance); } - void calculateNodeTopologyBetweenInstance() { + private void calculateNodeTopologyBetweenInstance() { for (FragmentInstance instance : fragmentInstanceList) { PlanNode rootNode = instance.getFragment().getPlanNodeTree(); if (rootNode instanceof MultiChildrenSinkNode) { From 1fb043a52619e8eb7b16c91545a989c46e1184a7 Mon Sep 17 00:00:00 2001 From: szywilliam Date: Mon, 24 Mar 2025 16:50:23 +0800 Subject: [PATCH 04/11] fix CI --- .../planner/distribution/SimpleFragmentParallelPlanner.java | 6 +++++- .../plan/planner/plan/AbstractFragmentParallelPlanner.java | 6 ++++-- .../planner/distribute/TableModelQueryFragmentPlanner.java | 6 +++++- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java index c7c78b4b109a1..2cadce4e51a2b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java @@ -143,7 +143,11 @@ private void produceFragmentInstance(PlanFragment fragment) { fragment.isRoot()); selectExecutorAndHost( - fragment, fragmentInstance, topology::getValidatedReplicaSet, dataNodeFIMap); + fragment, + fragmentInstance, + fragment::getTargetRegionForTreeModel, + topology::getValidatedReplicaSet, + dataNodeFIMap); if (analysis.getTreeStatement() instanceof QueryStatement || analysis.getTreeStatement() instanceof ExplainAnalyzeStatement diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java index 0912f37915c5e..cbad76a79dd7a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java @@ -44,6 +44,7 @@ import java.util.List; import java.util.Map; import java.util.function.Function; +import java.util.function.Supplier; public abstract class AbstractFragmentParallelPlanner implements IFragmentParallelPlaner { private static final Logger LOGGER = @@ -58,11 +59,12 @@ protected AbstractFragmentParallelPlanner(MPPQueryContext queryContext) { protected void selectExecutorAndHost( PlanFragment fragment, FragmentInstance fragmentInstance, + Supplier replicaSetProvider, Function validator, Map> dataNodeFIMap) { // Get the target region for origin PlanFragment, then its instance will be distributed one // of them. - TRegionReplicaSet regionReplicaSet = fragment.getTargetRegionForTreeModel(); + TRegionReplicaSet regionReplicaSet = replicaSetProvider.get(); if (regionReplicaSet != null && !CollectionUtils.isEmpty(regionReplicaSet.getDataNodeLocations())) { regionReplicaSet = validator.apply(regionReplicaSet); @@ -87,7 +89,7 @@ protected void selectExecutorAndHost( } } else { fragmentInstance.setExecutorAndHost(new StorageExecutor(regionReplicaSet)); - fragmentInstance.setHostDataNode(this.selectTargetDataNode(regionReplicaSet)); + fragmentInstance.setHostDataNode(selectTargetDataNode(regionReplicaSet)); } dataNodeFIMap.compute( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java index 138335c8e6524..560e8a844a29b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java @@ -144,7 +144,11 @@ private void produceFragmentInstance( fragment.isRoot()); selectExecutorAndHost( - fragment, fragmentInstance, topology::getValidatedReplicaSet, dataNodeFIMap); + fragment, + fragmentInstance, + () -> fragment.getTargetRegionForTableModel(nodeDistributionMap), + topology::getValidatedReplicaSet, + dataNodeFIMap); final Statement statement = analysis.getStatement(); if (analysis.isQuery() || statement instanceof ShowDevice || statement instanceof CountDevice) { From b4ae865515988308bdfe63e4ff253351a96ab612 Mon Sep 17 00:00:00 2001 From: szywilliam Date: Tue, 25 Mar 2025 09:52:49 +0800 Subject: [PATCH 05/11] address review issues --- .../manager/load/service/TopologyService.java | 40 +++++++++++-------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java index 103fb414a8089..d46181d5f7fe3 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java @@ -49,7 +49,6 @@ import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -219,7 +218,6 @@ private synchronized void topologyProbing() { final Map> latestTopology = dataNodeLocations.stream() .collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, k -> new HashSet<>())); - final Map> partitioned = new HashMap<>(); for (final Map.Entry, List> entry : heartbeats.entrySet()) { final int fromId = entry.getKey().getLeft(); @@ -227,15 +225,12 @@ private synchronized void topologyProbing() { if (!entry.getValue().isEmpty() && !failureDetector.isAvailable(entry.getKey(), entry.getValue())) { LOGGER.debug("Connection from DataNode {} to DataNode {} is broken", fromId, toId); - partitioned.computeIfAbsent(fromId, id -> new HashSet<>()).add(toId); } else { latestTopology.get(fromId).add(toId); } } - if (!partitioned.isEmpty()) { - logAsymmetricPartition(partitioned, dataNodeIds.size()); - } + logAsymmetricPartition(latestTopology); // 5. notify the listeners on topology change if (shouldRun.get()) { @@ -243,17 +238,28 @@ private synchronized void topologyProbing() { } } - private void logAsymmetricPartition( - final Map> partitioned, final int totalNodes) { - for (final int fromId : partitioned.keySet()) { - final Set unreachable = partitioned.get(fromId); - if (unreachable.size() >= totalNodes - 1) { - // this node is partitioned symmetrically from other nodes - continue; - } - for (final int toId : partitioned.get(fromId)) { - if (partitioned.get(toId) == null || !partitioned.get(toId).contains(fromId)) { - LOGGER.warn("[Topology] Asymmetric network partition from {} to {}", fromId, toId); + private void logAsymmetricPartition(final Map> topology) { + final Set nodes = topology.keySet(); + if (nodes.size() == 1) { + // 1 DataNode + return; + } + + for (int from : nodes) { + for (int to : nodes) { + if (from == to) { + continue; + } + + // whether we have asymmetric partition [from -> to] + final Set reachableFrom = topology.get(from); + final Set reachableTo = topology.get(to); + if (reachableFrom.size() <= 1 || reachableTo.size() <= 1) { + // symmetric partition for (from) or (to) + continue; + } + if (reachableTo.contains(from) && !reachableFrom.contains(to)) { + LOGGER.warn("[Topology] Asymmetric network partition from {} to {}", from, to); } } } From 04410adb15c9222ef1e000c5c4f8986c0b61a1e6 Mon Sep 17 00:00:00 2001 From: szywilliam Date: Thu, 27 Mar 2025 16:42:49 +0800 Subject: [PATCH 06/11] address review issues --- .../manager/load/service/TopologyService.java | 28 ++++++++++++++++--- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java index d46181d5f7fe3..8e46965e213fc 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java @@ -85,6 +85,7 @@ public class TopologyService implements Runnable, IClusterStatusSubscriber { /* (fromDataNodeId, toDataNodeId) -> heartbeat history */ private final Map, List> heartbeats; + private final Map, Integer> accumulatedFailures; private final List startingDataNodes = new CopyOnWriteArrayList<>(); private final IFailureDetector failureDetector; @@ -95,6 +96,7 @@ public TopologyService( this.configManager = configManager; this.topologyChangeListener = topologyChangeListener; this.heartbeats = new ConcurrentHashMap<>(); + this.accumulatedFailures = new ConcurrentHashMap<>(); this.shouldRun = new AtomicBoolean(false); this.awaitForSignal = new AwaitForSignal(this.getClass().getSimpleName()); @@ -129,6 +131,7 @@ public synchronized void stopTopologyService() { future.cancel(true); future = null; heartbeats.clear(); + accumulatedFailures.clear(); LOGGER.info("Topology Probing has stopped successfully"); } @@ -141,6 +144,7 @@ private boolean mayWait() { this.awaitForSignal.await(PROBING_INTERVAL_MS, TimeUnit.MILLISECONDS); return true; } catch (InterruptedException e) { + // we don't reset the interrupt flag here since we may reuse this thread again. return false; } } @@ -222,8 +226,19 @@ private synchronized void topologyProbing() { heartbeats.entrySet()) { final int fromId = entry.getKey().getLeft(); final int toId = entry.getKey().getRight(); - if (!entry.getValue().isEmpty() - && !failureDetector.isAvailable(entry.getKey(), entry.getValue())) { + + final boolean isReachable = !entry.getValue().isEmpty() + && !failureDetector.isAvailable(entry.getKey(), entry.getValue()); + + if (isReachable) { + // reset the failure accumulator + accumulatedFailures.put(entry.getKey(), 0); + } else { + accumulatedFailures.compute(entry.getKey(), (k, ov) -> Optional.ofNullable(ov).orElse(0) + 1); + } + + // if 3 subsequent probing all failed, then it is highly possible a network partition. + if (accumulatedFailures.get(entry.getKey()) >= 3) { LOGGER.debug("Connection from DataNode {} to DataNode {} is broken", fromId, toId); } else { latestTopology.get(fromId).add(toId); @@ -238,6 +253,10 @@ private synchronized void topologyProbing() { } } + /** + * We only consider warning (one vs remaining) network partition. If we need to cover more + * complicated scenarios like (many vs many) network partition, we shall use graph algorithms then. + */ private void logAsymmetricPartition(final Map> topology) { final Set nodes = topology.keySet(); if (nodes.size() == 1) { @@ -247,7 +266,7 @@ private void logAsymmetricPartition(final Map> topology) { for (int from : nodes) { for (int to : nodes) { - if (from == to) { + if (from >= to) { continue; } @@ -258,7 +277,7 @@ private void logAsymmetricPartition(final Map> topology) { // symmetric partition for (from) or (to) continue; } - if (reachableTo.contains(from) && !reachableFrom.contains(to)) { + if (!reachableTo.contains(from) && !reachableFrom.contains(to)) { LOGGER.warn("[Topology] Asymmetric network partition from {} to {}", from, to); } } @@ -297,6 +316,7 @@ public void onNodeStatisticsChanged(NodeStatisticsChangeEvent event) { || Objects.equals(pair.getRight(), nodeId)) .collect(Collectors.toSet()); toRemove.forEach(heartbeats::remove); + toRemove.forEach(accumulatedFailures::remove); } else { // we only trigger probing immediately if node comes around from UNKNOWN to RUNNING if (NodeStatus.Unknown.equals(changeEvent.getLeft().getStatus()) From b4b2b5ca02b6e897f347e247415a238c85ee34fb Mon Sep 17 00:00:00 2001 From: szywilliam Date: Thu, 27 Mar 2025 16:43:43 +0800 Subject: [PATCH 07/11] address review issues --- .../manager/load/service/TopologyService.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java index 8e46965e213fc..2a35e4afc2b02 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java @@ -227,14 +227,16 @@ private synchronized void topologyProbing() { final int fromId = entry.getKey().getLeft(); final int toId = entry.getKey().getRight(); - final boolean isReachable = !entry.getValue().isEmpty() - && !failureDetector.isAvailable(entry.getKey(), entry.getValue()); + final boolean isReachable = + !entry.getValue().isEmpty() + && !failureDetector.isAvailable(entry.getKey(), entry.getValue()); if (isReachable) { // reset the failure accumulator accumulatedFailures.put(entry.getKey(), 0); } else { - accumulatedFailures.compute(entry.getKey(), (k, ov) -> Optional.ofNullable(ov).orElse(0) + 1); + accumulatedFailures.compute( + entry.getKey(), (k, ov) -> Optional.ofNullable(ov).orElse(0) + 1); } // if 3 subsequent probing all failed, then it is highly possible a network partition. @@ -255,7 +257,8 @@ private synchronized void topologyProbing() { /** * We only consider warning (one vs remaining) network partition. If we need to cover more - * complicated scenarios like (many vs many) network partition, we shall use graph algorithms then. + * complicated scenarios like (many vs many) network partition, we shall use graph algorithms + * then. */ private void logAsymmetricPartition(final Map> topology) { final Set nodes = topology.keySet(); From 3c62226a7ba1ce0f6483a1ae8fca62dc37d3e89a Mon Sep 17 00:00:00 2001 From: szywilliam Date: Thu, 27 Mar 2025 16:50:11 +0800 Subject: [PATCH 08/11] address review issues --- .../manager/load/service/TopologyService.java | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java index 2a35e4afc2b02..885e220c9bcf7 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java @@ -309,21 +309,25 @@ public void onNodeStatisticsChanged(NodeStatisticsChangeEvent event) { startingDataNodes.remove(nodeId); } + final Set> affectedPairs = + heartbeats.keySet().stream() + .filter( + pair -> + Objects.equals(pair.getLeft(), nodeId) + || Objects.equals(pair.getRight(), nodeId)) + .collect(Collectors.toSet()); + if (changeEvent.getRight() == null) { // datanode removed from cluster, clean up probing history - final Set> toRemove = - heartbeats.keySet().stream() - .filter( - pair -> - Objects.equals(pair.getLeft(), nodeId) - || Objects.equals(pair.getRight(), nodeId)) - .collect(Collectors.toSet()); - toRemove.forEach(heartbeats::remove); - toRemove.forEach(accumulatedFailures::remove); + affectedPairs.forEach(heartbeats::remove); + affectedPairs.forEach(accumulatedFailures::remove); } else { // we only trigger probing immediately if node comes around from UNKNOWN to RUNNING if (NodeStatus.Unknown.equals(changeEvent.getLeft().getStatus()) && NodeStatus.Running.equals(changeEvent.getRight().getStatus())) { + // let's clear the history when a new node comes around + affectedPairs.forEach(pair -> heartbeats.put(pair, new ArrayList<>())); + affectedPairs.forEach(pair -> accumulatedFailures.put(pair, 0)); awaitForSignal.signal(); } } From e186df5546b68a21e4236d93792566f7c5478f27 Mon Sep 17 00:00:00 2001 From: szywilliam Date: Fri, 28 Mar 2025 11:06:51 +0800 Subject: [PATCH 09/11] address review issues --- .../manager/load/service/TopologyService.java | 21 ++----------------- 1 file changed, 2 insertions(+), 19 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java index 885e220c9bcf7..1176486f825dc 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java @@ -85,7 +85,6 @@ public class TopologyService implements Runnable, IClusterStatusSubscriber { /* (fromDataNodeId, toDataNodeId) -> heartbeat history */ private final Map, List> heartbeats; - private final Map, Integer> accumulatedFailures; private final List startingDataNodes = new CopyOnWriteArrayList<>(); private final IFailureDetector failureDetector; @@ -96,7 +95,6 @@ public TopologyService( this.configManager = configManager; this.topologyChangeListener = topologyChangeListener; this.heartbeats = new ConcurrentHashMap<>(); - this.accumulatedFailures = new ConcurrentHashMap<>(); this.shouldRun = new AtomicBoolean(false); this.awaitForSignal = new AwaitForSignal(this.getClass().getSimpleName()); @@ -131,7 +129,6 @@ public synchronized void stopTopologyService() { future.cancel(true); future = null; heartbeats.clear(); - accumulatedFailures.clear(); LOGGER.info("Topology Probing has stopped successfully"); } @@ -227,20 +224,8 @@ private synchronized void topologyProbing() { final int fromId = entry.getKey().getLeft(); final int toId = entry.getKey().getRight(); - final boolean isReachable = - !entry.getValue().isEmpty() - && !failureDetector.isAvailable(entry.getKey(), entry.getValue()); - - if (isReachable) { - // reset the failure accumulator - accumulatedFailures.put(entry.getKey(), 0); - } else { - accumulatedFailures.compute( - entry.getKey(), (k, ov) -> Optional.ofNullable(ov).orElse(0) + 1); - } - - // if 3 subsequent probing all failed, then it is highly possible a network partition. - if (accumulatedFailures.get(entry.getKey()) >= 3) { + if (!entry.getValue().isEmpty() + && !failureDetector.isAvailable(entry.getKey(), entry.getValue())) { LOGGER.debug("Connection from DataNode {} to DataNode {} is broken", fromId, toId); } else { latestTopology.get(fromId).add(toId); @@ -320,14 +305,12 @@ public void onNodeStatisticsChanged(NodeStatisticsChangeEvent event) { if (changeEvent.getRight() == null) { // datanode removed from cluster, clean up probing history affectedPairs.forEach(heartbeats::remove); - affectedPairs.forEach(accumulatedFailures::remove); } else { // we only trigger probing immediately if node comes around from UNKNOWN to RUNNING if (NodeStatus.Unknown.equals(changeEvent.getLeft().getStatus()) && NodeStatus.Running.equals(changeEvent.getRight().getStatus())) { // let's clear the history when a new node comes around affectedPairs.forEach(pair -> heartbeats.put(pair, new ArrayList<>())); - affectedPairs.forEach(pair -> accumulatedFailures.put(pair, 0)); awaitForSignal.signal(); } } From fc2b64b5af7f290ef116b18073622568b98c4414 Mon Sep 17 00:00:00 2001 From: szywilliam Date: Fri, 28 Mar 2025 11:07:47 +0800 Subject: [PATCH 10/11] address review issues --- .../iotdb/confignode/manager/load/service/TopologyService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java index 1176486f825dc..c1a48dd49cb23 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java @@ -225,7 +225,7 @@ private synchronized void topologyProbing() { final int toId = entry.getKey().getRight(); if (!entry.getValue().isEmpty() - && !failureDetector.isAvailable(entry.getKey(), entry.getValue())) { + && !failureDetector.isAvailable(entry.getKey(), entry.getValue())) { LOGGER.debug("Connection from DataNode {} to DataNode {} is broken", fromId, toId); } else { latestTopology.get(fromId).add(toId); From a2026b58bb754dba213de37adc648c82eff41b63 Mon Sep 17 00:00:00 2001 From: szywilliam Date: Fri, 28 Mar 2025 12:32:50 +0800 Subject: [PATCH 11/11] address review issues --- .../src/test/java/org/apache/iotdb/db/it/IoTDBRestartIT.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestartIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestartIT.java index 960c687a6ca09..603196e871a85 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestartIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestartIT.java @@ -29,6 +29,7 @@ import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -46,7 +47,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.fail; -// @Ignore +@Ignore("enable this after RTO/RPO retry") @RunWith(IoTDBTestRunner.class) @Category({LocalStandaloneIT.class, ClusterIT.class}) public class IoTDBRestartIT {