-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[Feature] Flink Doris Connector (#5372) #5375
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
32b1be4
977d470
130727c
d830c40
ad1c85d
3cac5c6
00b7219
082fee2
48b5d58
7c301ba
5a89d0a
c625849
a1463a4
ab412cf
c96a33a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,143 @@ | ||
| --- | ||
| { | ||
| "title": "Flink Doris Connector", | ||
| "language": "en" | ||
| } | ||
| --- | ||
|
|
||
| <!-- | ||
| Licensed to the Apache Software Foundation (ASF) under one | ||
| or more contributor license agreements. See the NOTICE file | ||
| distributed with this work for additional information | ||
| regarding copyright ownership. The ASF licenses this file | ||
| to you under the Apache License, Version 2.0 (the | ||
| "License"); you may not use this file except in compliance | ||
| with the License. You may obtain a copy of the License at | ||
|
|
||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
|
|
||
| Unless required by applicable law or agreed to in writing, | ||
| software distributed under the License is distributed on an | ||
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| KIND, either express or implied. See the License for the | ||
| specific language governing permissions and limitations | ||
| under the License. | ||
| --> | ||
|
|
||
| # Flink Doris Connector | ||
|
|
||
| Flink Doris Connector can support reading data stored in Doris through Flink. | ||
|
|
||
| - You can map the `Doris` table to` DataStream` or `Table`. | ||
|
|
||
| ## Version Compatibility | ||
|
|
||
| | Connector | Flink | Doris | Java | Scala | | ||
| | --------- | ----- | ------ | ---- | ----- | | ||
| | 1.0.0 | 1.11.2 | 0.13+ | 8 | 2.12 | | ||
|
|
||
|
|
||
| ## Build and Install | ||
|
|
||
| Execute following command in dir `extension/flink-doris-connector/`: | ||
|
|
||
| ```bash | ||
| sh build.sh | ||
| ``` | ||
|
|
||
| After successful compilation, the file `doris-flink-1.0.0-SNAPSHOT.jar` will be generated in the `output/` directory. Copy this file to `ClassPath` in `Flink` to use `Flink-Doris-Connector`. For example, `Flink` running in `Local` mode, put this file in the `jars/` folder. `Flink` running in `Yarn` cluster mode, put this file in the pre-deployment package. | ||
|
|
||
| ## How to use | ||
| The purpose of this step is to register the Doris data source on Flink. | ||
| This step is operated on Flink. | ||
| There are two ways to use sql and java. The following are examples to illustrate | ||
| ### SQL | ||
| The purpose of this step is to register the Doris data source on Flink. | ||
| This step is operated on Flink | ||
| ```sql | ||
| CREATE TABLE flink_doris_source ( | ||
| name STRING, | ||
| age INT, | ||
| price DECIMAL(5,2), | ||
| sale DOUBLE | ||
| ) | ||
| WITH ( | ||
| 'connector' = 'doris', | ||
| 'fenodes' = '$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT', | ||
| 'table.identifier' = '$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME', | ||
| 'username' = '$YOUR_DORIS_USERNAME', | ||
| 'password' = '$YOUR_DORIS_PASSWORD' | ||
| ); | ||
|
|
||
| CREATE TABLE flink_doris_sink ( | ||
| name STRING, | ||
| age INT, | ||
| price DECIMAL(5,2), | ||
| sale DOUBLE | ||
| ) | ||
| WITH ( | ||
| 'connector' = 'doris', | ||
| 'fenodes' = '$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT', | ||
| 'table.identifier' = '$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME', | ||
| 'username' = '$YOUR_DORIS_USERNAME', | ||
| 'password' = '$YOUR_DORIS_PASSWORD' | ||
| ); | ||
|
|
||
| INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source | ||
| ``` | ||
|
|
||
| ### DataStream | ||
|
|
||
| ```scala | ||
| Properties properties = new Properties(); | ||
| properties.put("fenodes","FE_IP:8030"); | ||
| properties.put("username","root"); | ||
| properties.put("password",""); | ||
| properties.put("table.identifier","db.table"); | ||
| env.addSource(new DorisSourceFunction(new DorisStreamOptions(properties),new SimpleListDeserializationSchema())).print(); | ||
| ``` | ||
|
|
||
| ### General | ||
|
|
||
| | Key | Default Value | Comment | | ||
| | -------------------------------- | ----------------- | ------------------------------------------------------------ | | ||
| | fenodes | -- | Doris FE http address, support multiple addresses, separated by commas | | ||
| | table.identifier | -- | Doris table identifier, eg, db1.tbl1 | | ||
| | username | -- | Doris username | | ||
| | password | -- | Doris password | | ||
| | doris.request.retries | 3 | Number of retries to send requests to Doris | | ||
| | doris.request.connect.timeout.ms | 30000 | Connection timeout for sending requests to Doris | | ||
| | doris.request.read.timeout.ms | 30000 | Read timeout for sending request to Doris | | ||
| | doris.request.query.timeout.s | 3600 | Query the timeout time of doris, the default is 1 hour, -1 means no timeout limit | | ||
| | doris.request.tablet.size | Integer.MAX_VALUE | The number of Doris Tablets corresponding to an RDD Partition. The smaller this value is set, the more partitions will be generated. This will increase the parallelism on the Spark side, but at the same time will cause greater pressure on Doris. | | ||
| | doris.batch.size | 1024 | The maximum number of rows to read data from BE at one time. Increasing this value can reduce the number of connections between Spark and Doris. Thereby reducing the extra time overhead caused by network delay. | | ||
| | doris.exec.mem.limit | 2147483648 | Memory limit for a single query. The default is 2GB, in bytes. | | ||
| | doris.deserialize.arrow.async | false | Whether to support asynchronous conversion of Arrow format to RowBatch required for spark-doris-connector iteration | | ||
| | doris.deserialize.queue.size | 64 | Asynchronous conversion of the internal processing queue in Arrow format takes effect when doris.deserialize.arrow.async is true | | ||
| | doris.read.field | -- | List of column names in the Doris table, separated by commas | | ||
| | doris.filter.query | -- | Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering. | | ||
| | sink.batch.size | 100 | Maximum number of lines in a single write BE | | ||
| | sink.max-retries | 1 | Number of retries after writing BE failed | | ||
|
|
||
|
|
||
| ## Doris & Flink Column Type Mapping | ||
|
|
||
| | Doris Type | Flink Type | | ||
| | ---------- | -------------------------------- | | ||
| | NULL_TYPE | NULL | | ||
| | BOOLEAN | BOOLEAN | | ||
| | TINYINT | TINYINT | | ||
| | SMALLINT | SMALLINT | | ||
| | INT | INT | | ||
| | BIGINT | BIGINT | | ||
| | FLOAT | FLOAT | | ||
| | DOUBLE | DOUBLE | | ||
| | DATE | STRING | | ||
| | DATETIME | STRING | | ||
| | DECIMAL | DECIMAL | | ||
| | CHAR | STRING | | ||
| | LARGEINT | STRING | | ||
| | VARCHAR | STRING | | ||
| | DECIMALV2 | DECIMAL | | ||
| | TIME | DOUBLE | | ||
| | HLL | Unsupported datatype | |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,147 @@ | ||
| --- | ||
| { | ||
| "title": "Flink Doris Connector", | ||
| "language": "zh-CN" | ||
| } | ||
| --- | ||
|
|
||
| <!-- | ||
| Licensed to the Apache Software Foundation (ASF) under one | ||
| or more contributor license agreements. See the NOTICE file | ||
| distributed with this work for additional information | ||
| regarding copyright ownership. The ASF licenses this file | ||
| to you under the Apache License, Version 2.0 (the | ||
| "License"); you may not use this file except in compliance | ||
| with the License. You may obtain a copy of the License at | ||
|
|
||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
|
|
||
| Unless required by applicable law or agreed to in writing, | ||
| software distributed under the License is distributed on an | ||
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| KIND, either express or implied. See the License for the | ||
| specific language governing permissions and limitations | ||
| under the License. | ||
| --> | ||
|
|
||
| # Flink Doris Connector | ||
|
|
||
| Flink Doris Connector 可以支持通过 Flink 读取 Doris 中存储的数据。 | ||
|
|
||
| - 可以将`Doris`表映射为`DataStream`或者`Table`。 | ||
|
|
||
| ## 版本兼容 | ||
|
|
||
| | Connector | Flink | Doris | Java | Scala | | ||
| | --------- | ----- | ------ | ---- | ----- | | ||
| | 1.0.0 | 1.11.2 | 0.13+ | 8 | 2.12 | | ||
|
|
||
|
|
||
| ## 编译与安装 | ||
|
|
||
| 在 `extension/flink-doris-connector/` 源码目录下执行: | ||
|
|
||
| ```bash | ||
| sh build.sh | ||
| ``` | ||
|
|
||
| 编译成功后,会在 `output/` 目录下生成文件 `doris-flink-1.0.0-SNAPSHOT.jar`。将此文件复制到 `Flink` 的 `ClassPath` 中即可使用 `Flink-Doris-Connector`。例如,`Local` 模式运行的 `Flink`,将此文件放入 `jars/` 文件夹下。`Yarn`集群模式运行的`Flink`,则将此文件放入预部署包中。 | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 我尝试编译了一下,编译完成后发现有两个jar包。一个叫 doris-flink-1.0-SNAPSHOT.jar, 一个叫original-doris-flink-1.0-SNAPSHOT.jar。这个 original-doris-flink-1.0-SNAPSHOT.jar 是干啥用的吗?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 已删除 original-doris-flink-1.0-SNAPSHOT.jar |
||
|
|
||
| ## 使用示例 | ||
| 此步骤的目的是在Flink上注册Doris数据源。 | ||
| 此步骤在Flink上进行。 | ||
| 有两种使用sql和java的方法。 以下是示例说明 | ||
| ### SQL | ||
| 此步骤的目的是在Flink上注册Doris数据源。 | ||
| 此步骤在Flink上进行。 | ||
| ```sql | ||
| CREATE TABLE flink_doris_source ( | ||
| name STRING, | ||
| age INT, | ||
| price DECIMAL(5,2), | ||
| sale DOUBLE | ||
| ) | ||
| WITH ( | ||
| 'connector' = 'doris', | ||
| 'fenodes' = '$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT', | ||
| 'table.identifier' = '$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME', | ||
| 'username' = '$YOUR_DORIS_USERNAME', | ||
| 'password' = '$YOUR_DORIS_PASSWORD' | ||
| ); | ||
|
|
||
| CREATE TABLE flink_doris_sink ( | ||
| name STRING, | ||
| age INT, | ||
| price DECIMAL(5,2), | ||
| sale DOUBLE | ||
| ) | ||
| WITH ( | ||
| 'connector' = 'doris', | ||
| 'fenodes' = '$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT', | ||
| 'table.identifier' = '$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME', | ||
| 'username' = '$YOUR_DORIS_USERNAME', | ||
| 'password' = '$YOUR_DORIS_PASSWORD' | ||
| ); | ||
|
|
||
| INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source | ||
| ``` | ||
|
|
||
| ### DataStream | ||
|
|
||
| ```java | ||
| Properties properties = new Properties(); | ||
| properties.put("fenodes","FE_IP:8030"); | ||
| properties.put("username","root"); | ||
| properties.put("password",""); | ||
| properties.put("table.identifier","db.table"); | ||
| env.addSource(new DorisSourceFunction(new DorisStreamOptions(properties),new SimpleListDeserializationSchema())).print(); | ||
| ``` | ||
|
|
||
|
|
||
| ## 配置 | ||
|
|
||
| ### 通用配置项 | ||
|
|
||
| | Key | Default Value | Comment | | ||
| | -------------------------------- | ----------------- | ------------------------------------------------------------ | | ||
| | fenodes | -- | Doris FE http 地址 | | ||
| | table.identifier | -- | Doris 表名,如:db1.tbl1 | | ||
| | username | -- | 访问Doris的用户名 | | ||
| | password | -- | 访问Doris的密码 | | ||
| | doris.request.retries | 3 | 向Doris发送请求的重试次数 | | ||
| | doris.request.connect.timeout.ms | 30000 | 向Doris发送请求的连接超时时间 | | ||
| | doris.request.read.timeout.ms | 30000 | 向Doris发送请求的读取超时时间 | | ||
| | doris.request.query.timeout.s | 3600 | 查询doris的超时时间,默认值为1小时,-1表示无超时限制 | | ||
| | doris.request.tablet.size | Integer.MAX_VALUE | 一个RDD Partition对应的Doris Tablet个数。<br />此数值设置越小,则会生成越多的Partition。从而提升Spark侧的并行度,但同时会对Doris造成更大的压力。 | | ||
| | doris.batch.size | 1024 | 一次从BE读取数据的最大行数。增大此数值可减少Spark与Doris之间建立连接的次数。<br />从而减轻网络延迟所带来的的额外时间开销。 | | ||
| | doris.exec.mem.limit | 2147483648 | 单个查询的内存限制。默认为 2GB,单位为字节 | | ||
| | doris.deserialize.arrow.async | false | 是否支持异步转换Arrow格式到spark-doris-connector迭代所需的RowBatch | | ||
| | doris.deserialize.queue.size | 64 | 异步转换Arrow格式的内部处理队列,当doris.deserialize.arrow.async为true时生效 | | ||
| | doris.read.field | -- | 读取Doris表的列名列表,多列之间使用逗号分隔 | | ||
| | doris.filter.query | -- | 过滤读取数据的表达式,此表达式透传给Doris。Doris使用此表达式完成源端数据过滤。 | | ||
| | sink.batch.size | 100 | 单次写BE的最大行数 | | ||
| | sink.max-retries | 1 | 写BE失败之后的重试次数 | | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 看起来可以调整的参数好像不止这些,可以后续把参数的含义都补充一下。比如mem limit这种。
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 参考spark-connector的参数,修改中,最近就会提交 |
||
|
|
||
|
|
||
|
|
||
| ## Doris 和 Flink 列类型映射关系 | ||
|
|
||
| | Doris Type | Flink Type | | ||
| | ---------- | -------------------------------- | | ||
| | NULL_TYPE | NULL | | ||
| | BOOLEAN | BOOLEAN | | ||
| | TINYINT | TINYINT | | ||
| | SMALLINT | SMALLINT | | ||
| | INT | INT | | ||
| | BIGINT | BIGINT | | ||
| | FLOAT | FLOAT | | ||
| | DOUBLE | DOUBLE | | ||
| | DATE | STRING | | ||
| | DATETIME | STRING | | ||
| | DECIMAL | DECIMAL | | ||
| | CHAR | STRING | | ||
| | LARGEINT | STRING | | ||
| | VARCHAR | STRING | | ||
| | DECIMALV2 | DECIMAL | | ||
| | TIME | DOUBLE | | ||
| | HLL | Unsupported datatype | | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,60 @@ | ||
| #!/usr/bin/env bash | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, | ||
| # software distributed under the License is distributed on an | ||
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
|
|
||
| ############################################################## | ||
| # This script is used to compile Flink-Doris-Connector | ||
| # Usage: | ||
| # sh build.sh | ||
| # | ||
| ############################################################## | ||
|
|
||
| set -eo pipefail | ||
|
|
||
| ROOT=`dirname "$0"` | ||
| ROOT=`cd "$ROOT"; pwd` | ||
|
|
||
| export DORIS_HOME=${ROOT}/../../ | ||
|
|
||
| # include custom environment variables | ||
| if [[ -f ${DORIS_HOME}/custom_env.sh ]]; then | ||
| . ${DORIS_HOME}/custom_env.sh | ||
| fi | ||
|
|
||
| # check maven | ||
| MVN_CMD=mvn | ||
| if [[ ! -z ${CUSTOM_MVN} ]]; then | ||
| MVN_CMD=${CUSTOM_MVN} | ||
| fi | ||
| if ! ${MVN_CMD} --version; then | ||
| echo "Error: mvn is not found" | ||
| exit 1 | ||
| fi | ||
| export MVN_CMD | ||
|
|
||
| ${MVN_CMD} clean package | ||
|
|
||
|
|
||
| mkdir -p output/ | ||
| cp target/doris-flink-1.0-SNAPSHOT.jar ./output/ | ||
|
|
||
| echo "*****************************************" | ||
| echo "Successfully build Flink-Doris-Connector" | ||
| echo "*****************************************" | ||
|
|
||
| exit 0 | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
中文文档修改意见同英文