[Feature] Flink Doris Connector (#5372)#5375
Conversation
| this.connectTimeout, this.socketTimeout, this.retries); | ||
| open(); | ||
| } | ||
|
|
There was a problem hiding this comment.
You need to write some annotation
There was a problem hiding this comment.
Modification, submitted soon, and added flink SQL to create doris table
| logger.warn(ErrorMessages.CONNECT_FAILED_MESSAGE, routing, e); | ||
| ex = e; | ||
| } | ||
| if (isConnected) { |
There was a problem hiding this comment.
This is better to be in try {}(just below isConnected = true;)
There was a problem hiding this comment.
Modification, submitted soon, and added flink SQL to create doris table
| logger.info("Success connect to {}.", routing); | ||
| break; | ||
| } | ||
| } |
There was a problem hiding this comment.
it is better to return just after isConnected = true
There was a problem hiding this comment.
Modification, submitted soon, and added flink SQL to create doris table
| private void close() { | ||
| logger.trace("Connect status before close with '{}' is '{}'.", routing, isConnected); | ||
| isConnected = false; | ||
| if (null != client) { |
There was a problem hiding this comment.
You should close it before set client null
| logger.debug("CloseScanner to '{}', parameter is '{}'.", routing, closeParams); | ||
| if (!isConnected) { | ||
| try { | ||
| open(); |
There was a problem hiding this comment.
Why open here?this will create a new client, do nothing, then close it?
| } | ||
| return false; | ||
| } | ||
|
|
|
Modification, submitted soon, and added flink SQL to create doris table |
…nnector-jdbc, add FlinkSql to create Doris table, and write to Doris table through streamload[Fix]
|
Refer to User-defined Sources & Sinks And flink-connector-jdbc, add flink sql to create doris table, and write doris table through streamload。 eg: EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.getConfig().getConfiguration().setString("job.name","test");
tEnv.executeSql(
"CREATE TABLE doris_test (" +
"name STRING," +
"age INT," +
"price DECIMAL(5,2)," +
"sale DOUBLE" +
") " +
"WITH (\n" +
" 'connector' = 'doris',\n" +
" 'fenodes' = 'FE_IP:8030',\n" +
" 'table.identifier' = 'demo.doris_test_source_2',\n" +
" 'username' = 'root',\n" +
" 'password' = ''" +
")");
tEnv.executeSql(
"CREATE TABLE doris_test_sink (" +
"name STRING," +
"age INT," +
"price DECIMAL(5,2)," +
"sale DOUBLE" +
") " +
"WITH (\n" +
" 'connector' = 'doris',\n" +
" 'fenodes' = 'FE_IP:8030',\n" +
" 'table.identifier' = 'demo.doris_test_sink_2',\n" +
" 'username' = 'root',\n" +
" 'password' = ''\n" +
")");
tEnv.executeSql("INSERT INTO doris_test_sink select name,age,price,sale from doris_test"); |
| } | ||
|
|
||
| @Override | ||
| public void writeRecord(RowData row) throws IOException { |
There was a problem hiding this comment.
We need to buffer a batch of rows before triggering stream load
|
Added batch.size and max-retries configuration items, can set the batch size and the number of retries |
|
add build script |
|
It is recommended that you add usage methods and usage examples to the document later. |
already add. |
| --> | ||
|
|
||
| # Flink Doris Connector | ||
|
|
| sh build.sh | ||
| ``` | ||
|
|
||
| 编译成功后,会在 `output/` 目录下生成文件 `doris-flink-1.0.0-SNAPSHOT.jar`。将此文件复制到 `Flink` 的 `ClassPath` 中即可使用 `Flink-Doris-Connector`。例如,`Local` 模式运行的 `Flink`,将此文件放入 `jars/` 文件夹下。`Yarn`集群模式运行的`Flink`,则将此文件放入预部署包中。 |
There was a problem hiding this comment.
我尝试编译了一下,编译完成后发现有两个jar包。一个叫 doris-flink-1.0-SNAPSHOT.jar, 一个叫original-doris-flink-1.0-SNAPSHOT.jar。这个 original-doris-flink-1.0-SNAPSHOT.jar 是干啥用的吗?
There was a problem hiding this comment.
已删除 original-doris-flink-1.0-SNAPSHOT.jar
| | username | -- | 访问Doris的用户名 | | ||
| | password | -- | 访问Doris的密码 | | ||
| | sink.batch.size | 100 | 单次写BE的最大行数 | | ||
| | sink.max-retries | 1 | 写BE失败之后的重试次数 | |
There was a problem hiding this comment.
看起来可以调整的参数好像不止这些,可以后续把参数的含义都补充一下。比如mem limit这种。
There was a problem hiding this comment.
参考spark-connector的参数,修改中,最近就会提交
|
|
||
| @Override | ||
| public String toString() { | ||
| return "RespContent{" + |
There was a problem hiding this comment.
Json tostring is better?
There was a problem hiding this comment.
Modification, submitted soon
| DataStreamSource<String> source = env.fromElements("[{\"name\":\"doris\"}]\t1"); | ||
| tEnv.createTemporaryView("doris_test",source,$("name")); | ||
|
|
||
| // Table wordWithCount = tEnv.sqlQuery("select name FROM doris_test "); |
There was a problem hiding this comment.
Modification, submitted soon
| } | ||
|
|
||
| public void convertArrowToRowBatch() throws DorisException { | ||
| try { |
There was a problem hiding this comment.
This method is worth having a unit test ~
There was a problem hiding this comment.
Modification, submitted soon
| new ByteArrayInputStream(nextResult.getRows()), | ||
| rootAllocator | ||
| ); | ||
| this.offsetInRowBatch = 0; |
There was a problem hiding this comment.
Although I don’t know whether it’s appropriate to put this while and exception logic in the constructor,
but could you please changed to a more elegant way of writing.
There was a problem hiding this comment.
Ok, it will be modified soon
Co-authored-by: EmmyMiao87 <522274284@qq.com>
Co-authored-by: EmmyMiao87 <522274284@qq.com>
Co-authored-by: EmmyMiao87 <522274284@qq.com>
…is modified to BE, and re-acquire other surviving BE nodes after failure
|
1.Merge DorisOptions and Settings configuration ; |
|
add pom profile patch |
|
Failed to get response from Doris http://10.0.10.28:8030/rest/v1/system?path=//backends, http code is 404 |
enable http v2 |
|
enable_http_server_v2 ? |
yes,enable_http_server_v2= true |
|
my doris version 0.15.0 rc02,enable_http_server_v2 is true default?but failed to get response from doris http://10.0.10.28:8030/api/backends?is_alive=true |
Proposed changes
Refer to Spark connector Doris and make an example of Flink Connector Doris
1.Customize DorisSource to implement interface org.apache.flink.streaming.api.functions.source.RichSourceFunction
2.Get dorisPartitions by calling the FE execution plan Api during initialization
3.Traverse dorisPartitions, read Doris data through scalaValueReader.class
Types of changes
What types of changes does your code introduce to Doris?
Put an
xin the boxes that applyChecklist
Put an
xin the boxes that apply. You can also fill these out after creating the PR. If you're unsure about any of them, don't hesitate to ask. We're here to help! This is simply a reminder of what we are going to look for before merging your code.I have created an issue on (Fix #5371) and described the bug/feature there in detail
Compiling and unit tests pass locally with my changes
I have added tests that prove my fix is effective or that my feature works
If these changes need document changes, I have updated the document
Any dependent changes have been merged
Further comments
If this is a relatively large or complex change, kick off the discussion at dev@doris.apache.org by explaining why you chose the solution you did and what alternatives you considered, etc...