From 202ffc054c346357a0a6b1cb1eeff4c9d4a2df8d Mon Sep 17 00:00:00 2001 From: yujun Date: Tue, 20 Aug 2024 23:01:53 +0800 Subject: [PATCH 1/4] query prefer chose tablets on good disks --- be/src/agent/task_worker_pool.cpp | 19 +++-- be/src/service/doris_main.cpp | 2 + .../org/apache/doris/catalog/DiskInfo.java | 4 + .../java/org/apache/doris/catalog/Tablet.java | 28 +++++-- .../apache/doris/master/ReportHandler.java | 29 ++++++- .../apache/doris/planner/OlapScanNode.java | 14 +++- .../apache/doris/catalog/QueryTabletTest.java | 84 +++++++++++++++++++ .../doris/utframe/MockedBackendFactory.java | 31 +++++++ 8 files changed, 190 insertions(+), 21 deletions(-) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/catalog/QueryTabletTest.java diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index e79f45f3d17fa8..4d3cd876b5c8c3 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -919,13 +919,6 @@ void report_task_callback(const TMasterInfo& master_info) { } void report_disk_callback(StorageEngine& engine, const TMasterInfo& master_info) { - // Random sleep 1~5 seconds before doing report. - // In order to avoid the problem that the FE receives many report requests at the same time - // and can not be processed. - if (config::report_random_wait) { - random_sleep(5); - } - TReportRequest request; request.__set_backend(BackendOptions::get_local_backend()); request.__isset.disks = true; @@ -966,8 +959,16 @@ void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_inf request.__set_backend(BackendOptions::get_local_backend()); request.__isset.tablets = true; - uint64_t report_version = s_report_version; - engine.tablet_manager()->build_all_report_tablets_info(&request.tablets); + uint64_t report_version; + for (int i = 0; i < 5; i++) { + request.tablets.clear(); + report_version = s_report_version; + engine.tablet_manager()->build_all_report_tablets_info(&request.tablets); + if (report_version == s_report_version) { + break; + } + } + if (report_version < s_report_version) { // TODO llj This can only reduce the possibility for report error, but can't avoid it. // If FE create a tablet in FE meta and send CREATE task to this BE, the tablet may not be included in this diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index 52469ac1081d33..05b4f00b8d276f 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -595,6 +595,8 @@ int main(int argc, char** argv) { exit(1); } + exec_env->storage_engine().notify_listeners(); + while (!doris::k_doris_exit) { #if defined(LEAK_SANITIZER) __lsan_do_leak_check(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java index 934e7f75fb0363..38d8037befc0d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java @@ -151,6 +151,10 @@ public boolean hasPathHash() { return pathHash != 0; } + public boolean isAlive() { + return state == DiskState.ONLINE; + } + public boolean isStorageMediumMatch(TStorageMedium storageMedium) { return this.storageMedium == storageMedium; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java index 61251084afacd9..d1740e8616ba13 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java @@ -281,9 +281,11 @@ public Multimap getNormalReplicaBackendPathMap() { } // for query - public List getQueryableReplicas(long visibleVersion, boolean allowFailedVersion) { + public List getQueryableReplicas(long visibleVersion, Map> backendAlivePathHashs, + boolean allowFailedVersion) { List allQueryableReplica = Lists.newArrayListWithCapacity(replicas.size()); List auxiliaryReplica = Lists.newArrayListWithCapacity(replicas.size()); + List deadPathReplica = Lists.newArrayList(); for (Replica replica : replicas) { if (replica.isBad()) { continue; @@ -294,21 +296,31 @@ public List getQueryableReplicas(long visibleVersion, boolean allowFail continue; } + if (!replica.checkVersionCatchUp(visibleVersion, false)) { + continue; + } + + Set thisBeAlivePaths = backendAlivePathHashs.get(replica.getBackendId()); ReplicaState state = replica.getState(); - if (state.canQuery()) { - if (replica.checkVersionCatchUp(visibleVersion, false)) { - allQueryableReplica.add(replica); - } + // if thisBeAlivePaths contains pathHash = 0, it mean this be hadn't report disks state. + // should ignore this case. + if (replica.getPathHash() != -1 && thisBeAlivePaths != null + && !thisBeAlivePaths.contains(replica.getPathHash()) + && !thisBeAlivePaths.contains(0L)) { + deadPathReplica.add(replica); + } else if (state.canQuery()) { + allQueryableReplica.add(replica); } else if (state == ReplicaState.DECOMMISSION) { - if (replica.checkVersionCatchUp(visibleVersion, false)) { - auxiliaryReplica.add(replica); - } + auxiliaryReplica.add(replica); } } if (allQueryableReplica.isEmpty()) { allQueryableReplica = auxiliaryReplica; } + if (allQueryableReplica.isEmpty()) { + allQueryableReplica = deadPathReplica; + } if (Config.skip_compaction_slower_replica && allQueryableReplica.size() > 1) { long minVersionCount = allQueryableReplica.stream().mapToLong(Replica::getVisibleVersionCount) diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index af62171007b571..0e2b14ed3fca74 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -22,6 +22,7 @@ import org.apache.doris.catalog.ColocateGroupSchema; import org.apache.doris.catalog.ColocateTableIndex; import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.DiskInfo; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.MaterializedIndex.IndexState; @@ -808,6 +809,15 @@ private static void deleteFromMeta(ListMultimap tabletDeleteFromMeta AgentBatchTask createReplicaBatchTask = new AgentBatchTask(); TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); Map objectPool = new HashMap(); + Backend backend = Env.getCurrentSystemInfo().getBackend(backendId); + Set backendHealthPathHashs; + if (backend == null) { + backendHealthPathHashs = Sets.newHashSet(); + } else { + backendHealthPathHashs = backend.getDisks().values().stream() + .filter(DiskInfo::isAlive) + .map(DiskInfo::getPathHash).collect(Collectors.toSet()); + } for (Long dbId : tabletDeleteFromMeta.keySet()) { Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); if (db == null) { @@ -863,7 +873,24 @@ private static void deleteFromMeta(ListMultimap tabletDeleteFromMeta long currentBackendReportVersion = Env.getCurrentSystemInfo() .getBackendReportVersion(backendId); if (backendReportVersion < currentBackendReportVersion) { - continue; + + // if backendHealthPathHashs contains health path hash 0, + // it means this backend hadn't reported disks state, + // should ignore this case. + boolean thisReplicaOnBadDisk = replica.getPathHash() != -1L + && !backendHealthPathHashs.contains(replica.getPathHash()) + && !backendHealthPathHashs.contains(0L); + + boolean existsOtherHealthReplica = tablet.getReplicas().stream() + .anyMatch(r -> r.getBackendId() != replica.getBackendId() + && r.getVersion() >= replica.getVersion() + && r.getLastFailedVersion() == -1L + && !r.isBad()); + + // if replica is on bad disks and there are other health replicas, still delete it. + if (!(thisReplicaOnBadDisk && existsOtherHealthReplica)) { + continue; + } } BinlogConfig binlogConfig = new BinlogConfig(olapTable.getBinlogConfig()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 09d944d746ae41..a35f960ba1aed7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -40,6 +40,7 @@ import org.apache.doris.catalog.AggregateType; import org.apache.doris.catalog.ColocateTableIndex; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.DiskInfo; import org.apache.doris.catalog.DistributionInfo; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.HashDistributionInfo; @@ -732,7 +733,7 @@ private Collection distributionPrune( } private void addScanRangeLocations(Partition partition, - List tablets) throws UserException { + List tablets, Map> backendAlivePathHashs) throws UserException { long visibleVersion = partition.getVisibleVersion(); String visibleVersionStr = String.valueOf(visibleVersion); @@ -776,7 +777,8 @@ private void addScanRangeLocations(Partition partition, paloRange.setTabletId(tabletId); // random shuffle List && only collect one copy - List replicas = tablet.getQueryableReplicas(visibleVersion, skipMissingVersion); + List replicas = tablet.getQueryableReplicas(visibleVersion, + backendAlivePathHashs, skipMissingVersion); if (replicas.isEmpty()) { if (ConnectContext.get().getSessionVariable().skipBadTablet) { continue; @@ -1125,6 +1127,12 @@ private void computeTabletInfo() throws UserException { */ Preconditions.checkState(scanBackendIds.size() == 0); Preconditions.checkState(scanTabletIds.size() == 0); + Map> backendAlivePathHashs = Maps.newHashMap(); + for (Backend backend : Env.getCurrentSystemInfo().getAllClusterBackendsNoException().values()) { + backendAlivePathHashs.put(backend.getId(), backend.getDisks().values().stream() + .filter(DiskInfo::isAlive).map(DiskInfo::getPathHash).collect(Collectors.toSet())); + } + for (Long partitionId : selectedPartitionIds) { final Partition partition = olapTable.getPartition(partitionId); final MaterializedIndex selectedTable = partition.getIndex(selectedIndexId); @@ -1166,7 +1174,7 @@ private void computeTabletInfo() throws UserException { totalTabletsNum += selectedTable.getTablets().size(); selectedSplitNum += tablets.size(); - addScanRangeLocations(partition, tablets); + addScanRangeLocations(partition, tablets, backendAlivePathHashs); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/QueryTabletTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/QueryTabletTest.java new file mode 100644 index 00000000000000..32929523a53624 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/QueryTabletTest.java @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog; + +import org.apache.doris.system.Backend; +import org.apache.doris.utframe.TestWithFeService; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +public class QueryTabletTest extends TestWithFeService { + + @Override + protected int backendNum() { + return 3; + } + + @Test + public void testTabletOnBadDisks() throws Exception { + createDatabase("db1"); + createTable("create table db1.tbl1(k1 int) distributed by hash(k1) buckets 1" + + " properties('replication_num' = '3')"); + + Database db = Env.getCurrentInternalCatalog().getDbOrMetaException("db1"); + OlapTable tbl = (OlapTable) db.getTableOrMetaException("tbl1"); + Assertions.assertNotNull(tbl); + Tablet tablet = tbl.getPartitions().iterator().next() + .getMaterializedIndices(MaterializedIndex.IndexExtState.ALL).iterator().next() + .getTablets().iterator().next(); + + List replicas = tablet.getReplicas(); + Assertions.assertEquals(3, replicas.size()); + for (Replica replica : replicas) { + Assertions.assertTrue(replica.getPathHash() != -1L); + } + + Assertions.assertEquals(replicas, + tablet.getQueryableReplicas(1L, getAlivePathHashs(), false)); + + // disk mark as bad + Env.getCurrentSystemInfo().getBackend(replicas.get(0).getBackendId()) + .getDisks().values().forEach(disk -> disk.setState(DiskInfo.DiskState.OFFLINE)); + + // lost disk + replicas.get(1).setPathHash(-123321L); + + Assertions.assertEquals(Lists.newArrayList(replicas.get(2)), + tablet.getQueryableReplicas(1L, getAlivePathHashs(), false)); + } + + private Map> getAlivePathHashs() { + Map> backendAlivePathHashs = Maps.newHashMap(); + for (Backend backend : Env.getCurrentSystemInfo().getAllClusterBackendsNoException().values()) { + backendAlivePathHashs.put(backend.getId(), backend.getDisks().values().stream() + .filter(DiskInfo::isAlive).map(DiskInfo::getPathHash).collect(Collectors.toSet())); + } + + return backendAlivePathHashs; + } + +} + diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java index 122733316340f6..53f075c29bd3cd 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java @@ -39,6 +39,7 @@ import org.apache.doris.thrift.TCancelPlanFragmentResult; import org.apache.doris.thrift.TCheckStorageFormatResult; import org.apache.doris.thrift.TCloneReq; +import org.apache.doris.thrift.TCreateTabletReq; import org.apache.doris.thrift.TDiskTrashInfo; import org.apache.doris.thrift.TDropTabletReq; import org.apache.doris.thrift.TExecPlanFragmentParams; @@ -83,7 +84,9 @@ import java.io.IOException; import java.util.List; +import java.util.Random; import java.util.concurrent.BlockingQueue; +import java.util.stream.Collectors; /* * This class is used to create mock backends. @@ -191,6 +194,9 @@ public void run() { TTaskType taskType = request.getTaskType(); switch (taskType) { case CREATE: + ++reportVersion; + handleCreateTablet(request, finishTaskRequest); + break; case ALTER: ++reportVersion; break; @@ -198,6 +204,7 @@ public void run() { handleDropTablet(request, finishTaskRequest); break; case CLONE: + ++reportVersion; handleCloneTablet(request, finishTaskRequest); break; case STORAGE_MEDIUM_MIGRATE: @@ -223,6 +230,30 @@ public void run() { } } + private void handleCreateTablet(TAgentTaskRequest request, TFinishTaskRequest finishTaskRequest) { + TCreateTabletReq req = request.getCreateTabletReq(); + List candDisks = backendInFe.getDisks().values().stream() + .filter(disk -> req.storage_medium == disk.getStorageMedium() && disk.isAlive()) + .collect(Collectors.toList()); + if (candDisks.isEmpty()) { + candDisks = backendInFe.getDisks().values().stream() + .filter(DiskInfo::isAlive) + .collect(Collectors.toList()); + } + DiskInfo choseDisk = candDisks.isEmpty() ? null + : candDisks.get(new Random().nextInt(candDisks.size())); + + List tabletInfos = Lists.newArrayList(); + TTabletInfo tabletInfo = new TTabletInfo(); + tabletInfo.setTabletId(req.tablet_id); + tabletInfo.setVersion(req.version); + tabletInfo.setPathHash(choseDisk == null ? -1L : choseDisk.getPathHash()); + tabletInfo.setReplicaId(req.replica_id); + tabletInfo.setUsed(true); + tabletInfos.add(tabletInfo); + finishTaskRequest.setFinishTabletInfos(tabletInfos); + } + private void handleDropTablet(TAgentTaskRequest request, TFinishTaskRequest finishTaskRequest) { TDropTabletReq req = request.getDropTabletReq(); long dataSize = Math.max(1, CatalogTestUtil.getTabletDataSize(req.tablet_id)); From c77571c997262287d1cc80c605caa3e336e470de Mon Sep 17 00:00:00 2001 From: yujun777 Date: Tue, 20 Aug 2024 23:09:47 +0800 Subject: [PATCH 2/4] update --- .../src/main/java/org/apache/doris/planner/OlapScanNode.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index a35f960ba1aed7..d133d359b27d31 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -1128,7 +1128,7 @@ private void computeTabletInfo() throws UserException { Preconditions.checkState(scanBackendIds.size() == 0); Preconditions.checkState(scanTabletIds.size() == 0); Map> backendAlivePathHashs = Maps.newHashMap(); - for (Backend backend : Env.getCurrentSystemInfo().getAllClusterBackendsNoException().values()) { + for (Backend backend : Env.getCurrentSystemInfo().getAllBackends()) { backendAlivePathHashs.put(backend.getId(), backend.getDisks().values().stream() .filter(DiskInfo::isAlive).map(DiskInfo::getPathHash).collect(Collectors.toSet())); } From d00fddf8e02b990dc9f5ac9909d7b34a4e0911e1 Mon Sep 17 00:00:00 2001 From: yujun777 Date: Tue, 20 Aug 2024 23:28:21 +0800 Subject: [PATCH 3/4] update --- .../src/test/java/org/apache/doris/catalog/QueryTabletTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/QueryTabletTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/QueryTabletTest.java index 32929523a53624..c11447341907f5 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/QueryTabletTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/QueryTabletTest.java @@ -72,7 +72,7 @@ public void testTabletOnBadDisks() throws Exception { private Map> getAlivePathHashs() { Map> backendAlivePathHashs = Maps.newHashMap(); - for (Backend backend : Env.getCurrentSystemInfo().getAllClusterBackendsNoException().values()) { + for (Backend backend : Env.getCurrentSystemInfo().getAllBackends()) { backendAlivePathHashs.put(backend.getId(), backend.getDisks().values().stream() .filter(DiskInfo::isAlive).map(DiskInfo::getPathHash).collect(Collectors.toSet())); } From c0d5c2bc8b1f2f761430540057e686af19325cd7 Mon Sep 17 00:00:00 2001 From: yujun777 Date: Thu, 22 Aug 2024 11:33:12 +0800 Subject: [PATCH 4/4] update --- be/src/service/doris_main.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index 05b4f00b8d276f..b85790f9ec1bb4 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -595,7 +595,7 @@ int main(int argc, char** argv) { exit(1); } - exec_env->storage_engine().notify_listeners(); + exec_env->get_storage_engine()->notify_listeners(); while (!doris::k_doris_exit) { #if defined(LEAK_SANITIZER)