Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 51 additions & 14 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ public void exec() throws Exception {
fragment.getFragmentId().asInt(), jobId);
}
}

futures.add(Pair.create(execState, execState.execRemoteFragmentAsync()));

backendId++;
Expand Down Expand Up @@ -1140,17 +1141,26 @@ private long getScanRangeLength(final TScanRange scanRange) {

private void computeColocateJoinInstanceParam(PlanFragmentId fragmentId, int parallelExecInstanceNum, FragmentExecParams params) {
Map<Integer, TNetworkAddress> bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(fragmentId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe bucketSeqToScanRang should be a map like fragmentIdToSeqToAddressMap to solve this problem is a
better understanding way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will leave it when refactoring them.

Set<Integer> scanNodeIds = fragmentIdToScanNodeIds.get(fragmentId);

// 1. count each node in one fragment should scan how many tablet, gather them in one list
Map<TNetworkAddress, List<Map<Integer, List<TScanRangeParams>>>> addressToScanRanges = Maps.newHashMap();
for (Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>> scanRanges : bucketSeqToScanRange.entrySet()) {
TNetworkAddress address = bucketSeqToAddress.get(scanRanges.getKey());
Map<Integer, List<TScanRangeParams>> nodeScanRanges = scanRanges.getValue();

// We only care about the node scan ranges of scan nodes which belong to this fragment
Map<Integer, List<TScanRangeParams>> filteredNodeScanRanges = Maps.newHashMap();
for (Integer scanNodeId : nodeScanRanges.keySet()) {
if (scanNodeIds.contains(scanNodeId)) {
filteredNodeScanRanges.put(scanNodeId, nodeScanRanges.get(scanNodeId));
}
}

if (!addressToScanRanges.containsKey(address)) {
addressToScanRanges.put(address, Lists.newArrayList());
}
addressToScanRanges.get(address).add(nodeScanRanges);
addressToScanRanges.get(address).add(filteredNodeScanRanges);
}

for (Map.Entry<TNetworkAddress, List<Map<Integer, List<TScanRangeParams>>>> addressScanRange : addressToScanRanges.entrySet()) {
Expand Down Expand Up @@ -1195,8 +1205,14 @@ private void computeScanRangeAssignment() throws Exception {
continue;
}

FragmentScanRangeAssignment assignment =
fragmentExecParamsMap.get(scanNode.getFragmentId()).scanRangeAssignment;
Set<Integer> scanNodeIds = fragmentIdToScanNodeIds.get(scanNode.getFragmentId());
if (scanNodeIds == null) {
scanNodeIds = Sets.newHashSet();
fragmentIdToScanNodeIds.put(scanNode.getFragmentId(), scanNodeIds);
}
scanNodeIds.add(scanNode.getId().asInt());

FragmentScanRangeAssignment assignment = fragmentExecParamsMap.get(scanNode.getFragmentId()).scanRangeAssignment;
if (isColocateJoin(scanNode.getFragment().getPlanRoot())) {
computeScanRangeAssignmentByColocate((OlapScanNode) scanNode, assignment);
} else if (bucketShuffleJoinController.isBucketShuffleJoin(scanNode.getFragmentId().asInt(), scanNode.getFragment().getPlanRoot())) {
Expand Down Expand Up @@ -1427,6 +1443,7 @@ class FragmentScanRangeAssignment
extends HashMap<TNetworkAddress, Map<Integer, List<TScanRangeParams>>> {
}

// Bucket sequence -> (scan node id -> list of TScanRangeParams)
class BucketSeqToScanRange extends HashMap<Integer, Map<Integer, List<TScanRangeParams>>> {

}
Expand All @@ -1444,6 +1461,13 @@ class BucketShuffleJoinController {
// cache the bucketShuffleFragmentIds
private Set<Integer> bucketShuffleFragmentIds = new HashSet<>();

private Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds;

// TODO(cmy): Should refactor this Controller to unify bucket shuffle join and colocate join
public BucketShuffleJoinController(Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds) {
this.fragmentIdToScanNodeIds = fragmentIdToScanNodeIds;
}

// check whether the node fragment is bucket shuffle join fragment
private boolean isBucketShuffleJoin(int fragmentId, PlanNode node) {
if (ConnectContext.get() != null) {
Expand Down Expand Up @@ -1553,38 +1577,48 @@ private void computeScanRangeAssignmentByBucket(
private void computeInstanceParam(PlanFragmentId fragmentId, int parallelExecInstanceNum, FragmentExecParams params) {
Map<Integer, TNetworkAddress> bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(fragmentId);
BucketSeqToScanRange bucketSeqToScanRange = fragmentIdBucketSeqToScanRangeMap.get(fragmentId);
Set<Integer> scanNodeIds = fragmentIdToScanNodeIds.get(fragmentId);

// 1. count each node in one fragment should scan how many tablet, gather them in one list
Map<TNetworkAddress, List<Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>>>> addressToScanRanges = Maps.newHashMap();
Map<TNetworkAddress, List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>> addressToScanRanges = Maps.newHashMap();
for (Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>> scanRanges : bucketSeqToScanRange.entrySet()) {
TNetworkAddress address = bucketSeqToAddress.get(scanRanges.getKey());
Map<Integer, List<TScanRangeParams>> nodeScanRanges = scanRanges.getValue();

// We only care about the node scan ranges of scan nodes which belong to this fragment
Map<Integer, List<TScanRangeParams>> filteredNodeScanRanges = Maps.newHashMap();
for (Integer scanNodeId : nodeScanRanges.keySet()) {
if (scanNodeIds.contains(scanNodeId)) {
filteredNodeScanRanges.put(scanNodeId, nodeScanRanges.get(scanNodeId));
}
}
Pair<Integer, Map<Integer, List<TScanRangeParams>>> filteredScanRanges = Pair.create(scanRanges.getKey(), filteredNodeScanRanges);

if (!addressToScanRanges.containsKey(address)) {
addressToScanRanges.put(address, Lists.newArrayList());
}
addressToScanRanges.get(address).add(scanRanges);
addressToScanRanges.get(address).add(filteredScanRanges);
}

for (Map.Entry<TNetworkAddress, List<Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>>>> addressScanRange : addressToScanRanges.entrySet()) {
List<Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>>> scanRange = addressScanRange.getValue();
for (Map.Entry<TNetworkAddress, List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>> addressScanRange : addressToScanRanges.entrySet()) {
List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>> scanRange = addressScanRange.getValue();
int expectedInstanceNum = 1;
if (parallelExecInstanceNum > 1) {
//the scan instance num should not larger than the tablets num
expectedInstanceNum = Math.min(scanRange.size(), parallelExecInstanceNum);
}

// 2. split how many scanRange one instance should scan
List<List<Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>>>> perInstanceScanRanges = ListUtil.splitBySize(scanRange,
List<List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>> perInstanceScanRanges = ListUtil.splitBySize(scanRange,
expectedInstanceNum);

// 3.constuct instanceExecParam add the scanRange should be scan by instance
for (List<Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>>> perInstanceScanRange : perInstanceScanRanges) {
// 3.construct instanceExecParam add the scanRange should be scan by instance
for (List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>> perInstanceScanRange : perInstanceScanRanges) {
FInstanceExecParam instanceParam = new FInstanceExecParam(null, addressScanRange.getKey(), 0, params);

for (Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>> nodeScanRangeMap : perInstanceScanRange) {
instanceParam.addBucketSeq(nodeScanRangeMap.getKey());
for (Map.Entry<Integer, List<TScanRangeParams>> nodeScanRange : nodeScanRangeMap.getValue().entrySet()) {
for (Pair<Integer, Map<Integer, List<TScanRangeParams>>> nodeScanRangeMap : perInstanceScanRange) {
instanceParam.addBucketSeq(nodeScanRangeMap.first);
for (Map.Entry<Integer, List<TScanRangeParams>> nodeScanRange : nodeScanRangeMap.second.entrySet()) {
if (!instanceParam.perNodeScanRanges.containsKey(nodeScanRange.getKey())) {
instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(), nodeScanRange.getValue());
} else {
Expand All @@ -1598,11 +1632,13 @@ private void computeInstanceParam(PlanFragmentId fragmentId, int parallelExecIns
}
}

private BucketShuffleJoinController bucketShuffleJoinController = new BucketShuffleJoinController();

private BucketSeqToScanRange bucketSeqToScanRange = new BucketSeqToScanRange();
private Map<PlanFragmentId, Map<Integer, TNetworkAddress>> fragmentIdToSeqToAddressMap = Maps.newHashMap();
// cache the fragment id to its scan node ids. Used for colocate join.
private Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds = Maps.newHashMap();
private Set<Integer> colocateFragmentIds = new HashSet<>();
private BucketShuffleJoinController bucketShuffleJoinController = new BucketShuffleJoinController(fragmentIdToScanNodeIds);

// record backend execute state
// TODO(zhaochun): add profile information and others
Expand Down Expand Up @@ -1951,3 +1987,4 @@ private void attachInstanceProfileToFragmentProfile() {
}
}


Loading