diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestartIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestartIT.java index 960c687a6ca09..603196e871a85 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestartIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestartIT.java @@ -29,6 +29,7 @@ import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -46,7 +47,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.fail; -// @Ignore +@Ignore("enable this after RTO/RPO retry") @RunWith(IoTDBTestRunner.class) @Category({LocalStandaloneIT.class, ClusterIT.class}) public class IoTDBRestartIT { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java index 7aa3ac91c59dd..c1a48dd49cb23 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java @@ -49,7 +49,6 @@ import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -142,6 +141,7 @@ private boolean mayWait() { this.awaitForSignal.await(PROBING_INTERVAL_MS, TimeUnit.MILLISECONDS); return true; } catch (InterruptedException e) { + // we don't reset the interrupt flag here since we may reuse this thread again. return false; } } @@ -219,23 +219,20 @@ private synchronized void topologyProbing() { final Map> latestTopology = dataNodeLocations.stream() .collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, k -> new HashSet<>())); - final Map> partitioned = new HashMap<>(); for (final Map.Entry, List> entry : heartbeats.entrySet()) { final int fromId = entry.getKey().getLeft(); final int toId = entry.getKey().getRight(); + if (!entry.getValue().isEmpty() && !failureDetector.isAvailable(entry.getKey(), entry.getValue())) { LOGGER.debug("Connection from DataNode {} to DataNode {} is broken", fromId, toId); - partitioned.computeIfAbsent(fromId, id -> new HashSet<>()).add(toId); } else { latestTopology.get(fromId).add(toId); } } - if (!partitioned.isEmpty()) { - logAsymmetricPartition(partitioned); - } + logAsymmetricPartition(latestTopology); // 5. notify the listeners on topology change if (shouldRun.get()) { @@ -243,11 +240,33 @@ private synchronized void topologyProbing() { } } - private void logAsymmetricPartition(final Map> partitioned) { - for (final int fromId : partitioned.keySet()) { - for (final int toId : partitioned.get(fromId)) { - if (partitioned.get(toId) == null || !partitioned.get(toId).contains(fromId)) { - LOGGER.warn("[Topology] Asymmetric network partition from {} to {}", fromId, toId); + /** + * We only consider warning (one vs remaining) network partition. If we need to cover more + * complicated scenarios like (many vs many) network partition, we shall use graph algorithms + * then. + */ + private void logAsymmetricPartition(final Map> topology) { + final Set nodes = topology.keySet(); + if (nodes.size() == 1) { + // 1 DataNode + return; + } + + for (int from : nodes) { + for (int to : nodes) { + if (from >= to) { + continue; + } + + // whether we have asymmetric partition [from -> to] + final Set reachableFrom = topology.get(from); + final Set reachableTo = topology.get(to); + if (reachableFrom.size() <= 1 || reachableTo.size() <= 1) { + // symmetric partition for (from) or (to) + continue; + } + if (!reachableTo.contains(from) && !reachableFrom.contains(to)) { + LOGGER.warn("[Topology] Asymmetric network partition from {} to {}", from, to); } } } @@ -275,20 +294,23 @@ public void onNodeStatisticsChanged(NodeStatisticsChangeEvent event) { startingDataNodes.remove(nodeId); } + final Set> affectedPairs = + heartbeats.keySet().stream() + .filter( + pair -> + Objects.equals(pair.getLeft(), nodeId) + || Objects.equals(pair.getRight(), nodeId)) + .collect(Collectors.toSet()); + if (changeEvent.getRight() == null) { // datanode removed from cluster, clean up probing history - final Set> toRemove = - heartbeats.keySet().stream() - .filter( - pair -> - Objects.equals(pair.getLeft(), nodeId) - || Objects.equals(pair.getRight(), nodeId)) - .collect(Collectors.toSet()); - toRemove.forEach(heartbeats::remove); + affectedPairs.forEach(heartbeats::remove); } else { // we only trigger probing immediately if node comes around from UNKNOWN to RUNNING if (NodeStatus.Unknown.equals(changeEvent.getLeft().getStatus()) && NodeStatus.Running.equals(changeEvent.getRight().getStatus())) { + // let's clear the history when a new node comes around + affectedPairs.forEach(pair -> heartbeats.put(pair, new ArrayList<>())); awaitForSignal.signal(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java index ee11364947b97..2cadce4e51a2b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java @@ -19,19 +19,12 @@ package org.apache.iotdb.db.queryengine.plan.planner.distribution; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; -import org.apache.iotdb.common.rpc.thrift.TEndPoint; -import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; -import org.apache.iotdb.commons.partition.QueryExecutor; -import org.apache.iotdb.commons.partition.StorageExecutor; -import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.queryengine.common.DataNodeEndPoints; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.PlanFragmentId; import org.apache.iotdb.db.queryengine.plan.ClusterTopology; import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; import org.apache.iotdb.db.queryengine.plan.expression.Expression; -import org.apache.iotdb.db.queryengine.plan.planner.IFragmentParallelPlaner; -import org.apache.iotdb.db.queryengine.plan.planner.exceptions.ReplicaSetUnreachableException; +import org.apache.iotdb.db.queryengine.plan.planner.plan.AbstractFragmentParallelPlanner; import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment; import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan; @@ -46,15 +39,11 @@ import org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainAnalyzeStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement; -import org.apache.commons.collections4.CollectionUtils; import org.apache.tsfile.read.common.Path; import org.apache.tsfile.utils.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; @@ -63,8 +52,7 @@ * A simple implementation of IFragmentParallelPlaner. This planner will transform one PlanFragment * into only one FragmentInstance. */ -public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner { - private static final Logger logger = LoggerFactory.getLogger(SimpleFragmentParallelPlanner.class); +public class SimpleFragmentParallelPlanner extends AbstractFragmentParallelPlanner { private final SubPlan subPlan; private final Analysis analysis; @@ -82,6 +70,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner { public SimpleFragmentParallelPlanner( SubPlan subPlan, Analysis analysis, MPPQueryContext context) { + super(context); this.subPlan = subPlan; this.analysis = analysis; this.queryContext = context; @@ -153,46 +142,12 @@ private void produceFragmentInstance(PlanFragment fragment) { queryContext.isExplainAnalyze(), fragment.isRoot()); - // Get the target region for origin PlanFragment, then its instance will be distributed one - // of them. - TRegionReplicaSet regionReplicaSet = fragment.getTargetRegionForTreeModel(); - if (regionReplicaSet != null - && !CollectionUtils.isEmpty(regionReplicaSet.getDataNodeLocations())) { - regionReplicaSet = topology.getValidatedReplicaSet(regionReplicaSet); - if (regionReplicaSet.getDataNodeLocations().isEmpty()) { - throw new ReplicaSetUnreachableException(fragment.getTargetRegionForTreeModel()); - } - } - - // Set ExecutorType and target host for the instance - // We need to store all the replica host in case of the scenario that the instance need to be - // redirected - // to another host when scheduling - if (regionReplicaSet == null || regionReplicaSet.getRegionId() == null) { - TDataNodeLocation dataNodeLocation = fragment.getTargetLocation(); - if (dataNodeLocation != null) { - // now only the case ShowQueries will enter here - fragmentInstance.setExecutorAndHost(new QueryExecutor(dataNodeLocation)); - } else { - // no data region && no dataNodeLocation, we need to execute this FI on local - // now only the case AggregationQuery has schemaengine but no data region will enter here - fragmentInstance.setExecutorAndHost( - new QueryExecutor(DataNodeEndPoints.getLocalDataNodeLocation())); - } - } else { - fragmentInstance.setExecutorAndHost(new StorageExecutor(regionReplicaSet)); - fragmentInstance.setHostDataNode(selectTargetDataNode(regionReplicaSet)); - } - - dataNodeFIMap.compute( - fragmentInstance.getHostDataNode(), - (k, v) -> { - if (v == null) { - v = new ArrayList<>(); - } - v.add(fragmentInstance); - return v; - }); + selectExecutorAndHost( + fragment, + fragmentInstance, + fragment::getTargetRegionForTreeModel, + topology::getValidatedReplicaSet, + dataNodeFIMap); if (analysis.getTreeStatement() instanceof QueryStatement || analysis.getTreeStatement() instanceof ExplainAnalyzeStatement @@ -205,62 +160,6 @@ private void produceFragmentInstance(PlanFragment fragment) { fragmentInstanceList.add(fragmentInstance); } - private TDataNodeLocation selectTargetDataNode(TRegionReplicaSet regionReplicaSet) { - if (regionReplicaSet == null - || regionReplicaSet.getDataNodeLocations() == null - || regionReplicaSet.getDataNodeLocations().isEmpty()) { - throw new IllegalArgumentException( - String.format("regionReplicaSet is invalid: %s", regionReplicaSet)); - } - String readConsistencyLevel = - IoTDBDescriptor.getInstance().getConfig().getReadConsistencyLevel(); - // TODO: (Chen Rongzhao) need to make the values of ReadConsistencyLevel as static variable or - // enums - boolean selectRandomDataNode = "weak".equals(readConsistencyLevel); - - // When planning fragment onto specific DataNode, the DataNode whose endPoint is in - // black list won't be considered because it may have connection issue now. - List availableDataNodes = - filterAvailableTDataNode(regionReplicaSet.getDataNodeLocations()); - if (availableDataNodes.isEmpty()) { - String errorMsg = - String.format( - "All replicas for region[%s] are not available in these DataNodes[%s]", - regionReplicaSet.getRegionId(), regionReplicaSet.getDataNodeLocations()); - throw new IllegalArgumentException(errorMsg); - } - if (regionReplicaSet.getDataNodeLocationsSize() != availableDataNodes.size()) { - logger.info("available replicas: {}", availableDataNodes); - } - int targetIndex; - if (!selectRandomDataNode || queryContext.getSession() == null) { - targetIndex = 0; - } else { - targetIndex = (int) (queryContext.getSession().getSessionId() % availableDataNodes.size()); - } - return availableDataNodes.get(targetIndex); - } - - private List filterAvailableTDataNode( - List originalDataNodeList) { - List result = new LinkedList<>(); - for (TDataNodeLocation dataNodeLocation : originalDataNodeList) { - if (isAvailableDataNode(dataNodeLocation)) { - result.add(dataNodeLocation); - } - } - return result; - } - - private boolean isAvailableDataNode(TDataNodeLocation dataNodeLocation) { - for (TEndPoint endPoint : queryContext.getEndPointBlackList()) { - if (endPoint.equals(dataNodeLocation.internalEndPoint)) { - return false; - } - } - return true; - } - private void calculateNodeTopologyBetweenInstance() { for (FragmentInstance instance : fragmentInstanceList) { PlanNode rootNode = instance.getFragment().getPlanNodeTree(); @@ -273,7 +172,8 @@ private void calculateNodeTopologyBetweenInstance() { // Set target Endpoint for FragmentSinkNode PlanNodeId downStreamNodeId = new PlanNodeId(downStreamChannelLocation.getRemotePlanNodeId()); - FragmentInstance downStreamInstance = findDownStreamInstance(downStreamNodeId); + FragmentInstance downStreamInstance = + findDownStreamInstance(planNodeMap, instanceMap, downStreamNodeId); downStreamChannelLocation.setRemoteEndpoint( downStreamInstance.getHostDataNode().getMPPDataExchangeEndPoint()); downStreamChannelLocation.setRemoteFragmentInstanceId( @@ -291,10 +191,6 @@ private void calculateNodeTopologyBetweenInstance() { } } - private FragmentInstance findDownStreamInstance(PlanNodeId exchangeNodeId) { - return instanceMap.get(planNodeMap.get(exchangeNodeId).left); - } - private void recordPlanNodeRelation(PlanNode root, PlanFragmentId planFragmentId) { planNodeMap.put(root.getPlanNodeId(), new Pair<>(planFragmentId, root)); for (PlanNode child : root.getChildren()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/exceptions/RootFIPlacementException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/exceptions/RootFIPlacementException.java index 15b6c640aa029..8910673ef3847 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/exceptions/RootFIPlacementException.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/exceptions/RootFIPlacementException.java @@ -33,7 +33,7 @@ public class RootFIPlacementException extends IoTDBRuntimeException { public RootFIPlacementException(Collection replicaSets) { super( - "root FragmentInstance placement error: " + replicaSets.toString(), + "Root FragmentInstance placement error: " + replicaSets.toString(), TSStatusCode.PLAN_FAILED_NETWORK_PARTITION.getStatusCode()); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java new file mode 100644 index 0000000000000..cbad76a79dd7a --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.planner.plan; + +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.commons.partition.QueryExecutor; +import org.apache.iotdb.commons.partition.StorageExecutor; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.queryengine.common.DataNodeEndPoints; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; +import org.apache.iotdb.db.queryengine.common.PlanFragmentId; +import org.apache.iotdb.db.queryengine.plan.planner.IFragmentParallelPlaner; +import org.apache.iotdb.db.queryengine.plan.planner.exceptions.ReplicaSetUnreachableException; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableModelQueryFragmentPlanner; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.tsfile.utils.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Supplier; + +public abstract class AbstractFragmentParallelPlanner implements IFragmentParallelPlaner { + private static final Logger LOGGER = + LoggerFactory.getLogger(TableModelQueryFragmentPlanner.class); + + protected MPPQueryContext queryContext; + + protected AbstractFragmentParallelPlanner(MPPQueryContext queryContext) { + this.queryContext = queryContext; + } + + protected void selectExecutorAndHost( + PlanFragment fragment, + FragmentInstance fragmentInstance, + Supplier replicaSetProvider, + Function validator, + Map> dataNodeFIMap) { + // Get the target region for origin PlanFragment, then its instance will be distributed one + // of them. + TRegionReplicaSet regionReplicaSet = replicaSetProvider.get(); + if (regionReplicaSet != null + && !CollectionUtils.isEmpty(regionReplicaSet.getDataNodeLocations())) { + regionReplicaSet = validator.apply(regionReplicaSet); + if (regionReplicaSet.getDataNodeLocations().isEmpty()) { + throw new ReplicaSetUnreachableException(fragment.getTargetRegionForTreeModel()); + } + } + // Set ExecutorType and target host for the instance + // We need to store all the replica host in case of the scenario that the instance need to be + // redirected + // to another host when scheduling + if (regionReplicaSet == null || regionReplicaSet.getRegionId() == null) { + TDataNodeLocation dataNodeLocation = fragment.getTargetLocation(); + if (dataNodeLocation != null) { + // now only the case ShowQueries will enter here + fragmentInstance.setExecutorAndHost(new QueryExecutor(dataNodeLocation)); + } else { + // no data region && no dataNodeLocation, we need to execute this FI on local + // now only the case AggregationQuery has schemaengine but no data region will enter here + fragmentInstance.setExecutorAndHost( + new QueryExecutor(DataNodeEndPoints.getLocalDataNodeLocation())); + } + } else { + fragmentInstance.setExecutorAndHost(new StorageExecutor(regionReplicaSet)); + fragmentInstance.setHostDataNode(selectTargetDataNode(regionReplicaSet)); + } + + dataNodeFIMap.compute( + fragmentInstance.getHostDataNode(), + (k, v) -> { + if (v == null) { + v = new ArrayList<>(); + } + v.add(fragmentInstance); + return v; + }); + } + + protected TDataNodeLocation selectTargetDataNode(TRegionReplicaSet regionReplicaSet) { + if (regionReplicaSet == null + || regionReplicaSet.getDataNodeLocations() == null + || regionReplicaSet.getDataNodeLocations().isEmpty()) { + throw new IllegalArgumentException( + String.format("regionReplicaSet is invalid: %s", regionReplicaSet)); + } + String readConsistencyLevel = + IoTDBDescriptor.getInstance().getConfig().getReadConsistencyLevel(); + // TODO: (Chen Rongzhao) need to make the values of ReadConsistencyLevel as static variable or + // enums + boolean selectRandomDataNode = "weak".equals(readConsistencyLevel); + + // When planning fragment onto specific DataNode, the DataNode whose endPoint is in + // black list won't be considered because it may have connection issue now. + List availableDataNodes = + filterAvailableTDataNode(regionReplicaSet.getDataNodeLocations()); + if (availableDataNodes.isEmpty()) { + String errorMsg = + String.format( + "All replicas for region[%s] are not available in these DataNodes[%s]", + regionReplicaSet.getRegionId(), regionReplicaSet.getDataNodeLocations()); + throw new IllegalArgumentException(errorMsg); + } + if (regionReplicaSet.getDataNodeLocationsSize() != availableDataNodes.size()) { + LOGGER.info("available replicas: {}", availableDataNodes); + } + int targetIndex; + if (!selectRandomDataNode || queryContext.getSession() == null) { + targetIndex = 0; + } else { + targetIndex = (int) (queryContext.getSession().getSessionId() % availableDataNodes.size()); + } + return availableDataNodes.get(targetIndex); + } + + protected FragmentInstance findDownStreamInstance( + Map> planNodeMap, + Map instanceMap, + PlanNodeId exchangeNodeId) { + return instanceMap.get(planNodeMap.get(exchangeNodeId).left); + } + + private List filterAvailableTDataNode( + List originalDataNodeList) { + List result = new LinkedList<>(); + for (TDataNodeLocation dataNodeLocation : originalDataNodeList) { + if (isAvailableDataNode(dataNodeLocation)) { + result.add(dataNodeLocation); + } + } + return result; + } + + private boolean isAvailableDataNode(TDataNodeLocation dataNodeLocation) { + for (TEndPoint endPoint : queryContext.getEndPointBlackList()) { + if (endPoint.equals(dataNodeLocation.internalEndPoint)) { + return false; + } + } + return true; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java index f1b903b3047be..6a33149c6807e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java @@ -168,7 +168,7 @@ private DistributedQueryPlan generateDistributedPlan( mppQueryContext.getQueryType() == QueryType.READ ? new TableModelQueryFragmentPlanner( subPlan, analysis, mppQueryContext, nodeDistributionMap) - .plan() + .parallelPlan() : new WriteFragmentParallelPlanner( subPlan, analysis, mppQueryContext, WritePlanNode::splitByPartition) .parallelPlan(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java index 2d152dc49f578..560e8a844a29b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java @@ -20,19 +20,13 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.distribute; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; -import org.apache.iotdb.common.rpc.thrift.TEndPoint; -import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; -import org.apache.iotdb.commons.partition.QueryExecutor; -import org.apache.iotdb.commons.partition.StorageExecutor; -import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.queryengine.common.DataNodeEndPoints; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.PlanFragmentId; import org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelLocation; import org.apache.iotdb.db.queryengine.plan.ClusterTopology; import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; import org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistribution; -import org.apache.iotdb.db.queryengine.plan.planner.exceptions.ReplicaSetUnreachableException; +import org.apache.iotdb.db.queryengine.plan.planner.plan.AbstractFragmentParallelPlanner; import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment; import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan; @@ -45,21 +39,14 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDevice; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement; -import org.apache.commons.collections4.CollectionUtils; import org.apache.tsfile.utils.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; -public class TableModelQueryFragmentPlanner { - - private static final Logger LOGGER = - LoggerFactory.getLogger(TableModelQueryFragmentPlanner.class); +public class TableModelQueryFragmentPlanner extends AbstractFragmentParallelPlanner { private final SubPlan subPlan; @@ -86,13 +73,15 @@ public class TableModelQueryFragmentPlanner { Analysis analysis, MPPQueryContext queryContext, final Map nodeDistributionMap) { + super(queryContext); this.subPlan = subPlan; this.analysis = analysis; this.queryContext = queryContext; this.nodeDistributionMap = nodeDistributionMap; } - public List plan() { + @Override + public List parallelPlan() { prepare(); calculateNodeTopologyBetweenInstance(); return fragmentInstanceList; @@ -113,122 +102,6 @@ private void recordPlanNodeRelation(PlanNode root, PlanFragmentId planFragmentId root.getChildren().forEach(child -> recordPlanNodeRelation(child, planFragmentId)); } - private void produceFragmentInstance( - PlanFragment fragment, final Map nodeDistributionMap) { - FragmentInstance fragmentInstance = - new FragmentInstance( - fragment, - fragment.getId().genFragmentInstanceId(), - QueryType.READ, - queryContext.getTimeOut() - (System.currentTimeMillis() - queryContext.getStartTime()), - queryContext.getSession(), - queryContext.isExplainAnalyze(), - fragment.isRoot()); - - // Get the target region for origin PlanFragment, then its instance will be distributed one - // of them. - TRegionReplicaSet regionReplicaSet = fragment.getTargetRegionForTableModel(nodeDistributionMap); - if (regionReplicaSet != null - && !CollectionUtils.isEmpty(regionReplicaSet.getDataNodeLocations())) { - regionReplicaSet = topology.getValidatedReplicaSet(regionReplicaSet); - if (regionReplicaSet.getDataNodeLocations().isEmpty()) { - throw new ReplicaSetUnreachableException( - fragment.getTargetRegionForTableModel(nodeDistributionMap)); - } - } - - // Set ExecutorType and target host for the instance, - // We need to store all the replica host in case of the scenario that the instance need to be - // redirected - // to another host when scheduling - if (regionReplicaSet == null || regionReplicaSet.getRegionId() == null) { - TDataNodeLocation dataNodeLocation = fragment.getTargetLocation(); - if (dataNodeLocation != null) { - // now only the case ShowStatement will enter here - fragmentInstance.setExecutorAndHost(new QueryExecutor(dataNodeLocation)); - } else { - // no data region && no dataNodeLocation, we need to execute this FI on local - // now only the case AggregationQuery has schemaengine but no data region will enter here - fragmentInstance.setExecutorAndHost( - new QueryExecutor(DataNodeEndPoints.getLocalDataNodeLocation())); - } - } else { - fragmentInstance.setExecutorAndHost(new StorageExecutor(regionReplicaSet)); - fragmentInstance.setHostDataNode(selectTargetDataNode(regionReplicaSet)); - } - - dataNodeFIMap.compute( - fragmentInstance.getHostDataNode(), - (k, v) -> { - if (v == null) { - v = new ArrayList<>(); - } - v.add(fragmentInstance); - return v; - }); - - final Statement statement = analysis.getStatement(); - if (analysis.isQuery() || statement instanceof ShowDevice || statement instanceof CountDevice) { - fragmentInstance.getFragment().generateTableModelTypeProvider(queryContext.getTypeProvider()); - } - instanceMap.putIfAbsent(fragment.getId(), fragmentInstance); - fragmentInstanceList.add(fragmentInstance); - } - - private TDataNodeLocation selectTargetDataNode(TRegionReplicaSet regionReplicaSet) { - if (regionReplicaSet == null - || regionReplicaSet.getDataNodeLocations() == null - || regionReplicaSet.getDataNodeLocations().isEmpty()) { - throw new IllegalArgumentException( - String.format("RegionReplicaSet is invalid: %s", regionReplicaSet)); - } - String readConsistencyLevel = - IoTDBDescriptor.getInstance().getConfig().getReadConsistencyLevel(); - boolean selectRandomDataNode = "weak".equals(readConsistencyLevel); - - // When planning fragment onto specific DataNode, the DataNode whose endPoint is in - // black list won't be considered because it may have connection issue now. - List availableDataNodes = - filterAvailableTDataNode(regionReplicaSet.getDataNodeLocations()); - if (availableDataNodes.isEmpty()) { - String errorMsg = - String.format( - "All replicas for region[%s] are not available in these DataNodes[%s]", - regionReplicaSet.getRegionId(), regionReplicaSet.getDataNodeLocations()); - throw new IllegalArgumentException(errorMsg); - } - if (regionReplicaSet.getDataNodeLocationsSize() != availableDataNodes.size()) { - LOGGER.info("Available replicas: {}", availableDataNodes); - } - int targetIndex; - if (!selectRandomDataNode || queryContext.getSession() == null) { - targetIndex = 0; - } else { - targetIndex = (int) (queryContext.getSession().getSessionId() % availableDataNodes.size()); - } - return availableDataNodes.get(targetIndex); - } - - private List filterAvailableTDataNode( - List originalDataNodeList) { - List result = new LinkedList<>(); - for (TDataNodeLocation dataNodeLocation : originalDataNodeList) { - if (isAvailableDataNode(dataNodeLocation)) { - result.add(dataNodeLocation); - } - } - return result; - } - - private boolean isAvailableDataNode(TDataNodeLocation dataNodeLocation) { - for (TEndPoint endPoint : queryContext.getEndPointBlackList()) { - if (endPoint.equals(dataNodeLocation.internalEndPoint)) { - return false; - } - } - return true; - } - private void calculateNodeTopologyBetweenInstance() { for (FragmentInstance instance : fragmentInstanceList) { PlanNode rootNode = instance.getFragment().getPlanNodeTree(); @@ -239,7 +112,8 @@ private void calculateNodeTopologyBetweenInstance() { // Set target Endpoint for FragmentSinkNode PlanNodeId downStreamNodeId = new PlanNodeId(downStreamChannelLocation.getRemotePlanNodeId()); - FragmentInstance downStreamInstance = findDownStreamInstance(downStreamNodeId); + FragmentInstance downStreamInstance = + findDownStreamInstance(planNodeMap, instanceMap, downStreamNodeId); downStreamChannelLocation.setRemoteEndpoint( downStreamInstance.getHostDataNode().getMPPDataExchangeEndPoint()); downStreamChannelLocation.setRemoteFragmentInstanceId( @@ -257,7 +131,30 @@ private void calculateNodeTopologyBetweenInstance() { } } - private FragmentInstance findDownStreamInstance(PlanNodeId exchangeNodeId) { - return instanceMap.get(planNodeMap.get(exchangeNodeId).left); + private void produceFragmentInstance( + PlanFragment fragment, final Map nodeDistributionMap) { + FragmentInstance fragmentInstance = + new FragmentInstance( + fragment, + fragment.getId().genFragmentInstanceId(), + QueryType.READ, + queryContext.getTimeOut() - (System.currentTimeMillis() - queryContext.getStartTime()), + queryContext.getSession(), + queryContext.isExplainAnalyze(), + fragment.isRoot()); + + selectExecutorAndHost( + fragment, + fragmentInstance, + () -> fragment.getTargetRegionForTableModel(nodeDistributionMap), + topology::getValidatedReplicaSet, + dataNodeFIMap); + + final Statement statement = analysis.getStatement(); + if (analysis.isQuery() || statement instanceof ShowDevice || statement instanceof CountDevice) { + fragmentInstance.getFragment().generateTableModelTypeProvider(queryContext.getTypeProvider()); + } + instanceMap.putIfAbsent(fragment.getId(), fragmentInstance); + fragmentInstanceList.add(fragmentInstance); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java index 49a8086d6354b..2634beab8b244 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java @@ -154,9 +154,8 @@ private static TSStatus tryCatchQueryException(Exception e) { } else if (t instanceof QueryInBatchStatementException) { return RpcUtils.getStatus( TSStatusCode.QUERY_NOT_ALLOWED, INFO_NOT_ALLOWED_IN_BATCH_ERROR + rootCause.getMessage()); - } else if (t instanceof RootFIPlacementException) { - return RpcUtils.getStatus(TSStatusCode.PLAN_FAILED_NETWORK_PARTITION, rootCause.getMessage()); - } else if (t instanceof ReplicaSetUnreachableException) { + } else if (t instanceof RootFIPlacementException + || t instanceof ReplicaSetUnreachableException) { return RpcUtils.getStatus(TSStatusCode.PLAN_FAILED_NETWORK_PARTITION, rootCause.getMessage()); } else if (t instanceof IoTDBException) { return RpcUtils.getStatus(((IoTDBException) t).getErrorCode(), rootCause.getMessage());