Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 12 additions & 18 deletions docs/help/Contents/Data Definition/ddl_stmt.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,7 @@
2) 有数据回溯需求的,可以考虑首个分区为空分区,以便后续增加分区

5. distribution_desc
1) Random 分桶
语法:
DISTRIBUTED BY RANDOM [BUCKETS num]
说明:
使用所有 key 列进行哈希分桶。默认分区数为10

2) Hash 分桶
1) Hash 分桶
语法:
DISTRIBUTED BY HASH (k1[,k2 ...]) [BUCKETS num]
说明:
Expand All @@ -171,10 +165,10 @@
建议:建议使用Hash分桶方式

6. PROPERTIES
1) 如果 ENGINE 类型为 olap,则可以在 properties 中指定行存或列存
1) 如果 ENGINE 类型为 olap,则可以在 properties 中指定列存(目前我们仅支持列存)

PROPERTIES (
"storage_type" = "[row|column]",
"storage_type" = "[column]",
)

2) 如果 ENGINE 类型为 olap
Expand Down Expand Up @@ -210,8 +204,8 @@
)

## example
1. 创建一个 olap 表,使用 Random 分桶,使用列存,相同key的记录进行聚合
CREATE TABLE example_db.table_random
1. 创建一个 olap 表,使用 HASH 分桶,使用列存,相同key的记录进行聚合
CREATE TABLE example_db.table_hash
(
k1 TINYINT,
k2 DECIMAL(10, 2) DEFAULT "10.5",
Expand All @@ -220,10 +214,10 @@
)
ENGINE=olap
AGGREGATE KEY(k1, k2)
DISTRIBUTED BY RANDOM BUCKETS 32
DISTRIBUTED BY HASH(k1) BUCKETS 32
PROPERTIES ("storage_type"="column");

2. 创建一个 olap 表,使用 Hash 分桶,使用行存,相同key的记录进行覆盖,
2. 创建一个 olap 表,使用 Hash 分桶,使用列存,相同key的记录进行覆盖,
设置初始存储介质和冷却时间
CREATE TABLE example_db.table_hash
(
Expand All @@ -236,7 +230,7 @@
UNIQUE KEY(k1, k2)
DISTRIBUTED BY HASH (k1, k2) BUCKETS 32
PROPERTIES(
"storage_type"="row",
"storage_type"="column",
"storage_medium" = "SSD",
"storage_cooldown_time" = "2015-06-04 00:00:00"
);
Expand Down Expand Up @@ -322,7 +316,7 @@
)
ENGINE=olap
AGGREGATE KEY(k1, k2)
DISTRIBUTED BY RANDOM BUCKETS 32
DISTRIBUTED BY HASH(k1) BUCKETS 32
PROPERTIES ("storage_type"="column");

7. 创建两张支持Colocat Join的表t1 和t2
Expand Down Expand Up @@ -387,7 +381,7 @@
1. 增加分区
语法:
ADD PARTITION [IF NOT EXISTS] partition_name VALUES LESS THAN [MAXVALUE|("value1")] ["key"="value"]
[DISTRIBUTED BY RANDOM [BUCKETS num] | DISTRIBUTED BY HASH (k1[,k2 ...]) [BUCKETS num]]
[DISTRIBUTED BY HASH (k1[,k2 ...]) [BUCKETS num]]
注意:
1) 分区为左闭右开区间,用户指定右边界,系统自动确定左边界
2) 如果没有指定分桶方式,则自动使用建表使用的分桶方式
Expand Down Expand Up @@ -512,10 +506,10 @@
ALTER TABLE example_db.my_table
ADD PARTITION p1 VALUES LESS THAN ("2014-01-01");

2. 增加分区,使用新的分桶方式
2. 增加分区,使用新的分桶数
ALTER TABLE example_db.my_table
ADD PARTITION p1 VALUES LESS THAN ("2015-01-01")
DISTRIBUTED BY RANDOM BUCKETS 20;
DISTRIBUTED BY HASH(k1) BUCKETS 20;

3. 删除分区
ALTER TABLE example_db.my_table
Expand Down
2 changes: 1 addition & 1 deletion docs/help/Contents/Data Manipulation/streaming.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
例2: 表中有3个列“c1, c2, c3", 源文件中前三列依次对应,但是有多余1列;那么需要指定-H "columns: c1, c2, c3, xxx";
最后一个列随意指定个名称占位即可
例3: 表中有3个列“year, month, day"三个列,源文件中只有一个时间列,为”2018-06-01 01:02:03“格式;
那么可以指定-H "columns: col, year = year(col), month=mont(col), day=day(col)"完成导入
那么可以指定-H "columns: col, year = year(col), month=month(col), day=day(col)"完成导入

where: 用于抽取部分数据。用户如果有需要将不需要的数据过滤掉,那么可以通过设定这个选项来达到。
例1: 只导入大于k1列等于20180601的数据,那么可以在导入时候指定-H "where: k1 = 20180601"
Expand Down
13 changes: 13 additions & 0 deletions fe/src/main/java/org/apache/doris/catalog/Replica.java
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,11 @@ public void updateVersionInfoForRecovery(
* V is larger or equal to LFV, reset LFV. And if V is less than LSV, just set V to LSV. This may
* happen when a clone task finished and report version V, but the LSV is already larger than V,
* And we know that version between V and LSV is valid, so move V forward to LSV.
*
* Case 5:
* This is a bug case, I don't know why, may be some previous version introduce it. It looks like
* the V(hash) equals to LSV(hash), and V equals to LFV, but LFV hash is 0 or some unknown number.
* We just reset the LFV(hash) to recovery this replica.
*/
private void updateReplicaInfo(long newVersion, long newVersionHash,
long lastFailedVersion, long lastFailedVersionHash,
Expand Down Expand Up @@ -321,6 +326,14 @@ private void updateReplicaInfo(long newVersion, long newVersionHash,
}
}

// case 5:
if (this.version == this.lastSuccessVersion && this.versionHash == this.lastSuccessVersionHash
&& this.version == this.lastFailedVersion && this.versionHash != this.lastFailedVersionHash) {
this.lastFailedVersion = -1;
this.lastFailedVersionHash = 0;
this.lastFailedTimestamp = -1;
}

LOG.debug("after update {}", this.toString());
}

Expand Down
19 changes: 14 additions & 5 deletions fe/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
Original file line number Diff line number Diff line change
Expand Up @@ -799,14 +799,24 @@ public void finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request)
// job being submitted, this delta version become a residual version.
// we just let this pass
LOG.warn("replica's last failed version equals to report version: "
+ replica.getLastFailedTimestamp() + " but hash is different: "
+ replica.getLastFailedVersion() + " but hash is different: "
+ replica.getLastFailedVersionHash() + " vs. "
+ reportedTablet.getVersion_hash() + ", but we let it pass.");
+ reportedTablet.getVersion_hash() + ", but we let it pass."
+ " tablet: {}, backend: {}", tabletId, replica.getBackendId());
} else if (replica.getVersion() == replica.getLastSuccessVersion()
&& replica.getVersionHash() == replica.getLastSuccessVersionHash()
&& replica.getVersion() == replica.getLastFailedVersion()) {
// see replica.updateVersionInfo()'s case 5
LOG.warn("replica's version(hash) and last success version(hash) are equal to "
+ "last failed version: {}, but last failed version hash is invalid: {}."
+ " we let it pass. tablet: {}, backend: {}",
replica.getVersion(), replica.getLastFailedVersionHash(), tabletId, replica.getBackendId());

} else {
// do not throw exception, cause we want this clone task retry again.
throw new SchedException(Status.RUNNING_FAILED,
"replica's last failed version equals to report version: "
+ replica.getLastFailedTimestamp() + " but hash is different: "
+ replica.getLastFailedVersion() + " but hash is different: "
+ replica.getLastFailedVersionHash() + " vs. "
+ reportedTablet.getVersion_hash());
}
Expand Down Expand Up @@ -846,7 +856,6 @@ public void finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request)
throw new SchedException(Status.UNRECOVERABLE, e.getMessage());
}
throw e;

} finally {
db.writeUnlock();
}
Expand Down Expand Up @@ -991,7 +1000,7 @@ public int compareTo(TabletSchedCtx o) {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("tablet id: ").append(tabletId).append(", status: ").append(tabletStatus.name());
sb.append(", state: ").append(state.name());
sb.append(", state: ").append(state.name()).append(", type: ").append(type.name());
if (srcReplica != null) {
sb.append(". from backend: ").append(srcReplica.getBackendId());
sb.append(", src path hash: ").append(srcPathHash);
Expand Down
10 changes: 8 additions & 2 deletions fe/src/main/java/org/apache/doris/clone/TabletScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,12 @@ private void schedulePendingTablets() {
if (tabletCtx.getFailedSchedCounter() > 10) {
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED,
"schedule failed too many times and " + e.getMessage());
} else {
// we must release resource it current hold, and be scheduled again
tabletCtx.releaseResource(this);
// adjust priority to avoid some higher priority always be the first in pendingTablets
stat.counterTabletScheduledFailed.incrementAndGet();
dynamicAdjustPrioAndAddBackToPendingTablets(tabletCtx, e.getMessage());
}
}
} else {
Expand Down Expand Up @@ -1253,8 +1259,8 @@ public synchronized long takeAnAvailBalanceSlotFrom(Set<Long> pathHashs) {
return -1;
}

public void freeBalanceSlot(long destPathHash) {
Slot slot = pathSlots.get(destPathHash);
public synchronized void freeBalanceSlot(long pathHash) {
Slot slot = pathSlots.get(pathHash);
if (slot == null) {
return;
}
Expand Down