From a54b64b8de8cd71a6fe49359673e57de97fd6870 Mon Sep 17 00:00:00 2001 From: morningman Date: Tue, 14 Sep 2021 13:59:27 +0800 Subject: [PATCH 1/2] 1 2 3 4 add tablet order fix slot bug 1. show proc location default 2. no proper tag 3. random distribution 4. send token shutdown remove log fix codestyle --- be/src/exec/tablet_sink.cpp | 1 + be/src/olap/schema_change.cpp | 8 ++- be/src/olap/tablet.cpp | 2 +- .../apache/doris/analysis/ShowLoadStmt.java | 2 +- .../apache/doris/analysis/ShowTabletStmt.java | 1 + .../org/apache/doris/catalog/Replica.java | 6 +++ .../java/org/apache/doris/catalog/Tablet.java | 10 ++-- .../apache/doris/clone/TabletSchedCtx.java | 52 +++++++++---------- .../apache/doris/clone/TabletScheduler.java | 10 ++-- .../common/proc/ClusterLoadStatByTag.java | 30 ++++++----- .../doris/common/proc/StatisticProcDir.java | 31 +++++++---- .../org/apache/doris/load/LoadChecker.java | 3 +- .../doris/planner/DistributedPlanner.java | 4 +- .../apache/doris/planner/OlapScanNode.java | 8 ++- .../java/org/apache/doris/qe/Coordinator.java | 2 +- .../org/apache/doris/qe/ShowExecutor.java | 17 +++--- .../org/apache/doris/qe/StmtExecutor.java | 2 +- .../doris/analysis/ShowLoadStmtTest.java | 10 +++- .../doris/clone/TabletSchedCtxTest.java | 37 +++++++++++++ 19 files changed, 156 insertions(+), 80 deletions(-) diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index f780971f8ccd4a..9c1e31c7b6f67a 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -845,6 +845,7 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { _stop_background_threads_latch.count_down(); if (_sender_thread) { _sender_thread->join(); + _send_batch_thread_pool_token->shutdown(); } Expr::close(_output_expr_ctxs, state); diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 04e5070f11f710..49a7f961f49193 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -1573,7 +1573,11 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe } for (auto& rs_reader : rs_readers) { - rs_reader->init(&reader_context); + res = rs_reader->init(&reader_context); + if (res != OLAP_SUCCESS) { + LOG(WARNING) << "failed to init rowset reader: " << base_tablet->full_name(); + break; + } } } while (0); @@ -1717,7 +1721,7 @@ OLAPStatus SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tabl RowsetReaderSharedPtr rowset_reader; RETURN_NOT_OK((*base_rowset)->create_reader(_mem_tracker, &rowset_reader)); - rowset_reader->init(&reader_context); + RETURN_NOT_OK(rowset_reader->init(&reader_context)); RowsetWriterContext writer_context; writer_context.rowset_id = StorageEngine::instance()->next_rowset_id(); diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 1746c5d8c6e489..c1471fa4797b39 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -1345,8 +1345,8 @@ int64_t Tablet::prepare_compaction_and_calculate_permits(CompactionType compacti OLAPStatus res = _base_compaction->prepare_compact(); if (res != OLAP_SUCCESS) { set_last_base_compaction_failure_time(UnixMillis()); - DorisMetrics::instance()->base_compaction_request_failed->increment(1); if (res != OLAP_ERR_BE_NO_SUITABLE_VERSION) { + DorisMetrics::instance()->base_compaction_request_failed->increment(1); LOG(WARNING) << "failed to pick rowsets for base compaction. res=" << res << ", tablet=" << full_name(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowLoadStmt.java index 610477e4dfe2cc..afee7145c159c6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowLoadStmt.java @@ -219,7 +219,7 @@ private void analyzeSubPredicate(Expr subExpr) throws AnalysisException { break CHECK; } - if (!isAccurateMatch && !value.contains("%")) { + if (hasLabel && !isAccurateMatch && !value.contains("%")) { value = "%" + value + "%"; } if (hasLabel) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTabletStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTabletStmt.java index ac8063a5a49945..d31db2655d580a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTabletStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTabletStmt.java @@ -284,6 +284,7 @@ public ShowResultSetMetaData getMetaData() { builder.addColumn(new Column("PartitionId", ScalarType.createVarchar(30))); builder.addColumn(new Column("IndexId", ScalarType.createVarchar(30))); builder.addColumn(new Column("IsSync", ScalarType.createVarchar(30))); + builder.addColumn(new Column("Order", ScalarType.createVarchar(30))); builder.addColumn(new Column("DetailCmd", ScalarType.createVarchar(30))); } else { for (String title : TabletsProcDir.TITLE_NAMES) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java index 42a77900c65857..70f4709f3cb4a6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java @@ -563,4 +563,10 @@ public void setWatermarkTxnId(long watermarkTxnId) { public long getWatermarkTxnId() { return watermarkTxnId; } + + public boolean isAlive() { + return getState() != ReplicaState.CLONE + && getState() != ReplicaState.DECOMMISSION + && !isBad(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java index f1d3b01416077e..cec8d8fd31affd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java @@ -27,9 +27,6 @@ import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import com.google.common.collect.HashMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -37,6 +34,9 @@ import com.google.common.collect.Sets; import com.google.gson.annotations.SerializedName; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -419,9 +419,7 @@ public Pair getHealthStatusWithPriority( Set hosts = Sets.newHashSet(); for (Replica replica : replicas) { Backend backend = systemInfoService.getBackend(replica.getBackendId()); - if (backend == null || !backend.isAlive() || replica.getState() == ReplicaState.CLONE - || replica.getState() == ReplicaState.DECOMMISSION - || replica.isBad() || !hosts.add(backend.getHost())) { + if (backend == null || !backend.isAlive() || !replica.isAlive() || !hosts.add(backend.getHost())) { // this replica is not alive, // or if this replica is on same host with another replica, we also treat it as 'dead', // so that Tablet Scheduler will create a new replica on different host. diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index 18e66b15f71e86..1977e5da182f2c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -54,6 +54,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Set; @@ -97,7 +99,9 @@ public class TabletSchedCtx implements Comparable { * from the tablet scheduler. */ private static final int RUNNING_FAILED_COUNTER_THRESHOLD = 3; - + + private static VersionCountComparator VERSION_COUNTER_COMPARATOR = new VersionCountComparator(); + public enum Type { BALANCE, REPAIR } @@ -504,50 +508,33 @@ public void chooseSrcReplica(Map backendsWorkingSlots) throws Sc if (replica.getLastFailedVersion() > 0) { continue; } - + if (!replica.checkVersionCatchUp(visibleVersion, visibleVersionHash, false)) { continue; } - + candidates.add(replica); } - + if (candidates.isEmpty()) { throw new SchedException(Status.UNRECOVERABLE, "unable to find source replica"); } // choose a replica which slot is available from candidates. - long minVersionCount = Long.MAX_VALUE; - boolean findSrcReplica = false; + // sort replica by version count asc, so that we prefer to choose replicas with fewer versions + Collections.sort(candidates, VERSION_COUNTER_COMPARATOR); for (Replica srcReplica : candidates) { PathSlot slot = backendsWorkingSlots.get(srcReplica.getBackendId()); if (slot == null) { continue; } - + long srcPathHash = slot.takeSlot(srcReplica.getPathHash()); if (srcPathHash != -1) { - if (!findSrcReplica) { - // version count is set by report process, so it may not be set yet and default value is -1. - // so we need to check it. - minVersionCount = srcReplica.getVersionCount() == -1 ? Long.MAX_VALUE : srcReplica.getVersionCount(); - setSrc(srcReplica); - findSrcReplica = true; - } else { - long curVerCount = srcReplica.getVersionCount() == -1 ? Long.MAX_VALUE : srcReplica.getVersionCount(); - if (curVerCount < minVersionCount) { - minVersionCount = curVerCount; - setSrc(srcReplica); - findSrcReplica = true; - } - } + setSrc(srcReplica); + return; } } - - if (findSrcReplica) { - return; - } - throw new SchedException(Status.SCHEDULE_FAILED, "unable to find source slot"); } @@ -623,7 +610,6 @@ public void chooseDestReplicaForVersionIncomplete(Map backendsWo if (destPathHash == -1) { throw new SchedException(Status.SCHEDULE_FAILED, "unable to take slot of dest path"); } - if (chosenReplica.getState() == ReplicaState.DECOMMISSION) { // Since this replica is selected as the repair object of VERSION_INCOMPLETE, // it means that this replica needs to be able to accept loading data. @@ -666,7 +652,7 @@ public void releaseResource(TabletScheduler tabletScheduler, boolean reserveTabl } } } - + if (destPathHash != -1) { PathSlot slot = tabletScheduler.getBackendsWorkingSlots().get(destBackendId); if (slot != null) { @@ -1102,4 +1088,14 @@ public String toString() { } return sb.toString(); } + + // Comparator to sort the replica with version count, asc + public static class VersionCountComparator implements Comparator { + @Override + public int compare(Replica r1, Replica r2) { + long verCount1 = r1.getVersionCount() == -1 ? Integer.MAX_VALUE : r1.getVersionCount(); + long verCount2 = r2.getVersionCount() == -1 ? Integer.MAX_VALUE : r2.getVersionCount(); + return (int) (verCount1 - verCount2); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index 907c917f83fc04..d63772c8d98b98 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -638,7 +638,7 @@ private Tag chooseProperTag(TabletSchedCtx tabletCtx, boolean forMissingReplica) Map currentAllocMap = Maps.newHashMap(); for (Replica replica : replicas) { Backend be = infoService.getBackend(replica.getBackendId()); - if (be != null) { + if (be != null && be.isAlive() && replica.isAlive()) { Short num = currentAllocMap.getOrDefault(be.getTag(), (short) 0); currentAllocMap.put(be.getTag(), (short) (num + 1)); } @@ -1196,11 +1196,11 @@ private RootPathLoadStatistic chooseAvailableDestPath(TabletSchedCtx tabletCtx, PathSlot slot = backendsWorkingSlots.get(rootPathLoadStatistic.getBeId()); if (slot == null) { - LOG.debug("backend {} does not found when getting slots", rootPathLoadStatistic.getBeId()); continue; } - if (slot.takeSlot(rootPathLoadStatistic.getPathHash()) != -1) { + long pathHash = slot.takeSlot(rootPathLoadStatistic.getPathHash()); + if (pathHash != -1) { return rootPathLoadStatistic; } } @@ -1209,11 +1209,11 @@ private RootPathLoadStatistic chooseAvailableDestPath(TabletSchedCtx tabletCtx, for (RootPathLoadStatistic rootPathLoadStatistic : allFitPaths) { PathSlot slot = backendsWorkingSlots.get(rootPathLoadStatistic.getBeId()); if (slot == null) { - LOG.debug("backend {} does not found when getting slots", rootPathLoadStatistic.getBeId()); continue; } - if (slot.takeSlot(rootPathLoadStatistic.getPathHash()) != -1) { + long pathHash = slot.takeSlot(rootPathLoadStatistic.getPathHash()); + if (pathHash != -1) { return rootPathLoadStatistic; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ClusterLoadStatByTag.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ClusterLoadStatByTag.java index 095143ccd96078..0f2fb8bf3e3f96 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ClusterLoadStatByTag.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ClusterLoadStatByTag.java @@ -34,25 +34,15 @@ // SHOW PROC "/cluster_balance/cluster_load_stat" public class ClusterLoadStatByTag implements ProcDirInterface { public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder().add( - "StorageMedium").build(); - - private Map tagMap = Maps.newHashMap(); + "Tag").build(); @Override public ProcResult fetchResult() throws AnalysisException { BaseProcResult result = new BaseProcResult(); result.setNames(TITLE_NAMES); - List beIds = Catalog.getCurrentSystemInfo().getBackendIds(false); - Set tags = Sets.newHashSet(); - for (long beId : beIds) { - Backend be = Catalog.getCurrentSystemInfo().getBackend(beId); - if (be != null) { - tags.add(be.getTag()); - } - } + Set tags = genTagMap(); for (Tag tag : tags) { result.addRow(Lists.newArrayList(tag.toKey())); - tagMap.put(tag.toKey(), tag); } return result; } @@ -64,6 +54,11 @@ public boolean register(String name, ProcNodeInterface node) { @Override public ProcNodeInterface lookup(String name) throws AnalysisException { + Set tags = genTagMap(); + Map tagMap = Maps.newHashMap(); + for (Tag tag : tags) { + tagMap.put(tag.toKey(), tag); + } Tag tag = tagMap.get(name); if (tag == null) { throw new AnalysisException("No such tag: " + name); @@ -71,4 +66,15 @@ public ProcNodeInterface lookup(String name) throws AnalysisException { return new ClusterLoadStatByTagAndMedium(tag); } + private Set genTagMap() { + Set tags = Sets.newHashSet(); + List beIds = Catalog.getCurrentSystemInfo().getBackendIds(false); + for (long beId : beIds) { + Backend be = Catalog.getCurrentSystemInfo().getBackend(beId); + if (be != null) { + tags.add(be.getTag()); + } + } + return tags; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/StatisticProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/StatisticProcDir.java index e8255f4f12815c..36707477f55861 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/StatisticProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/StatisticProcDir.java @@ -18,6 +18,7 @@ package org.apache.doris.common.proc; import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.ColocateTableIndex; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.MaterializedIndex.IndexExtState; @@ -133,10 +134,13 @@ static class DBStatistic { .stream().map(AgentTask::getTabletId).collect(Collectors.toSet()); SystemInfoService infoService = Catalog.getCurrentSystemInfo(); + ColocateTableIndex colocateTableIndex = Catalog.getCurrentColocateIndex(); List aliveBeIdsInCluster = infoService.getClusterBackendIds(db.getClusterName(), true); db.getTables().stream().filter(t -> t != null && t.getType() == TableType.OLAP).forEach(t -> { ++tableNum; OlapTable olapTable = (OlapTable) t; + ColocateTableIndex.GroupId groupId = colocateTableIndex.isColocateTable(olapTable.getId()) ? + colocateTableIndex.getGroup(olapTable.getId()) : null; olapTable.readLock(); try { for (Partition partition : olapTable.getAllPartitions()) { @@ -144,21 +148,30 @@ static class DBStatistic { ++partitionNum; for (MaterializedIndex materializedIndex : partition.getMaterializedIndices(IndexExtState.VISIBLE)) { ++indexNum; - for (Tablet tablet : materializedIndex.getTablets()) { + List tablets = materializedIndex.getTablets(); + for (int i = 0; i < tablets.size(); ++i) { + Tablet tablet = tablets.get(i); ++tabletNum; replicaNum += tablet.getReplicas().size(); - Pair res = tablet.getHealthStatusWithPriority( - infoService, db.getClusterName(), - partition.getVisibleVersion(), partition.getVisibleVersionHash(), - replicaAlloc, aliveBeIdsInCluster); + TabletStatus res = null; + if (groupId != null) { + Set backendsSet = colocateTableIndex.getTabletBackendsByGroup(groupId, i); + res = tablet.getColocateHealthStatus(partition.getVisibleVersion(), replicaAlloc, backendsSet); + } else { + Pair pair = tablet.getHealthStatusWithPriority( + infoService, db.getClusterName(), + partition.getVisibleVersion(), partition.getVisibleVersionHash(), + replicaAlloc, aliveBeIdsInCluster); + res = pair.first; + } // here we treat REDUNDANT as HEALTHY, for user friendly. - if (res.first != TabletStatus.HEALTHY && res.first != TabletStatus.REDUNDANT - && res.first != TabletStatus.COLOCATE_REDUNDANT && res.first != TabletStatus.NEED_FURTHER_REPAIR - && res.first != TabletStatus.UNRECOVERABLE) { + if (res != TabletStatus.HEALTHY && res != TabletStatus.REDUNDANT + && res != TabletStatus.COLOCATE_REDUNDANT && res != TabletStatus.NEED_FURTHER_REPAIR + && res != TabletStatus.UNRECOVERABLE) { unhealthyTabletIds.add(tablet.getId()); - } else if (res.first == TabletStatus.UNRECOVERABLE) { + } else if (res == TabletStatus.UNRECOVERABLE) { unrecoverableTabletIds.add(tablet.getId()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java b/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java index 32f36f663e18f8..3793acc3e4c6f4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java @@ -170,7 +170,8 @@ private void runPendingJobs() { task = new HadoopLoadPendingTask(job); break; default: - LOG.warn("unknown etl job type. type: {}", etlJobType.name()); + LOG.warn("unknown etl job type. type: {}, job id: {}, label: {}, db: {}", + etlJobType.name(), job.getId(), job.getLabel(), job.getDbId()); break; } if (task != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index 8c3b6091dd0d20..7a166ec452a3bb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -182,7 +182,7 @@ PlanFragment createInsertFragment( */ private PlanFragment createPlanFragments( PlanNode root, boolean isPartitioned, - long perNodeMemLimit, ArrayList fragments) throws UserException, AnalysisException { + long perNodeMemLimit, ArrayList fragments) throws UserException { ArrayList childFragments = Lists.newArrayList(); for (PlanNode child : root.getChildren()) { // allow child fragments to be partitioned, unless they contain a limit clause @@ -272,7 +272,7 @@ private PlanFragment createMergeFragment(PlanFragment inputFragment) * fragment * TODO: hbase scans are range-partitioned on the row key */ - private PlanFragment createScanFragment(PlanNode node) { + private PlanFragment createScanFragment(PlanNode node) throws UserException { if (node instanceof MysqlScanNode || node instanceof OdbcScanNode) { return new PlanFragment(ctx_.getNextFragmentId(), node, DataPartition.UNPARTITIONED); } else if (node instanceof SchemaScanNode) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 342348748d1ae6..246788cb087c33 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -903,14 +903,18 @@ private void filterDeletedRows(Analyzer analyzer) throws AnalysisException { The reason is that @coordicator will not set the scan range for the fragment, when data partition of fragment is UNPARTITION. */ - public DataPartition constructInputPartitionByDistributionInfo() { + public DataPartition constructInputPartitionByDistributionInfo() throws UserException { ColocateTableIndex colocateTableIndex = Catalog.getCurrentColocateIndex(); if ((colocateTableIndex.isColocateTable(olapTable.getId()) && !colocateTableIndex.isGroupUnstable(colocateTableIndex.getGroup(olapTable.getId()))) || olapTable.getPartitionInfo().getType() == PartitionType.UNPARTITIONED || olapTable.getPartitions().size() == 1) { DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo(); - Preconditions.checkState(distributionInfo instanceof HashDistributionInfo); + if (!(distributionInfo instanceof HashDistributionInfo)) { + // There may be some random distribution table left, throw exception here. + // And these table should be modified to hash distribution by ALTER TABLE operation. + throw new UserException("Table with non hash distribution is not supported: " + olapTable.getName()); + } List distributeColumns = ((HashDistributionInfo) distributionInfo).getDistributionColumns(); List dataDistributeExprs = Lists.newArrayList(); for (Column column : distributeColumns) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 2ded32b34af976..e4196dcbb99674 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -596,7 +596,7 @@ private void sendFragment() throws TException, RpcException, UserException { cancelInternal(InternalService.PPlanFragmentCancelReason.INTERNAL_ERROR); switch (code) { case TIMEOUT: - throw new UserException("send fragment timeout. backend id: " + pair.first.backend.getId()); + throw new RpcException(pair.first.backend.getHost(), "send fragment timeout. backend id: " + pair.first.backend.getId()); case THRIFT_RPC_ERROR: SimpleScheduler.addToBlacklist(pair.first.backend.getId(), errMsg); throw new RpcException(pair.first.backend.getHost(), "rpc failed"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index 8778104433c616..e7f700353a7226 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -155,16 +155,16 @@ import org.apache.doris.thrift.TUnit; import org.apache.doris.transaction.GlobalTransactionMgr; -import org.apache.commons.lang3.tuple.Triple; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.commons.lang3.tuple.Triple; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; @@ -1387,6 +1387,7 @@ private void handleShowTablet() throws AnalysisException { String indexName = FeConstants.null_string; Boolean isSync = true; + int tabletIdx = -1; // check real meta do { Database db = catalog.getDbNullable(dbId); @@ -1425,6 +1426,8 @@ private void handleShowTablet() throws AnalysisException { break; } + tabletIdx = index.getTabletOrderIdx(tablet.getId()); + List replicas = tablet.getReplicas(); for (Replica replica : replicas) { Replica tmp = invertedIndex.getReplica(tabletId, replica.getBackendId()); @@ -1447,9 +1450,9 @@ private void handleShowTablet() throws AnalysisException { String detailCmd = String.format("SHOW PROC '/dbs/%d/%d/partitions/%d/%d/%d';", dbId, tableId, partitionId, indexId, tabletId); rows.add(Lists.newArrayList(dbName, tableName, partitionName, indexName, - dbId.toString(), tableId.toString(), - partitionId.toString(), indexId.toString(), - isSync.toString(), detailCmd)); + dbId.toString(), tableId.toString(), + partitionId.toString(), indexId.toString(), + isSync.toString(), String.valueOf(tabletIdx), detailCmd)); } else { Database db = catalog.getDbOrAnalysisException(showStmt.getDbName()); OlapTable olapTable = db.getOlapTableOrAnalysisException(showStmt.getTableName()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index c8852734ccd080..d581e704128b98 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -402,7 +402,7 @@ public void execute(TUniqueId queryId) throws Exception { throw e; } catch (UserException e) { // analysis exception only print message, not print the stack - LOG.warn("execute Exception. {}", e); + LOG.warn("execute Exception. {}", e.getMessage()); context.getState().setError(e.getMessage()); context.getState().setErrType(QueryState.ErrType.ANALYSIS_ERR); } catch (Exception e) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/ShowLoadStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/ShowLoadStmtTest.java index b9027f360be5ef..66d807ade97232 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/ShowLoadStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/ShowLoadStmtTest.java @@ -17,7 +17,6 @@ package org.apache.doris.analysis; -import mockit.Expectations; import org.apache.doris.analysis.BinaryPredicate.Operator; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.FakeCatalog; @@ -29,6 +28,8 @@ import org.junit.Before; import org.junit.Test; +import mockit.Expectations; + public class ShowLoadStmtTest { private Analyzer analyzer; private Catalog catalog; @@ -110,10 +111,15 @@ public void testWhere() throws UserException, AnalysisException { StringLiteral stringLiteralLike = new StringLiteral("ab%"); LikePredicate likePredicate = new LikePredicate(org.apache.doris.analysis.LikePredicate.Operator.LIKE, - slotRef, stringLiteralLike); + slotRef, stringLiteralLike); stmt = new ShowLoadStmt(null, likePredicate, null, new LimitElement(10)); stmt.analyze(analyzer); Assert.assertEquals("SHOW LOAD FROM `testCluster:testDb` WHERE `label` LIKE \'ab%\' LIMIT 10", stmt.toString()); + + BinaryPredicate statePredicate = new BinaryPredicate(Operator.EQ, new SlotRef(null, "state"), new StringLiteral("PENDING")); + stmt = new ShowLoadStmt(null, statePredicate, null, new LimitElement(10)); + stmt.analyze(analyzer); + Assert.assertEquals("SHOW LOAD FROM `testCluster:testDb` WHERE `state` = \'PENDING\' LIMIT 10", stmt.toString()); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java index b9d9ed24048daf..887a67dd5bd45e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java @@ -17,13 +17,18 @@ package org.apache.doris.clone; +import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.ReplicaAllocation; import org.apache.doris.clone.TabletSchedCtx.Priority; import org.apache.doris.clone.TabletSchedCtx.Type; +import com.clearspring.analytics.util.Lists; + import org.junit.Assert; import org.junit.Test; +import java.util.Collections; +import java.util.List; import java.util.PriorityQueue; public class TabletSchedCtxTest { @@ -75,4 +80,36 @@ public void testPriorityCompare() { Assert.assertEquals(ctx2.getTabletId(), expectedCtx.getTabletId()); } + @Test + public void testVersionCountComparator() { + TabletSchedCtx.VersionCountComparator countComparator = new TabletSchedCtx.VersionCountComparator(); + List replicaList = Lists.newArrayList(); + Replica replica1 = new Replica(); + replica1.setVersionCount(100); + replica1.setState(Replica.ReplicaState.NORMAL); + + Replica replica2 = new Replica(); + replica2.setVersionCount(50); + replica2.setState(Replica.ReplicaState.NORMAL); + + Replica replica3 = new Replica(); + replica3.setVersionCount(-1); + replica3.setState(Replica.ReplicaState.NORMAL); + + Replica replica4 = new Replica(); + replica4.setVersionCount(200); + replica4.setState(Replica.ReplicaState.NORMAL); + + replicaList.add(replica1); + replicaList.add(replica2); + replicaList.add(replica3); + replicaList.add(replica4); + + Collections.sort(replicaList, countComparator); + Assert.assertEquals(50, replicaList.get(0).getVersionCount()); + Assert.assertEquals(100, replicaList.get(1).getVersionCount()); + Assert.assertEquals(200, replicaList.get(2).getVersionCount()); + Assert.assertEquals(-1, replicaList.get(3).getVersionCount()); + } + } From c9ebd624eb44ab40bc6cc6cb136f72f107fd4f8c Mon Sep 17 00:00:00 2001 From: morningman Date: Fri, 17 Sep 2021 00:30:37 +0800 Subject: [PATCH 2/2] remove ut --- fe/fe-core/AlterRoutineLoadOperationLogTest | Bin 478 -> 0 bytes fe/fe-core/diskInfoTest | Bin 158 -> 0 bytes .../org/apache/doris/clone/TabletSchedCtx.java | 12 +++++++++--- .../apache/doris/clone/TabletSchedCtxTest.java | 2 +- 4 files changed, 10 insertions(+), 4 deletions(-) delete mode 100644 fe/fe-core/AlterRoutineLoadOperationLogTest delete mode 100644 fe/fe-core/diskInfoTest diff --git a/fe/fe-core/AlterRoutineLoadOperationLogTest b/fe/fe-core/AlterRoutineLoadOperationLogTest deleted file mode 100644 index ae3953e57df97846669fee9c3d828d9fdc1c9340..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 478 zcmZuuK~BRk5G>ceBJvqQio}ImI8;JGqEdT9sB)8ayM%b7wR0$_ALTWyw@!rwNAJ$e z&WvZXtM9LQQJ8nyJ+8ZfF_Wbi{1>u{yk&dM7$Wtxn*3Xg7}zt7!wn`2ra$T{ z^^qG90EQfDNxZ>cYjyS`!0!ScnL6NS)O=>@M$c|&z$U3r(Oag+KeZ))!T(%5<`nqQ&5$dSeTldLF9o(IhG}6<|HQNq@pUav@|s|G&3?chbk>D TNi0bP+3)Y? { @Override public int compare(Replica r1, Replica r2) { - long verCount1 = r1.getVersionCount() == -1 ? Integer.MAX_VALUE : r1.getVersionCount(); - long verCount2 = r2.getVersionCount() == -1 ? Integer.MAX_VALUE : r2.getVersionCount(); - return (int) (verCount1 - verCount2); + long verCount1 = r1.getVersionCount() == -1 ? Long.MAX_VALUE : r1.getVersionCount(); + long verCount2 = r2.getVersionCount() == -1 ? Long.MAX_VALUE : r2.getVersionCount(); + if (verCount1 < verCount2) { + return -1; + } else if (verCount1 > verCount2) { + return 1; + } else { + return 0; + } } } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java index 887a67dd5bd45e..e9d31197732901 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java @@ -22,7 +22,7 @@ import org.apache.doris.clone.TabletSchedCtx.Priority; import org.apache.doris.clone.TabletSchedCtx.Type; -import com.clearspring.analytics.util.Lists; +import com.google.common.collect.Lists; import org.junit.Assert; import org.junit.Test;