Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
Expand All @@ -46,7 +47,7 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;

// @Ignore
@Ignore("enable this after RTO/RPO retry")
@RunWith(IoTDBTestRunner.class)
@Category({LocalStandaloneIT.class, ClusterIT.class})
public class IoTDBRestartIT {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -142,6 +141,7 @@ private boolean mayWait() {
this.awaitForSignal.await(PROBING_INTERVAL_MS, TimeUnit.MILLISECONDS);
return true;
} catch (InterruptedException e) {
// we don't reset the interrupt flag here since we may reuse this thread again.
return false;
}
}
Expand Down Expand Up @@ -219,35 +219,54 @@ private synchronized void topologyProbing() {
final Map<Integer, Set<Integer>> latestTopology =
dataNodeLocations.stream()
.collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, k -> new HashSet<>()));
final Map<Integer, Set<Integer>> partitioned = new HashMap<>();
for (final Map.Entry<Pair<Integer, Integer>, List<AbstractHeartbeatSample>> entry :
heartbeats.entrySet()) {
final int fromId = entry.getKey().getLeft();
final int toId = entry.getKey().getRight();

if (!entry.getValue().isEmpty()
&& !failureDetector.isAvailable(entry.getKey(), entry.getValue())) {
LOGGER.debug("Connection from DataNode {} to DataNode {} is broken", fromId, toId);
partitioned.computeIfAbsent(fromId, id -> new HashSet<>()).add(toId);
} else {
latestTopology.get(fromId).add(toId);
}
}

if (!partitioned.isEmpty()) {
logAsymmetricPartition(partitioned);
}
logAsymmetricPartition(latestTopology);

// 5. notify the listeners on topology change
if (shouldRun.get()) {
topologyChangeListener.accept(latestTopology);
}
}

private void logAsymmetricPartition(final Map<Integer, Set<Integer>> partitioned) {
for (final int fromId : partitioned.keySet()) {
for (final int toId : partitioned.get(fromId)) {
if (partitioned.get(toId) == null || !partitioned.get(toId).contains(fromId)) {
LOGGER.warn("[Topology] Asymmetric network partition from {} to {}", fromId, toId);
/**
* We only consider warning (one vs remaining) network partition. If we need to cover more
* complicated scenarios like (many vs many) network partition, we shall use graph algorithms
* then.
*/
private void logAsymmetricPartition(final Map<Integer, Set<Integer>> topology) {
final Set<Integer> nodes = topology.keySet();
if (nodes.size() == 1) {
// 1 DataNode
return;
}

for (int from : nodes) {
for (int to : nodes) {
if (from >= to) {
continue;
}

// whether we have asymmetric partition [from -> to]
final Set<Integer> reachableFrom = topology.get(from);
final Set<Integer> reachableTo = topology.get(to);
if (reachableFrom.size() <= 1 || reachableTo.size() <= 1) {
// symmetric partition for (from) or (to)
continue;
}
if (!reachableTo.contains(from) && !reachableFrom.contains(to)) {
LOGGER.warn("[Topology] Asymmetric network partition from {} to {}", from, to);
}
}
}
Expand Down Expand Up @@ -275,20 +294,23 @@ public void onNodeStatisticsChanged(NodeStatisticsChangeEvent event) {
startingDataNodes.remove(nodeId);
}

final Set<Pair<Integer, Integer>> affectedPairs =
heartbeats.keySet().stream()
.filter(
pair ->
Objects.equals(pair.getLeft(), nodeId)
|| Objects.equals(pair.getRight(), nodeId))
.collect(Collectors.toSet());

if (changeEvent.getRight() == null) {
// datanode removed from cluster, clean up probing history
final Set<Pair<Integer, Integer>> toRemove =
heartbeats.keySet().stream()
.filter(
pair ->
Objects.equals(pair.getLeft(), nodeId)
|| Objects.equals(pair.getRight(), nodeId))
.collect(Collectors.toSet());
toRemove.forEach(heartbeats::remove);
affectedPairs.forEach(heartbeats::remove);
} else {
// we only trigger probing immediately if node comes around from UNKNOWN to RUNNING
if (NodeStatus.Unknown.equals(changeEvent.getLeft().getStatus())
&& NodeStatus.Running.equals(changeEvent.getRight().getStatus())) {
// let's clear the history when a new node comes around
affectedPairs.forEach(pair -> heartbeats.put(pair, new ArrayList<>()));
awaitForSignal.signal();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,12 @@
package org.apache.iotdb.db.queryengine.plan.planner.distribution;

import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.partition.QueryExecutor;
import org.apache.iotdb.commons.partition.StorageExecutor;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.common.DataNodeEndPoints;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
import org.apache.iotdb.db.queryengine.plan.ClusterTopology;
import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.planner.IFragmentParallelPlaner;
import org.apache.iotdb.db.queryengine.plan.planner.exceptions.ReplicaSetUnreachableException;
import org.apache.iotdb.db.queryengine.plan.planner.plan.AbstractFragmentParallelPlanner;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
Expand All @@ -46,15 +39,11 @@
import org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainAnalyzeStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.tsfile.read.common.Path;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -63,8 +52,7 @@
* A simple implementation of IFragmentParallelPlaner. This planner will transform one PlanFragment
* into only one FragmentInstance.
*/
public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
private static final Logger logger = LoggerFactory.getLogger(SimpleFragmentParallelPlanner.class);
public class SimpleFragmentParallelPlanner extends AbstractFragmentParallelPlanner {

private final SubPlan subPlan;
private final Analysis analysis;
Expand All @@ -82,6 +70,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {

public SimpleFragmentParallelPlanner(
SubPlan subPlan, Analysis analysis, MPPQueryContext context) {
super(context);
this.subPlan = subPlan;
this.analysis = analysis;
this.queryContext = context;
Expand Down Expand Up @@ -153,46 +142,12 @@ private void produceFragmentInstance(PlanFragment fragment) {
queryContext.isExplainAnalyze(),
fragment.isRoot());

// Get the target region for origin PlanFragment, then its instance will be distributed one
// of them.
TRegionReplicaSet regionReplicaSet = fragment.getTargetRegionForTreeModel();
if (regionReplicaSet != null
&& !CollectionUtils.isEmpty(regionReplicaSet.getDataNodeLocations())) {
regionReplicaSet = topology.getValidatedReplicaSet(regionReplicaSet);
if (regionReplicaSet.getDataNodeLocations().isEmpty()) {
throw new ReplicaSetUnreachableException(fragment.getTargetRegionForTreeModel());
}
}

// Set ExecutorType and target host for the instance
// We need to store all the replica host in case of the scenario that the instance need to be
// redirected
// to another host when scheduling
if (regionReplicaSet == null || regionReplicaSet.getRegionId() == null) {
TDataNodeLocation dataNodeLocation = fragment.getTargetLocation();
if (dataNodeLocation != null) {
// now only the case ShowQueries will enter here
fragmentInstance.setExecutorAndHost(new QueryExecutor(dataNodeLocation));
} else {
// no data region && no dataNodeLocation, we need to execute this FI on local
// now only the case AggregationQuery has schemaengine but no data region will enter here
fragmentInstance.setExecutorAndHost(
new QueryExecutor(DataNodeEndPoints.getLocalDataNodeLocation()));
}
} else {
fragmentInstance.setExecutorAndHost(new StorageExecutor(regionReplicaSet));
fragmentInstance.setHostDataNode(selectTargetDataNode(regionReplicaSet));
}

dataNodeFIMap.compute(
fragmentInstance.getHostDataNode(),
(k, v) -> {
if (v == null) {
v = new ArrayList<>();
}
v.add(fragmentInstance);
return v;
});
selectExecutorAndHost(
fragment,
fragmentInstance,
fragment::getTargetRegionForTreeModel,
topology::getValidatedReplicaSet,
dataNodeFIMap);

if (analysis.getTreeStatement() instanceof QueryStatement
|| analysis.getTreeStatement() instanceof ExplainAnalyzeStatement
Expand All @@ -205,62 +160,6 @@ private void produceFragmentInstance(PlanFragment fragment) {
fragmentInstanceList.add(fragmentInstance);
}

private TDataNodeLocation selectTargetDataNode(TRegionReplicaSet regionReplicaSet) {
if (regionReplicaSet == null
|| regionReplicaSet.getDataNodeLocations() == null
|| regionReplicaSet.getDataNodeLocations().isEmpty()) {
throw new IllegalArgumentException(
String.format("regionReplicaSet is invalid: %s", regionReplicaSet));
}
String readConsistencyLevel =
IoTDBDescriptor.getInstance().getConfig().getReadConsistencyLevel();
// TODO: (Chen Rongzhao) need to make the values of ReadConsistencyLevel as static variable or
// enums
boolean selectRandomDataNode = "weak".equals(readConsistencyLevel);

// When planning fragment onto specific DataNode, the DataNode whose endPoint is in
// black list won't be considered because it may have connection issue now.
List<TDataNodeLocation> availableDataNodes =
filterAvailableTDataNode(regionReplicaSet.getDataNodeLocations());
if (availableDataNodes.isEmpty()) {
String errorMsg =
String.format(
"All replicas for region[%s] are not available in these DataNodes[%s]",
regionReplicaSet.getRegionId(), regionReplicaSet.getDataNodeLocations());
throw new IllegalArgumentException(errorMsg);
}
if (regionReplicaSet.getDataNodeLocationsSize() != availableDataNodes.size()) {
logger.info("available replicas: {}", availableDataNodes);
}
int targetIndex;
if (!selectRandomDataNode || queryContext.getSession() == null) {
targetIndex = 0;
} else {
targetIndex = (int) (queryContext.getSession().getSessionId() % availableDataNodes.size());
}
return availableDataNodes.get(targetIndex);
}

private List<TDataNodeLocation> filterAvailableTDataNode(
List<TDataNodeLocation> originalDataNodeList) {
List<TDataNodeLocation> result = new LinkedList<>();
for (TDataNodeLocation dataNodeLocation : originalDataNodeList) {
if (isAvailableDataNode(dataNodeLocation)) {
result.add(dataNodeLocation);
}
}
return result;
}

private boolean isAvailableDataNode(TDataNodeLocation dataNodeLocation) {
for (TEndPoint endPoint : queryContext.getEndPointBlackList()) {
if (endPoint.equals(dataNodeLocation.internalEndPoint)) {
return false;
}
}
return true;
}

private void calculateNodeTopologyBetweenInstance() {
for (FragmentInstance instance : fragmentInstanceList) {
PlanNode rootNode = instance.getFragment().getPlanNodeTree();
Expand All @@ -273,7 +172,8 @@ private void calculateNodeTopologyBetweenInstance() {
// Set target Endpoint for FragmentSinkNode
PlanNodeId downStreamNodeId =
new PlanNodeId(downStreamChannelLocation.getRemotePlanNodeId());
FragmentInstance downStreamInstance = findDownStreamInstance(downStreamNodeId);
FragmentInstance downStreamInstance =
findDownStreamInstance(planNodeMap, instanceMap, downStreamNodeId);
downStreamChannelLocation.setRemoteEndpoint(
downStreamInstance.getHostDataNode().getMPPDataExchangeEndPoint());
downStreamChannelLocation.setRemoteFragmentInstanceId(
Expand All @@ -291,10 +191,6 @@ private void calculateNodeTopologyBetweenInstance() {
}
}

private FragmentInstance findDownStreamInstance(PlanNodeId exchangeNodeId) {
return instanceMap.get(planNodeMap.get(exchangeNodeId).left);
}

private void recordPlanNodeRelation(PlanNode root, PlanFragmentId planFragmentId) {
planNodeMap.put(root.getPlanNodeId(), new Pair<>(planFragmentId, root));
for (PlanNode child : root.getChildren()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
public class RootFIPlacementException extends IoTDBRuntimeException {
public RootFIPlacementException(Collection<TRegionReplicaSet> replicaSets) {
super(
"root FragmentInstance placement error: " + replicaSets.toString(),
"Root FragmentInstance placement error: " + replicaSets.toString(),
TSStatusCode.PLAN_FAILED_NETWORK_PARTITION.getStatusCode());
}
}
Loading
Loading