From 496d24fa75ad6ba0870a486cac8c52349cbcd50b Mon Sep 17 00:00:00 2001 From: yujun Date: Tue, 24 Oct 2023 15:18:32 +0800 Subject: [PATCH 1/2] clone add replica prefer choose the same medium --- .../doris/clone/BackendLoadStatistic.java | 2 +- .../doris/clone/RootPathLoadStatistic.java | 10 +- .../apache/doris/clone/TabletScheduler.java | 42 ++++---- .../clone/AddReplicaChoseMediumTest.java | 97 +++++++++++++++++++ .../apache/doris/clone/DecommissionTest.java | 4 +- .../clone/TabletRepairAndBalanceTest.java | 4 +- .../doris/utframe/TestWithFeService.java | 4 +- .../apache/doris/utframe/UtFrameUtils.java | 4 +- 8 files changed, 132 insertions(+), 35 deletions(-) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/clone/AddReplicaChoseMediumTest.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java index 2d6401b8eb77e6..c23bcd76329e97 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java @@ -414,7 +414,7 @@ public BalanceStatus isFit(long tabletSize, TStorageMedium medium, for (int i = 0; i < pathStatistics.size(); i++) { RootPathLoadStatistic pathStatistic = pathStatistics.get(i); // if this is a supplement task, ignore the storage medium - if (!isSupplement && pathStatistic.getStorageMedium() != medium) { + if (!isSupplement && medium != null && pathStatistic.getStorageMedium() != medium) { LOG.debug("backend {} path {}'s storage medium {} is not {} storage medium, actual: {}", beId, pathStatistic.getPath(), pathStatistic.getStorageMedium(), medium); continue; diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java index 1a51276f7d6bd1..d2f1983a831748 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java @@ -98,9 +98,11 @@ public BalanceStatus isFit(long tabletSize, boolean isSupplement) { toString() + " does not fit tablet with size: " + tabletSize + ", offline"); } + double newUsagePerc = (usedCapacityB + tabletSize) / (double) capacityB; + long newLeftCapacity = capacityB - usedCapacityB - tabletSize; if (isSupplement) { - if ((usedCapacityB + tabletSize) / (double) capacityB > (Config.storage_flood_stage_usage_percent / 100.0) - && capacityB - usedCapacityB - tabletSize < Config.storage_flood_stage_left_capacity_bytes) { + if (newUsagePerc > (Config.storage_flood_stage_usage_percent / 100.0) + || newLeftCapacity < Config.storage_flood_stage_left_capacity_bytes) { return new BalanceStatus(ErrCode.COMMON_ERROR, toString() + " does not fit tablet with size: " + tabletSize + ", limitation reached"); } else { @@ -108,8 +110,8 @@ public BalanceStatus isFit(long tabletSize, boolean isSupplement) { } } - if ((usedCapacityB + tabletSize) / (double) capacityB > (Config.storage_high_watermark_usage_percent / 100.0) - || capacityB - usedCapacityB - tabletSize < Config.storage_min_left_capacity_bytes) { + if (newUsagePerc > (Config.storage_high_watermark_usage_percent / 100.0) + || newLeftCapacity < Config.storage_min_left_capacity_bytes) { return new BalanceStatus(ErrCode.COMMON_ERROR, toString() + " does not fit tablet with size: " + tabletSize); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index 7fb800bcbfd34f..24cb84c970edb4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -1311,7 +1311,8 @@ private RootPathLoadStatistic chooseAvailableDestPath(TabletSchedCtx tabletCtx, // get all available paths which this tablet can fit in. // beStatistics is sorted by mix load score in ascend order, so select from first to last. - List allFitPaths = Lists.newArrayList(); + List allFitPathsSameMedium = Lists.newArrayList(); + List allFitPathsDiffMedium = Lists.newArrayList(); for (BackendLoadStatistic bes : beStatistics) { if (!bes.isAvailable()) { LOG.debug("backend {} is not available, skip. tablet: {}", bes.getBeId(), tabletCtx.getTabletId()); @@ -1342,27 +1343,27 @@ private RootPathLoadStatistic chooseAvailableDestPath(TabletSchedCtx tabletCtx, List resultPaths = Lists.newArrayList(); BalanceStatus st = bes.isFit(tabletCtx.getTabletSize(), tabletCtx.getStorageMedium(), - resultPaths, tabletCtx.getTabletStatus() != TabletStatus.REPLICA_RELOCATING - /* if REPLICA_RELOCATING, then it is not a supplement task */); - if (!st.ok()) { - LOG.debug("unable to find path for tablet: {}. {}", tabletCtx, st); - // This is to solve, when we decommission some BEs with SSD disks, - // if there are no SSD disks on the remaining BEs, it will be impossible to select a - // suitable destination path. - // In this case, we need to ignore the storage medium property - // and try to select the destination path again. - // Set `isSupplement` to true will ignore the storage medium property. - st = bes.isFit(tabletCtx.getTabletSize(), tabletCtx.getStorageMedium(), - resultPaths, true); - if (!st.ok()) { - LOG.debug("unable to find path for supplementing tablet: {}. {}", tabletCtx, st); - continue; + resultPaths, false); + if (st.ok()) { + resultPaths.stream().forEach(path -> allFitPathsSameMedium.add(new BePathLoadStatPair(bes, path))); + } else { + LOG.debug("backend {} unable to find path for tablet: {}. {}", bes.getBeId(), tabletCtx, st); + resultPaths.clear(); + st = bes.isFit(tabletCtx.getTabletSize(), tabletCtx.getStorageMedium(), resultPaths, true); + if (st.ok()) { + resultPaths.stream().forEach(path -> allFitPathsDiffMedium.add(new BePathLoadStatPair(bes, path))); + } else { + LOG.debug("backend {} unable to find path for supplementing tablet: {}. {}", + bes.getBeId(), tabletCtx, st); } } - - resultPaths.stream().forEach(path -> allFitPaths.add(new BePathLoadStatPair(bes, path))); } + // all fit paths has already been sorted by load score in 'allFitPaths' in ascend order. + // just get first available path. + // we try to find a path with specified media type, if not find, arbitrarily use one. + List allFitPaths = + !allFitPathsSameMedium.isEmpty() ? allFitPathsSameMedium : allFitPathsDiffMedium; if (allFitPaths.isEmpty()) { throw new SchedException(Status.UNRECOVERABLE, "unable to find dest path for new replica"); } @@ -1370,14 +1371,11 @@ private RootPathLoadStatistic chooseAvailableDestPath(TabletSchedCtx tabletCtx, BePathLoadStatPairComparator comparator = new BePathLoadStatPairComparator(allFitPaths); Collections.sort(allFitPaths, comparator); - // all fit paths has already been sorted by load score in 'allFitPaths' in ascend order. - // just get first available path. - // we try to find a path with specified media type, if not find, arbitrarily use one. for (BePathLoadStatPair bePathLoadStat : allFitPaths) { RootPathLoadStatistic rootPathLoadStatistic = bePathLoadStat.getPathLoadStatistic(); if (rootPathLoadStatistic.getStorageMedium() != tabletCtx.getStorageMedium()) { LOG.debug("backend {}'s path {}'s storage medium {} " - + "is not equal to tablet's storage medium {}, skip. tablet: {}", + + "is not equal to tablet's storage medium {}, skip. tablet: {}", rootPathLoadStatistic.getBeId(), rootPathLoadStatistic.getPathHash(), rootPathLoadStatistic.getStorageMedium(), tabletCtx.getStorageMedium(), tabletCtx.getTabletId()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/AddReplicaChoseMediumTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/AddReplicaChoseMediumTest.java new file mode 100644 index 00000000000000..dee048223b58e1 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/AddReplicaChoseMediumTest.java @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.clone; + +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.TabletInvertedIndex; +import org.apache.doris.common.Config; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TStorageMedium; +import org.apache.doris.utframe.TestWithFeService; + +import com.google.common.collect.Lists; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.stream.Collectors; + +public class AddReplicaChoseMediumTest extends TestWithFeService { + + @Override + protected void beforeCreatingConnectContext() throws Exception { + Config.enable_round_robin_create_tablet = true; + Config.allow_replica_on_same_host = true; + Config.tablet_checker_interval_ms = 100; + Config.tablet_schedule_interval_ms = 100; + Config.schedule_slot_num_per_hdd_path = 1; + } + + @Override + protected int backendNum() { + return 4; + } + + @Test + public void testAddReplicaChoseMedium() throws Exception { + TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); + List backends = Env.getCurrentSystemInfo().getAllBackends(); + Assertions.assertEquals(backendNum(), backends.size()); + for (Backend be : backends) { + Assertions.assertEquals(0, invertedIndex.getTabletNumByBackendId(be.getId())); + } + + Backend beWithSsd = backends.get(3); + beWithSsd.getDisks().values().forEach(it -> { + it.setStorageMedium(TStorageMedium.SSD); + it.setTotalCapacityB(it.getTotalCapacityB() * 2); + }); + + createDatabase("test"); + createTable("CREATE TABLE test.tbl1 (k INT) DISTRIBUTED BY HASH(k) " + + " BUCKETS 12 PROPERTIES ( \"replication_num\" = \"2\"," + + " \"storage_medium\" = \"HDD\")"); + RebalancerTestUtil.updateReplicaPathHash(); + + Assertions.assertEquals(0, invertedIndex.getTabletNumByBackendId(beWithSsd.getId())); + for (Backend be : backends) { + if (be.getId() != beWithSsd.getId()) { + Assertions.assertEquals(8, invertedIndex.getTabletNumByBackendId(be.getId())); + } + } + + Backend decommissionBe = backends.get(0); + String decommissionStmtStr = "alter system decommission backend \"" + decommissionBe.getHost() + + ":" + decommissionBe.getHeartbeatPort() + "\""; + Assertions.assertNotNull(getSqlStmtExecutor(decommissionStmtStr)); + Assertions.assertTrue(decommissionBe.isDecommissioned()); + + List gotTabletNums = null; + List expectTabletNums = Lists.newArrayList(0, 12, 12, 0); + for (int i = 0; i < 10; i++) { + gotTabletNums = backends.stream().map(it -> invertedIndex.getTabletNumByBackendId(it.getId())) + .collect(Collectors.toList()); + if (expectTabletNums.equals(gotTabletNums)) { + break; + } + Thread.sleep(1000); + } + + Assertions.assertEquals(expectTabletNums, gotTabletNums); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java index 43ea5340bc6db5..eae28a01345c9c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java @@ -85,7 +85,7 @@ public static void beforeClass() throws Exception { Map backendDisks = Maps.newHashMap(); TDisk tDisk1 = new TDisk(); tDisk1.setRootPath("/home/doris1.HDD"); - tDisk1.setDiskTotalCapacity(20000000); + tDisk1.setDiskTotalCapacity(10L << 30); tDisk1.setDataUsedCapacity(1); tDisk1.setUsed(true); tDisk1.setDiskAvailableCapacity(tDisk1.disk_total_capacity - tDisk1.data_used_capacity); @@ -95,7 +95,7 @@ public static void beforeClass() throws Exception { TDisk tDisk2 = new TDisk(); tDisk2.setRootPath("/home/doris2.HHD"); - tDisk2.setDiskTotalCapacity(20000000); + tDisk2.setDiskTotalCapacity(10L << 30); tDisk2.setDataUsedCapacity(1); tDisk2.setUsed(true); tDisk2.setDiskAvailableCapacity(tDisk2.disk_total_capacity - tDisk2.data_used_capacity); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java index 1524ec3a8841f8..f1057de808a57f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java @@ -128,7 +128,7 @@ public static void beforeClass() throws Exception { Map backendDisks = Maps.newHashMap(); TDisk tDisk1 = new TDisk(); tDisk1.setRootPath("/home/doris.HDD"); - tDisk1.setDiskTotalCapacity(2000000000); + tDisk1.setDiskTotalCapacity(10L << 30); tDisk1.setDataUsedCapacity(1); tDisk1.setUsed(true); tDisk1.setDiskAvailableCapacity(tDisk1.disk_total_capacity - tDisk1.data_used_capacity); @@ -138,7 +138,7 @@ public static void beforeClass() throws Exception { TDisk tDisk2 = new TDisk(); tDisk2.setRootPath("/home/doris.SSD"); - tDisk2.setDiskTotalCapacity(2000000000); + tDisk2.setDiskTotalCapacity(10L << 30); tDisk2.setDataUsedCapacity(1); tDisk2.setUsed(true); tDisk2.setDiskAvailableCapacity(tDisk2.disk_total_capacity - tDisk2.data_used_capacity); diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java index ac4fa2660db7ea..5edf5917a392b2 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java @@ -463,8 +463,8 @@ beHttpPort, new DefaultHeartbeatServiceImpl(beThriftPort, beHttpPort, beBrpcPort Backend be = new Backend(Env.getCurrentEnv().getNextId(), backend.getHost(), backend.getHeartbeatPort()); DiskInfo diskInfo1 = new DiskInfo("/path" + be.getId()); diskInfo1.setPathHash(be.getId()); - diskInfo1.setTotalCapacityB(1000000); - diskInfo1.setAvailableCapacityB(500000); + diskInfo1.setTotalCapacityB(10L << 30); + diskInfo1.setAvailableCapacityB(5L << 30); diskInfo1.setDataUsedCapacityB(480000); Map disks = Maps.newHashMap(); disks.put(diskInfo1.getRootPath(), diskInfo1); diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java index 2e2d53edb7a952..9f24461dbfc390 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java @@ -290,8 +290,8 @@ beHttpPort, new DefaultHeartbeatServiceImpl(beThriftPort, beHttpPort, beBrpcPort Backend be = new Backend(Env.getCurrentEnv().getNextId(), backend.getHost(), backend.getHeartbeatPort()); Map disks = Maps.newHashMap(); DiskInfo diskInfo1 = new DiskInfo("/path" + be.getId()); - diskInfo1.setTotalCapacityB(1000000); - diskInfo1.setAvailableCapacityB(500000); + diskInfo1.setTotalCapacityB(10L << 30); + diskInfo1.setAvailableCapacityB(5L << 30); diskInfo1.setDataUsedCapacityB(480000); disks.put(diskInfo1.getRootPath(), diskInfo1); be.setDisks(ImmutableMap.copyOf(disks)); From 6b339d3f70a510e1d86a6f25f7a810740c675229 Mon Sep 17 00:00:00 2001 From: yujun777 Date: Wed, 25 Oct 2023 16:27:28 +0800 Subject: [PATCH 2/2] fix compile --- .../org/apache/doris/clone/AddReplicaChoseMediumTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/AddReplicaChoseMediumTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/AddReplicaChoseMediumTest.java index dee048223b58e1..aac92cff2e79d8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/AddReplicaChoseMediumTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/AddReplicaChoseMediumTest.java @@ -20,6 +20,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.common.Config; +import org.apache.doris.common.FeConstants; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.utframe.TestWithFeService; @@ -37,8 +38,8 @@ public class AddReplicaChoseMediumTest extends TestWithFeService { protected void beforeCreatingConnectContext() throws Exception { Config.enable_round_robin_create_tablet = true; Config.allow_replica_on_same_host = true; - Config.tablet_checker_interval_ms = 100; - Config.tablet_schedule_interval_ms = 100; + FeConstants.tablet_checker_interval_ms = 100; + FeConstants.tablet_schedule_interval_ms = 100; Config.schedule_slot_num_per_hdd_path = 1; }