Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ public BalanceStatus isFit(long tabletSize, TStorageMedium medium,
for (int i = 0; i < pathStatistics.size(); i++) {
RootPathLoadStatistic pathStatistic = pathStatistics.get(i);
// if this is a supplement task, ignore the storage medium
if (!isSupplement && pathStatistic.getStorageMedium() != medium) {
if (!isSupplement && medium != null && pathStatistic.getStorageMedium() != medium) {
LOG.debug("backend {} path {}'s storage medium {} is not {} storage medium, actual: {}",
beId, pathStatistic.getPath(), pathStatistic.getStorageMedium(), medium);
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,7 @@ private boolean relocateAndBalance(GroupId groupId, Tag tag, Set<Long> unavailab
globalColocateStatistic.getBucketTotalReplicaDataSize(groupId, bucketIndex);

resultPaths.clear();
BalanceStatus st = beStat.isFit(bucketDataSize, null, resultPaths, true);
BalanceStatus st = beStat.isFit(bucketDataSize, null, resultPaths, false);
if (!st.ok()) {
LOG.debug("backend {} is unable to fit in group {}, tablet order idx {}, data size {}",
destBeId, groupId, bucketIndex, bucketDataSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,18 +98,20 @@ public BalanceStatus isFit(long tabletSize, boolean isSupplement) {
toString() + " does not fit tablet with size: " + tabletSize + ", offline");
}

double newUsagePerc = (usedCapacityB + tabletSize) / (double) capacityB;
long newLeftCapacity = capacityB - usedCapacityB - tabletSize;
if (isSupplement) {
if ((usedCapacityB + tabletSize) / (double) capacityB > (Config.storage_flood_stage_usage_percent / 100.0)
&& capacityB - usedCapacityB - tabletSize < Config.storage_flood_stage_left_capacity_bytes) {
if (newUsagePerc > (Config.storage_flood_stage_usage_percent / 100.0)
|| newLeftCapacity < Config.storage_flood_stage_left_capacity_bytes) {
return new BalanceStatus(ErrCode.COMMON_ERROR,
toString() + " does not fit tablet with size: " + tabletSize + ", limitation reached");
} else {
return BalanceStatus.OK;
}
}

if ((usedCapacityB + tabletSize) / (double) capacityB > (Config.storage_high_watermark_usage_percent / 100.0)
|| capacityB - usedCapacityB - tabletSize < Config.storage_min_left_capacity_bytes) {
if (newUsagePerc > (Config.storage_high_watermark_usage_percent / 100.0)
|| newLeftCapacity < Config.storage_min_left_capacity_bytes) {
return new BalanceStatus(ErrCode.COMMON_ERROR,
toString() + " does not fit tablet with size: " + tabletSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1318,7 +1318,8 @@ private RootPathLoadStatistic chooseAvailableDestPath(TabletSchedCtx tabletCtx,

// get all available paths which this tablet can fit in.
// beStatistics is sorted by mix load score in ascend order, so select from first to last.
List<BePathLoadStatPair> allFitPaths = Lists.newArrayList();
List<BePathLoadStatPair> allFitPathsSameMedium = Lists.newArrayList();
List<BePathLoadStatPair> allFitPathsDiffMedium = Lists.newArrayList();
for (BackendLoadStatistic bes : beStatistics) {
if (!bes.isAvailable()) {
LOG.debug("backend {} is not available, skip. tablet: {}", bes.getBeId(), tabletCtx.getTabletId());
Expand Down Expand Up @@ -1349,42 +1350,39 @@ private RootPathLoadStatistic chooseAvailableDestPath(TabletSchedCtx tabletCtx,

List<RootPathLoadStatistic> resultPaths = Lists.newArrayList();
BalanceStatus st = bes.isFit(tabletCtx.getTabletSize(), tabletCtx.getStorageMedium(),
resultPaths, tabletCtx.getTabletStatus() != TabletStatus.REPLICA_RELOCATING
/* if REPLICA_RELOCATING, then it is not a supplement task */);
if (!st.ok()) {
LOG.debug("unable to find path for tablet: {}. {}", tabletCtx, st);
// This is to solve, when we decommission some BEs with SSD disks,
// if there are no SSD disks on the remaining BEs, it will be impossible to select a
// suitable destination path.
// In this case, we need to ignore the storage medium property
// and try to select the destination path again.
// Set `isSupplement` to true will ignore the storage medium property.
st = bes.isFit(tabletCtx.getTabletSize(), tabletCtx.getStorageMedium(),
resultPaths, true);
if (!st.ok()) {
LOG.debug("unable to find path for supplementing tablet: {}. {}", tabletCtx, st);
continue;
resultPaths, false);
if (st.ok()) {
resultPaths.stream().forEach(path -> allFitPathsSameMedium.add(new BePathLoadStatPair(bes, path)));
} else {
LOG.debug("backend {} unable to find path for tablet: {}. {}", bes.getBeId(), tabletCtx, st);
resultPaths.clear();
st = bes.isFit(tabletCtx.getTabletSize(), tabletCtx.getStorageMedium(), resultPaths, true);
if (st.ok()) {
resultPaths.stream().forEach(path -> allFitPathsDiffMedium.add(new BePathLoadStatPair(bes, path)));
} else {
LOG.debug("backend {} unable to find path for supplementing tablet: {}. {}",
bes.getBeId(), tabletCtx, st);
}
}

resultPaths.stream().forEach(path -> allFitPaths.add(new BePathLoadStatPair(bes, path)));
}

// all fit paths has already been sorted by load score in 'allFitPaths' in ascend order.
// just get first available path.
// we try to find a path with specified media type, if not find, arbitrarily use one.
List<BePathLoadStatPair> allFitPaths =
!allFitPathsSameMedium.isEmpty() ? allFitPathsSameMedium : allFitPathsDiffMedium;
if (allFitPaths.isEmpty()) {
throw new SchedException(Status.UNRECOVERABLE, "unable to find dest path for new replica");
}

BePathLoadStatPairComparator comparator = new BePathLoadStatPairComparator(allFitPaths);
Collections.sort(allFitPaths, comparator);

// all fit paths has already been sorted by load score in 'allFitPaths' in ascend order.
// just get first available path.
// we try to find a path with specified media type, if not find, arbitrarily use one.
for (BePathLoadStatPair bePathLoadStat : allFitPaths) {
RootPathLoadStatistic rootPathLoadStatistic = bePathLoadStat.getPathLoadStatistic();
if (rootPathLoadStatistic.getStorageMedium() != tabletCtx.getStorageMedium()) {
LOG.debug("backend {}'s path {}'s storage medium {} "
+ "is not equal to tablet's storage medium {}, skip. tablet: {}",
+ "is not equal to tablet's storage medium {}, skip. tablet: {}",
rootPathLoadStatistic.getBeId(), rootPathLoadStatistic.getPathHash(),
rootPathLoadStatistic.getStorageMedium(), tabletCtx.getStorageMedium(),
tabletCtx.getTabletId());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.clone;

import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.common.Config;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.utframe.TestWithFeService;

import com.google.common.collect.Lists;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.stream.Collectors;

public class AddReplicaChoseMediumTest extends TestWithFeService {

@Override
protected void beforeCreatingConnectContext() throws Exception {
Config.enable_round_robin_create_tablet = true;
Config.allow_replica_on_same_host = true;
Config.tablet_checker_interval_ms = 100;
Config.tablet_schedule_interval_ms = 100;
Config.schedule_slot_num_per_hdd_path = 1;
}

@Override
protected int backendNum() {
return 4;
}

@Test
public void testAddReplicaChoseMedium() throws Exception {
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
List<Backend> backends = Env.getCurrentSystemInfo().getAllBackends();
Assertions.assertEquals(backendNum(), backends.size());
for (Backend be : backends) {
Assertions.assertEquals(0, invertedIndex.getTabletNumByBackendId(be.getId()));
}

Backend beWithSsd = backends.get(3);
beWithSsd.getDisks().values().forEach(it -> {
it.setStorageMedium(TStorageMedium.SSD);
it.setTotalCapacityB(it.getTotalCapacityB() * 2);
});

createDatabase("test");
createTable("CREATE TABLE test.tbl1 (k INT) DISTRIBUTED BY HASH(k) "
+ " BUCKETS 12 PROPERTIES ( \"replication_num\" = \"2\","
+ " \"storage_medium\" = \"HDD\")");
RebalancerTestUtil.updateReplicaPathHash();

Assertions.assertEquals(0, invertedIndex.getTabletNumByBackendId(beWithSsd.getId()));
for (Backend be : backends) {
if (be.getId() != beWithSsd.getId()) {
Assertions.assertEquals(8, invertedIndex.getTabletNumByBackendId(be.getId()));
}
}

Backend decommissionBe = backends.get(0);
String decommissionStmtStr = "alter system decommission backend \"" + decommissionBe.getHost()
+ ":" + decommissionBe.getHeartbeatPort() + "\"";
Assertions.assertNotNull(getSqlStmtExecutor(decommissionStmtStr));
Assertions.assertTrue(decommissionBe.isDecommissioned());

List<Integer> gotTabletNums = null;
List<Integer> expectTabletNums = Lists.newArrayList(0, 12, 12, 0);
for (int i = 0; i < 10; i++) {
gotTabletNums = backends.stream().map(it -> invertedIndex.getTabletNumByBackendId(it.getId()))
.collect(Collectors.toList());
if (expectTabletNums.equals(gotTabletNums)) {
break;
}
Thread.sleep(1000);
}

Assertions.assertEquals(expectTabletNums, gotTabletNums);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public static void beforeClass() throws Exception {
Map<String, TDisk> backendDisks = Maps.newHashMap();
TDisk tDisk1 = new TDisk();
tDisk1.setRootPath("/home/doris1.HDD");
tDisk1.setDiskTotalCapacity(20000000);
tDisk1.setDiskTotalCapacity(10L << 30);
tDisk1.setDataUsedCapacity(1);
tDisk1.setUsed(true);
tDisk1.setDiskAvailableCapacity(tDisk1.disk_total_capacity - tDisk1.data_used_capacity);
Expand All @@ -95,7 +95,7 @@ public static void beforeClass() throws Exception {

TDisk tDisk2 = new TDisk();
tDisk2.setRootPath("/home/doris2.HHD");
tDisk2.setDiskTotalCapacity(20000000);
tDisk2.setDiskTotalCapacity(10L << 30);
tDisk2.setDataUsedCapacity(1);
tDisk2.setUsed(true);
tDisk2.setDiskAvailableCapacity(tDisk2.disk_total_capacity - tDisk2.data_used_capacity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public static void beforeClass() throws Exception {
Map<String, TDisk> backendDisks = Maps.newHashMap();
TDisk tDisk1 = new TDisk();
tDisk1.setRootPath("/home/doris.HDD");
tDisk1.setDiskTotalCapacity(2000000000);
tDisk1.setDiskTotalCapacity(10L << 30);
tDisk1.setDataUsedCapacity(1);
tDisk1.setUsed(true);
tDisk1.setDiskAvailableCapacity(tDisk1.disk_total_capacity - tDisk1.data_used_capacity);
Expand All @@ -138,7 +138,7 @@ public static void beforeClass() throws Exception {

TDisk tDisk2 = new TDisk();
tDisk2.setRootPath("/home/doris.SSD");
tDisk2.setDiskTotalCapacity(2000000000);
tDisk2.setDiskTotalCapacity(10L << 30);
tDisk2.setDataUsedCapacity(1);
tDisk2.setUsed(true);
tDisk2.setDiskAvailableCapacity(tDisk2.disk_total_capacity - tDisk2.data_used_capacity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,8 +466,8 @@ beHttpPort, beArrowFlightSqlPort, new DefaultHeartbeatServiceImpl(beThriftPort,
Backend be = new Backend(Env.getCurrentEnv().getNextId(), backend.getHost(), backend.getHeartbeatPort());
DiskInfo diskInfo1 = new DiskInfo("/path" + be.getId());
diskInfo1.setPathHash(be.getId());
diskInfo1.setTotalCapacityB(1000000);
diskInfo1.setAvailableCapacityB(500000);
diskInfo1.setTotalCapacityB(10L << 30);
diskInfo1.setAvailableCapacityB(5L << 30);
diskInfo1.setDataUsedCapacityB(480000);
diskInfo1.setPathHash(be.getId());
Map<String, DiskInfo> disks = Maps.newHashMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,8 @@ beHttpPort, beArrowFlightSqlPort, new DefaultHeartbeatServiceImpl(beThriftPort,
Backend be = new Backend(Env.getCurrentEnv().getNextId(), backend.getHost(), backend.getHeartbeatPort());
Map<String, DiskInfo> disks = Maps.newHashMap();
DiskInfo diskInfo1 = new DiskInfo("/path" + be.getId());
diskInfo1.setTotalCapacityB(1000000);
diskInfo1.setAvailableCapacityB(500000);
diskInfo1.setTotalCapacityB(10L << 30);
diskInfo1.setAvailableCapacityB(5L << 30);
diskInfo1.setDataUsedCapacityB(480000);
diskInfo1.setPathHash(be.getId());
disks.put(diskInfo1.getRootPath(), diskInfo1);
Expand Down