From cec0e3b8683acb86afc8bdcc6b329f00d1119ae6 Mon Sep 17 00:00:00 2001 From: HappenLee Date: Mon, 18 Jan 2021 16:56:54 +0800 Subject: [PATCH] [Bug] Colocate Join and Bucket shuffle join may scan some tablet twice time. Fix issue #5255 --- .../java/org/apache/doris/qe/Coordinator.java | 19 ++++++++----------- .../org/apache/doris/qe/CoordinatorTest.java | 12 +++++++++++- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 0cd3f0ca101f07..c0fba828b7413a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1205,13 +1205,11 @@ private void computeColocateJoinInstanceParam(PlanFragmentId fragmentId, int par for (Map> nodeScanRangeMap : perInstanceScanRange) { for (Map.Entry> nodeScanRange : nodeScanRangeMap.entrySet()) { if (!instanceParam.perNodeScanRanges.containsKey(nodeScanRange.getKey())) { - range.put(nodeScanRange.getKey(), nodeScanRange.getValue()); - instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(), nodeScanRange.getValue()); - } else { - range.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue()); - instanceParam.perNodeScanRanges.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue()); + range.put(nodeScanRange.getKey(), Lists.newArrayList()); + instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(), Lists.newArrayList()); } - + range.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue()); + instanceParam.perNodeScanRanges.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue()); } } params.instanceExecParams.add(instanceParam); @@ -1665,12 +1663,11 @@ private void computeInstanceParam(PlanFragmentId fragmentId, int parallelExecIns instanceParam.addBucketSeq(nodeScanRangeMap.first); for (Map.Entry> nodeScanRange : nodeScanRangeMap.second.entrySet()) { if (!instanceParam.perNodeScanRanges.containsKey(nodeScanRange.getKey())) { - range.put(nodeScanRange.getKey(), nodeScanRange.getValue()); - instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(), nodeScanRange.getValue()); - } else { - range.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue()); - instanceParam.perNodeScanRanges.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue()); + range.put(nodeScanRange.getKey(), Lists.newArrayList()); + instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(), Lists.newArrayList()); } + range.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue()); + instanceParam.perNodeScanRanges.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue()); } } params.instanceExecParams.add(instanceParam); diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java index ea4adfab5c48e1..41e1755c21a880 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java @@ -110,7 +110,10 @@ public void testComputeColocateJoinInstanceParam() { // 2. set bucketSeqToScanRange in coordinator BucketSeqToScanRange bucketSeqToScanRange = new BucketSeqToScanRange(); Map> ScanRangeMap = new HashMap<>(); - ScanRangeMap.put(scanNodeId, new ArrayList<>()); + List scanRangeParamsList = new ArrayList<>(); + scanRangeParamsList.add(new TScanRangeParams()); + + ScanRangeMap.put(scanNodeId, scanRangeParamsList); for (int i = 0; i < 3; i++) { bucketSeqToScanRange.put(i, ScanRangeMap); } @@ -120,6 +123,13 @@ public void testComputeColocateJoinInstanceParam() { Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 1, params); Assert.assertEquals(1, params.instanceExecParams.size()); + // check whether one instance have 3 tablet to scan + for (FInstanceExecParam instanceExecParam : params.instanceExecParams) { + for (List tempScanRangeParamsList :instanceExecParam.perNodeScanRanges.values()) { + Assert.assertEquals(3, tempScanRangeParamsList.size()); + } + } + params = new FragmentExecParams(null); Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 2, params); Assert.assertEquals(2, params.instanceExecParams.size());