From cc0a6bd5120976398c8d50677854da2a0f8191ff Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Thu, 14 Mar 2019 12:35:29 +0800 Subject: [PATCH 1/2] Fix bug of invalid replica last failed version (#746) 1. Some previous doris version may cause some invalid replica last failed version. 2. Also modify the CREATE TABLE help doc, remove row storage type and random distribution. --- .../help/Contents/Data Definition/ddl_stmt.md | 30 ++++++++----------- .../Contents/Data Manipulation/streaming.md | 2 +- .../org/apache/doris/catalog/Replica.java | 13 ++++++++ .../apache/doris/clone/TabletSchedCtx.java | 16 ++++++++-- 4 files changed, 39 insertions(+), 22 deletions(-) diff --git a/docs/help/Contents/Data Definition/ddl_stmt.md b/docs/help/Contents/Data Definition/ddl_stmt.md index db309639b0aabd..64dc2f6d7743a5 100644 --- a/docs/help/Contents/Data Definition/ddl_stmt.md +++ b/docs/help/Contents/Data Definition/ddl_stmt.md @@ -156,13 +156,7 @@ 2) 有数据回溯需求的,可以考虑首个分区为空分区,以便后续增加分区 5. distribution_desc - 1) Random 分桶 - 语法: - DISTRIBUTED BY RANDOM [BUCKETS num] - 说明: - 使用所有 key 列进行哈希分桶。默认分区数为10 - - 2) Hash 分桶 + 1) Hash 分桶 语法: DISTRIBUTED BY HASH (k1[,k2 ...]) [BUCKETS num] 说明: @@ -171,10 +165,10 @@ 建议:建议使用Hash分桶方式 6. PROPERTIES - 1) 如果 ENGINE 类型为 olap,则可以在 properties 中指定行存或列存 + 1) 如果 ENGINE 类型为 olap,则可以在 properties 中指定列存(目前我们仅支持列存) PROPERTIES ( - "storage_type" = "[row|column]", + "storage_type" = "[column]", ) 2) 如果 ENGINE 类型为 olap @@ -210,8 +204,8 @@ ) ## example - 1. 创建一个 olap 表,使用 Random 分桶,使用列存,相同key的记录进行聚合 - CREATE TABLE example_db.table_random + 1. 创建一个 olap 表,使用 HASH 分桶,使用列存,相同key的记录进行聚合 + CREATE TABLE example_db.table_hash ( k1 TINYINT, k2 DECIMAL(10, 2) DEFAULT "10.5", @@ -220,10 +214,10 @@ ) ENGINE=olap AGGREGATE KEY(k1, k2) - DISTRIBUTED BY RANDOM BUCKETS 32 + DISTRIBUTED BY HASH(k1) BUCKETS 32 PROPERTIES ("storage_type"="column"); - 2. 创建一个 olap 表,使用 Hash 分桶,使用行存,相同key的记录进行覆盖, + 2. 创建一个 olap 表,使用 Hash 分桶,使用列存,相同key的记录进行覆盖, 设置初始存储介质和冷却时间 CREATE TABLE example_db.table_hash ( @@ -236,7 +230,7 @@ UNIQUE KEY(k1, k2) DISTRIBUTED BY HASH (k1, k2) BUCKETS 32 PROPERTIES( - "storage_type"="row", + "storage_type"="column", "storage_medium" = "SSD", "storage_cooldown_time" = "2015-06-04 00:00:00" ); @@ -322,7 +316,7 @@ ) ENGINE=olap AGGREGATE KEY(k1, k2) - DISTRIBUTED BY RANDOM BUCKETS 32 + DISTRIBUTED BY HASH(k1) BUCKETS 32 PROPERTIES ("storage_type"="column"); 7. 创建两张支持Colocat Join的表t1 和t2 @@ -387,7 +381,7 @@ 1. 增加分区 语法: ADD PARTITION [IF NOT EXISTS] partition_name VALUES LESS THAN [MAXVALUE|("value1")] ["key"="value"] - [DISTRIBUTED BY RANDOM [BUCKETS num] | DISTRIBUTED BY HASH (k1[,k2 ...]) [BUCKETS num]] + [DISTRIBUTED BY HASH (k1[,k2 ...]) [BUCKETS num]] 注意: 1) 分区为左闭右开区间,用户指定右边界,系统自动确定左边界 2) 如果没有指定分桶方式,则自动使用建表使用的分桶方式 @@ -512,10 +506,10 @@ ALTER TABLE example_db.my_table ADD PARTITION p1 VALUES LESS THAN ("2014-01-01"); - 2. 增加分区,使用新的分桶方式 + 2. 增加分区,使用新的分桶数 ALTER TABLE example_db.my_table ADD PARTITION p1 VALUES LESS THAN ("2015-01-01") - DISTRIBUTED BY RANDOM BUCKETS 20; + DISTRIBUTED BY HASH(k1) BUCKETS 20; 3. 删除分区 ALTER TABLE example_db.my_table diff --git a/docs/help/Contents/Data Manipulation/streaming.md b/docs/help/Contents/Data Manipulation/streaming.md index afb64f9ddd0a6d..ed4eeda72f23ba 100644 --- a/docs/help/Contents/Data Manipulation/streaming.md +++ b/docs/help/Contents/Data Manipulation/streaming.md @@ -30,7 +30,7 @@ 例2: 表中有3个列“c1, c2, c3", 源文件中前三列依次对应,但是有多余1列;那么需要指定-H "columns: c1, c2, c3, xxx"; 最后一个列随意指定个名称占位即可 例3: 表中有3个列“year, month, day"三个列,源文件中只有一个时间列,为”2018-06-01 01:02:03“格式; - 那么可以指定-H "columns: col, year = year(col), month=mont(col), day=day(col)"完成导入 + 那么可以指定-H "columns: col, year = year(col), month=month(col), day=day(col)"完成导入 where: 用于抽取部分数据。用户如果有需要将不需要的数据过滤掉,那么可以通过设定这个选项来达到。 例1: 只导入大于k1列等于20180601的数据,那么可以在导入时候指定-H "where: k1 = 20180601" diff --git a/fe/src/main/java/org/apache/doris/catalog/Replica.java b/fe/src/main/java/org/apache/doris/catalog/Replica.java index 89584bc6258676..8d1dcbc200a9fc 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Replica.java +++ b/fe/src/main/java/org/apache/doris/catalog/Replica.java @@ -247,6 +247,11 @@ public void updateVersionInfoForRecovery( * V is larger or equal to LFV, reset LFV. And if V is less than LSV, just set V to LSV. This may * happen when a clone task finished and report version V, but the LSV is already larger than V, * And we know that version between V and LSV is valid, so move V forward to LSV. + * + * Case 5: + * This is a bug case, I don't know why, may be some previous version introduce it. It looks like + * the V(hash) equals to LSV(hash), and V equals to LFV, but LFV hash is 0 or some unknown number. + * We just reset the LFV(hash) to recovery this replica. */ private void updateReplicaInfo(long newVersion, long newVersionHash, long lastFailedVersion, long lastFailedVersionHash, @@ -321,6 +326,14 @@ private void updateReplicaInfo(long newVersion, long newVersionHash, } } + // case 5: + if (this.version == this.lastSuccessVersion && this.versionHash == this.lastSuccessVersionHash + && this.version == this.lastFailedVersion && this.versionHash != this.lastFailedVersionHash) { + this.lastFailedVersion = -1; + this.lastFailedVersionHash = 0; + this.lastFailedTimestamp = -1; + } + LOG.debug("after update {}", this.toString()); } diff --git a/fe/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index ab635844593ed8..62a9cde8f657c8 100644 --- a/fe/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -799,14 +799,24 @@ public void finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request) // job being submitted, this delta version become a residual version. // we just let this pass LOG.warn("replica's last failed version equals to report version: " - + replica.getLastFailedTimestamp() + " but hash is different: " + + replica.getLastFailedVersion() + " but hash is different: " + replica.getLastFailedVersionHash() + " vs. " - + reportedTablet.getVersion_hash() + ", but we let it pass."); + + reportedTablet.getVersion_hash() + ", but we let it pass." + + " tablet: {}, backend: {}", tabletId, replica.getBackendId()); + } else if (replica.getVersion() == replica.getLastSuccessVersion() + && replica.getVersionHash() == replica.getLastSuccessVersionHash() + && replica.getVersion() == replica.getLastFailedVersion()) { + // see replica.updateVersionInfo()'s case 5 + LOG.warn("replica's version(hash) and last success version(hash) are equal to " + + "last failed version: {}, but last failed version hash is invalid: {}." + + " we let it pass. tablet: {}, backend: {}", + replica.getVersion(), replica.getLastFailedVersionHash(), tabletId, replica.getBackendId()); + } else { // do not throw exception, cause we want this clone task retry again. throw new SchedException(Status.RUNNING_FAILED, "replica's last failed version equals to report version: " - + replica.getLastFailedTimestamp() + " but hash is different: " + + replica.getLastFailedVersion() + " but hash is different: " + replica.getLastFailedVersionHash() + " vs. " + reportedTablet.getVersion_hash()); } From 3a06b746a10eaf44bb646310b8a951dd12fe2ca1 Mon Sep 17 00:00:00 2001 From: morningman Date: Thu, 14 Mar 2019 20:24:12 +0800 Subject: [PATCH 2/2] Fix bug that balance slot may not be released when balance task is done --- .../java/org/apache/doris/clone/TabletSchedCtx.java | 3 +-- .../java/org/apache/doris/clone/TabletScheduler.java | 10 ++++++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index 62a9cde8f657c8..f5caa75716efd0 100644 --- a/fe/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -856,7 +856,6 @@ public void finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request) throw new SchedException(Status.UNRECOVERABLE, e.getMessage()); } throw e; - } finally { db.writeUnlock(); } @@ -1001,7 +1000,7 @@ public int compareTo(TabletSchedCtx o) { public String toString() { StringBuilder sb = new StringBuilder(); sb.append("tablet id: ").append(tabletId).append(", status: ").append(tabletStatus.name()); - sb.append(", state: ").append(state.name()); + sb.append(", state: ").append(state.name()).append(", type: ").append(type.name()); if (srcReplica != null) { sb.append(". from backend: ").append(srcReplica.getBackendId()); sb.append(", src path hash: ").append(srcPathHash); diff --git a/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java index c274a016c60aff..a32211894e084b 100644 --- a/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -366,6 +366,12 @@ private void schedulePendingTablets() { if (tabletCtx.getFailedSchedCounter() > 10) { finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, "schedule failed too many times and " + e.getMessage()); + } else { + // we must release resource it current hold, and be scheduled again + tabletCtx.releaseResource(this); + // adjust priority to avoid some higher priority always be the first in pendingTablets + stat.counterTabletScheduledFailed.incrementAndGet(); + dynamicAdjustPrioAndAddBackToPendingTablets(tabletCtx, e.getMessage()); } } } else { @@ -1253,8 +1259,8 @@ public synchronized long takeAnAvailBalanceSlotFrom(Set pathHashs) { return -1; } - public void freeBalanceSlot(long destPathHash) { - Slot slot = pathSlots.get(destPathHash); + public synchronized void freeBalanceSlot(long pathHash) { + Slot slot = pathSlots.get(pathHash); if (slot == null) { return; }