From 444bbfaf2ec7b0130a12a1061a119648ec8827a7 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Tue, 17 Jun 2025 15:33:36 +0800 Subject: [PATCH] [feat](warmup) display tables in SHOW WARM UP JOB results (#51594) Enhance the `SHOW WARM UP JOB` command to display tables for "warm up by tables" jobs. Table names are shown in the format `db.table.partition`, separated by commas. For example: ``` mysql> show warm up job; +---------------+--------------+----------+-------+-------------------------+-------------+----------+-------------------------+--------+------------------+ | JobId | ComputeGroup | Status | Type | CreateTime | FinishBatch | AllBatch | FinishTime | ErrMsg | Tables | +---------------+--------------+----------+-------+-------------------------+-------------+----------+-------------------------+--------+------------------+ | 1749488013342 | cluster1 | FINISHED | TABLE | 2025-06-10 01:01:43.670 | 1 | 1 | 2025-06-10 01:01:46.344 | | test.y, test.t.t | | 1749488013329 | cluster1 | FINISHED | TABLE | 2025-06-10 00:59:41.135 | 1 | 1 | 2025-06-10 00:59:43.336 | | test.y, test.t | | 1749488013306 | cluster1 | FINISHED | TABLE | 2025-06-10 00:58:43.781 | 1 | 1 | 2025-06-10 00:58:46.332 | | test.y | +---------------+--------------+----------+-------+-------------------------+-------------+----------+-------------------------+--------+------------------+ ``` None --- .../java/org/apache/doris/common/Triple.java | 109 ++++++++++++++++++ .../doris/analysis/ShowCloudWarmUpStmt.java | 1 + .../doris/analysis/WarmUpClusterStmt.java | 6 +- .../doris/cloud/CacheHotspotManager.java | 10 +- .../apache/doris/cloud/CloudWarmUpJob.java | 22 ++++ .../cloud/cache/CacheHotspotManagerTest.java | 2 +- .../warm_up/table/test_warm_up_table.groovy | 6 + .../warm_up/table/test_warm_up_tables.groovy | 8 ++ 8 files changed, 157 insertions(+), 7 deletions(-) create mode 100644 fe/fe-common/src/main/java/org/apache/doris/common/Triple.java diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Triple.java b/fe/fe-common/src/main/java/org/apache/doris/common/Triple.java new file mode 100644 index 00000000000000..6e5291354f064d --- /dev/null +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Triple.java @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/Pair.java +// and modified by Doris + +package org.apache.doris.common; + +import com.google.gson.annotations.SerializedName; + +import java.util.Comparator; +import java.util.Objects; + +/** + * The equivalent of a {@link Pair} but with three elements: left, middle, and right. + *

+ * Notice: When using Triple for persistence, users need to guarantee that L, M, and R can be serialized through Gson + */ +public class Triple { + public static TripleComparator> TRIPLE_VALUE_COMPARATOR = new TripleComparator<>(); + + @SerializedName(value = "left") + public L left; + @SerializedName(value = "middle") + public M middle; + @SerializedName(value = "right") + public R right; + + private Triple(L left, M middle, R right) { + this.left = left; + this.middle = middle; + this.right = right; + } + + public static Triple ofSame(K same) { + return new Triple<>(same, same, same); + } + + public static Triple of(L left, M middle, R right) { + return new Triple<>(left, middle, right); + } + + public L getLeft() { + return left; + } + + public M getMiddle() { + return middle; + } + + public R getRight() { + return right; + } + + /** + * A triple is equal if all three parts are equal(). + */ + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o instanceof Triple) { + Triple other = (Triple) o; + + boolean leftEqual = Objects.isNull(left) ? other.left == null : left.equals(other.left); + boolean middleEqual = Objects.isNull(middle) ? other.middle == null : middle.equals(other.middle); + boolean rightEqual = Objects.isNull(right) ? other.right == null : right.equals(other.right); + + return leftEqual && middleEqual && rightEqual; + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(left, middle, right); + } + + @Override + public String toString() { + String leftStr = Objects.nonNull(left) ? left.toString() : ""; + String middleStr = Objects.nonNull(middle) ? middle.toString() : ""; + String rightStr = Objects.nonNull(right) ? right.toString() : ""; + return leftStr + ":" + middleStr + ":" + rightStr; + } + + public static class TripleComparator> implements Comparator { + @Override + public int compare(T o1, T o2) { + return o1.right.compareTo(o2.right); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCloudWarmUpStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCloudWarmUpStmt.java index f823aeb9c15636..9ec063d3f76164 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCloudWarmUpStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCloudWarmUpStmt.java @@ -44,6 +44,7 @@ public class ShowCloudWarmUpStmt extends ShowStmt implements NotFallbackInParser .add("AllBatch") .add("FinishTime") .add("ErrMsg") + .add("Tables") .build(); public ShowCloudWarmUpStmt(Expr whereClause) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/WarmUpClusterStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/WarmUpClusterStmt.java index 9f386a686a298a..cca21d5c259c06 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/WarmUpClusterStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/WarmUpClusterStmt.java @@ -26,11 +26,10 @@ import org.apache.doris.common.Config; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.Triple; import org.apache.doris.common.UserException; import com.google.common.base.Strings; -import org.apache.commons.lang3.tuple.ImmutableTriple; -import org.apache.commons.lang3.tuple.Triple; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -108,8 +107,7 @@ public void analyze(Analyzer analyzer) throws AnalysisException, UserException { if (partitionName.length() != 0 && !table.containsPartition(partitionName)) { throw new AnalysisException("The partition " + partitionName + " doesn't exist"); } - Triple part = - new ImmutableTriple<>(dbName, tableName.getTbl(), partitionName); + Triple part = Triple.of(dbName, tableName.getTbl(), partitionName); tables.add(part); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java index 1e86fda15db9af..b73e467836d91c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java @@ -38,6 +38,7 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; import org.apache.doris.common.ThreadPoolManager; +import org.apache.doris.common.Triple; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.rpc.RpcException; @@ -52,7 +53,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.apache.commons.lang3.tuple.Triple; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; @@ -633,7 +633,13 @@ public long createJob(WarmUpClusterStmt stmt) throws AnalysisException { Map>> beToTabletIdBatches = splitBatch(beToWarmUpTablets); CloudWarmUpJob.JobType jobType = stmt.isWarmUpWithTable() ? JobType.TABLE : JobType.CLUSTER; - CloudWarmUpJob warmUpJob = new CloudWarmUpJob(jobId, stmt.getDstClusterName(), beToTabletIdBatches, jobType); + CloudWarmUpJob warmUpJob; + if (jobType == JobType.TABLE) { + warmUpJob = new CloudWarmUpJob(jobId, stmt.getDstClusterName(), beToTabletIdBatches, jobType, + stmt.getTables(), stmt.isForce()); + } else { + warmUpJob = new CloudWarmUpJob(jobId, stmt.getDstClusterName(), beToTabletIdBatches, jobType); + } addCloudWarmUpJob(warmUpJob); Env.getCurrentEnv().getEditLog().logModifyCloudWarmUpJob(warmUpJob); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java index 79c00e322c2509..463a37c4635dc5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java @@ -24,6 +24,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.Triple; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.TimeUtils; @@ -42,6 +43,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.gson.annotations.SerializedName; +import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -52,6 +54,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; public class CloudWarmUpJob implements Writable { private static final Logger LOG = LogManager.getLogger(CloudWarmUpJob.class); @@ -99,6 +102,12 @@ public enum JobType { @SerializedName(value = "JobType") protected JobType jobType; + @SerializedName(value = "tables") + protected List> tables = new ArrayList<>(); + + @SerializedName(value = "force") + protected boolean force = false; + private Map beToClient; private Map beToAddr; @@ -128,6 +137,14 @@ public CloudWarmUpJob(long jobId, String cloudClusterName, } } + public CloudWarmUpJob(long jobId, String cloudClusterName, + Map>> beToTabletIdBatches, JobType jobType, + List> tables, boolean force) { + this(jobId, cloudClusterName, beToTabletIdBatches, jobType); + this.tables = tables; + this.force = force; + } + public long getJobId() { return jobId; } @@ -182,6 +199,11 @@ public List getJobInfo() { info.add(Long.toString(maxBatchSize)); info.add(TimeUtils.longToTimeStringWithms(finishedTimeMs)); info.add(errMsg); + info.add(tables.stream() + .map(t -> StringUtils.isEmpty(t.getRight()) + ? t.getLeft() + "." + t.getMiddle() + : t.getLeft() + "." + t.getMiddle() + "." + t.getRight()) + .collect(Collectors.joining(", "))); return info; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/cloud/cache/CacheHotspotManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/cloud/cache/CacheHotspotManagerTest.java index ff42ea31bcb83f..04155b250994a6 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/cloud/cache/CacheHotspotManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/cloud/cache/CacheHotspotManagerTest.java @@ -23,11 +23,11 @@ import org.apache.doris.catalog.Tablet; import org.apache.doris.cloud.CacheHotspotManager; import org.apache.doris.cloud.system.CloudSystemInfoService; +import org.apache.doris.common.Triple; import org.apache.doris.system.Backend; import mockit.Mock; import mockit.MockUp; -import org.apache.commons.lang3.tuple.Triple; import org.junit.Assert; import org.junit.Test; diff --git a/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/table/test_warm_up_table.groovy b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/table/test_warm_up_table.groovy index 3f9dc93d550071..b7eb8761951049 100644 --- a/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/table/test_warm_up_table.groovy +++ b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/table/test_warm_up_table.groovy @@ -23,6 +23,10 @@ suite("test_warm_up_table") { def jobStateResult = sql """ SHOW WARM UP JOB WHERE ID = ${jobId} """ return jobStateResult[0][2] } + def getTablesFromShowCommand = { jobId -> + def jobStateResult = sql """ SHOW WARM UP JOB WHERE ID = ${jobId} """ + return jobStateResult[0][9] + } List ipList = new ArrayList<>(); List hbPortList = new ArrayList<>() @@ -154,6 +158,8 @@ suite("test_warm_up_table") { sql "cancel warm up job where id = ${jobId[0][0]}" assertTrue(false); } + def tablesString = getTablesFromShowCommand(jobId[0][0]) + assertTrue(tablesString.contains("customer"), tablesString) sleep(30000) long ttl_cache_size = 0 getMetricsMethod.call(ipList[0], brpcPortList[0]) { diff --git a/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/table/test_warm_up_tables.groovy b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/table/test_warm_up_tables.groovy index bf39e922802576..77286717117578 100644 --- a/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/table/test_warm_up_tables.groovy +++ b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/table/test_warm_up_tables.groovy @@ -23,6 +23,10 @@ suite("test_warm_up_tables") { def jobStateResult = sql """ SHOW WARM UP JOB WHERE ID = ${jobId} """ return jobStateResult[0][2] } + def getTablesFromShowCommand = { jobId -> + def jobStateResult = sql """ SHOW WARM UP JOB WHERE ID = ${jobId} """ + return jobStateResult[0][9] + } List ipList = new ArrayList<>(); List hbPortList = new ArrayList<>() @@ -164,6 +168,10 @@ suite("test_warm_up_tables") { jobId_ = sql "warm up cluster regression_cluster_name1 with table customer partition p3 and table supplier;" waitJobDone(jobId_); + def tablesString = getTablesFromShowCommand(jobId_[0][0]) + assertTrue(tablesString.contains("customer.p3"), tablesString) + assertTrue(tablesString.contains("supplier"), tablesString) + sleep(30000) long ttl_cache_size = 0 getMetricsMethod.call(ipList[0], brpcPortList[0]) {