From 866ca70935aa53f3aeac2a64c01aa0d763a050c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?caiconghui=20=5B=E8=94=A1=E8=81=AA=E8=BE=89=5D?= Date: Mon, 13 Jul 2020 20:47:33 +0800 Subject: [PATCH 1/9] Add thread name in fe log to make trace problem more easy --- .../src/main/java/org/apache/doris/common/Log4jConfig.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Log4jConfig.java b/fe/fe-core/src/main/java/org/apache/doris/common/Log4jConfig.java index 5912cd0d9fdf69..c3109bb743e33f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Log4jConfig.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Log4jConfig.java @@ -42,7 +42,7 @@ public class Log4jConfig extends XmlConfiguration { " \n" + " \n" + " \n" + - " %d{yyyy-MM-dd HH:mm:ss,SSS} %p %tid [%C{1}.%M():%L] %m%n\n" + + " %d{yyyy-MM-dd HH:mm:ss,SSS} %p (%t|%tid) [%C{1}.%M():%L] %m%n\n" + " \n" + " \n" + " \n" + @@ -57,7 +57,7 @@ public class Log4jConfig extends XmlConfiguration { " \n" + " \n" + " \n" + - " %d{yyyy-MM-dd HH:mm:ss,SSS} %p %tid [%C{1}.%M():%L] %m%n\n" + + " %d{yyyy-MM-dd HH:mm:ss,SSS} %p (%t|%tid) [%C{1}.%M():%L] %m%n\n" + " \n" + " \n" + " \n" + From ce1fccce624cd049989e2e8401a71e58f8d415ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?caiconghui=20=5B=E8=94=A1=E8=81=AA=E8=BE=89=5D?= Date: Mon, 13 Jul 2020 20:48:28 +0800 Subject: [PATCH 2/9] Skip to update replica version when new version is lower than replica's version --- .../src/main/java/org/apache/doris/catalog/Replica.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java index c9cab33710142a..44947f10ababb4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java @@ -321,9 +321,10 @@ private void updateReplicaInfo(long newVersion, long newVersionHash, LOG.debug("before update: {}", this.toString()); if (newVersion < this.version) { - // yiguolei: could not find any reason why new version less than this.version should run??? - LOG.warn("replica {} on backend {}'s new version {} is lower than meta version {}", - id, backendId, newVersion, this.version); + // This case is that replica meta version has been updated by ReportHandler + LOG.debug("replica {} on backend {}'s new version {} is lower than meta version {}," + + "not to continue to update replica", id, backendId, newVersion, this.version); + return; } this.version = newVersion; From 371739988af7611986b419a06c4c80fefdb3d109 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?caiconghui=20=5B=E8=94=A1=E8=81=AA=E8=BE=89=5D?= Date: Mon, 13 Jul 2020 20:49:40 +0800 Subject: [PATCH 3/9] Add resend time interval check to escape duplicate agent task send --- .../java/org/apache/doris/common/Config.java | 8 +++++++ .../apache/doris/master/ReportHandler.java | 23 +++++++++++++------ .../java/org/apache/doris/task/AgentTask.java | 16 +++++++++++-- .../apache/doris/task/PublishVersionTask.java | 4 ++-- .../transaction/PublishVersionDaemon.java | 4 +++- 5 files changed, 43 insertions(+), 12 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 44dfe123b50a1c..49d8981ca497f7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1117,5 +1117,13 @@ public class Config extends ConfigBase { */ @ConfField public static String thrift_server_type = ThriftServer.THREAD_POOL; + + /** + * This will decide whether to resend agent task when create_time for agent_task is set, + * only when now - create_time > agent_task_resend_wait_time_ms can ReportHandler do resend agent task ; + */ + @ConfField + public static long agent_task_resend_wait_time_ms = 3000; + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index 8d4081260206d7..1d46e77e89739b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -328,17 +328,20 @@ private static void taskReport(long backendId, Map> running LOG.info("begin to handle task report from backend {}", backendId); long start = System.currentTimeMillis(); - for (TTaskType type : runningTasks.keySet()) { - Set taskSet = runningTasks.get(type); - if (!taskSet.isEmpty()) { - String signatures = StringUtils.join(taskSet, ", "); - LOG.debug("backend task[{}]: {}", type.name(), signatures); + if (LOG.isDebugEnabled()) { + for (TTaskType type : runningTasks.keySet()) { + Set taskSet = runningTasks.get(type); + if (!taskSet.isEmpty()) { + String signatures = StringUtils.join(taskSet, ", "); + LOG.debug("backend task[{}]: {}", type.name(), signatures); + } } } List diffTasks = AgentTaskQueue.getDiffTasks(backendId, runningTasks); AgentBatchTask batchTask = new AgentBatchTask(); + long now = System.currentTimeMillis(); for (AgentTask task : diffTasks) { // these tasks no need to do diff // 1. CREATE @@ -350,7 +353,12 @@ private static void taskReport(long backendId, Map> running || task.getTaskType() == TTaskType.CHECK_CONSISTENCY) { continue; } - batchTask.addTask(task); + + // to escape to send duplicate agent task to be + if (task.shouldResend(now)) { + batchTask.addTask(task); + } + } LOG.debug("get {} diff task(s) to resend", batchTask.getTaskNum()); @@ -742,10 +750,11 @@ private static void handleMigration(ListMultimap tabletMet private static void handleRepublishVersionInfo(Map> transactionsToPublish, long backendId) { AgentBatchTask batchTask = new AgentBatchTask(); + long createPublishVersionTaskTime = System.currentTimeMillis(); for (Long dbId : transactionsToPublish.keySet()) { ListMultimap map = transactionsToPublish.get(dbId); for (long txnId : map.keySet()) { - PublishVersionTask task = new PublishVersionTask(backendId, txnId, dbId, map.get(txnId)); + PublishVersionTask task = new PublishVersionTask(backendId, txnId, dbId, map.get(txnId), createPublishVersionTaskTime); batchTask.addTask(task); // add to AgentTaskQueue for handling finish report. AgentTaskQueue.addTask(task); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java index 9ce7557afa807c..77d38073ffa755 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java @@ -17,6 +17,7 @@ package org.apache.doris.task; +import org.apache.doris.common.Config; import org.apache.doris.thrift.TResourceInfo; import org.apache.doris.thrift.TTaskType; @@ -39,9 +40,10 @@ public abstract class AgentTask { // some of are not. // so whether the task is finished depends on caller's logic, not the value of this member. protected boolean isFinished = false; + protected long createTime; public AgentTask(TResourceInfo resourceInfo, long backendId, TTaskType taskType, - long dbId, long tableId, long partitionId, long indexId, long tabletId, long signature) { + long dbId, long tableId, long partitionId, long indexId, long tabletId, long signature, long createTime) { this.backendId = backendId; this.signature = signature; this.taskType = taskType; @@ -55,11 +57,17 @@ public AgentTask(TResourceInfo resourceInfo, long backendId, TTaskType taskType, this.resourceInfo = resourceInfo; this.failedTimes = 0; + this.createTime = createTime; } public AgentTask(TResourceInfo resourceInfo, long backendId, TTaskType taskType, long dbId, long tableId, long partitionId, long indexId, long tabletId) { - this(resourceInfo, backendId, taskType, dbId, tableId, partitionId, indexId, tabletId, tabletId); + this(resourceInfo, backendId, taskType, dbId, tableId, partitionId, indexId, tabletId, tabletId, -1); + } + + public AgentTask(TResourceInfo resourceInfo, long backendId, TTaskType taskType, + long dbId, long tableId, long partitionId, long indexId, long tabletId, long signature) { + this(resourceInfo, backendId, taskType, dbId, tableId, partitionId, indexId, tabletId, signature, -1); } public long getSignature() { @@ -122,6 +130,10 @@ public boolean isFinished() { return isFinished; } + public boolean shouldResend(long currentTimeMillis) { + return createTime == -1 || currentTimeMillis - createTime > Config.agent_task_resend_wait_time_ms; + } + @Override public String toString() { return "[" + taskType + "], signature: " + signature + ", backendId: " + backendId + ", tablet id: " + tabletId; diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java index 9dbb47b0ffaa19..7323bd834f6af6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java @@ -36,8 +36,8 @@ public class PublishVersionTask extends AgentTask { private boolean isFinished; public PublishVersionTask(long backendId, long transactionId, long dbId, - List partitionVersionInfos) { - super(null, backendId, TTaskType.PUBLISH_VERSION, dbId, -1L, -1L, -1L, -1L, transactionId); + List partitionVersionInfos, long createTime) { + super(null, backendId, TTaskType.PUBLISH_VERSION, dbId, -1L, -1L, -1L, -1L, transactionId, createTime); this.transactionId = transactionId; this.partitionVersionInfos = partitionVersionInfos; this.errorTablets = new ArrayList(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java index 2371feebf401b6..9e68beeaf2212e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java @@ -76,6 +76,7 @@ private void publishVersion() throws UserException { LOG.warn("some transaction state need to publish, but no backend exists"); return; } + long createPublishVersionTaskTime = System.currentTimeMillis(); // every backend-transaction identified a single task AgentBatchTask batchTask = new AgentBatchTask(); // traverse all ready transactions and dispatch the publish version task to all backends @@ -113,7 +114,8 @@ private void publishVersion() throws UserException { PublishVersionTask task = new PublishVersionTask(backendId, transactionState.getTransactionId(), transactionState.getDbId(), - partitionVersionInfos); + partitionVersionInfos, + createPublishVersionTaskTime); // add to AgentTaskQueue for handling finish report. // not check return value, because the add will success AgentTaskQueue.addTask(task); From 34bbf99774a889fef267789df5eba338fe0d2734 Mon Sep 17 00:00:00 2001 From: caiconghui Date: Mon, 13 Jul 2020 22:38:38 +0800 Subject: [PATCH 4/9] Add doc content for config agent_task_resend_wait_time_ms --- docs/en/administrator-guide/config/fe_config.md | 4 ++++ docs/zh-CN/administrator-guide/config/fe_config.md | 4 ++++ .../main/java/org/apache/doris/catalog/Replica.java | 12 +++++++++--- .../main/java/org/apache/doris/common/Config.java | 6 +++--- 4 files changed, 20 insertions(+), 6 deletions(-) diff --git a/docs/en/administrator-guide/config/fe_config.md b/docs/en/administrator-guide/config/fe_config.md index 4f81d179cdf8c5..fe6b313c2a28a0 100644 --- a/docs/en/administrator-guide/config/fe_config.md +++ b/docs/en/administrator-guide/config/fe_config.md @@ -110,6 +110,10 @@ There are two ways to configure FE configuration items: ## Configurations +### `agent_task_resend_wait_time_ms` + +当代理任务的创建时间被设置的时候,此配置将决定是否重新发送代理任务, 当且仅当当前时间减去创建时间大于agent_task_task_resend_wait_time_ms时,ReportHandler可以重新发送代理任务。 + ### `alter_table_timeout_second` ### `async_load_task_pool_size` diff --git a/docs/zh-CN/administrator-guide/config/fe_config.md b/docs/zh-CN/administrator-guide/config/fe_config.md index dcc70c2b6f9beb..9b31f0432afe8b 100644 --- a/docs/zh-CN/administrator-guide/config/fe_config.md +++ b/docs/zh-CN/administrator-guide/config/fe_config.md @@ -110,6 +110,10 @@ FE 的配置项有两种方式进行配置: ## 配置项列表 +### `agent_task_resend_wait_time_ms` + +This config will decide whether to resend agent task when create_time for agent_task is set, only when current_time - create_time > agent_task_resend_wait_time_ms can ReportHandler do resend agent task. + ### `alter_table_timeout_second` ### `async_load_task_pool_size` diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java index 44947f10ababb4..09a22738ddeba7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java @@ -318,12 +318,16 @@ private void updateReplicaInfo(long newVersion, long newVersionHash, long lastFailedVersion, long lastFailedVersionHash, long lastSuccessVersion, long lastSuccessVersionHash, long newDataSize, long newRowCount) { - LOG.debug("before update: {}", this.toString()); + if (LOG.isDebugEnabled()) { + LOG.debug("before update: {}", this.toString()); + } if (newVersion < this.version) { // This case is that replica meta version has been updated by ReportHandler - LOG.debug("replica {} on backend {}'s new version {} is lower than meta version {}," + if (LOG.isDebugEnabled()) { + LOG.debug("replica {} on backend {}'s new version {} is lower than meta version {}," + "not to continue to update replica", id, backendId, newVersion, this.version); + } return; } @@ -384,7 +388,9 @@ private void updateReplicaInfo(long newVersion, long newVersionHash, } } - LOG.debug("after update {}", this.toString()); + if (LOG.isDebugEnabled()) { + LOG.debug("after update {}", this.toString()); + } } public synchronized void updateLastFailedVersion(long lastFailedVersion, long lastFailedVersionHash) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 49d8981ca497f7..6a0ed72dd1bcf3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1119,11 +1119,11 @@ public class Config extends ConfigBase { public static String thrift_server_type = ThriftServer.THREAD_POOL; /** - * This will decide whether to resend agent task when create_time for agent_task is set, - * only when now - create_time > agent_task_resend_wait_time_ms can ReportHandler do resend agent task ; + * This config will decide whether to resend agent task when create_time for agent_task is set, + * only when current_time - create_time > agent_task_resend_wait_time_ms can ReportHandler do resend agent task */ @ConfField - public static long agent_task_resend_wait_time_ms = 3000; + public static long agent_task_resend_wait_time_ms = 5000; } From 23f0f1e2b9338b732ec5dfa2cf2111ad71463f89 Mon Sep 17 00:00:00 2001 From: caiconghui Date: Mon, 13 Jul 2020 22:49:13 +0800 Subject: [PATCH 5/9] fix typo --- .../main/java/org/apache/doris/master/ReportHandler.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index 1d46e77e89739b..3b826eebd9b890 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -341,7 +341,7 @@ private static void taskReport(long backendId, Map> running List diffTasks = AgentTaskQueue.getDiffTasks(backendId, runningTasks); AgentBatchTask batchTask = new AgentBatchTask(); - long now = System.currentTimeMillis(); + long taskReportTime = System.currentTimeMillis(); for (AgentTask task : diffTasks) { // these tasks no need to do diff // 1. CREATE @@ -354,8 +354,8 @@ private static void taskReport(long backendId, Map> running continue; } - // to escape to send duplicate agent task to be - if (task.shouldResend(now)) { + // to escape sending duplicate agent task to be + if (task.shouldResend(taskReportTime)) { batchTask.addTask(task); } From 4e734cc2982cdae4c0b0b71701957a347bd9d910 Mon Sep 17 00:00:00 2001 From: caiconghui Date: Mon, 13 Jul 2020 23:04:07 +0800 Subject: [PATCH 6/9] fix comment --- fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java index 09a22738ddeba7..0d83bf7a34f03a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java @@ -323,7 +323,7 @@ private void updateReplicaInfo(long newVersion, long newVersionHash, } if (newVersion < this.version) { - // This case is that replica meta version has been updated by ReportHandler + // This case means that replica meta version has been updated by ReportHandler before if (LOG.isDebugEnabled()) { LOG.debug("replica {} on backend {}'s new version {} is lower than meta version {}," + "not to continue to update replica", id, backendId, newVersion, this.version); From aef9ac152d3b88502eff36e47ad6b97c21266a1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?caiconghui=20=5B=E8=94=A1=E8=81=AA=E8=BE=89=5D?= Date: Thu, 16 Jul 2020 13:15:10 +0800 Subject: [PATCH 7/9] fix by review --- docs/en/administrator-guide/config/fe_config.md | 8 +++++++- docs/zh-CN/administrator-guide/config/fe_config.md | 6 +++++- .../src/main/java/org/apache/doris/catalog/Replica.java | 6 ++++++ .../src/main/java/org/apache/doris/common/Config.java | 2 +- 4 files changed, 19 insertions(+), 3 deletions(-) diff --git a/docs/en/administrator-guide/config/fe_config.md b/docs/en/administrator-guide/config/fe_config.md index fe6b313c2a28a0..7612067f389dfd 100644 --- a/docs/en/administrator-guide/config/fe_config.md +++ b/docs/en/administrator-guide/config/fe_config.md @@ -112,7 +112,13 @@ There are two ways to configure FE configuration items: ### `agent_task_resend_wait_time_ms` -当代理任务的创建时间被设置的时候,此配置将决定是否重新发送代理任务, 当且仅当当前时间减去创建时间大于agent_task_task_resend_wait_time_ms时,ReportHandler可以重新发送代理任务。 +This configuration will decide whether to resend agent task when create_time for agent_task is set, only when current_time - create_time > agent_task_resend_wait_time_ms can ReportHandler do resend agent task. + +This configuration is currently mainly used to solve the problem of repeated sending of `PUBLISH_VERSION` agent tasks. The current default value of this configuration is 5000, which is an experimental value. + +Because there is a certain time delay between submitting agent tasks to AgentTaskQueue and submitting to be, Increasing the value of this configuration can effectively solve the problem of repeated sending of agent tasks, + +But at the same time, it will cause the submission of failed or failed execution of the agent task to be executed again for an extended period of time. ### `alter_table_timeout_second` diff --git a/docs/zh-CN/administrator-guide/config/fe_config.md b/docs/zh-CN/administrator-guide/config/fe_config.md index 9b31f0432afe8b..ec4149097dce5f 100644 --- a/docs/zh-CN/administrator-guide/config/fe_config.md +++ b/docs/zh-CN/administrator-guide/config/fe_config.md @@ -112,7 +112,11 @@ FE 的配置项有两种方式进行配置: ### `agent_task_resend_wait_time_ms` -This config will decide whether to resend agent task when create_time for agent_task is set, only when current_time - create_time > agent_task_resend_wait_time_ms can ReportHandler do resend agent task. +当代理任务的创建时间被设置的时候,此配置将决定是否重新发送代理任务, 当且仅当当前时间减去创建时间大于 `agent_task_task_resend_wait_time_ms` 时,ReportHandler可以重新发送代理任务。 + +该配置目前主要用来解决`PUBLISH_VERSION`代理任务的重复发送问题, 目前该配置的默认值是5000,是个实验值,由于把代理任务提交到代理任务队列和提交到be存在一定的时间延迟,所以调大该配置的值可以有效解决代理任务的重复发送问题, + +但同时会导致提交失败或者执行失败的代理任务再次被执行的时间延长。 ### `alter_table_timeout_second` diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java index 0d83bf7a34f03a..36666e9eee63e4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java @@ -324,6 +324,12 @@ private void updateReplicaInfo(long newVersion, long newVersionHash, if (newVersion < this.version) { // This case means that replica meta version has been updated by ReportHandler before + // For example, the publish version daemon has already sent some publish verison tasks to one be to publish version 2, 3, 4, 5, 6, + // and the be finish all publish version tasks, the be's replica version is 6 now, but publish version daemon need to wait + // for other be to finish most of publish version tasks to update replica version in fe. + // At the moment, the replica version in fe is 4, when ReportHandler sync tablet, it find reported replica version in be is 6 and then + // set version to 6 for replica in fe. And then publish version daemon try to finish txn, and use visible version(5) + // to update replica. Finally, it find the newer version(5) is lower than replica version(6) in fe. if (LOG.isDebugEnabled()) { LOG.debug("replica {} on backend {}'s new version {} is lower than meta version {}," + "not to continue to update replica", id, backendId, newVersion, this.version); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 6a0ed72dd1bcf3..714c4ef427ee7e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1122,7 +1122,7 @@ public class Config extends ConfigBase { * This config will decide whether to resend agent task when create_time for agent_task is set, * only when current_time - create_time > agent_task_resend_wait_time_ms can ReportHandler do resend agent task */ - @ConfField + @ConfField (mutable = true, masterOnly = true) public static long agent_task_resend_wait_time_ms = 5000; } From 2c16498ecac08936141d1635b44991db548e88fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?caiconghui=20=5B=E8=94=A1=E8=81=AA=E8=BE=89=5D?= Date: Thu, 16 Jul 2020 15:46:19 +0800 Subject: [PATCH 8/9] small fix in finishTransaction method of DatabaseTransactionMgr --- .../src/main/java/org/apache/doris/catalog/Replica.java | 8 ++++---- .../apache/doris/transaction/DatabaseTransactionMgr.java | 5 ++--- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java index 36666e9eee63e4..0e813f888987c6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java @@ -73,16 +73,16 @@ public enum ReplicaStatus { private long backendId; // the version could be queried @SerializedName(value = "version") - private long version; + private volatile long version; @SerializedName(value = "versionHash") private long versionHash; private int schemaHash = -1; @SerializedName(value = "dataSize") - private long dataSize = 0; + private volatile long dataSize = 0; @SerializedName(value = "rowCount") - private long rowCount = 0; + private volatile long rowCount = 0; @SerializedName(value = "state") - private ReplicaState state; + private volatile ReplicaState state; // the last load failed version @SerializedName(value = "lastFailedVersion") diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index aa15bc0169a710..15f5de48b61c88 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -721,9 +721,8 @@ public void finishTransaction(long transactionId, Set errorReplicaIds) thr + " and its version not equal to partition commit version or commit version - 1" + " if its not a upgrate stage, its a fatal error. ", transactionState, replica); } - } else if (replica.getVersion() == partitionCommitInfo.getVersion() - && replica.getVersionHash() == partitionCommitInfo.getVersionHash()) { - // the replica's version and version hash is equal to current transaction partition's version and version hash + } else if (replica.getVersion() >= partitionCommitInfo.getVersion()) { + // the replica's version is larger than or equal to current transaction partition's version and version hash // the replica is normal, then remove it from error replica ids errorReplicaIds.remove(replica.getId()); ++healthReplicaNum; From aedbeb9af7c856f1a5fb88588c2cd998036c82f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?caiconghui=20=5B=E8=94=A1=E8=81=AA=E8=BE=89=5D?= Date: Thu, 16 Jul 2020 15:46:58 +0800 Subject: [PATCH 9/9] fix comment --- .../org/apache/doris/transaction/DatabaseTransactionMgr.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 15f5de48b61c88..4aad965df4049c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -722,7 +722,7 @@ public void finishTransaction(long transactionId, Set errorReplicaIds) thr + " if its not a upgrate stage, its a fatal error. ", transactionState, replica); } } else if (replica.getVersion() >= partitionCommitInfo.getVersion()) { - // the replica's version is larger than or equal to current transaction partition's version and version hash + // the replica's version is larger than or equal to current transaction partition's version // the replica is normal, then remove it from error replica ids errorReplicaIds.remove(replica.getId()); ++healthReplicaNum;