From 406723e2dd5ba30a54ebb97b0c89a2a85adacba5 Mon Sep 17 00:00:00 2001 From: morningman Date: Tue, 3 Nov 2020 23:45:01 +0800 Subject: [PATCH 1/2] [Bug][SQL] Fix bug that query failed when SQL contains Union and Colocate Join. SQL like: `select a join b union select c join d`; if a b is colocate join, and c d is also colocate join, the query may failed with error like: `failed to get tablet. tablet_id=26846, with schema_hash=398972982, reason=tablet does not exist` --- .../java/org/apache/doris/qe/Coordinator.java | 66 ++++++++++++++----- 1 file changed, 48 insertions(+), 18 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index ef2bcd1f6a8946..cdae7d270881f2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -81,6 +81,11 @@ import org.apache.doris.thrift.TTabletCommitInfo; import org.apache.doris.thrift.TUniqueId; +import org.apache.commons.collections.map.HashedMap; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.thrift.TException; + import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; @@ -88,11 +93,6 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import org.apache.commons.collections.map.HashedMap; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.thrift.TException; - import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -486,6 +486,7 @@ public void exec() throws Exception { fragment.getFragmentId().asInt(), jobId); } } + futures.add(Pair.create(execState, execState.execRemoteFragmentAsync())); backendId++; @@ -1140,6 +1141,7 @@ private long getScanRangeLength(final TScanRange scanRange) { private void computeColocateJoinInstanceParam(PlanFragmentId fragmentId, int parallelExecInstanceNum, FragmentExecParams params) { Map bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(fragmentId); + Set scanNodeIds = fragmentIdToScanNodeIds.get(fragmentId); // 1. count each node in one fragment should scan how many tablet, gather them in one list Map>>> addressToScanRanges = Maps.newHashMap(); @@ -1147,10 +1149,18 @@ private void computeColocateJoinInstanceParam(PlanFragmentId fragmentId, int par TNetworkAddress address = bucketSeqToAddress.get(scanRanges.getKey()); Map> nodeScanRanges = scanRanges.getValue(); + // We only care about the node scan ranges of scan nodes which belong to this fragment + Map> filteredNodeScanRanges = Maps.newHashMap(); + for (Integer scanNodeId : nodeScanRanges.keySet()) { + if (scanNodeIds.contains(scanNodeId)) { + filteredNodeScanRanges.put(scanNodeId, nodeScanRanges.get(scanNodeId)); + } + } + if (!addressToScanRanges.containsKey(address)) { addressToScanRanges.put(address, Lists.newArrayList()); } - addressToScanRanges.get(address).add(nodeScanRanges); + addressToScanRanges.get(address).add(filteredNodeScanRanges); } for (Map.Entry>>> addressScanRange : addressToScanRanges.entrySet()) { @@ -1195,8 +1205,14 @@ private void computeScanRangeAssignment() throws Exception { continue; } - FragmentScanRangeAssignment assignment = - fragmentExecParamsMap.get(scanNode.getFragmentId()).scanRangeAssignment; + Set scanNodeIds = fragmentIdToScanNodeIds.get(scanNode.getFragmentId()); + if (scanNodeIds == null) { + scanNodeIds = Sets.newHashSet(); + fragmentIdToScanNodeIds.put(scanNode.getFragmentId(), scanNodeIds); + } + scanNodeIds.add(scanNode.getId().asInt()); + + FragmentScanRangeAssignment assignment = fragmentExecParamsMap.get(scanNode.getFragmentId()).scanRangeAssignment; if (isColocateJoin(scanNode.getFragment().getPlanRoot())) { computeScanRangeAssignmentByColocate((OlapScanNode) scanNode, assignment); } else if (bucketShuffleJoinController.isBucketShuffleJoin(scanNode.getFragmentId().asInt(), scanNode.getFragment().getPlanRoot())) { @@ -1427,6 +1443,7 @@ class FragmentScanRangeAssignment extends HashMap>> { } + // Bucket sequence -> (scan node id -> list of TScanRangeParams) class BucketSeqToScanRange extends HashMap>> { } @@ -1553,21 +1570,31 @@ private void computeScanRangeAssignmentByBucket( private void computeInstanceParam(PlanFragmentId fragmentId, int parallelExecInstanceNum, FragmentExecParams params) { Map bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(fragmentId); BucketSeqToScanRange bucketSeqToScanRange = fragmentIdBucketSeqToScanRangeMap.get(fragmentId); + Set scanNodeIds = fragmentIdToScanNodeIds.get(fragmentId); // 1. count each node in one fragment should scan how many tablet, gather them in one list - Map>>>> addressToScanRanges = Maps.newHashMap(); + Map>>>> addressToScanRanges = Maps.newHashMap(); for (Map.Entry>> scanRanges : bucketSeqToScanRange.entrySet()) { TNetworkAddress address = bucketSeqToAddress.get(scanRanges.getKey()); Map> nodeScanRanges = scanRanges.getValue(); + // We only care about the node scan ranges of scan nodes which belong to this fragment + Map> filteredNodeScanRanges = Maps.newHashMap(); + for (Integer scanNodeId : nodeScanRanges.keySet()) { + if (scanNodeIds.contains(scanNodeId)) { + filteredNodeScanRanges.put(scanNodeId, nodeScanRanges.get(scanNodeId)); + } + } + Pair>> filteredScanRanges = Pair.create(scanRanges.getKey(), filteredNodeScanRanges); + if (!addressToScanRanges.containsKey(address)) { addressToScanRanges.put(address, Lists.newArrayList()); } - addressToScanRanges.get(address).add(scanRanges); + addressToScanRanges.get(address).add(filteredScanRanges); } - for (Map.Entry>>>> addressScanRange : addressToScanRanges.entrySet()) { - List>>> scanRange = addressScanRange.getValue(); + for (Map.Entry>>>> addressScanRange : addressToScanRanges.entrySet()) { + List>>> scanRange = addressScanRange.getValue(); int expectedInstanceNum = 1; if (parallelExecInstanceNum > 1) { //the scan instance num should not larger than the tablets num @@ -1575,16 +1602,16 @@ private void computeInstanceParam(PlanFragmentId fragmentId, int parallelExecIns } // 2. split how many scanRange one instance should scan - List>>>> perInstanceScanRanges = ListUtil.splitBySize(scanRange, + List>>>> perInstanceScanRanges = ListUtil.splitBySize(scanRange, expectedInstanceNum); - // 3.constuct instanceExecParam add the scanRange should be scan by instance - for (List>>> perInstanceScanRange : perInstanceScanRanges) { + // 3.construct instanceExecParam add the scanRange should be scan by instance + for (List>>> perInstanceScanRange : perInstanceScanRanges) { FInstanceExecParam instanceParam = new FInstanceExecParam(null, addressScanRange.getKey(), 0, params); - for (Map.Entry>> nodeScanRangeMap : perInstanceScanRange) { - instanceParam.addBucketSeq(nodeScanRangeMap.getKey()); - for (Map.Entry> nodeScanRange : nodeScanRangeMap.getValue().entrySet()) { + for (Pair>> nodeScanRangeMap : perInstanceScanRange) { + instanceParam.addBucketSeq(nodeScanRangeMap.first); + for (Map.Entry> nodeScanRange : nodeScanRangeMap.second.entrySet()) { if (!instanceParam.perNodeScanRanges.containsKey(nodeScanRange.getKey())) { instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(), nodeScanRange.getValue()); } else { @@ -1602,6 +1629,8 @@ private void computeInstanceParam(PlanFragmentId fragmentId, int parallelExecIns private BucketSeqToScanRange bucketSeqToScanRange = new BucketSeqToScanRange(); private Map> fragmentIdToSeqToAddressMap = Maps.newHashMap(); + // cache the fragment id to its scan node ids. Used for colocate join. + private Map> fragmentIdToScanNodeIds = Maps.newHashMap(); private Set colocateFragmentIds = new HashSet<>(); // record backend execute state @@ -1951,3 +1980,4 @@ private void attachInstanceProfileToFragmentProfile() { } } + From a7c33635bb324663d805c26311cab8bbf9a97af6 Mon Sep 17 00:00:00 2001 From: morningman Date: Wed, 4 Nov 2020 10:14:24 +0800 Subject: [PATCH 2/2] fix ut --- .../java/org/apache/doris/qe/Coordinator.java | 19 ++-- .../org/apache/doris/qe/CoordinatorTest.java | 86 +++++++++++++------ 2 files changed, 72 insertions(+), 33 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index cdae7d270881f2..4d4f851aa5e2b6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -81,11 +81,6 @@ import org.apache.doris.thrift.TTabletCommitInfo; import org.apache.doris.thrift.TUniqueId; -import org.apache.commons.collections.map.HashedMap; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.thrift.TException; - import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; @@ -93,6 +88,11 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.commons.collections.map.HashedMap; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.thrift.TException; + import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -1461,6 +1461,13 @@ class BucketShuffleJoinController { // cache the bucketShuffleFragmentIds private Set bucketShuffleFragmentIds = new HashSet<>(); + private Map> fragmentIdToScanNodeIds; + + // TODO(cmy): Should refactor this Controller to unify bucket shuffle join and colocate join + public BucketShuffleJoinController(Map> fragmentIdToScanNodeIds) { + this.fragmentIdToScanNodeIds = fragmentIdToScanNodeIds; + } + // check whether the node fragment is bucket shuffle join fragment private boolean isBucketShuffleJoin(int fragmentId, PlanNode node) { if (ConnectContext.get() != null) { @@ -1625,13 +1632,13 @@ private void computeInstanceParam(PlanFragmentId fragmentId, int parallelExecIns } } - private BucketShuffleJoinController bucketShuffleJoinController = new BucketShuffleJoinController(); private BucketSeqToScanRange bucketSeqToScanRange = new BucketSeqToScanRange(); private Map> fragmentIdToSeqToAddressMap = Maps.newHashMap(); // cache the fragment id to its scan node ids. Used for colocate join. private Map> fragmentIdToScanNodeIds = Maps.newHashMap(); private Set colocateFragmentIds = new HashSet<>(); + private BucketShuffleJoinController bucketShuffleJoinController = new BucketShuffleJoinController(fragmentIdToScanNodeIds); // record backend execute state // TODO(zhaochun): add profile information and others diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java index 11dc8d177cab0b..eb1bad3afb8349 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java @@ -17,10 +17,8 @@ package org.apache.doris.qe; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; import mockit.Mocked; + import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.BinaryPredicate; import org.apache.doris.analysis.Expr; @@ -34,33 +32,33 @@ import org.apache.doris.persist.EditLog; import org.apache.doris.planner.DataPartition; import org.apache.doris.planner.EmptySetNode; -import org.apache.doris.planner.ExchangeNode; import org.apache.doris.planner.HashJoinNode; -import org.apache.doris.planner.MysqlScanNode; import org.apache.doris.planner.OlapScanNode; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.PlanFragmentId; -import org.apache.doris.planner.PlanNode; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.Planner; -import org.apache.doris.planner.ScanNode; import org.apache.doris.service.FrontendOptions; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPartitionType; -import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TScanRangeLocation; import org.apache.doris.thrift.TScanRangeLocations; import org.apache.doris.thrift.TScanRangeParams; import org.apache.doris.thrift.TUniqueId; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ImmutableMap; + import org.junit.Assert; import org.junit.Test; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; public class CoordinatorTest extends Coordinator { static Planner planner = new Planner(); @@ -87,13 +85,19 @@ public CoordinatorTest() { public void testComputeColocateJoinInstanceParam() { Coordinator coordinator = new Coordinator(context, analyzer, planner); + PlanFragmentId planFragmentId = new PlanFragmentId(1); + int scanNodeId = 1; + Map> fragmentIdToScanNodeIds = new HashMap<>(); + fragmentIdToScanNodeIds.put(planFragmentId, new HashSet<>()); + fragmentIdToScanNodeIds.get(planFragmentId).add(scanNodeId); + Deencapsulation.setField(coordinator, "fragmentIdToScanNodeIds", fragmentIdToScanNodeIds); + // 1. set fragmentToBucketSeqToAddress in coordinator Map bucketSeqToAddress = new HashMap<>(); TNetworkAddress address = new TNetworkAddress(); for (int i = 0; i < 3; i++) { bucketSeqToAddress.put(i, address); } - PlanFragmentId planFragmentId = new PlanFragmentId(1); Map> fragmentToBucketSeqToAddress = new HashMap<>(); fragmentToBucketSeqToAddress.put(planFragmentId, bucketSeqToAddress); Deencapsulation.setField(coordinator, "fragmentIdToSeqToAddressMap", fragmentToBucketSeqToAddress); @@ -101,7 +105,7 @@ public void testComputeColocateJoinInstanceParam() { // 2. set bucketSeqToScanRange in coordinator BucketSeqToScanRange bucketSeqToScanRange = new BucketSeqToScanRange(); Map> ScanRangeMap = new HashMap<>(); - ScanRangeMap.put(1, new ArrayList<>()); + ScanRangeMap.put(scanNodeId, new ArrayList<>()); for (int i = 0; i < 3; i++) { bucketSeqToScanRange.put(i, ScanRangeMap); } @@ -126,9 +130,18 @@ public void testComputeColocateJoinInstanceParam() { @Test public void testIsBucketShuffleJoin() { - Coordinator.BucketShuffleJoinController bucketShuffleJoinController = new Coordinator.BucketShuffleJoinController(); - - PlanNodeId testPaloNodeId = new PlanNodeId(-1); + PlanFragmentId planFragmentId = new PlanFragmentId(1); + int scanNodeId = 1; + // The "fragmentIdToScanNodeIds" we created here is useless in this test. + // It is only for creating the BucketShuffleJoinController. + // So the fragment id and scan node id in it is meaningless. + Map> fragmentIdToScanNodeIds = new HashMap<>(); + fragmentIdToScanNodeIds.put(planFragmentId, new HashSet<>()); + fragmentIdToScanNodeIds.get(planFragmentId).add(scanNodeId); + Coordinator.BucketShuffleJoinController bucketShuffleJoinController + = new Coordinator.BucketShuffleJoinController(fragmentIdToScanNodeIds); + + PlanNodeId testPlanNodeId = new PlanNodeId(-1); TupleId testTupleId = new TupleId(-1); ArrayList tupleIdArrayList = new ArrayList<>(); tupleIdArrayList.add(testTupleId); @@ -137,8 +150,8 @@ public void testIsBucketShuffleJoin() { BinaryPredicate binaryPredicate = new BinaryPredicate(); testJoinexprs.add(binaryPredicate); - HashJoinNode hashJoinNode = new HashJoinNode(testPaloNodeId, new EmptySetNode(testPaloNodeId, tupleIdArrayList), - new EmptySetNode(testPaloNodeId, tupleIdArrayList) , new TableRef(), testJoinexprs, new ArrayList<>()); + HashJoinNode hashJoinNode = new HashJoinNode(testPlanNodeId, new EmptySetNode(testPlanNodeId, tupleIdArrayList), + new EmptySetNode(testPlanNodeId, tupleIdArrayList), new TableRef(), testJoinexprs, new ArrayList<>()); hashJoinNode.setFragment(new PlanFragment(new PlanFragmentId(-1), hashJoinNode, new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, testJoinexprs))); @@ -146,7 +159,7 @@ public void testIsBucketShuffleJoin() { Assert.assertEquals(false, Deencapsulation.invoke(bucketShuffleJoinController, "isBucketShuffleJoin", -1, hashJoinNode)); - // the fragment id is differernt from hash join node + // the fragment id is different from hash join node hashJoinNode.setFragment(new PlanFragment(new PlanFragmentId(-2), hashJoinNode, new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, testJoinexprs))); hashJoinNode.setDistributionMode(HashJoinNode.DistributionMode.BUCKET_SHUFFLE); @@ -158,7 +171,7 @@ public void testIsBucketShuffleJoin() { Assert.assertEquals(true, Deencapsulation.invoke(bucketShuffleJoinController, "isBucketShuffleJoin", -1, hashJoinNode)); - // the framgent id is in cache, so not do check node again + // the fragment id is in cache, so not do check node again Assert.assertEquals(true, Deencapsulation.invoke(bucketShuffleJoinController, "isBucketShuffleJoin", -1)); @@ -166,7 +179,13 @@ public void testIsBucketShuffleJoin() { @Test public void testComputeScanRangeAssignmentByBucketq() { - Coordinator.BucketShuffleJoinController bucketShuffleJoinController = new Coordinator.BucketShuffleJoinController(); + PlanFragmentId planFragmentId = new PlanFragmentId(1); + int scanNodeId = 1; + Map> fragmentIdToScanNodeIds = new HashMap<>(); + fragmentIdToScanNodeIds.put(planFragmentId, new HashSet<>()); + fragmentIdToScanNodeIds.get(planFragmentId).add(scanNodeId); + Coordinator.BucketShuffleJoinController bucketShuffleJoinController + = new Coordinator.BucketShuffleJoinController(fragmentIdToScanNodeIds); // init olap scan node of bucket shuffle join TupleDescriptor tupleDescriptor = new TupleDescriptor(new TupleId(-1)); @@ -175,7 +194,7 @@ public void testComputeScanRangeAssignmentByBucketq() { Deencapsulation.setField(olapTable, "defaultDistributionInfo", hashDistributionInfo); tupleDescriptor.setTable(olapTable); - OlapScanNode olapScanNode = new OlapScanNode(new PlanNodeId(-1), tupleDescriptor, "test"); + OlapScanNode olapScanNode = new OlapScanNode(new PlanNodeId(scanNodeId), tupleDescriptor, "test"); ArrayListMultimap bucketseq2localtion = ArrayListMultimap.create(); // each olaptable bucket have the same TScanRangeLocations, be id is {0, 1, 2} @@ -196,10 +215,9 @@ public void testComputeScanRangeAssignmentByBucketq() { } Deencapsulation.setField(olapScanNode, "bucketSeq2locations", bucketseq2localtion); - olapScanNode.setFragment(new PlanFragment(new PlanFragmentId(1), olapScanNode, + olapScanNode.setFragment(new PlanFragment(planFragmentId, olapScanNode, new DataPartition(TPartitionType.UNPARTITIONED))); - // init all backend Backend backend0 = new Backend(); backend0.setAlive(true); @@ -240,7 +258,13 @@ public void testComputeScanRangeAssignmentByBucketq() { @Test public void testComputeScanRangeAssignmentByBucket() { - Coordinator.BucketShuffleJoinController bucketShuffleJoinController = new Coordinator.BucketShuffleJoinController(); + PlanFragmentId planFragmentId = new PlanFragmentId(1); + int scanNodeId = 1; + Map> fragmentIdToScanNodeIds = new HashMap<>(); + fragmentIdToScanNodeIds.put(planFragmentId, new HashSet<>()); + fragmentIdToScanNodeIds.get(planFragmentId).add(scanNodeId); + Coordinator.BucketShuffleJoinController bucketShuffleJoinController + = new Coordinator.BucketShuffleJoinController(fragmentIdToScanNodeIds); // init olap scan node of bucket shuffle join TupleDescriptor tupleDescriptor = new TupleDescriptor(new TupleId(-1)); @@ -249,7 +273,7 @@ public void testComputeScanRangeAssignmentByBucket() { Deencapsulation.setField(olapTable, "defaultDistributionInfo", hashDistributionInfo); tupleDescriptor.setTable(olapTable); - OlapScanNode olapScanNode = new OlapScanNode(new PlanNodeId(-1), tupleDescriptor, "test"); + OlapScanNode olapScanNode = new OlapScanNode(new PlanNodeId(scanNodeId), tupleDescriptor, "test"); ArrayListMultimap bucketseq2localtion = ArrayListMultimap.create(); // each olaptable bucket have the same TScanRangeLocations, be id is {0, 1, 2} @@ -270,7 +294,7 @@ public void testComputeScanRangeAssignmentByBucket() { } Deencapsulation.setField(olapScanNode, "bucketSeq2locations", bucketseq2localtion); - olapScanNode.setFragment(new PlanFragment(new PlanFragmentId(1), olapScanNode, + olapScanNode.setFragment(new PlanFragment(planFragmentId, olapScanNode, new DataPartition(TPartitionType.UNPARTITIONED))); @@ -315,7 +339,15 @@ public void testComputeScanRangeAssignmentByBucket() { @Test public void testComputeBucketShuffleJoinInstanceParam() { - Coordinator.BucketShuffleJoinController bucketShuffleJoinController = new Coordinator.BucketShuffleJoinController(); + PlanFragmentId planFragmentId = new PlanFragmentId(1); + int scanNodeId = 1; + + // set fragment id to scan node ids map + Map> fragmentIdToScanNodeIds = new HashMap<>(); + fragmentIdToScanNodeIds.put(planFragmentId, new HashSet<>()); + fragmentIdToScanNodeIds.get(planFragmentId).add(scanNodeId); + Coordinator.BucketShuffleJoinController bucketShuffleJoinController + = new Coordinator.BucketShuffleJoinController(fragmentIdToScanNodeIds); // 1. set fragmentToBucketSeqToAddress in bucketShuffleJoinController Map bucketSeqToAddress = new HashMap<>(); @@ -323,7 +355,6 @@ public void testComputeBucketShuffleJoinInstanceParam() { for (int i = 0; i < 3; i++) { bucketSeqToAddress.put(i, address); } - PlanFragmentId planFragmentId = new PlanFragmentId(1); Map> fragmentToBucketSeqToAddress = new HashMap<>(); fragmentToBucketSeqToAddress.put(planFragmentId, bucketSeqToAddress); Deencapsulation.setField(bucketShuffleJoinController, "fragmentIdToSeqToAddressMap", fragmentToBucketSeqToAddress); @@ -332,13 +363,14 @@ public void testComputeBucketShuffleJoinInstanceParam() { Map fragmentIdBucketSeqToScanRangeMap = new HashMap<>(); BucketSeqToScanRange bucketSeqToScanRange = new BucketSeqToScanRange(); Map> ScanRangeMap = new HashMap<>(); - ScanRangeMap.put(1, new ArrayList<>()); + ScanRangeMap.put(scanNodeId, new ArrayList<>()); for (int i = 0; i < 3; i++) { bucketSeqToScanRange.put(i, ScanRangeMap); } fragmentIdBucketSeqToScanRangeMap.put(planFragmentId, bucketSeqToScanRange); Deencapsulation.setField(bucketShuffleJoinController, "fragmentIdBucketSeqToScanRangeMap", fragmentIdBucketSeqToScanRangeMap); + FragmentExecParams params = new FragmentExecParams(null); Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 1, params); Assert.assertEquals(1, params.instanceExecParams.size());