From 921447fa01179215c655badfca1a6393d838df80 Mon Sep 17 00:00:00 2001 From: meiyi Date: Wed, 13 Dec 2023 19:14:12 +0800 Subject: [PATCH 01/15] [doc](insert) Add group commit docs --- docs/en/docs/admin-manual/config/be-config.md | 14 + .../import/import-way/group-commit-manual.md | 281 +++++++++++++++++ docs/sidebars.json | 3 +- .../docs/admin-manual/config/be-config.md | 13 + .../import/import-way/group-commit-manual.md | 282 ++++++++++++++++++ 5 files changed, 592 insertions(+), 1 deletion(-) create mode 100644 docs/en/docs/data-operate/import/import-way/group-commit-manual.md create mode 100644 docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md diff --git a/docs/en/docs/admin-manual/config/be-config.md b/docs/en/docs/admin-manual/config/be-config.md index 1dcf7dbb241ad9..4d5fa52e000e08 100644 --- a/docs/en/docs/admin-manual/config/be-config.md +++ b/docs/en/docs/admin-manual/config/be-config.md @@ -1510,3 +1510,17 @@ Indicates how many tablets failed to load in the data directory. At the same tim * Description: BE Whether to enable the use of java-jni. When enabled, mutual calls between c++ and java are allowed. Currently supports hudi, java-udf, jdbc, max-compute, paimon, preload, avro * Default value: true + +#### `group_commit_interval_ms` + +* Description: The interval in milliseconds of the internal group commit load job will stop and start a new internal job. See [Group Commit](../../data-operate/import/import-way/group-commit-manual.md) for more details +* Default: 10000 + +#### `group_commit_replay_wal_dir` + +* Description: The `WAL` directory of group commit. See [Group Commit](../../data-operate/import/import-way/group-commit-manual.md) for more details +* Default: A directory named `wal` is created under each directory of the `storage_root_path`. Configuration examples: + + ``` + group_commit_replay_wal_dir=/data1/storage/wal,/data2/storage/wal,/data3/storage/wal + ``` \ No newline at end of file diff --git a/docs/en/docs/data-operate/import/import-way/group-commit-manual.md b/docs/en/docs/data-operate/import/import-way/group-commit-manual.md new file mode 100644 index 00000000000000..756a6e7ee774b4 --- /dev/null +++ b/docs/en/docs/data-operate/import/import-way/group-commit-manual.md @@ -0,0 +1,281 @@ +--- +{ + "title": "Group Commit", + "language": "zh-CN" +} +--- + + + +# Group Commit + +Group commit load does not introduce a new import method, but an extension of `INSERT INTO tbl VALUS(...)`、`Stream Load`、`Http Stream`. + +In Doris, all methods of data loading are independent jobs which initiate a new transaction and generate a new data version. In the scenario of high-frequency writes, both transactions and compactions are under great pressure. Group commit load reduces the number of transactions and compactions by combining multiple small load tasks into one load job, and thus improve write performance. + +The process is roughly as follows: +1. User starts a group commit load, BE puts the data into the memory and WAL, and returns immediately. The data is not visible to users at this time; +2. BE will periodically (default is 10 seconds) commit the data in the memory, and the data is visible to users after committed; +3. If BE restarts, the data will be recovered through WAL. + +## Fundamental + +### Write process +1. User starts a group commit load, FE generates a plan fragment; +2. BE executes the plan. Unlike non group commit load, the processed data is not sent to each tablet, but put into a queue in the memory shared by multiple group commit load; +3. BE starts an internal load, which consumes the data in the queue, writes to WAL, and notifies that the data related load has been finished; +4. After that, the data is processed in the same way as non group commit load, send to each tablet, write memtable, and flushed to segment files; +5. The internal load is finished after a fixed time interval (default is 10 seconds), and the data is visible to users when it is committed. + +### WAL Introduction + +Each group commit load will generate a corresponding WAL file, which is used to recover failed load jobs. If there is a restart be or fail to run the group commit load during the writing process, be will replay WAL file through a stream load in the background to reimport the data, which can make sure that data is not lost. If the group commit load job is completed normally, the WAL will be directly deleted to reduce disk space usage. + +## Basic operations + +If the table schema is: +```sql +CREATE TABLE `dt` ( + `id` int(11) NOT NULL, + `name` varchar(50) NULL, + `score` int(11) NULL +) ENGINE=OLAP +DUPLICATE KEY(`id`) +DISTRIBUTED BY HASH(`id`) BUCKETS 1 +PROPERTIES ( + "replication_num" = "1" +); +``` + +### INSERT INTO VALUES + +```sql +# Config session variable to enable the group commit, the default value is false +mysql> set enable_insert_group_commit = true; + +# The retured label is start with 'group_commit', which is the label of the real load job +mysql> insert into dt values(1, 'Bob', 90), (2, 'Alice', 99); +Query OK, 2 rows affected (0.05 sec) +{'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'} + +# The returned label and txn_id are the same as the above, which means they are handled in on load job +mysql> insert into dt(id, name) values(3, 'John'); +Query OK, 1 row affected (0.01 sec) +{'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'} + +# The data is not visible +mysql> select * from dt; +Empty set (0.01 sec) + +# After about 10 seconds, the data is visible +mysql> select * from dt; ++------+-------+-------+ +| id | name | score | ++------+-------+-------+ +| 1 | Bob | 90 | +| 2 | Alice | 99 | +| 3 | John | NULL | ++------+-------+-------+ +3 rows in set (0.02 sec) +``` + +### Stream Load + +If the content of `data.csv` is: +```sql +4,Amy,60 +5,Ross,98 +``` + +```sql +# Add 'group_commit:true' configuration in the http header + +curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:true" -H "column_separator:," http://{fe_host}:{http_port}/api/db/dt/_stream_load +{ + "TxnId": 7009, + "Label": "group_commit_c84d2099208436ab_96e33fda01eddba8", + "Comment": "", + "GroupCommit": true, + "Status": "Success", + "Message": "OK", + "NumberTotalRows": 2, + "NumberLoadedRows": 2, + "NumberFilteredRows": 0, + "NumberUnselectedRows": 0, + "LoadBytes": 19, + "LoadTimeMs": 35, + "StreamLoadPutTimeMs": 5, + "ReadDataTimeMs": 0, + "WriteDataTimeMs": 26 +} + +# The returned 'GroupCommit' is 'true', which means this is a group commit load +# The retured label is start with 'group_commit', which is the label of the real load job +``` + +See [Stream Load](stream-load-manual.md) for more detailed syntax used by **Stream Load**. + +### Http Stream + +```sql +# Add 'group_commit:true' configuration in the http header + +curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:true" -H "sql:insert into db.dt select * from http_stream('column_separator'=',', 'format' = 'CSV')" http://{fe_host}:{http_port}/api/_http_stream +{ + "TxnId": 7011, + "Label": "group_commit_3b45c5750d5f15e5_703428e462e1ebb0", + "Comment": "", + "GroupCommit": true, + "Status": "Success", + "Message": "OK", + "NumberTotalRows": 2, + "NumberLoadedRows": 2, + "NumberFilteredRows": 0, + "NumberUnselectedRows": 0, + "LoadBytes": 19, + "LoadTimeMs": 65, + "StreamLoadPutTimeMs": 41, + "ReadDataTimeMs": 47, + "WriteDataTimeMs": 23 +} + +# The returned 'GroupCommit' is 'true', which means this is a group commit load +# The retured label is start with 'group_commit', which is the label of the real load job +``` + +See [Stream Load](stream-load-manual.md) for more detailed syntax used by **Http Stream**. + +### Use `PreparedStatement` + +To reduce the CPU cost of SQL parsing and query planning, we provide the `PreparedStatement` in the FE. When using `PreparedStatement`, the SQL and its plan will be cached in the session level memory cache and will be reused later on, which reduces the CPU cost of FE. The following is an example of using PreparedStatement in JDBC: + +1. Setup JDBC url and enable server side prepared statement + +``` +url = jdbc:mysql://127.0.0.1:9030/db?useServerPrepStmts=true +``` + +2. Enable `enable_insert_group_commit` session variable, there are two ways to do it: + +* Add `sessionVariables=enable_insert_group_commit=true` in JDBC url + +``` +url = jdbc:mysql://127.0.0.1:9030/db?useServerPrepStmts=true&sessionVariables=enable_insert_group_commit=true +``` + +*Use `SET enable_insert_group_commit = true;` command + +``` +try (Statement statement = conn.createStatement()) { + statement.execute("SET enable_insert_group_commit = true;"); +} +``` + +3. Using `PreparedStatement` + +```java +private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver"; +private static final String URL_PATTERN = "jdbc:mysql://%s:%d/%s?useServerPrepStmts=true"; +private static final String HOST = "127.0.0.1"; +private static final int PORT = 9087; +private static final String DB = "db"; +private static final String TBL = "dt"; +private static final String USER = "root"; +private static final String PASSWD = ""; +private static final int INSERT_BATCH_SIZE = 10; + +private static void groupCommitInsert() throws Exception { + Class.forName(JDBC_DRIVER); + try (Connection conn = DriverManager.getConnection(String.format(URL_PATTERN, HOST, PORT, DB), USER, PASSWD)) { + // enable session variable 'enable_insert_group_commit' + try (Statement statement = conn.createStatement()) { + statement.execute("SET enable_insert_group_commit = true;"); + } + + String query = "insert into " + TBL + " values(?, ?, ?)"; + try (PreparedStatement stmt = conn.prepareStatement(query)) { + for (int i = 0; i < INSERT_BATCH_SIZE; i++) { + stmt.setInt(1, i); + stmt.setString(2, "name" + i); + stmt.setInt(3, i + 10); + int result = stmt.executeUpdate(); + System.out.println("rows: " + result); + } + } + } catch (Exception e) { + e.printStackTrace(); + } +} + +private static void groupCommitInsertBatch() throws Exception { + Class.forName(JDBC_DRIVER); + // add rewriteBatchedStatements=true and cachePrepStmts=true in JDBC url + // enable session variables by sessionVariables=enable_insert_group_commit=true in JDBC url + try (Connection conn = DriverManager.getConnection( + String.format(URL_PATTERN + "&rewriteBatchedStatements=true&cachePrepStmts=true&sessionVariables=enable_insert_group_commit=true", HOST, PORT, DB), USER, PASSWD)) { + + String query = "insert into " + TBL + " values(?, ?, ?)"; + try (PreparedStatement stmt = conn.prepareStatement(query)) { + for (int j = 0; j < 5; j++) { + // 10 rows per insert + for (int i = 0; i < INSERT_BATCH_SIZE; i++) { + stmt.setInt(1, i); + stmt.setString(2, "name" + i); + stmt.setInt(3, i + 10); + stmt.addBatch(); + } + int[] result = stmt.executeBatch(); + } + } + } catch (Exception e) { + e.printStackTrace(); + } +} +``` + +See [Synchronize Data Using Insert Method](../import-scenes/jdbc-load.md) for more details about **JDBC**. + +## Relevant system configuration + +### Session variable + ++ enable_insert_group_commit + + If this configuration is true, FE will judge whether the `INSERT INTO VALUES` can be group commit, the conditions are as follows: + + Not a transaction insert, as `Begin`; `INSERT INTO VALUES`; `COMMIT` + + Not specifying partition, as `INSERT INTO dt PARTITION()` + + Not specifying label, as `INSERT INTO dt WITH LABEL {label} VALUES` + + VALUES does not contain any expression, as `INSERT INTO dt VALUES (1 + 100)` + + The default value is false, use `SET enable_insert_group_commit = true;` command to enable it. + +### BE configuration + ++ group_commit_interval_ms + + The time interval of the internal group commit load job will stop and start a new internal job, the default value is 10000 milliseconds. + ++ group_commit_replay_wal_dir + + The directory for storing WAL files. By default, a directory named `wal` is created under each directory of the `storage_root_path`. Users don't need to configure this if there is no special requirement. Configuration examples: + + ``` + group_commit_replay_wal_dir=/data1/storage/wal,/data2/storage/wal,/data3/storage/wal + ``` diff --git a/docs/sidebars.json b/docs/sidebars.json index da910610f08645..7e5286f7a4c73a 100644 --- a/docs/sidebars.json +++ b/docs/sidebars.json @@ -91,7 +91,8 @@ "data-operate/import/import-way/mysql-load-manual", "data-operate/import/import-way/s3-load-manual", "data-operate/import/import-way/insert-into-manual", - "data-operate/import/import-way/load-json-format" + "data-operate/import/import-way/load-json-format", + "data-operate/import/import-way/group-commit-manual" ] }, { diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md b/docs/zh-CN/docs/admin-manual/config/be-config.md index 802279c4cd00c1..2b93c1f2ef15d1 100644 --- a/docs/zh-CN/docs/admin-manual/config/be-config.md +++ b/docs/zh-CN/docs/admin-manual/config/be-config.md @@ -1539,3 +1539,16 @@ load tablets from header failed, failed tablets size: xxx, path=xxx * 描述: BE 是否开启使用java-jni,开启后允许 c++ 与 java 之间的相互调用。目前已经支持hudi、java-udf、jdbc、max-compute、paimon、preload、avro * 默认值: true + +#### `group_commit_interval_ms` + +* 描述: 攒批写入开启多久后结束,单位为毫秒,请参考 [Group Commit](../../data-operate/import/import-way/group-commit-manual.md) +* 默认值: 10000,即10秒 + +#### `group_commit_replay_wal_dir` + +* 描述: 攒批写入存放WAL文件的目录,请参考 [Group Commit](../../data-operate/import/import-way/group-commit-manual.md) +* 默认值: 默认在用户配置的`storage_root_path`的各个目录下创建一个名为`wal`的目录。配置示例: + ``` + group_commit_replay_wal_dir=/data1/storage/wal,/data2/storage/wal,/data3/storage/wal + ``` diff --git a/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md b/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md new file mode 100644 index 00000000000000..b35f64a9eac351 --- /dev/null +++ b/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md @@ -0,0 +1,282 @@ +--- +{ + "title": "Group Commit", + "language": "zh-CN" +} +--- + + + +# Group Commit + +攒批写入没有引入一种新的导入方式,而是对`INSERT INTO tbl VALUS(...)`、`Stream Load`、`Http Stream`的扩展。 + +在 Doris 中,所有的数据写入都是一个独立的导入作业,发起一个新的事务,产生一个新的数据版本。在高频写入的场景下,对transaction和compaction都产生了较大的压力。攒批写通过把多个小的写入合成一个写入作业,减少了transaction和compaction的次数,缓解了系统内部的压力,提高了写入的性能。 + +流程大致为: +1. 用户发起的导入,BE把处理后的数据写入内存和WAL中即返回,此时不能查询到数据; +2. 正常情况下,BE内部周期性(默认为10秒间隔)将内存中的数据提交,提交之后数据对用户可见; +3. 如果发生BE重启等,通过WAL走写入流程恢复数据。 + +## 原理介绍 + +### 写入流程 + +1. 用户发起攒批写入,FE生成执行计划; +2. BE执行规划,与非攒批导入不同,处理后的数据不是发给各个tablet,而是放到一个内存中的队列中,多个攒批共享这个队列; +3. BE内部发起一个导入规划,消费队列中的数据,写入WAL,并通知该数据对应的写入已完成; +4. 之后,消费后的数据和普通写入的处理流程一样,发给各个tablet,写入memtable,下刷为segment文件等; +5. BE内部发起的导入在达到固定的攒批时间(默认为10秒)后,开始提交,提交完成后,数据对用户可见。 + +### WAL介绍 + +每一次攒批会生成一个对应的WAL文件,其作用是用于恢复失败的攒批作业,在写入过程中如果发生了be重启或者攒批作业运行失败,be可以通过relay WAL文件,在后台发起一个stream load重新导入数据,保证攒批数据不丢失。如果攒批作业正常执行完成,WAL会被直接删掉。 + +## 基本操作 + +假如表的结构为: +```sql +CREATE TABLE `dt` ( + `id` int(11) NOT NULL, + `name` varchar(50) NULL, + `score` int(11) NULL +) ENGINE=OLAP +DUPLICATE KEY(`id`) +DISTRIBUTED BY HASH(`id`) BUCKETS 1 +PROPERTIES ( + "replication_num" = "1" +); +``` + +### INSERT INTO VALUES + +```sql +# 配置session变量开启攒批,默认为false +mysql> set enable_insert_group_commit = true; + +# 这里返回的label是group_commit开头的,是真正消费数据的导入关联的label,可以区分出是否攒批了 +mysql> insert into dt values(1, 'Bob', 90), (2, 'Alice', 99); +Query OK, 2 rows affected (0.05 sec) +{'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'} + +# 可以看出这个label, txn_id和上一个相同,说明是攒到了同一个导入任务中 +mysql> insert into dt(id, name) values(3, 'John'); +Query OK, 1 row affected (0.01 sec) +{'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'} + +# 不能立刻查询到 +mysql> select * from dt; +Empty set (0.01 sec) + +# 10秒后可以查询到 +mysql> select * from dt; ++------+-------+-------+ +| id | name | score | ++------+-------+-------+ +| 1 | Bob | 90 | +| 2 | Alice | 99 | +| 3 | John | NULL | ++------+-------+-------+ +3 rows in set (0.02 sec) +``` + +### Stream Load + +假如`data.csv`的内容为: +```sql +4,Amy,60 +5,Ross,98 +``` + +```sql +# 导入时在header中增加"group_commit:true"配置 + +curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:true" -H "column_separator:," http://{fe_host}:{http_port}/api/db/dt/_stream_load +{ + "TxnId": 7009, + "Label": "group_commit_c84d2099208436ab_96e33fda01eddba8", + "Comment": "", + "GroupCommit": true, + "Status": "Success", + "Message": "OK", + "NumberTotalRows": 2, + "NumberLoadedRows": 2, + "NumberFilteredRows": 0, + "NumberUnselectedRows": 0, + "LoadBytes": 19, + "LoadTimeMs": 35, + "StreamLoadPutTimeMs": 5, + "ReadDataTimeMs": 0, + "WriteDataTimeMs": 26 +} + +# 返回的GroupCommit为true,说明进入了攒批的流程 +# 返回的Label是group_commit开头的,是真正消费数据的导入关联的label +``` + +关于 Stream Load 使用的更多详细语法及最佳实践,请参阅 [Stream Load](stream-load-manual.md)。 + +### Http Stream + +```sql +# 导入时在header中增加"group_commit:true"配置 + +curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:true" -H "sql:insert into db.dt select * from http_stream('column_separator'=',', 'format' = 'CSV')" http://{fe_host}:{http_port}/api/_http_stream +{ + "TxnId": 7011, + "Label": "group_commit_3b45c5750d5f15e5_703428e462e1ebb0", + "Comment": "", + "GroupCommit": true, + "Status": "Success", + "Message": "OK", + "NumberTotalRows": 2, + "NumberLoadedRows": 2, + "NumberFilteredRows": 0, + "NumberUnselectedRows": 0, + "LoadBytes": 19, + "LoadTimeMs": 65, + "StreamLoadPutTimeMs": 41, + "ReadDataTimeMs": 47, + "WriteDataTimeMs": 23 +} + +# 返回的GroupCommit为true,说明进入了攒批的流程 +# 返回的Label是group_commit开头的,是真正消费数据的导入关联的label +``` + +关于 Http Stream 使用的更多详细语法及最佳实践,请参阅 [Stream Load](stream-load-manual.md)。 + +### 使用`PreparedStatement` + +为了减少 SQL 解析和生成规划的开销, 我们在 FE 端支持了 MySQL 协议的`PreparedStatement`特性。当使用`PreparedStatement`时,SQL 和其导入规划将被缓存到 Session 级别的内存缓存中,后续的导入直接使用缓存对象,降低了 FE 的 CPU 压力。下面是在 JDBC 中使用 PreparedStatement 的例子: + +1. 设置 JDBC url 并在 Server 端开启 prepared statement + +``` +url = jdbc:mysql://127.0.0.1:9030/db?useServerPrepStmts=true +``` + +2. 开启 `enable_insert_group_commit` session变量,有如下两种方式: + +* 通过JDBC url设置,增加`sessionVariables=enable_insert_group_commit=true` + +``` +url = jdbc:mysql://127.0.0.1:9030/db?useServerPrepStmts=true&sessionVariables=enable_insert_group_commit=true +``` + +* 通过执行SQL设置 + +``` +try (Statement statement = conn.createStatement()) { + statement.execute("SET enable_insert_group_commit = true;"); +} +``` + +3. 使用 `PreparedStatement` + +```java +private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver"; +private static final String URL_PATTERN = "jdbc:mysql://%s:%d/%s?useServerPrepStmts=true"; +private static final String HOST = "127.0.0.1"; +private static final int PORT = 9087; +private static final String DB = "db"; +private static final String TBL = "dt"; +private static final String USER = "root"; +private static final String PASSWD = ""; +private static final int INSERT_BATCH_SIZE = 10; + +private static void groupCommitInsert() throws Exception { + Class.forName(JDBC_DRIVER); + try (Connection conn = DriverManager.getConnection(String.format(URL_PATTERN, HOST, PORT, DB), USER, PASSWD)) { + // enable session variable 'enable_insert_group_commit' + try (Statement statement = conn.createStatement()) { + statement.execute("SET enable_insert_group_commit = true;"); + } + + String query = "insert into " + TBL + " values(?, ?, ?)"; + try (PreparedStatement stmt = conn.prepareStatement(query)) { + for (int i = 0; i < INSERT_BATCH_SIZE; i++) { + stmt.setInt(1, i); + stmt.setString(2, "name" + i); + stmt.setInt(3, i + 10); + int result = stmt.executeUpdate(); + System.out.println("rows: " + result); + } + } + } catch (Exception e) { + e.printStackTrace(); + } +} + +private static void groupCommitInsertBatch() throws Exception { + Class.forName(JDBC_DRIVER); + // add rewriteBatchedStatements=true and cachePrepStmts=true in JDBC url + // enable session variables by sessionVariables=enable_insert_group_commit=true in JDBC url + try (Connection conn = DriverManager.getConnection( + String.format(URL_PATTERN + "&rewriteBatchedStatements=true&cachePrepStmts=true&sessionVariables=enable_insert_group_commit=true", HOST, PORT, DB), USER, PASSWD)) { + + String query = "insert into " + TBL + " values(?, ?, ?)"; + try (PreparedStatement stmt = conn.prepareStatement(query)) { + for (int j = 0; j < 5; j++) { + // 10 rows per insert + for (int i = 0; i < INSERT_BATCH_SIZE; i++) { + stmt.setInt(1, i); + stmt.setString(2, "name" + i); + stmt.setInt(3, i + 10); + stmt.addBatch(); + } + int[] result = stmt.executeBatch(); + } + } + } catch (Exception e) { + e.printStackTrace(); + } +} +``` + +关于**JDBC**的更多用法,参考[使用Insert方式同步数据](../import-scenes/jdbc-load.md)。 + +## 相关系统配置 + +### Session变量 + ++ enable_insert_group_commit + + 当该参数设置为 true 时,会判断用户发起的`INSERT INTO VALUES`语句是否符合攒批的条件,如果符合,该语句的执行会进入到攒批写入中。主要的判断逻辑包括: + + 不是事务写入,即`Begin`; `INSERT INTO VALUES`; `COMMIT`方式 + + 不指定partition,即`INSERT INTO dt PARTITION()`等指定partition的语句 + + 不指定label,即`INSERT INTO dt WITH LABEL {label} VALUES` + + VALUES中不能包含表达式,即`INSERT INTO dt VALUES (1 + 100)` + + 默认为 false。可通过 `SET enable_insert_group_commit = true;` 来设置。 + +### BE 配置 + ++ group_commit_interval_ms + + 攒批写入开启多久后结束,单位为毫秒,默认为10000,即10秒。 + ++ group_commit_replay_wal_dir + + 存放WAL文件的目录,默认在用户配置的`storage_root_path`的各个目录下创建一个名为`wal`的目录,如无特殊要求,不需要修改。配置示例: + + ``` + group_commit_replay_wal_dir=/data1/storage/wal,/data2/storage/wal,/data3/storage/wal + ``` \ No newline at end of file From ef7f9b0bea3888a7fdd7f02751ae7e8878078d89 Mon Sep 17 00:00:00 2001 From: meiyi Date: Tue, 19 Dec 2023 10:42:18 +0800 Subject: [PATCH 02/15] some modify --- .../docs/admin-manual/config/be-config.md | 7 +- .../import/import-way/group-commit-manual.md | 217 ++++++++++++++---- 2 files changed, 177 insertions(+), 47 deletions(-) diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md b/docs/zh-CN/docs/admin-manual/config/be-config.md index 2b93c1f2ef15d1..ac8899db4f6e8e 100644 --- a/docs/zh-CN/docs/admin-manual/config/be-config.md +++ b/docs/zh-CN/docs/admin-manual/config/be-config.md @@ -1540,15 +1540,10 @@ load tablets from header failed, failed tablets size: xxx, path=xxx * 描述: BE 是否开启使用java-jni,开启后允许 c++ 与 java 之间的相互调用。目前已经支持hudi、java-udf、jdbc、max-compute、paimon、preload、avro * 默认值: true -#### `group_commit_interval_ms` - -* 描述: 攒批写入开启多久后结束,单位为毫秒,请参考 [Group Commit](../../data-operate/import/import-way/group-commit-manual.md) -* 默认值: 10000,即10秒 - #### `group_commit_replay_wal_dir` * 描述: 攒批写入存放WAL文件的目录,请参考 [Group Commit](../../data-operate/import/import-way/group-commit-manual.md) * 默认值: 默认在用户配置的`storage_root_path`的各个目录下创建一个名为`wal`的目录。配置示例: ``` - group_commit_replay_wal_dir=/data1/storage/wal,/data2/storage/wal,/data3/storage/wal + group_commit_replay_wal_dir=/data1/storage/wal;/data2/storage/wal;/data3/storage/wal ``` diff --git a/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md b/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md index b35f64a9eac351..65c546c2165d88 100644 --- a/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md +++ b/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md @@ -26,30 +26,33 @@ under the License. # Group Commit -攒批写入没有引入一种新的导入方式,而是对`INSERT INTO tbl VALUS(...)`、`Stream Load`、`Http Stream`的扩展。 +攒批写入没有引入一种新的导入方式,而是对`INSERT INTO tbl VALUES(...)`、`Stream Load`、`Http Stream`的扩展。 在 Doris 中,所有的数据写入都是一个独立的导入作业,发起一个新的事务,产生一个新的数据版本。在高频写入的场景下,对transaction和compaction都产生了较大的压力。攒批写通过把多个小的写入合成一个写入作业,减少了transaction和compaction的次数,缓解了系统内部的压力,提高了写入的性能。 -流程大致为: -1. 用户发起的导入,BE把处理后的数据写入内存和WAL中即返回,此时不能查询到数据; -2. 正常情况下,BE内部周期性(默认为10秒间隔)将内存中的数据提交,提交之后数据对用户可见; -3. 如果发生BE重启等,通过WAL走写入流程恢复数据。 +## 攒批模式 -## 原理介绍 +攒批写入有三种模式,分别是: -### 写入流程 +* `off_mode` -1. 用户发起攒批写入,FE生成执行计划; -2. BE执行规划,与非攒批导入不同,处理后的数据不是发给各个tablet,而是放到一个内存中的队列中,多个攒批共享这个队列; -3. BE内部发起一个导入规划,消费队列中的数据,写入WAL,并通知该数据对应的写入已完成; -4. 之后,消费后的数据和普通写入的处理流程一样,发给各个tablet,写入memtable,下刷为segment文件等; -5. BE内部发起的导入在达到固定的攒批时间(默认为10秒)后,开始提交,提交完成后,数据对用户可见。 +不开启攒批,保持以上三种导入方式的默认行为。 -### WAL介绍 +* `sync_mode` -每一次攒批会生成一个对应的WAL文件,其作用是用于恢复失败的攒批作业,在写入过程中如果发生了be重启或者攒批作业运行失败,be可以通过relay WAL文件,在后台发起一个stream load重新导入数据,保证攒批数据不丢失。如果攒批作业正常执行完成,WAL会被直接删掉。 +多个客户端发起的导入复用一个内部导入,等内部导入成功或失败后,外部导入才会返回。 -## 基本操作 +如果内部导入成功,数据可以立即查出。 + +* `async_mode` + +多个客户端发起的导入复用一个内部导入,内部导入将处理后的数据写入Write Ahead Log(WAL)后,立即返回。 + +此时,数据不能立即读出。内部导入默认开启10秒后自动提交,等成功后,数据才能读出。 + +当内部导入因为BE节点重启或内存不足等原因导入失败后,BE会通过WAL重放机制重新导入数据。 + +## 攒批使用方式 假如表的结构为: ```sql @@ -67,9 +70,10 @@ PROPERTIES ( ### INSERT INTO VALUES +* 异步模式 ```sql -# 配置session变量开启攒批,默认为false -mysql> set enable_insert_group_commit = true; +# 配置session变量开启攒批(默认为off_mode),开启异步模式 +mysql> set group_commit = async_mode; # 这里返回的label是group_commit开头的,是真正消费数据的导入关联的label,可以区分出是否攒批了 mysql> insert into dt values(1, 'Bob', 90), (2, 'Alice', 99); @@ -97,18 +101,48 @@ mysql> select * from dt; 3 rows in set (0.02 sec) ``` +* 同步模式 +```sql +# 配置session变量开启攒批(默认为off_mode),开启同步模式 +mysql> set group_commit = sync_mode; + +# 这里返回的label是group_commit开头的,是真正消费数据的导入关联的label,可以区分出是否攒批了 +mysql> insert into dt values(4, 'Bob', 90), (5, 'Alice', 99); +Query OK, 2 rows affected (10.06 sec) +{'label':'group_commit_d84ab96c09b60587_ec455a33cb0e9e87', 'status':'PREPARE', 'txnId':'3007', 'query_id':'fc6b94085d704a94-a69bfc9a202e66e2'} + +# 数据可以立刻读出 +mysql> select * from dt; ++------+-------+-------+ +| id | name | score | ++------+-------+-------+ +| 1 | Bob | 90 | +| 2 | Alice | 99 | +| 3 | John | NULL | +| 4 | Bob | 90 | +| 5 | Alice | 99 | ++------+-------+-------+ +5 rows in set (0.03 sec) +``` + +* 关闭攒批 +```sql +mysql> set group_commit = off_mode; +``` + ### Stream Load 假如`data.csv`的内容为: ```sql -4,Amy,60 -5,Ross,98 +6,Amy,60 +7,Ross,98 ``` +* 异步模式 ```sql -# 导入时在header中增加"group_commit:true"配置 +# 导入时在header中增加"group_commit:async_mode"配置 -curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:true" -H "column_separator:," http://{fe_host}:{http_port}/api/db/dt/_stream_load +curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:async_mode" -H "column_separator:," http://{fe_host}:{http_port}/api/db/dt/_stream_load { "TxnId": 7009, "Label": "group_commit_c84d2099208436ab_96e33fda01eddba8", @@ -131,14 +165,42 @@ curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:true" - # 返回的Label是group_commit开头的,是真正消费数据的导入关联的label ``` +* 同步模式 +```sql +# 导入时在header中增加"group_commit:sync_mode"配置 + +curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:sync_mode" -H "column_separator:," http://{fe_host}:{http_port}/api/db/dt/_stream_load +{ + "TxnId": 3009, + "Label": "group_commit_d941bf17f6efcc80_ccf4afdde9881293", + "Comment": "", + "GroupCommit": true, + "Status": "Success", + "Message": "OK", + "NumberTotalRows": 2, + "NumberLoadedRows": 2, + "NumberFilteredRows": 0, + "NumberUnselectedRows": 0, + "LoadBytes": 19, + "LoadTimeMs": 10044, + "StreamLoadPutTimeMs": 4, + "ReadDataTimeMs": 0, + "WriteDataTimeMs": 10038 +} + +# 返回的GroupCommit为true,说明进入了攒批的流程 +# 返回的Label是group_commit开头的,是真正消费数据的导入关联的label +``` + 关于 Stream Load 使用的更多详细语法及最佳实践,请参阅 [Stream Load](stream-load-manual.md)。 ### Http Stream +* 异步模式 ```sql -# 导入时在header中增加"group_commit:true"配置 +# 导入时在header中增加"group_commit:async_mode"配置 -curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:true" -H "sql:insert into db.dt select * from http_stream('column_separator'=',', 'format' = 'CSV')" http://{fe_host}:{http_port}/api/_http_stream +curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:async_mode" -H "sql:insert into db.dt select * from http_stream('column_separator'=',', 'format' = 'CSV')" http://{fe_host}:{http_port}/api/_http_stream { "TxnId": 7011, "Label": "group_commit_3b45c5750d5f15e5_703428e462e1ebb0", @@ -161,11 +223,38 @@ curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:true" - # 返回的Label是group_commit开头的,是真正消费数据的导入关联的label ``` +* 同步模式 +```sql +# 导入时在header中增加"group_commit:sync_mode"配置 + +curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:sync_mode" -H "sql:insert into db.dt select * from http_stream('column_separator'=',', 'format' = 'CSV')" http://{fe_host}:{http_port}/api/_http_stream +{ + "TxnId": 3011, + "Label": "group_commit_fe470e6752aadbe6_a8f3ac328b02ea91", + "Comment": "", + "GroupCommit": true, + "Status": "Success", + "Message": "OK", + "NumberTotalRows": 2, + "NumberLoadedRows": 2, + "NumberFilteredRows": 0, + "NumberUnselectedRows": 0, + "LoadBytes": 19, + "LoadTimeMs": 10066, + "StreamLoadPutTimeMs": 31, + "ReadDataTimeMs": 32, + "WriteDataTimeMs": 10034 +} + +# 返回的GroupCommit为true,说明进入了攒批的流程 +# 返回的Label是group_commit开头的,是真正消费数据的导入关联的label +``` + 关于 Http Stream 使用的更多详细语法及最佳实践,请参阅 [Stream Load](stream-load-manual.md)。 ### 使用`PreparedStatement` -为了减少 SQL 解析和生成规划的开销, 我们在 FE 端支持了 MySQL 协议的`PreparedStatement`特性。当使用`PreparedStatement`时,SQL 和其导入规划将被缓存到 Session 级别的内存缓存中,后续的导入直接使用缓存对象,降低了 FE 的 CPU 压力。下面是在 JDBC 中使用 PreparedStatement 的例子: +当用户使用JDBC `insert into values`方式写入时,为了减少 SQL 解析和生成规划的开销, 我们在 FE 端支持了 MySQL 协议的`PreparedStatement`特性。当使用`PreparedStatement`时,SQL 和其导入规划将被缓存到 Session 级别的内存缓存中,后续的导入直接使用缓存对象,降低了 FE 的 CPU 压力。下面是在 JDBC 中使用 PreparedStatement 的例子: 1. 设置 JDBC url 并在 Server 端开启 prepared statement @@ -173,19 +262,19 @@ curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:true" - url = jdbc:mysql://127.0.0.1:9030/db?useServerPrepStmts=true ``` -2. 开启 `enable_insert_group_commit` session变量,有如下两种方式: +2. 开启 `group_commit` session变量,有如下两种方式: -* 通过JDBC url设置,增加`sessionVariables=enable_insert_group_commit=true` +* 通过JDBC url设置,增加`sessionVariables=group_commit=async_mode` ``` -url = jdbc:mysql://127.0.0.1:9030/db?useServerPrepStmts=true&sessionVariables=enable_insert_group_commit=true +url = jdbc:mysql://127.0.0.1:9030/db?useServerPrepStmts=true&sessionVariables=group_commit=async_mode ``` * 通过执行SQL设置 ``` try (Statement statement = conn.createStatement()) { - statement.execute("SET enable_insert_group_commit = true;"); + statement.execute("SET group_commit = async_mode;"); } ``` @@ -205,9 +294,9 @@ private static final int INSERT_BATCH_SIZE = 10; private static void groupCommitInsert() throws Exception { Class.forName(JDBC_DRIVER); try (Connection conn = DriverManager.getConnection(String.format(URL_PATTERN, HOST, PORT, DB), USER, PASSWD)) { - // enable session variable 'enable_insert_group_commit' + // set session variable 'group_commit' try (Statement statement = conn.createStatement()) { - statement.execute("SET enable_insert_group_commit = true;"); + statement.execute("SET group_commit = async_mode;"); } String query = "insert into " + TBL + " values(?, ?, ?)"; @@ -228,7 +317,7 @@ private static void groupCommitInsert() throws Exception { private static void groupCommitInsertBatch() throws Exception { Class.forName(JDBC_DRIVER); // add rewriteBatchedStatements=true and cachePrepStmts=true in JDBC url - // enable session variables by sessionVariables=enable_insert_group_commit=true in JDBC url + // set session variables by sessionVariables=group_commit=async_mode in JDBC url try (Connection conn = DriverManager.getConnection( String.format(URL_PATTERN + "&rewriteBatchedStatements=true&cachePrepStmts=true&sessionVariables=enable_insert_group_commit=true", HOST, PORT, DB), USER, PASSWD)) { @@ -253,30 +342,76 @@ private static void groupCommitInsertBatch() throws Exception { 关于**JDBC**的更多用法,参考[使用Insert方式同步数据](../import-scenes/jdbc-load.md)。 -## 相关系统配置 +## 修改攒批默认提交间隔 -### Session变量 +攒批的默认提交间隔为10秒,用户可以通过修改表的配置,调整攒批的提交间隔: -+ enable_insert_group_commit +```sql +# 修改提交间隔为2秒 +ALTER TABLE dt SET ("group_commit_interval_ms"="2000"); +``` + +## 使用限制 + +* 当开启了攒批模式,系统会判断用户发起的`INSERT INTO VALUES`语句是否符合攒批的条件,如果符合,该语句的执行会进入到攒批写入中。主要的判断逻辑包括: - 当该参数设置为 true 时,会判断用户发起的`INSERT INTO VALUES`语句是否符合攒批的条件,如果符合,该语句的执行会进入到攒批写入中。主要的判断逻辑包括: + 不是事务写入,即`Begin`; `INSERT INTO VALUES`; `COMMIT`方式 - + 不指定partition,即`INSERT INTO dt PARTITION()`等指定partition的语句 + + 不指定label,即`INSERT INTO dt WITH LABEL {label} VALUES` + + VALUES中不能包含表达式,即`INSERT INTO dt VALUES (1 + 100)` - 默认为 false。可通过 `SET enable_insert_group_commit = true;` 来设置。 + + 不是列更新写入 + + +* 当开启了攒批模式,系统会判断用户发起的`Stream Load`和`Http Stream`是否符合攒批的条件,如果符合,该导入的执行会进入到攒批写入中。主要的判断逻辑包括: + + + 不是两阶段提交 + + + 不指定label + + + 不是列更新写入 + + +* 对`max_filter_ratio`语义的支持 + + * 在默认的导入中,`filter_ratio`是导入完成后,通过失败的行数和总行数计算,决定是否commit transaction。 + + * 在攒批模式下,由于多个用户发起的导入会被一个内部导入执行,虽然可以计算出每个导入的`filter_ratio`,但是数据一旦进入内部导入,就只能commit transaction + + * 但攒批模式支持了一定程度的`max_filter_ratio`语义,当导入的总行数不高于`group_commit_memory_rows_for_max_filter_ratio`(配置在be.conf中,默认为10000行),会把数据缓存起来,计算出真正的`filter_ratio`,如果超过了`max_filter_ratio`,会把数据丢弃,用户导入失败 + + +* WAL限制 + + * 对于`async_mode`的攒批写入,会把数据写入WAL。如果内部写入成功,则WAL被立刻删除;如果内部导入失败,通过导入WAL的方法来恢复数据 + + * 目前WAL文件只存储在一个BE上,如果这个BE磁盘损坏或文件误删等,可能导入丢失部分数据。 + + * 当下线BE节点时,请使用[`DECOMMISSION`](../../../sql-manual/sql-reference/Cluster-Management-Statements/ALTER-SYSTEM-DECOMMISSION-BACKEND.md)命令,安全下线节点,防止该节点下线前WAL文件还没有全部处理完成,导致部分数据丢失 + + * 对于`async_mode`的攒批写入,如果导入数据过大(超过WAL单目录的80%),或不知道数据量的chunked stream load,为了防止生成的WAL占用太多的磁盘空间,会退化成`sync_mode` + + * 为了防止多个小的导入攒到一个内部导入中,导致WAL占用过多的磁盘空间的问题,当总WAL文件大小超过配置阈值(参考相关系统配置中的`group_commit_wal_max_disk_limit`)时,会阻塞攒批写入,直到磁盘空间释放或超时报错 + + * 当发生重量级schema change时,为了保证WAL能够适配表的schema,在schema change最后的fe修改元数据阶段,会拒绝攒批写入,客户端收到`insert table ${table_name} is blocked on schema change`异常,客户端重试即可 + +## 相关系统配置 ### BE 配置 -+ group_commit_interval_ms ++ group_commit_memory_rows_for_max_filter_ratio - 攒批写入开启多久后结束,单位为毫秒,默认为10000,即10秒。 + 当导入的总行数不高于该值,会把数据缓存起来,计算出真正的`filter_ratio`,如果超过了`max_filter_ratio`,会把数据丢弃,用户导入失败。默认为10000行。 + group_commit_replay_wal_dir 存放WAL文件的目录,默认在用户配置的`storage_root_path`的各个目录下创建一个名为`wal`的目录,如无特殊要求,不需要修改。配置示例: ``` - group_commit_replay_wal_dir=/data1/storage/wal,/data2/storage/wal,/data3/storage/wal - ``` \ No newline at end of file + group_commit_replay_wal_dir=/data1/storage/wal;/data2/storage/wal;/data3/storage/wal + ``` + ++ group_commit_wal_max_disk_limit + + WAL文件的最大磁盘占用,当总WAL文件大小超过该值时,会阻塞攒批写入,直到磁盘空间释放或超时报错。默认为10%。 \ No newline at end of file From 0a62c85fa40b3bfa1217f1cae96e8a3dd77f84a0 Mon Sep 17 00:00:00 2001 From: Yongqiang YANG <98214048+dataroaring@users.noreply.github.com> Date: Wed, 20 Dec 2023 12:32:16 +0800 Subject: [PATCH 03/15] Update group-commit-manual.md --- .../import/import-way/group-commit-manual.md | 24 +++++++------------ 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md b/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md index 65c546c2165d88..1e0c8cdfee3376 100644 --- a/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md +++ b/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md @@ -26,33 +26,25 @@ under the License. # Group Commit -攒批写入没有引入一种新的导入方式,而是对`INSERT INTO tbl VALUES(...)`、`Stream Load`、`Http Stream`的扩展。 +Group Commit 不是一种新的导入方式,而是对`INSERT INTO tbl VALUES(...)`、`Stream Load`、`Http Stream`的扩展,大幅提升了高并发小写入的性能。您可以直接使用 `INSERT INTO tbl VALUES(...)` 高频写入数据到 Doris 中,同时使用 PreparedStatement 可以获得更高的性能。您也可以使用`Stream Load`或者`Http Stream`高频写入数据到 Doris 中。 -在 Doris 中,所有的数据写入都是一个独立的导入作业,发起一个新的事务,产生一个新的数据版本。在高频写入的场景下,对transaction和compaction都产生了较大的压力。攒批写通过把多个小的写入合成一个写入作业,减少了transaction和compaction的次数,缓解了系统内部的压力,提高了写入的性能。 +## Group Commit 模式 -## 攒批模式 - -攒批写入有三种模式,分别是: +Group Commit 写入有三种模式,分别是: * `off_mode` -不开启攒批,保持以上三种导入方式的默认行为。 +不开启 Group Commit,保持以上三种导入方式的默认行为。 * `sync_mode` -多个客户端发起的导入复用一个内部导入,等内部导入成功或失败后,外部导入才会返回。 - -如果内部导入成功,数据可以立即查出。 +Doris 根据负载、表的 `group_commit_interval`属性将多个导入在一个事务提交,事务提交后导入返回。 * `async_mode` -多个客户端发起的导入复用一个内部导入,内部导入将处理后的数据写入Write Ahead Log(WAL)后,立即返回。 - -此时,数据不能立即读出。内部导入默认开启10秒后自动提交,等成功后,数据才能读出。 - -当内部导入因为BE节点重启或内存不足等原因导入失败后,BE会通过WAL重放机制重新导入数据。 +Doris 首先将数据写入 WAL,然后导入立即返回。Doris 会根据负载和表的`group_commit_interval`属性异步提交数据,提交之后数据可见。 -## 攒批使用方式 +## Group Commit 使用方式 假如表的结构为: ```sql @@ -414,4 +406,4 @@ ALTER TABLE dt SET ("group_commit_interval_ms"="2000"); + group_commit_wal_max_disk_limit - WAL文件的最大磁盘占用,当总WAL文件大小超过该值时,会阻塞攒批写入,直到磁盘空间释放或超时报错。默认为10%。 \ No newline at end of file + WAL文件的最大磁盘占用,当总WAL文件大小超过该值时,会阻塞攒批写入,直到磁盘空间释放或超时报错。默认为10%。 From a0482d4cb623afc3eaeacee373d1bd1e2334ffb7 Mon Sep 17 00:00:00 2001 From: Yongqiang YANG <98214048+dataroaring@users.noreply.github.com> Date: Wed, 20 Dec 2023 12:34:47 +0800 Subject: [PATCH 04/15] Update group-commit-manual.md --- .../data-operate/import/import-way/group-commit-manual.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md b/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md index 1e0c8cdfee3376..309df283cd1189 100644 --- a/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md +++ b/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md @@ -38,11 +38,11 @@ Group Commit 写入有三种模式,分别是: * `sync_mode` -Doris 根据负载、表的 `group_commit_interval`属性将多个导入在一个事务提交,事务提交后导入返回。 +Doris 根据负载和表的 `group_commit_interval`属性将多个导入在一个事务提交,事务提交后导入返回。 * `async_mode` -Doris 首先将数据写入 WAL,然后导入立即返回。Doris 会根据负载和表的`group_commit_interval`属性异步提交数据,提交之后数据可见。 +Doris 首先将数据写入 WAL,然后导入立即返回。Doris 会根据负载和表的`group_commit_interval`属性异步提交数据,提交之后数据可见。单次导入大于 TODO 时,会自动切换为`sync_mode`。 ## Group Commit 使用方式 From 519a6b2038181a8e15164e0c57d4fdee8819d713 Mon Sep 17 00:00:00 2001 From: Yongqiang YANG <98214048+dataroaring@users.noreply.github.com> Date: Wed, 20 Dec 2023 17:01:01 +0800 Subject: [PATCH 05/15] Update group-commit-manual.md --- .../docs/data-operate/import/import-way/group-commit-manual.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md b/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md index 309df283cd1189..de43852ae2f474 100644 --- a/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md +++ b/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md @@ -26,7 +26,7 @@ under the License. # Group Commit -Group Commit 不是一种新的导入方式,而是对`INSERT INTO tbl VALUES(...)`、`Stream Load`、`Http Stream`的扩展,大幅提升了高并发小写入的性能。您可以直接使用 `INSERT INTO tbl VALUES(...)` 高频写入数据到 Doris 中,同时使用 PreparedStatement 可以获得更高的性能。您也可以使用`Stream Load`或者`Http Stream`高频写入数据到 Doris 中。 +Group Commit 不是一种新的导入方式,而是对`INSERT INTO tbl VALUES(...)`、`Stream Load`、`Http Stream`的扩展,大幅提升了高并发小写入的性能。您的应用程序可以直接使用 JDBC 高频写入数据到 Doris 中,同时使用 PreparedStatement 可以获得更高的性能。日志场景下,您也可以使用`Stream Load`或者`Http Stream`高频写入数据到 Doris 中。 ## Group Commit 模式 From b65a794a0c206f7fd23358c3242e26575d464a60 Mon Sep 17 00:00:00 2001 From: Yongqiang YANG <98214048+dataroaring@users.noreply.github.com> Date: Wed, 20 Dec 2023 17:08:03 +0800 Subject: [PATCH 06/15] Update group-commit-manual.md --- .../import/import-way/group-commit-manual.md | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md b/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md index de43852ae2f474..b00fbe5594ba3e 100644 --- a/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md +++ b/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md @@ -32,17 +32,17 @@ Group Commit 不是一种新的导入方式,而是对`INSERT INTO tbl VALUES(. Group Commit 写入有三种模式,分别是: -* `off_mode` +* 关闭模式 `off_mode` 不开启 Group Commit,保持以上三种导入方式的默认行为。 -* `sync_mode` +* 同步模式 `sync_mode` -Doris 根据负载和表的 `group_commit_interval`属性将多个导入在一个事务提交,事务提交后导入返回。 +Doris 根据负载和表的 `group_commit_interval`属性将多个导入在一个事务提交,事务提交后导入返回。适合高并发且导入完成后要求数据立即可见的场景。 -* `async_mode` +* 异步模式 `async_mode` -Doris 首先将数据写入 WAL,然后导入立即返回。Doris 会根据负载和表的`group_commit_interval`属性异步提交数据,提交之后数据可见。单次导入大于 TODO 时,会自动切换为`sync_mode`。 +Doris 首先将数据写入 WAL,然后导入立即返回。Doris 会根据负载和表的`group_commit_interval`属性异步提交数据,提交之后数据可见。单次导入大于 TODO 时,会自动切换为`sync_mode`。适合对写入延迟敏感以及高频写入的场景。 ## Group Commit 使用方式 @@ -64,15 +64,15 @@ PROPERTIES ( * 异步模式 ```sql -# 配置session变量开启攒批(默认为off_mode),开启异步模式 +# 配置session变量开启 group commit (默认为off_mode),开启异步模式 mysql> set group_commit = async_mode; -# 这里返回的label是group_commit开头的,是真正消费数据的导入关联的label,可以区分出是否攒批了 +# 这里返回的label是 group_commit 开头的,可以区分出是否使用了 group commit mysql> insert into dt values(1, 'Bob', 90), (2, 'Alice', 99); Query OK, 2 rows affected (0.05 sec) {'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'} -# 可以看出这个label, txn_id和上一个相同,说明是攒到了同一个导入任务中 +# 可以看出这个 label, txn_id 和上一个相同,说明是攒到了同一个导入任务中 mysql> insert into dt(id, name) values(3, 'John'); Query OK, 1 row affected (0.01 sec) {'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'} @@ -81,7 +81,7 @@ Query OK, 1 row affected (0.01 sec) mysql> select * from dt; Empty set (0.01 sec) -# 10秒后可以查询到 +# 10秒后可以查询到,可以通过表属性 group_commit_interval 控制数据可见延迟。 mysql> select * from dt; +------+-------+-------+ | id | name | score | @@ -95,10 +95,10 @@ mysql> select * from dt; * 同步模式 ```sql -# 配置session变量开启攒批(默认为off_mode),开启同步模式 +# 配置session变量开启 group commit (默认为off_mode),开启同步模式 mysql> set group_commit = sync_mode; -# 这里返回的label是group_commit开头的,是真正消费数据的导入关联的label,可以区分出是否攒批了 +# 这里返回的 label 是 group_commit 开头的,可以区分出是否谁用了 group commit,导入耗时至少是表属性 group_commit_interval。 mysql> insert into dt values(4, 'Bob', 90), (5, 'Alice', 99); Query OK, 2 rows affected (10.06 sec) {'label':'group_commit_d84ab96c09b60587_ec455a33cb0e9e87', 'status':'PREPARE', 'txnId':'3007', 'query_id':'fc6b94085d704a94-a69bfc9a202e66e2'} @@ -117,7 +117,7 @@ mysql> select * from dt; 5 rows in set (0.03 sec) ``` -* 关闭攒批 +* 关闭模式 ```sql mysql> set group_commit = off_mode; ``` From dbd1daee9092e31d14d5cf111d4a44416664fa2f Mon Sep 17 00:00:00 2001 From: Yongqiang YANG <98214048+dataroaring@users.noreply.github.com> Date: Wed, 20 Dec 2023 17:09:32 +0800 Subject: [PATCH 07/15] Update group-commit-manual.md --- .../import/import-way/group-commit-manual.md | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md b/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md index b00fbe5594ba3e..daf8e56fd10dc0 100644 --- a/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md +++ b/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md @@ -153,7 +153,7 @@ curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:async_mo "WriteDataTimeMs": 26 } -# 返回的GroupCommit为true,说明进入了攒批的流程 +# 返回的GroupCommit为true,说明进入了group commit的流程 # 返回的Label是group_commit开头的,是真正消费数据的导入关联的label ``` @@ -180,7 +180,7 @@ curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:sync_mod "WriteDataTimeMs": 10038 } -# 返回的GroupCommit为true,说明进入了攒批的流程 +# 返回的GroupCommit为true,说明进入了group commit的流程 # 返回的Label是group_commit开头的,是真正消费数据的导入关联的label ``` @@ -211,7 +211,7 @@ curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:async_m "WriteDataTimeMs": 23 } -# 返回的GroupCommit为true,说明进入了攒批的流程 +# 返回的GroupCommit为true,说明进入了group commit的流程 # 返回的Label是group_commit开头的,是真正消费数据的导入关联的label ``` @@ -238,7 +238,7 @@ curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:sync_mo "WriteDataTimeMs": 10034 } -# 返回的GroupCommit为true,说明进入了攒批的流程 +# 返回的GroupCommit为true,说明进入了group commit的流程 # 返回的Label是group_commit开头的,是真正消费数据的导入关联的label ``` @@ -334,9 +334,9 @@ private static void groupCommitInsertBatch() throws Exception { 关于**JDBC**的更多用法,参考[使用Insert方式同步数据](../import-scenes/jdbc-load.md)。 -## 修改攒批默认提交间隔 +## 修改group commit默认提交间隔 -攒批的默认提交间隔为10秒,用户可以通过修改表的配置,调整攒批的提交间隔: +group commit的默认提交间隔为10秒,用户可以通过修改表的配置,调整group commit的提交间隔: ```sql # 修改提交间隔为2秒 @@ -345,7 +345,7 @@ ALTER TABLE dt SET ("group_commit_interval_ms"="2000"); ## 使用限制 -* 当开启了攒批模式,系统会判断用户发起的`INSERT INTO VALUES`语句是否符合攒批的条件,如果符合,该语句的执行会进入到攒批写入中。主要的判断逻辑包括: +* 当开启了group commit模式,系统会判断用户发起的`INSERT INTO VALUES`语句是否符合group commit的条件,如果符合,该语句的执行会进入到group commit写入中。主要的判断逻辑包括: + 不是事务写入,即`Begin`; `INSERT INTO VALUES`; `COMMIT`方式 @@ -356,7 +356,7 @@ ALTER TABLE dt SET ("group_commit_interval_ms"="2000"); + 不是列更新写入 -* 当开启了攒批模式,系统会判断用户发起的`Stream Load`和`Http Stream`是否符合攒批的条件,如果符合,该导入的执行会进入到攒批写入中。主要的判断逻辑包括: +* 当开启了group commit模式,系统会判断用户发起的`Stream Load`和`Http Stream`是否符合group commit的条件,如果符合,该导入的执行会进入到group commit写入中。主要的判断逻辑包括: + 不是两阶段提交 @@ -369,24 +369,24 @@ ALTER TABLE dt SET ("group_commit_interval_ms"="2000"); * 在默认的导入中,`filter_ratio`是导入完成后,通过失败的行数和总行数计算,决定是否commit transaction。 - * 在攒批模式下,由于多个用户发起的导入会被一个内部导入执行,虽然可以计算出每个导入的`filter_ratio`,但是数据一旦进入内部导入,就只能commit transaction + * 在group commit模式下,由于多个用户发起的导入会被一个内部导入执行,虽然可以计算出每个导入的`filter_ratio`,但是数据一旦进入内部导入,就只能commit transaction - * 但攒批模式支持了一定程度的`max_filter_ratio`语义,当导入的总行数不高于`group_commit_memory_rows_for_max_filter_ratio`(配置在be.conf中,默认为10000行),会把数据缓存起来,计算出真正的`filter_ratio`,如果超过了`max_filter_ratio`,会把数据丢弃,用户导入失败 + * 但group commit模式支持了一定程度的`max_filter_ratio`语义,当导入的总行数不高于`group_commit_memory_rows_for_max_filter_ratio`(配置在be.conf中,默认为10000行),会把数据缓存起来,计算出真正的`filter_ratio`,如果超过了`max_filter_ratio`,会把数据丢弃,用户导入失败 * WAL限制 - * 对于`async_mode`的攒批写入,会把数据写入WAL。如果内部写入成功,则WAL被立刻删除;如果内部导入失败,通过导入WAL的方法来恢复数据 + * 对于`async_mode`的group commit写入,会把数据写入WAL。如果内部写入成功,则WAL被立刻删除;如果内部导入失败,通过导入WAL的方法来恢复数据 * 目前WAL文件只存储在一个BE上,如果这个BE磁盘损坏或文件误删等,可能导入丢失部分数据。 * 当下线BE节点时,请使用[`DECOMMISSION`](../../../sql-manual/sql-reference/Cluster-Management-Statements/ALTER-SYSTEM-DECOMMISSION-BACKEND.md)命令,安全下线节点,防止该节点下线前WAL文件还没有全部处理完成,导致部分数据丢失 - * 对于`async_mode`的攒批写入,如果导入数据过大(超过WAL单目录的80%),或不知道数据量的chunked stream load,为了防止生成的WAL占用太多的磁盘空间,会退化成`sync_mode` + * 对于`async_mode`的group commit写入,如果导入数据过大(超过WAL单目录的80%),或不知道数据量的chunked stream load,为了防止生成的WAL占用太多的磁盘空间,会退化成`sync_mode` - * 为了防止多个小的导入攒到一个内部导入中,导致WAL占用过多的磁盘空间的问题,当总WAL文件大小超过配置阈值(参考相关系统配置中的`group_commit_wal_max_disk_limit`)时,会阻塞攒批写入,直到磁盘空间释放或超时报错 + * 为了防止多个小的导入攒到一个内部导入中,导致WAL占用过多的磁盘空间的问题,当总WAL文件大小超过配置阈值(参考相关系统配置中的`group_commit_wal_max_disk_limit`)时,会阻塞group commit写入,直到磁盘空间释放或超时报错 - * 当发生重量级schema change时,为了保证WAL能够适配表的schema,在schema change最后的fe修改元数据阶段,会拒绝攒批写入,客户端收到`insert table ${table_name} is blocked on schema change`异常,客户端重试即可 + * 当发生重量级schema change时,为了保证WAL能够适配表的schema,在schema change最后的fe修改元数据阶段,会拒绝group commit写入,客户端收到`insert table ${table_name} is blocked on schema change`异常,客户端重试即可 ## 相关系统配置 @@ -406,4 +406,4 @@ ALTER TABLE dt SET ("group_commit_interval_ms"="2000"); + group_commit_wal_max_disk_limit - WAL文件的最大磁盘占用,当总WAL文件大小超过该值时,会阻塞攒批写入,直到磁盘空间释放或超时报错。默认为10%。 + WAL文件的最大磁盘占用,当总WAL文件大小超过该值时,会阻塞group commit写入,直到磁盘空间释放或超时报错。默认为10%。 From a6949a03f37c21757138bd3b298e8d1dec2e447b Mon Sep 17 00:00:00 2001 From: Yongqiang YANG <98214048+dataroaring@users.noreply.github.com> Date: Wed, 20 Dec 2023 17:14:19 +0800 Subject: [PATCH 08/15] Update group-commit-manual.md --- .../import/import-way/group-commit-manual.md | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md b/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md index daf8e56fd10dc0..86c89f7f6ce829 100644 --- a/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md +++ b/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md @@ -345,29 +345,29 @@ ALTER TABLE dt SET ("group_commit_interval_ms"="2000"); ## 使用限制 -* 当开启了group commit模式,系统会判断用户发起的`INSERT INTO VALUES`语句是否符合group commit的条件,如果符合,该语句的执行会进入到group commit写入中。主要的判断逻辑包括: +* 当开启了 group commit 模式,系统会判断用户发起的`INSERT INTO VALUES`语句是否符合group commit的条件,如果符合,该语句的执行会进入到group commit写入中。符合以下条件会自动退化为非 group commit 方式: - + 不是事务写入,即`Begin`; `INSERT INTO VALUES`; `COMMIT`方式 + + 事务写入,即`Begin`; `INSERT INTO VALUES`; `COMMIT`方式 - + 不指定label,即`INSERT INTO dt WITH LABEL {label} VALUES` + + 指定label,即`INSERT INTO dt WITH LABEL {label} VALUES` - + VALUES中不能包含表达式,即`INSERT INTO dt VALUES (1 + 100)` + + VALUES中包含表达式,即`INSERT INTO dt VALUES (1 + 100)` - + 不是列更新写入 + + 列更新写入 -* 当开启了group commit模式,系统会判断用户发起的`Stream Load`和`Http Stream`是否符合group commit的条件,如果符合,该导入的执行会进入到group commit写入中。主要的判断逻辑包括: +* 当开启了group commit模式,系统会判断用户发起的`Stream Load`和`Http Stream`是否符合group commit的条件,如果符合,该导入的执行会进入到group commit写入中。符合以下条件的会自动退化为非 group commit 方式: - + 不是两阶段提交 + + 两阶段提交 - + 不指定label + + 指定label - + 不是列更新写入 + + 列更新写入 * 对`max_filter_ratio`语义的支持 - * 在默认的导入中,`filter_ratio`是导入完成后,通过失败的行数和总行数计算,决定是否commit transaction。 + * 在默认的导入中,`filter_ratio`是导入完成后,通过失败的行数和总行数计算,决定是否提交本次写入。 * 在group commit模式下,由于多个用户发起的导入会被一个内部导入执行,虽然可以计算出每个导入的`filter_ratio`,但是数据一旦进入内部导入,就只能commit transaction From 8531a9d300a28ae36f5504d976b8bcceac19b436 Mon Sep 17 00:00:00 2001 From: Yongqiang YANG <98214048+dataroaring@users.noreply.github.com> Date: Wed, 20 Dec 2023 17:20:08 +0800 Subject: [PATCH 09/15] Update group-commit-manual.md --- .../data-operate/import/import-way/group-commit-manual.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md b/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md index 86c89f7f6ce829..3d51f9936a9860 100644 --- a/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md +++ b/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md @@ -371,7 +371,7 @@ ALTER TABLE dt SET ("group_commit_interval_ms"="2000"); * 在group commit模式下,由于多个用户发起的导入会被一个内部导入执行,虽然可以计算出每个导入的`filter_ratio`,但是数据一旦进入内部导入,就只能commit transaction - * 但group commit模式支持了一定程度的`max_filter_ratio`语义,当导入的总行数不高于`group_commit_memory_rows_for_max_filter_ratio`(配置在be.conf中,默认为10000行),会把数据缓存起来,计算出真正的`filter_ratio`,如果超过了`max_filter_ratio`,会把数据丢弃,用户导入失败 + * 但group commit模式支持了一定程度的`max_filter_ratio`语义,当导入的总行数不高于`group_commit_memory_rows_for_max_filter_ratio`(配置在be.conf中,默认为10000行),`max_filter_ratio` 工作。 * WAL限制 @@ -394,7 +394,7 @@ ALTER TABLE dt SET ("group_commit_interval_ms"="2000"); + group_commit_memory_rows_for_max_filter_ratio - 当导入的总行数不高于该值,会把数据缓存起来,计算出真正的`filter_ratio`,如果超过了`max_filter_ratio`,会把数据丢弃,用户导入失败。默认为10000行。 + 当导入的总行数不高于该值,`max_filter_ratio` 正常工作,否则不工作。 + group_commit_replay_wal_dir From 76321c98d621586660f6ec89e200a62bce95d927 Mon Sep 17 00:00:00 2001 From: Yongqiang YANG <98214048+dataroaring@users.noreply.github.com> Date: Wed, 20 Dec 2023 17:23:30 +0800 Subject: [PATCH 10/15] Update group-commit-manual.md --- .../docs/data-operate/import/import-way/group-commit-manual.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md b/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md index 3d51f9936a9860..633ba2bfcba2f8 100644 --- a/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md +++ b/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md @@ -386,7 +386,7 @@ ALTER TABLE dt SET ("group_commit_interval_ms"="2000"); * 为了防止多个小的导入攒到一个内部导入中,导致WAL占用过多的磁盘空间的问题,当总WAL文件大小超过配置阈值(参考相关系统配置中的`group_commit_wal_max_disk_limit`)时,会阻塞group commit写入,直到磁盘空间释放或超时报错 - * 当发生重量级schema change时,为了保证WAL能够适配表的schema,在schema change最后的fe修改元数据阶段,会拒绝group commit写入,客户端收到`insert table ${table_name} is blocked on schema change`异常,客户端重试即可 + * 当发生重量级 schema change(目前加减列、修改 varchar 长度和重命名列是轻量级 schema change,其它的是重量级 schema change) 时,为了保证WAL能够适配表的 schema,在schema change最后的fe修改元数据阶段,会拒绝group commit写入,客户端收到`insert table ${table_name} is blocked on schema change`异常,客户端重试即可 ## 相关系统配置 From 3fe8161089108f3578320b875f7d8db3d29e7fd7 Mon Sep 17 00:00:00 2001 From: Yongqiang YANG <98214048+dataroaring@users.noreply.github.com> Date: Wed, 20 Dec 2023 17:27:04 +0800 Subject: [PATCH 11/15] Update group-commit-manual.md --- .../docs/data-operate/import/import-way/group-commit-manual.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md b/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md index 633ba2bfcba2f8..32f2c6fcfb6681 100644 --- a/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md +++ b/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md @@ -26,7 +26,7 @@ under the License. # Group Commit -Group Commit 不是一种新的导入方式,而是对`INSERT INTO tbl VALUES(...)`、`Stream Load`、`Http Stream`的扩展,大幅提升了高并发小写入的性能。您的应用程序可以直接使用 JDBC 高频写入数据到 Doris 中,同时使用 PreparedStatement 可以获得更高的性能。日志场景下,您也可以使用`Stream Load`或者`Http Stream`高频写入数据到 Doris 中。 +Group Commit 不是一种新的导入方式,而是对`INSERT INTO tbl VALUES(...)`、`Stream Load`、`Http Stream`的扩展,大幅提升了高并发小写入的性能。您的应用程序可以直接使用 JDBC 将数据高频写入 Doris,同时通过使用 PreparedStatement 可以获得更高的性能。在日志场景下,您也可以利用 Stream Load 或者 Http Stream 将数据高频写入 Doris。 ## Group Commit 模式 From e7ad60ba8257ec76c8d3d897239a589cf40bcfdd Mon Sep 17 00:00:00 2001 From: Yongqiang YANG <98214048+dataroaring@users.noreply.github.com> Date: Wed, 20 Dec 2023 17:30:15 +0800 Subject: [PATCH 12/15] Update group-commit-manual.md --- .../import/import-way/group-commit-manual.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md b/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md index 32f2c6fcfb6681..359684b933f5a8 100644 --- a/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md +++ b/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md @@ -32,17 +32,17 @@ Group Commit 不是一种新的导入方式,而是对`INSERT INTO tbl VALUES(. Group Commit 写入有三种模式,分别是: -* 关闭模式 `off_mode` +* 关闭模式(`off_mode`) 不开启 Group Commit,保持以上三种导入方式的默认行为。 -* 同步模式 `sync_mode` +* 同步模式(`sync_mode`) -Doris 根据负载和表的 `group_commit_interval`属性将多个导入在一个事务提交,事务提交后导入返回。适合高并发且导入完成后要求数据立即可见的场景。 +Doris 根据负载和表的 `group_commit_interval`属性将多个导入在一个事务提交,事务提交后导入返回。这适用于高并发写入场景,且在导入完成后要求数据立即可见。 -* 异步模式 `async_mode` +* 异步模式(`async_mode`) -Doris 首先将数据写入 WAL,然后导入立即返回。Doris 会根据负载和表的`group_commit_interval`属性异步提交数据,提交之后数据可见。单次导入大于 TODO 时,会自动切换为`sync_mode`。适合对写入延迟敏感以及高频写入的场景。 +Doris 首先将数据写入 WAL(Write Ahead Log),然后导入立即返回。Doris 会根据负载和表的`group_commit_interval`属性异步提交数据,提交之后数据可见。单次导入大于 TODO 时,会自动切换为`sync_mode`。这适用于写入延迟敏感以及高频写入的场景。 ## Group Commit 使用方式 From 69aa0e9dd6feabcdd564d1437e3954cd097a4962 Mon Sep 17 00:00:00 2001 From: meiyi Date: Fri, 22 Dec 2023 15:30:23 +0800 Subject: [PATCH 13/15] fix group commit doc zh --- .../docs/admin-manual/config/be-config.md | 11 ++- .../import/import-way/group-commit-manual.md | 69 ++++++++++--------- 2 files changed, 45 insertions(+), 35 deletions(-) diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md b/docs/zh-CN/docs/admin-manual/config/be-config.md index ac8899db4f6e8e..a8ea599b06ddcd 100644 --- a/docs/zh-CN/docs/admin-manual/config/be-config.md +++ b/docs/zh-CN/docs/admin-manual/config/be-config.md @@ -1540,10 +1540,15 @@ load tablets from header failed, failed tablets size: xxx, path=xxx * 描述: BE 是否开启使用java-jni,开启后允许 c++ 与 java 之间的相互调用。目前已经支持hudi、java-udf、jdbc、max-compute、paimon、preload、avro * 默认值: true -#### `group_commit_replay_wal_dir` +#### `group_commit_wal_path` -* 描述: 攒批写入存放WAL文件的目录,请参考 [Group Commit](../../data-operate/import/import-way/group-commit-manual.md) +* 描述: group commit 存放 WAL 文件的目录,请参考 [Group Commit](../../data-operate/import/import-way/group-commit-manual.md) * 默认值: 默认在用户配置的`storage_root_path`的各个目录下创建一个名为`wal`的目录。配置示例: ``` - group_commit_replay_wal_dir=/data1/storage/wal;/data2/storage/wal;/data3/storage/wal + group_commit_wal_path=/data1/storage/wal;/data2/storage/wal;/data3/storage/wal ``` + +#### `group_commit_memory_rows_for_max_filter_ratio` + +* 描述: 当 group commit 导入的总行数不高于该值,`max_filter_ratio` 正常工作,否则不工作,请参考 [Group Commit](../../data-operate/import/import-way/group-commit-manual.md) +* 默认值: 10000 diff --git a/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md b/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md index 359684b933f5a8..6b10ea20b81a6c 100644 --- a/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md +++ b/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md @@ -42,7 +42,7 @@ Doris 根据负载和表的 `group_commit_interval`属性将多个导入在一 * 异步模式(`async_mode`) -Doris 首先将数据写入 WAL(Write Ahead Log),然后导入立即返回。Doris 会根据负载和表的`group_commit_interval`属性异步提交数据,提交之后数据可见。单次导入大于 TODO 时,会自动切换为`sync_mode`。这适用于写入延迟敏感以及高频写入的场景。 +Doris 首先将数据写入 WAL (`Write Ahead Log`),然后导入立即返回。Doris 会根据负载和表的`group_commit_interval`属性异步提交数据,提交之后数据可见。为了防止 WAL 占用较大的磁盘空间,单次导入数据量较大时,会自动切换为`sync_mode`。这适用于写入延迟敏感以及高频写入的场景。 ## Group Commit 使用方式 @@ -246,7 +246,7 @@ curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:sync_mo ### 使用`PreparedStatement` -当用户使用JDBC `insert into values`方式写入时,为了减少 SQL 解析和生成规划的开销, 我们在 FE 端支持了 MySQL 协议的`PreparedStatement`特性。当使用`PreparedStatement`时,SQL 和其导入规划将被缓存到 Session 级别的内存缓存中,后续的导入直接使用缓存对象,降低了 FE 的 CPU 压力。下面是在 JDBC 中使用 PreparedStatement 的例子: +当用户使用 JDBC `insert into values`方式写入时,为了减少 SQL 解析和生成规划的开销, 我们在 FE 端支持了 MySQL 协议的`PreparedStatement`特性。当使用`PreparedStatement`时,SQL 和其导入规划将被缓存到 Session 级别的内存缓存中,后续的导入直接使用缓存对象,降低了 FE 的 CPU 压力。下面是在 JDBC 中使用 PreparedStatement 的例子: 1. 设置 JDBC url 并在 Server 端开启 prepared statement @@ -256,13 +256,13 @@ url = jdbc:mysql://127.0.0.1:9030/db?useServerPrepStmts=true 2. 开启 `group_commit` session变量,有如下两种方式: -* 通过JDBC url设置,增加`sessionVariables=group_commit=async_mode` +* 通过 JDBC url 设置,增加`sessionVariables=group_commit=async_mode` ``` url = jdbc:mysql://127.0.0.1:9030/db?useServerPrepStmts=true&sessionVariables=group_commit=async_mode ``` -* 通过执行SQL设置 +* 通过执行 SQL 设置 ``` try (Statement statement = conn.createStatement()) { @@ -311,7 +311,7 @@ private static void groupCommitInsertBatch() throws Exception { // add rewriteBatchedStatements=true and cachePrepStmts=true in JDBC url // set session variables by sessionVariables=group_commit=async_mode in JDBC url try (Connection conn = DriverManager.getConnection( - String.format(URL_PATTERN + "&rewriteBatchedStatements=true&cachePrepStmts=true&sessionVariables=enable_insert_group_commit=true", HOST, PORT, DB), USER, PASSWD)) { + String.format(URL_PATTERN + "&rewriteBatchedStatements=true&cachePrepStmts=true&sessionVariables=group_commit=async_mode", HOST, PORT, DB), USER, PASSWD)) { String query = "insert into " + TBL + " values(?, ?, ?)"; try (PreparedStatement stmt = conn.prepareStatement(query)) { @@ -336,74 +336,79 @@ private static void groupCommitInsertBatch() throws Exception { ## 修改group commit默认提交间隔 -group commit的默认提交间隔为10秒,用户可以通过修改表的配置,调整group commit的提交间隔: +group commit 的默认提交间隔为 10 秒,用户可以通过修改表的配置,调整 group commit 的提交间隔: ```sql -# 修改提交间隔为2秒 +# 修改提交间隔为 2 秒 ALTER TABLE dt SET ("group_commit_interval_ms"="2000"); ``` ## 使用限制 -* 当开启了 group commit 模式,系统会判断用户发起的`INSERT INTO VALUES`语句是否符合group commit的条件,如果符合,该语句的执行会进入到group commit写入中。符合以下条件会自动退化为非 group commit 方式: +* 当开启了 group commit 模式,系统会判断用户发起的`INSERT INTO VALUES`语句是否符合 group commit 的条件,如果符合,该语句的执行会进入到 group commit 写入中。符合以下条件会自动退化为非 group commit 方式: + 事务写入,即`Begin`; `INSERT INTO VALUES`; `COMMIT`方式 - + 指定label,即`INSERT INTO dt WITH LABEL {label} VALUES` + + 指定 label,即`INSERT INTO dt WITH LABEL {label} VALUES` - + VALUES中包含表达式,即`INSERT INTO dt VALUES (1 + 100)` + + VALUES 中包含表达式,即`INSERT INTO dt VALUES (1 + 100)` + 列更新写入 + + 表不支持 light schema change -* 当开启了group commit模式,系统会判断用户发起的`Stream Load`和`Http Stream`是否符合group commit的条件,如果符合,该导入的执行会进入到group commit写入中。符合以下条件的会自动退化为非 group commit 方式: +* 当开启了 group commit 模式,系统会判断用户发起的`Stream Load`和`Http Stream`是否符合 group commit 的条件,如果符合,该导入的执行会进入到 group commit 写入中。符合以下条件的会自动退化为非 group commit 方式: + 两阶段提交 - + 指定label + + 指定 label + 列更新写入 + + 表不支持 light schema change -* 对`max_filter_ratio`语义的支持 ++ 对于 unique 模型,由于 group commit 不能保证提交顺序,用户可以配合 sequence 列使用 - * 在默认的导入中,`filter_ratio`是导入完成后,通过失败的行数和总行数计算,决定是否提交本次写入。 +* 对`max_filter_ratio`语义的支持 - * 在group commit模式下,由于多个用户发起的导入会被一个内部导入执行,虽然可以计算出每个导入的`filter_ratio`,但是数据一旦进入内部导入,就只能commit transaction + * 在默认的导入中,`filter_ratio`是导入完成后,通过失败的行数和总行数计算,决定是否提交本次写入 - * 但group commit模式支持了一定程度的`max_filter_ratio`语义,当导入的总行数不高于`group_commit_memory_rows_for_max_filter_ratio`(配置在be.conf中,默认为10000行),`max_filter_ratio` 工作。 + * 在 group commit 模式下,由于多个用户发起的导入会被一个内部导入执行,虽然可以计算出每个导入的`filter_ratio`,但是数据一旦进入内部导入,就只能 commit transaction + * group commit 模式支持了一定程度的`max_filter_ratio`语义,当导入的总行数不高于`group_commit_memory_rows_for_max_filter_ratio`(配置在`be.conf`中,默认为`10000`行),`max_filter_ratio` 工作 -* WAL限制 - * 对于`async_mode`的group commit写入,会把数据写入WAL。如果内部写入成功,则WAL被立刻删除;如果内部导入失败,通过导入WAL的方法来恢复数据 +* WAL 限制 - * 目前WAL文件只存储在一个BE上,如果这个BE磁盘损坏或文件误删等,可能导入丢失部分数据。 + * 对于`async_mode`的 group commit 写入,会把数据写入 WAL。如果内部导入成功,则 WAL 被立刻删除;如果内部导入失败,通过导入 WAL 的方法来恢复数据 - * 当下线BE节点时,请使用[`DECOMMISSION`](../../../sql-manual/sql-reference/Cluster-Management-Statements/ALTER-SYSTEM-DECOMMISSION-BACKEND.md)命令,安全下线节点,防止该节点下线前WAL文件还没有全部处理完成,导致部分数据丢失 + * 目前 WAL 文件只存储在一个 BE 上,如果这个 BE 磁盘损坏或文件误删等,可能导入丢失部分数据 - * 对于`async_mode`的group commit写入,如果导入数据过大(超过WAL单目录的80%),或不知道数据量的chunked stream load,为了防止生成的WAL占用太多的磁盘空间,会退化成`sync_mode` + * 当下线 BE 节点时,请使用[`DECOMMISSION`](../../../sql-manual/sql-reference/Cluster-Management-Statements/ALTER-SYSTEM-DECOMMISSION-BACKEND.md)命令,安全下线节点,防止该节点下线前 WAL 文件还没有全部处理完成,导致部分数据丢失 - * 为了防止多个小的导入攒到一个内部导入中,导致WAL占用过多的磁盘空间的问题,当总WAL文件大小超过配置阈值(参考相关系统配置中的`group_commit_wal_max_disk_limit`)时,会阻塞group commit写入,直到磁盘空间释放或超时报错 + * 对于`async_mode`的 group commit 写入,为了保护磁盘空间,当遇到以下情况时,会切换成`sync_mode` - * 当发生重量级 schema change(目前加减列、修改 varchar 长度和重命名列是轻量级 schema change,其它的是重量级 schema change) 时,为了保证WAL能够适配表的 schema,在schema change最后的fe修改元数据阶段,会拒绝group commit写入,客户端收到`insert table ${table_name} is blocked on schema change`异常,客户端重试即可 + * 导入数据量过大,即超过 WAL 单目录的80%空间 -## 相关系统配置 + * 不知道数据量的 chunked stream load -### BE 配置 + * 导入数据量不大,但磁盘可用空间不足 -+ group_commit_memory_rows_for_max_filter_ratio + * 当发生重量级 schema change(目前加减列、修改 varchar 长度和重命名列是轻量级 schema change,其它的是重量级 schema change) 时,为了保证 WAL 能够适配表的 schema,在 schema change 最后的 fe 修改元数据阶段,会拒绝 group commit 写入,客户端收到`insert table ${table_name} is blocked on schema change`异常,客户端重试即可 - 当导入的总行数不高于该值,`max_filter_ratio` 正常工作,否则不工作。 +## 相关系统配置 -+ group_commit_replay_wal_dir +### BE 配置 - 存放WAL文件的目录,默认在用户配置的`storage_root_path`的各个目录下创建一个名为`wal`的目录,如无特殊要求,不需要修改。配置示例: +#### `group_commit_wal_path` +* 描述: group commit 存放 WAL 文件的目录 +* 默认值: 默认在用户配置的`storage_root_path`的各个目录下创建一个名为`wal`的目录。配置示例: ``` - group_commit_replay_wal_dir=/data1/storage/wal;/data2/storage/wal;/data3/storage/wal + group_commit_wal_path=/data1/storage/wal;/data2/storage/wal;/data3/storage/wal ``` -+ group_commit_wal_max_disk_limit +#### `group_commit_memory_rows_for_max_filter_ratio` - WAL文件的最大磁盘占用,当总WAL文件大小超过该值时,会阻塞group commit写入,直到磁盘空间释放或超时报错。默认为10%。 +* 描述: 当 group commit 导入的总行数不高于该值,`max_filter_ratio` 正常工作,否则不工作 +* 默认值: 10000 From 17866e94a92e91077ca15a9b25d29718e52bed1a Mon Sep 17 00:00:00 2001 From: meiyi Date: Fri, 22 Dec 2023 18:25:49 +0800 Subject: [PATCH 14/15] fix group commit doc en --- docs/en/docs/admin-manual/config/be-config.md | 19 +- .../import/import-way/group-commit-manual.md | 228 ++++++++++++++---- .../import/import-way/group-commit-manual.md | 5 +- 3 files changed, 192 insertions(+), 60 deletions(-) diff --git a/docs/en/docs/admin-manual/config/be-config.md b/docs/en/docs/admin-manual/config/be-config.md index 4d5fa52e000e08..e9c709d6e30126 100644 --- a/docs/en/docs/admin-manual/config/be-config.md +++ b/docs/en/docs/admin-manual/config/be-config.md @@ -1511,16 +1511,15 @@ Indicates how many tablets failed to load in the data directory. At the same tim * Description: BE Whether to enable the use of java-jni. When enabled, mutual calls between c++ and java are allowed. Currently supports hudi, java-udf, jdbc, max-compute, paimon, preload, avro * Default value: true -#### `group_commit_interval_ms` +#### `group_commit_wal_path` -* Description: The interval in milliseconds of the internal group commit load job will stop and start a new internal job. See [Group Commit](../../data-operate/import/import-way/group-commit-manual.md) for more details -* Default: 10000 - -#### `group_commit_replay_wal_dir` - -* Description: The `WAL` directory of group commit. See [Group Commit](../../data-operate/import/import-way/group-commit-manual.md) for more details +* The `WAL` directory of group commit. * Default: A directory named `wal` is created under each directory of the `storage_root_path`. Configuration examples: - ``` - group_commit_replay_wal_dir=/data1/storage/wal,/data2/storage/wal,/data3/storage/wal - ``` \ No newline at end of file + group_commit_wal_path=/data1/storage/wal;/data2/storage/wal;/data3/storage/wal + ``` + +#### `group_commit_memory_rows_for_max_filter_ratio` + +* Description: The `max_filter_ratio` limit can only work if the total rows of `group commit` is less than this value. See [Group Commit](../../data-operate/import/import-way/group-commit-manual.md) for more details +* Default: 10000 diff --git a/docs/en/docs/data-operate/import/import-way/group-commit-manual.md b/docs/en/docs/data-operate/import/import-way/group-commit-manual.md index 756a6e7ee774b4..5a26f6c23b5f54 100644 --- a/docs/en/docs/data-operate/import/import-way/group-commit-manual.md +++ b/docs/en/docs/data-operate/import/import-way/group-commit-manual.md @@ -26,27 +26,23 @@ under the License. # Group Commit -Group commit load does not introduce a new import method, but an extension of `INSERT INTO tbl VALUS(...)`、`Stream Load`、`Http Stream`. +Group commit load does not introduce a new import method, but an extension of `INSERT INTO tbl VALUS(...)`, `Stream Load` and `Http Stream`. It is a way to improve the write performance of Doris with high-concurrency and small-data writes. Your application can directly use JDBC to perform high-frequency data writes into Doris, at the same time, leveraging PreparedStatement can get even higher performance. In logging scenarios, you can also use Stream Load or Http Stream to perform high-frequency data writes into Doris. -In Doris, all methods of data loading are independent jobs which initiate a new transaction and generate a new data version. In the scenario of high-frequency writes, both transactions and compactions are under great pressure. Group commit load reduces the number of transactions and compactions by combining multiple small load tasks into one load job, and thus improve write performance. +## Group Commit Mode -The process is roughly as follows: -1. User starts a group commit load, BE puts the data into the memory and WAL, and returns immediately. The data is not visible to users at this time; -2. BE will periodically (default is 10 seconds) commit the data in the memory, and the data is visible to users after committed; -3. If BE restarts, the data will be recovered through WAL. +Group Commit provides 3 modes: -## Fundamental +* `off_mode` -### Write process -1. User starts a group commit load, FE generates a plan fragment; -2. BE executes the plan. Unlike non group commit load, the processed data is not sent to each tablet, but put into a queue in the memory shared by multiple group commit load; -3. BE starts an internal load, which consumes the data in the queue, writes to WAL, and notifies that the data related load has been finished; -4. After that, the data is processed in the same way as non group commit load, send to each tablet, write memtable, and flushed to segment files; -5. The internal load is finished after a fixed time interval (default is 10 seconds), and the data is visible to users when it is committed. +Disable group commit, keep the original behavior for `INSERT INTO VALUES`, `Stream Load` and `Http Stream`. -### WAL Introduction +* `sync_mode` -Each group commit load will generate a corresponding WAL file, which is used to recover failed load jobs. If there is a restart be or fail to run the group commit load during the writing process, be will replay WAL file through a stream load in the background to reimport the data, which can make sure that data is not lost. If the group commit load job is completed normally, the WAL will be directly deleted to reduce disk space usage. +Doris groups multiple loads into one transaction commit based on the `group_commit_interval` property of the table. The load is returned after the transaction commit. This mode is suitable for high-concurrency writing scenarios and requires immediate data visibility after the load is finished. + +* `async_mode` + +Doris writes data to the Write Ahead Log (WAL) firstly, then the load is returned. Doris groups multiple loads into one transaction commit based on the `group_commit_interval` property of the table, and the data becomes visible after the commit. To prevent excessive disk space usage by the WAL, it automatically switches to `sync_mode` when loading a large amount of data. This is suitable for latency-sensitive and high-frequency writing. ## Basic operations @@ -66,9 +62,10 @@ PROPERTIES ( ### INSERT INTO VALUES +* async_mode ```sql -# Config session variable to enable the group commit, the default value is false -mysql> set enable_insert_group_commit = true; +# Config session variable to enable the async group commit, the default value is off_mode +mysql> set group_commit = async_mode; # The retured label is start with 'group_commit', which is the label of the real load job mysql> insert into dt values(1, 'Bob', 90), (2, 'Alice', 99); @@ -96,18 +93,49 @@ mysql> select * from dt; 3 rows in set (0.02 sec) ``` +* sync_mode +```sql +# Config session variable to enable the sync group commit +mysql> set group_commit = sync_mode; + +# The retured label is start with 'group_commit', which is the label of the real load job. +# The insert costs at least the group_commit_interval_ms of table property. +mysql> insert into dt values(4, 'Bob', 90), (5, 'Alice', 99); +Query OK, 2 rows affected (10.06 sec) +{'label':'group_commit_d84ab96c09b60587_ec455a33cb0e9e87', 'status':'PREPARE', 'txnId':'3007', 'query_id':'fc6b94085d704a94-a69bfc9a202e66e2'} + +# The data is visible after the insert is returned +mysql> select * from dt; ++------+-------+-------+ +| id | name | score | ++------+-------+-------+ +| 1 | Bob | 90 | +| 2 | Alice | 99 | +| 3 | John | NULL | +| 4 | Bob | 90 | +| 5 | Alice | 99 | ++------+-------+-------+ +5 rows in set (0.03 sec) +``` + +* off_mode +```sql +mysql> set group_commit = off_mode; +``` + ### Stream Load If the content of `data.csv` is: ```sql -4,Amy,60 -5,Ross,98 +6,Amy,60 +7,Ross,98 ``` +* async_mode ```sql -# Add 'group_commit:true' configuration in the http header +# Add 'group_commit:async_mode' configuration in the http header -curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:true" -H "column_separator:," http://{fe_host}:{http_port}/api/db/dt/_stream_load +curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:async_mode" -H "column_separator:," http://{fe_host}:{http_port}/api/db/dt/_stream_load { "TxnId": 7009, "Label": "group_commit_c84d2099208436ab_96e33fda01eddba8", @@ -130,14 +158,42 @@ curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:true" - # The retured label is start with 'group_commit', which is the label of the real load job ``` +* sync_mode +```sql +# Add 'group_commit:sync_mode' configuration in the http header + +curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:sync_mode" -H "column_separator:," http://{fe_host}:{http_port}/api/db/dt/_stream_load +{ + "TxnId": 3009, + "Label": "group_commit_d941bf17f6efcc80_ccf4afdde9881293", + "Comment": "", + "GroupCommit": true, + "Status": "Success", + "Message": "OK", + "NumberTotalRows": 2, + "NumberLoadedRows": 2, + "NumberFilteredRows": 0, + "NumberUnselectedRows": 0, + "LoadBytes": 19, + "LoadTimeMs": 10044, + "StreamLoadPutTimeMs": 4, + "ReadDataTimeMs": 0, + "WriteDataTimeMs": 10038 +} + +# The returned 'GroupCommit' is 'true', which means this is a group commit load +# The retured label is start with 'group_commit', which is the label of the real load job +``` + See [Stream Load](stream-load-manual.md) for more detailed syntax used by **Stream Load**. ### Http Stream +* async_mode ```sql -# Add 'group_commit:true' configuration in the http header +# Add 'group_commit:async_mode' configuration in the http header -curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:true" -H "sql:insert into db.dt select * from http_stream('column_separator'=',', 'format' = 'CSV')" http://{fe_host}:{http_port}/api/_http_stream +curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:async_mode" -H "sql:insert into db.dt select * from http_stream('column_separator'=',', 'format' = 'CSV')" http://{fe_host}:{http_port}/api/_http_stream { "TxnId": 7011, "Label": "group_commit_3b45c5750d5f15e5_703428e462e1ebb0", @@ -160,6 +216,33 @@ curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:true" - # The retured label is start with 'group_commit', which is the label of the real load job ``` +* sync_mode +```sql +# Add 'group_commit:sync_mode' configuration in the http header + +curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:sync_mode" -H "sql:insert into db.dt select * from http_stream('column_separator'=',', 'format' = 'CSV')" http://{fe_host}:{http_port}/api/_http_stream +{ + "TxnId": 3011, + "Label": "group_commit_fe470e6752aadbe6_a8f3ac328b02ea91", + "Comment": "", + "GroupCommit": true, + "Status": "Success", + "Message": "OK", + "NumberTotalRows": 2, + "NumberLoadedRows": 2, + "NumberFilteredRows": 0, + "NumberUnselectedRows": 0, + "LoadBytes": 19, + "LoadTimeMs": 10066, + "StreamLoadPutTimeMs": 31, + "ReadDataTimeMs": 32, + "WriteDataTimeMs": 10034 +} + +# The returned 'GroupCommit' is 'true', which means this is a group commit load +# The retured label is start with 'group_commit', which is the label of the real load job +``` + See [Stream Load](stream-load-manual.md) for more detailed syntax used by **Http Stream**. ### Use `PreparedStatement` @@ -172,19 +255,19 @@ To reduce the CPU cost of SQL parsing and query planning, we provide the `Prepar url = jdbc:mysql://127.0.0.1:9030/db?useServerPrepStmts=true ``` -2. Enable `enable_insert_group_commit` session variable, there are two ways to do it: +2. Set `group_commit` session variable, there are two ways to do it: -* Add `sessionVariables=enable_insert_group_commit=true` in JDBC url +* Add `sessionVariables=group_commit=async_mode` in JDBC url ``` -url = jdbc:mysql://127.0.0.1:9030/db?useServerPrepStmts=true&sessionVariables=enable_insert_group_commit=true +url = jdbc:mysql://127.0.0.1:9030/db?useServerPrepStmts=true&sessionVariables=group_commit=async_mode ``` -*Use `SET enable_insert_group_commit = true;` command +* Use `SET group_commit = async_mode;` command ``` try (Statement statement = conn.createStatement()) { - statement.execute("SET enable_insert_group_commit = true;"); + statement.execute("SET group_commit = async_mode;"); } ``` @@ -204,9 +287,9 @@ private static final int INSERT_BATCH_SIZE = 10; private static void groupCommitInsert() throws Exception { Class.forName(JDBC_DRIVER); try (Connection conn = DriverManager.getConnection(String.format(URL_PATTERN, HOST, PORT, DB), USER, PASSWD)) { - // enable session variable 'enable_insert_group_commit' + // set session variable 'group_commit' try (Statement statement = conn.createStatement()) { - statement.execute("SET enable_insert_group_commit = true;"); + statement.execute("SET group_commit = async_mode;"); } String query = "insert into " + TBL + " values(?, ?, ?)"; @@ -227,9 +310,9 @@ private static void groupCommitInsert() throws Exception { private static void groupCommitInsertBatch() throws Exception { Class.forName(JDBC_DRIVER); // add rewriteBatchedStatements=true and cachePrepStmts=true in JDBC url - // enable session variables by sessionVariables=enable_insert_group_commit=true in JDBC url + // set session variables by sessionVariables=group_commit=async_mode in JDBC url try (Connection conn = DriverManager.getConnection( - String.format(URL_PATTERN + "&rewriteBatchedStatements=true&cachePrepStmts=true&sessionVariables=enable_insert_group_commit=true", HOST, PORT, DB), USER, PASSWD)) { + String.format(URL_PATTERN + "&rewriteBatchedStatements=true&cachePrepStmts=true&sessionVariables=group_commit=async_mode", HOST, PORT, DB), USER, PASSWD)) { String query = "insert into " + TBL + " values(?, ?, ?)"; try (PreparedStatement stmt = conn.prepareStatement(query)) { @@ -252,30 +335,81 @@ private static void groupCommitInsertBatch() throws Exception { See [Synchronize Data Using Insert Method](../import-scenes/jdbc-load.md) for more details about **JDBC**. -## Relevant system configuration +## Modify the group commit interval + +The default group commit interval is 10 seconds. Users can modify the configuration of the table: -### Session variable +```sql +# Modify the group commit interval to 2 seconds +ALTER TABLE dt SET ("group_commit_interval_ms"="2000"); +``` -+ enable_insert_group_commit +## Limitations - If this configuration is true, FE will judge whether the `INSERT INTO VALUES` can be group commit, the conditions are as follows: - + Not a transaction insert, as `Begin`; `INSERT INTO VALUES`; `COMMIT` - + Not specifying partition, as `INSERT INTO dt PARTITION()` - + Not specifying label, as `INSERT INTO dt WITH LABEL {label} VALUES` - + VALUES does not contain any expression, as `INSERT INTO dt VALUES (1 + 100)` +* When the group commit is enabled, some `INSERT INTO VALUES` sqls are not executed in the group commit way if they meet the following conditions: - The default value is false, use `SET enable_insert_group_commit = true;` command to enable it. + * Transaction insert, such as `BEGIN`, `INSERT INTO VALUES`, `COMMIT` -### BE configuration + * Specify the label, such as `INSERT INTO dt WITH LABEL {label} VALUES` + + * Expressions within VALUES, such as `INSERT INTO dt VALUES (1 + 100)` + + * Column update + + * Tables that do not support light schema changes + +* When the group commit is enabled, some `Stream Load` and `Http Stream` are not executed in the group commit way if they meet the following conditions: + + * Two phase commit -+ group_commit_interval_ms + * Specify the label - The time interval of the internal group commit load job will stop and start a new internal job, the default value is 10000 milliseconds. + * Column update -+ group_commit_replay_wal_dir + * Tables that do not support light schema changes - The directory for storing WAL files. By default, a directory named `wal` is created under each directory of the `storage_root_path`. Users don't need to configure this if there is no special requirement. Configuration examples: +* For unique table, because the group commit can not guarantee the commit order, users can use sequence column to ensure the data consistency. +* The limit of `max_filter_ratio` + + * For non group commit load, filter_ratio is calculated by the failed rows and total rows when load is finished, if the filter_ratio does not match, the transaction will not commit + + * In the group commit mode, multiple user loads are executed through a single internal load. The internal load will commit all user loads. + + * Currently, group commit supports a certain degree of max_filter_ratio semantics. When the total number of rows does not exceed group_commit_memory_rows_for_max_filter_ratio (configured in be.conf, defaulting to 10000 rows), max_filter_ratio will work. + +* The limit of WAL + + * For async_mode group commit, data is written to the Write Ahead Log (WAL). If the internal load succeeds, the WAL is immediately deleted. If the internal load fails, data is recovery by importing the WAL. + + * Currently, WAL files are stored only on one Backend (BE). If the BE's disk is damaged or the file is mistakenly deleted, it may result in the loss of data. + + * When decommissioning a BE node, please use the [`DECOMMISSION`](../../../sql-manual/sql-reference/Cluster-Management-Statements/ALTER-SYSTEM-DECOMMISSION-BACKEND.md) command to safely decommission the node. This prevents potential data loss if the WAL files are not processed before the node is taken offline. + + * For async_mode group commit writes, to protect disk space, it switches to sync_mode under the following conditions: + + * Exceeding 80% of the disk space in a single directory of WAL due to large data import. + + * Chunked stream loads with an unknown data amount. + + * Insufficient disk space, even with a relatively small import. + + * During hard weight schema changes (adding or dropping columns, modifying varchar length, and renaming columns are considered lightweight schema changes, others are hard weight), to ensure WAL file is compatibility with the table's schema, the final stage of metadata modification in FE will reject group commit writes. Clients get `insert table ${table_name} is blocked on schema change` exception and can retry the operation + +## Relevant system configuration + +### BE configuration + +#### `group_commit_wal_path` + +* The `WAL` directory of group commit. +* Default: A directory named `wal` is created under each directory of the `storage_root_path`. Configuration examples: ``` - group_commit_replay_wal_dir=/data1/storage/wal,/data2/storage/wal,/data3/storage/wal + group_commit_wal_path=/data1/storage/wal;/data2/storage/wal;/data3/storage/wal ``` + +#### `group_commit_memory_rows_for_max_filter_ratio` + +* Description: The `max_filter_ratio` limit can only work if the total rows of `group commit` is less than this value. +* Default: 10000 + diff --git a/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md b/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md index 6b10ea20b81a6c..e657b6a6199ae6 100644 --- a/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md +++ b/docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md @@ -254,7 +254,7 @@ curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:sync_mo url = jdbc:mysql://127.0.0.1:9030/db?useServerPrepStmts=true ``` -2. 开启 `group_commit` session变量,有如下两种方式: +2. 配置 `group_commit` session变量,有如下两种方式: * 通过 JDBC url 设置,增加`sessionVariables=group_commit=async_mode` @@ -367,7 +367,7 @@ ALTER TABLE dt SET ("group_commit_interval_ms"="2000"); + 表不支持 light schema change -+ 对于 unique 模型,由于 group commit 不能保证提交顺序,用户可以配合 sequence 列使用 ++ 对于 unique 模型,由于 group commit 不能保证提交顺序,用户可以配合 sequence 列使用来保证数据一致性 * 对`max_filter_ratio`语义的支持 @@ -377,7 +377,6 @@ ALTER TABLE dt SET ("group_commit_interval_ms"="2000"); * group commit 模式支持了一定程度的`max_filter_ratio`语义,当导入的总行数不高于`group_commit_memory_rows_for_max_filter_ratio`(配置在`be.conf`中,默认为`10000`行),`max_filter_ratio` 工作 - * WAL 限制 * 对于`async_mode`的 group commit 写入,会把数据写入 WAL。如果内部导入成功,则 WAL 被立刻删除;如果内部导入失败,通过导入 WAL 的方法来恢复数据 From 24bf6a9d00436790f43a771eb698cfc8fe39e5da Mon Sep 17 00:00:00 2001 From: meiyi Date: Fri, 22 Dec 2023 19:00:20 +0800 Subject: [PATCH 15/15] fix group commit doc en --- .../import/import-way/group-commit-manual.md | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/docs/en/docs/data-operate/import/import-way/group-commit-manual.md b/docs/en/docs/data-operate/import/import-way/group-commit-manual.md index 5a26f6c23b5f54..9b0e007e15b6c0 100644 --- a/docs/en/docs/data-operate/import/import-way/group-commit-manual.md +++ b/docs/en/docs/data-operate/import/import-way/group-commit-manual.md @@ -1,7 +1,7 @@ --- { "title": "Group Commit", - "language": "zh-CN" + "language": "en" } --- @@ -26,7 +26,7 @@ under the License. # Group Commit -Group commit load does not introduce a new import method, but an extension of `INSERT INTO tbl VALUS(...)`, `Stream Load` and `Http Stream`. It is a way to improve the write performance of Doris with high-concurrency and small-data writes. Your application can directly use JDBC to perform high-frequency data writes into Doris, at the same time, leveraging PreparedStatement can get even higher performance. In logging scenarios, you can also use Stream Load or Http Stream to perform high-frequency data writes into Doris. +Group commit load does not introduce a new data import method, but an extension of `INSERT INTO tbl VALUS(...)`, `Stream Load` and `Http Stream`. It is a way to improve the write performance of Doris with high-concurrency and small-data writes. Your application can directly use JDBC to do high-concurrency insert operation into Doris, at the same time, combining PreparedStatement can get even higher performance. In logging scenarios, you can also do high-concurrency Stream Load or Http Stream into Doris. ## Group Commit Mode @@ -38,11 +38,11 @@ Disable group commit, keep the original behavior for `INSERT INTO VALUES`, `Stre * `sync_mode` -Doris groups multiple loads into one transaction commit based on the `group_commit_interval` property of the table. The load is returned after the transaction commit. This mode is suitable for high-concurrency writing scenarios and requires immediate data visibility after the load is finished. +Doris groups multiple loads into one transaction commit based on the `group_commit_interval` table property. The load is returned after the transaction commit. This mode is suitable for high-concurrency writing scenarios and requires immediate data visibility after the load is finished. * `async_mode` -Doris writes data to the Write Ahead Log (WAL) firstly, then the load is returned. Doris groups multiple loads into one transaction commit based on the `group_commit_interval` property of the table, and the data becomes visible after the commit. To prevent excessive disk space usage by the WAL, it automatically switches to `sync_mode` when loading a large amount of data. This is suitable for latency-sensitive and high-frequency writing. +Doris writes data to the Write Ahead Log (WAL) firstly, then the load is returned. Doris groups multiple loads into one transaction commit based on the `group_commit_interval` table property, and the data is visible after the commit. To prevent excessive disk space usage by the WAL, it automatically switches to `sync_mode`. This is suitable for latency-sensitive and high-frequency writing. ## Basic operations @@ -372,29 +372,29 @@ ALTER TABLE dt SET ("group_commit_interval_ms"="2000"); * The limit of `max_filter_ratio` - * For non group commit load, filter_ratio is calculated by the failed rows and total rows when load is finished, if the filter_ratio does not match, the transaction will not commit + * For non group commit load, filter_ratio is calculated by the failed rows and total rows when load is finished. If the filter_ratio does not match, the transaction will not commit - * In the group commit mode, multiple user loads are executed through a single internal load. The internal load will commit all user loads. + * In the group commit mode, multiple user loads are executed by one internal load. The internal load will commit all user loads. - * Currently, group commit supports a certain degree of max_filter_ratio semantics. When the total number of rows does not exceed group_commit_memory_rows_for_max_filter_ratio (configured in be.conf, defaulting to 10000 rows), max_filter_ratio will work. + * Currently, group commit supports a certain degree of max_filter_ratio semantics. When the total number of rows does not exceed group_commit_memory_rows_for_max_filter_ratio (configured in `be.conf`, defaulting to `10000` rows), max_filter_ratio will work. * The limit of WAL * For async_mode group commit, data is written to the Write Ahead Log (WAL). If the internal load succeeds, the WAL is immediately deleted. If the internal load fails, data is recovery by importing the WAL. - * Currently, WAL files are stored only on one Backend (BE). If the BE's disk is damaged or the file is mistakenly deleted, it may result in the loss of data. + * Currently, WAL files are stored only on one disk of one BE. If the BE's disk is damaged or the file is mistakenly deleted, it may result in data loss. * When decommissioning a BE node, please use the [`DECOMMISSION`](../../../sql-manual/sql-reference/Cluster-Management-Statements/ALTER-SYSTEM-DECOMMISSION-BACKEND.md) command to safely decommission the node. This prevents potential data loss if the WAL files are not processed before the node is taken offline. * For async_mode group commit writes, to protect disk space, it switches to sync_mode under the following conditions: - * Exceeding 80% of the disk space in a single directory of WAL due to large data import. + * For an import with large amount of data: exceeding 80% of the disk space of a WAL directory. * Chunked stream loads with an unknown data amount. - * Insufficient disk space, even with a relatively small import. + * Insufficient disk space, even with it is an import with small amount of data. - * During hard weight schema changes (adding or dropping columns, modifying varchar length, and renaming columns are considered lightweight schema changes, others are hard weight), to ensure WAL file is compatibility with the table's schema, the final stage of metadata modification in FE will reject group commit writes. Clients get `insert table ${table_name} is blocked on schema change` exception and can retry the operation + * During hard weight schema changes (adding or dropping columns, modifying varchar length, and renaming columns are lightweight schema changes, others are hard weight), to ensure WAL file is compatibility with the table's schema, the final stage of metadata modification in FE will reject group commit writes. Clients get `insert table ${table_name} is blocked on schema change` exception and can retry the import. ## Relevant system configuration