From 6bf59a79c4409b9a4c65ccc9f52dd0089e62709e Mon Sep 17 00:00:00 2001
From: huzekang <1040080742@qq.com>
Date: Mon, 28 Jun 2021 21:10:49 +0800
Subject: [PATCH 1/5] feat: implementation datax doris writer plugin
---
.../datax-doriswriter/doc/doriswriter.md | 147 +++++++++++
extension/datax-doriswriter/pom.xml | 86 +++++++
.../src/main/assembly/package.xml | 35 +++
.../writer/doriswriter/DorisFlushBatch.java | 28 ++
.../writer/doriswriter/DorisJsonCodec.java | 69 +++++
.../writer/doriswriter/DorisWriter.java | 242 ++++++++++++++++++
.../doriswriter/DorisWriterEmitter.java | 158 ++++++++++++
.../datax/plugin/writer/doriswriter/Key.java | 104 ++++++++
.../src/main/resources/plugin.json | 6 +
.../main/resources/plugin_job_template.json | 15 ++
10 files changed, 890 insertions(+)
create mode 100644 extension/datax-doriswriter/doc/doriswriter.md
create mode 100644 extension/datax-doriswriter/pom.xml
create mode 100644 extension/datax-doriswriter/src/main/assembly/package.xml
create mode 100644 extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java
create mode 100644 extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java
create mode 100644 extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java
create mode 100644 extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java
create mode 100644 extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java
create mode 100644 extension/datax-doriswriter/src/main/resources/plugin.json
create mode 100644 extension/datax-doriswriter/src/main/resources/plugin_job_template.json
diff --git a/extension/datax-doriswriter/doc/doriswriter.md b/extension/datax-doriswriter/doc/doriswriter.md
new file mode 100644
index 00000000000000..e58eff24357efe
--- /dev/null
+++ b/extension/datax-doriswriter/doc/doriswriter.md
@@ -0,0 +1,147 @@
+# DorisWriter 插件文档
+
+## 1 快速介绍
+DorisWriter支持将大批量数据写入Doris中。
+
+## 2 实现原理
+DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter会将`reader`读取的数据进行缓存在内存中,拼接成Json文本,然后批量导入至Doris。
+
+## 3 功能说明
+
+### 3.1 配置样例
+
+这里是一份从Stream读取数据后导入至Doris的配置文件。
+
+```
+{
+ "job": {
+ "setting": {
+ "speed": {
+ "channel": 1
+ },
+ "errorLimit": {
+ "record": 0,
+ "percentage": 0
+ }
+ },
+ "content": [
+ {
+ "reader": {
+ "name": "streamreader",
+ "parameter": {
+ "column": [
+ {
+ "value": "皮蛋1",
+ "type": "string"
+ },
+ {
+ "value": "皮蛋2",
+ "type": "string"
+ },
+ {
+ "value": "111",
+ "type": "long"
+ },
+ {
+ "value": "222",
+ "type": "long"
+ }
+ ],
+ "sliceRecordCount": 100
+ }
+ },
+ "writer": {
+ "name": "doriswriter",
+ "parameter": {
+ "username": "dxx",
+ "password": "123456",
+ "database": "test",
+ "table": "datax_test",
+ "column": [
+ "k1",
+ "k2",
+ "v1",
+ "v2"
+ ],
+ "preSql": [],
+ "postSql": [],
+ "jdbcUrl": "jdbc:mysql://10.93.6.247:9030/",
+ "beLoadUrl": [
+ "10.93.6.167:8041"
+ ],
+ "loadProps": {
+ }
+ }
+ }
+ }
+ ]
+ }
+}
+```
+
+
+
+### 3.2 参数说明
+
+* **username**
+
+ - 描述:访问Doris数据库的用户名
+ - 必选:是
+ - 默认值:无
+
+* **password**
+
+ - 描述:访问Doris数据库的密码
+ - 必选:是
+ - 默认值:无
+
+* **database**
+
+ - 描述:访问Doris表的数据库名称。
+ - 必选:是
+ - 默认值:无
+
+* **table**
+
+ - 描述:访问Doris表的表名称。
+ - 必选:是
+ - 默认值:无
+
+* **beLoadUrl**
+
+ - 描述:Doris BE的地址用于Stream load,可以为多个BE地址,形如`BE_ip:Be_webserver_port`。
+ - 必选:是
+ - 默认值:无
+
+* **column**
+
+ - 描述:目的表**需要写入数据**的字段,字段之间用英文逗号分隔。例如: "column": ["id","name","age"]。
+ - 必选:是
+ - 默认值:否
+
+* **preSql**
+
+ - 描述:写入数据到目的表前,会先执行这里的标准语句。
+ - 必选:否
+ - 默认值:无
+
+* **postSql**
+
+ - 描述:写入数据到目的表后,会执行这里的标准语句。
+ - 必选:否
+ - 默认值:无
+
+* **jdbcUrl**
+
+ - 描述:目的数据库的 JDBC 连接信息,用于执行`preSql`及`postSql`。
+ - 必选:否
+ - 默认值:无
+
+* **loadProps**
+
+ - 描述:StreamLoad 的请求参数,详情参照StreamLoad介绍页面。
+ - 必选:否
+ - 默认值:无
+
+
+
diff --git a/extension/datax-doriswriter/pom.xml b/extension/datax-doriswriter/pom.xml
new file mode 100644
index 00000000000000..ad6c3bd5102131
--- /dev/null
+++ b/extension/datax-doriswriter/pom.xml
@@ -0,0 +1,86 @@
+
+
+
+ datax-all
+ com.alibaba.datax
+ 0.0.1-SNAPSHOT
+
+ 4.0.0
+
+ doriswriter
+
+
+
+ com.alibaba.datax
+ datax-common
+ ${datax-project-version}
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+
+
+ org.slf4j
+ slf4j-api
+
+
+ ch.qos.logback
+ logback-classic
+
+
+
+ com.alibaba.datax
+ plugin-rdbms-util
+ ${datax-project-version}
+
+
+
+ mysql
+ mysql-connector-java
+ ${mysql.driver.version}
+
+
+
+ org.apache.httpcomponents
+ httpclient
+ 4.5.3
+
+
+
+
+
+
+ maven-compiler-plugin
+
+ ${jdk-version}
+ ${jdk-version}
+ ${project-sourceEncoding}
+
+
+
+
+
+ maven-assembly-plugin
+
+
+ src/main/assembly/package.xml
+
+ datax
+
+
+
+ dwzip
+ package
+
+ single
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/extension/datax-doriswriter/src/main/assembly/package.xml b/extension/datax-doriswriter/src/main/assembly/package.xml
new file mode 100644
index 00000000000000..4edb15abf8c769
--- /dev/null
+++ b/extension/datax-doriswriter/src/main/assembly/package.xml
@@ -0,0 +1,35 @@
+
+
+
+ dir
+
+ false
+
+
+ src/main/resources
+
+ plugin.json
+ plugin_job_template.json
+
+ plugin/writer/doriswriter
+
+
+ target/
+
+ doriswriter-0.0.1-SNAPSHOT.jar
+
+ plugin/writer/doriswriter
+
+
+
+
+
+ false
+ plugin/writer/doriswriter/libs
+ runtime
+
+
+
\ No newline at end of file
diff --git a/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java b/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java
new file mode 100644
index 00000000000000..e5dac0e7d2dfb0
--- /dev/null
+++ b/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java
@@ -0,0 +1,28 @@
+package com.alibaba.datax.plugin.writer.doriswriter;
+
+import java.util.List;
+
+public class DorisFlushBatch
+{
+ private String label;
+ private Long bytes;
+ private List rows;
+
+ public DorisFlushBatch(final String label, final Long bytes, final List rows) {
+ this.label = label;
+ this.bytes = bytes;
+ this.rows = rows;
+ }
+
+ public String getLabel() {
+ return this.label;
+ }
+
+ public Long getBytes() {
+ return this.bytes;
+ }
+
+ public List getRows() {
+ return this.rows;
+ }
+}
diff --git a/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java b/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java
new file mode 100644
index 00000000000000..7e442b22c02303
--- /dev/null
+++ b/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java
@@ -0,0 +1,69 @@
+package com.alibaba.datax.plugin.writer.doriswriter;
+
+import com.alibaba.datax.common.element.Column;
+import com.alibaba.datax.common.element.DateColumn;
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.fastjson.JSON;
+import org.apache.commons.lang3.time.DateFormatUtils;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
+public class DorisJsonCodec {
+
+ private static String timeZone = "GMT+8";
+ private static TimeZone timeZoner = TimeZone.getTimeZone(timeZone);
+
+ private final List fieldNames;
+
+ public DorisJsonCodec(final List fieldNames) {
+ this.fieldNames = fieldNames;
+ }
+
+ public String serialize(final Record row) {
+ if (null == this.fieldNames) {
+ return "";
+ }
+ final Map rowMap = new HashMap(this.fieldNames.size());
+ int idx = 0;
+ for (final String fieldName : this.fieldNames) {
+ rowMap.put(fieldName, this.columnConvert2String(row.getColumn(idx)));
+ ++idx;
+ }
+ return JSON.toJSONString(rowMap);
+ }
+
+
+ /**
+ * convert datax internal data to string
+ *
+ * @param col
+ * @return
+ */
+ private String columnConvert2String(final Column col) {
+ if (null == col.getRawData()) {
+ return null;
+ }
+ if (Column.Type.BOOL == col.getType()) {
+ return String.valueOf(col.asLong());
+ }
+ if (Column.Type.DATE != col.getType()) {
+ return col.asString();
+ }
+ final DateColumn.DateType type = ((DateColumn) col).getSubType();
+ if (type == DateColumn.DateType.DATE) {
+ return DateFormatUtils.format(col.asDate(), "yyyy-MM-dd", timeZoner);
+ }
+ if (type == DateColumn.DateType.TIME) {
+ return DateFormatUtils.format(col.asDate(), "HH:mm:ss", timeZoner);
+ }
+ if (type == DateColumn.DateType.DATETIME) {
+ return DateFormatUtils.format(col.asDate(), "yyyy-MM-dd HH:mm:ss", timeZoner);
+ }
+ return null;
+ }
+
+
+}
\ No newline at end of file
diff --git a/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java b/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java
new file mode 100644
index 00000000000000..8f271c482198fb
--- /dev/null
+++ b/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java
@@ -0,0 +1,242 @@
+package com.alibaba.datax.plugin.writer.doriswriter;
+
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.datax.common.exception.DataXException;
+import com.alibaba.datax.common.plugin.RecordReceiver;
+import com.alibaba.datax.common.spi.Writer;
+import com.alibaba.datax.common.util.Configuration;
+import com.alibaba.datax.plugin.rdbms.util.DBUtil;
+import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
+import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
+import com.alibaba.datax.plugin.rdbms.util.RdbmsException;
+import com.alibaba.datax.plugin.rdbms.writer.Constant;
+import com.alibaba.druid.sql.parser.ParserException;
+import com.google.common.base.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+public class DorisWriter extends Writer {
+ public DorisWriter() {
+ }
+
+ public static class Task extends com.alibaba.datax.common.spi.Writer.Task {
+ private static final Logger LOG = LoggerFactory.getLogger(DorisWriter.Task.class);
+
+ private DorisWriterEmitter dorisWriterEmitter;
+ private Key keys;
+ private DorisJsonCodec rowCodec;
+
+
+ public Task() {
+ }
+
+ @Override
+ public void init() {
+ this.keys = new Key(super.getPluginJobConf());
+ this.rowCodec = new DorisJsonCodec(this.keys.getColumns());
+ this.dorisWriterEmitter = new DorisWriterEmitter(keys);
+ }
+
+ @Override
+ public void prepare() {
+ }
+
+ @Override
+ public void startWrite(RecordReceiver recordReceiver) {
+ try {
+ List buffer = new ArrayList<>();
+ int batchCount = 0;
+ long batchByteSize = 0L;
+ Record record;
+ // loop to get record from datax
+ while ((record = recordReceiver.getFromReader()) != null) {
+ // check column size
+ if (record.getColumnNumber() != this.keys.getColumns().size()) {
+ throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
+ String.format("config writer column info error. because the column number of reader is :%s and the column number of writer is:%s . please check you datax job config json.", record.getColumnNumber(), this.keys.getColumns().size()));
+ }
+ // codec record
+ final String recordStr = this.rowCodec.serialize(record);
+ // put into buffer
+ buffer.add(recordStr);
+ batchCount += 1;
+ batchByteSize += recordStr.getBytes().length;
+ // trigger buffer
+ if (batchCount >= this.keys.getBatchRows() || batchByteSize >= this.keys.getBatchByteSize()) {
+ // generate doris stream load label
+ final String label = getStreamLoadLabel();
+ LOG.debug(String.format("Doris buffer Sinking triggered: rows[%d] label[%s].", batchCount, label));
+ final DorisFlushBatch flushBatch = new DorisFlushBatch(label, batchByteSize, buffer);
+ dorisWriterEmitter.doStreamLoad(flushBatch);
+ // clear buffer
+ batchCount = 0;
+ batchByteSize = 0L;
+ buffer.clear();
+ }
+ }
+ if (buffer.size() > 0) {
+ final DorisFlushBatch flushBatch = new DorisFlushBatch(getStreamLoadLabel(), batchByteSize, buffer);
+ dorisWriterEmitter.doStreamLoad(flushBatch);
+ }
+
+ } catch (Exception e) {
+ throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
+ }
+ }
+
+ private String getStreamLoadLabel() {
+ return UUID.randomUUID().toString();
+ }
+
+ @Override
+ public void post() {
+
+ }
+
+ @Override
+ public void destroy() {
+ }
+
+ @Override
+ public boolean supportFailOver() {
+ return false;
+ }
+ }
+
+ public static class Job extends com.alibaba.datax.common.spi.Writer.Job {
+ private static final Logger LOG = LoggerFactory.getLogger(DorisWriter.Job.class);
+ private Configuration originalConfig = null;
+ private Key keys;
+
+ public Job() {
+ }
+
+ @Override
+ public void init() {
+ this.originalConfig = super.getPluginJobConf();
+ this.keys = new Key(super.getPluginJobConf());
+ this.keys.doPretreatment();
+ }
+
+ @Override
+ public void preCheck() {
+ this.init();
+ this.preCheckPrePareSQL(this.keys);
+ this.preCheckPostSQL(this.keys);
+ }
+
+ @Override
+ public void prepare() {
+ String username = this.keys.getUsername();
+ String password = this.keys.getPassword();
+ String jdbcUrl = this.keys.getJdbcUrl();
+ List renderedPreSqls = this.renderPreOrPostSqls(this.keys.getPreSqlList(), this.keys.getTable());
+ if (!renderedPreSqls.isEmpty()) {
+ Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, username, password);
+ LOG.info("prepare execute preSqls:[{}]. doris jdbc url为:{}.", String.join(";", renderedPreSqls), jdbcUrl);
+ this.executeSqls(conn, renderedPreSqls);
+ DBUtil.closeDBResources(null, null, conn);
+ }
+
+ }
+
+ @Override
+ public List split(int mandatoryNumber) {
+ List configurations = new ArrayList<>(mandatoryNumber);
+
+ for(int i = 0; i < mandatoryNumber; ++i) {
+ configurations.add(this.originalConfig);
+ }
+
+ return configurations;
+ }
+
+ @Override
+ public void post() {
+ String username = this.keys.getUsername();
+ String password = this.keys.getPassword();
+ String jdbcUrl = this.keys.getJdbcUrl();
+ List renderedPostSqls = this.renderPreOrPostSqls(this.keys.getPostSqlList(), this.keys.getTable());
+ if (!renderedPostSqls.isEmpty()) {
+ Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, username, password);
+ LOG.info("prepare execute postSqls:[{}]. doris jdbc url为:{}.", String.join(";", renderedPostSqls), jdbcUrl);
+ this.executeSqls(conn, renderedPostSqls);
+ DBUtil.closeDBResources(null, null, conn);
+ }
+
+ }
+
+ @Override
+ public void destroy() {
+ }
+
+ private List renderPreOrPostSqls(final List preOrPostSqls, final String tableName) {
+ if (null == preOrPostSqls) {
+ return Collections.emptyList();
+ }
+ final List renderedSqls = new ArrayList<>();
+ for (final String sql : preOrPostSqls) {
+ if (!Strings.isNullOrEmpty(sql)) {
+ renderedSqls.add(sql.replace(Constant.TABLE_NAME_PLACEHOLDER, tableName));
+ }
+ }
+ return renderedSqls;
+ }
+
+ private void executeSqls(final Connection conn, final List sqls) {
+ Statement stmt = null;
+ String currentSql = null;
+ try {
+ stmt = conn.createStatement();
+ for (String s : sqls) {
+ final String sql = currentSql = s;
+ DBUtil.executeSqlWithoutResultSet(stmt, sql);
+ }
+ } catch (Exception e) {
+ throw RdbmsException.asQueryException(DataBaseType.MySql, e, currentSql, null, null);
+ } finally {
+ DBUtil.closeDBResources(null, stmt, null);
+ }
+ }
+
+ private void preCheckPrePareSQL(final Key keys) {
+ final String table = keys.getTable();
+ final List preSqls = keys.getPreSqlList();
+ final List renderedPreSqls = renderPreOrPostSqls(preSqls, table);
+ if (!renderedPreSqls.isEmpty()) {
+ LOG.info("prepare check preSqls:[{}].", String.join(";", renderedPreSqls));
+ for (final String sql : renderedPreSqls) {
+ try {
+ DBUtil.sqlValid(sql, DataBaseType.MySql);
+ } catch (ParserException e) {
+ throw RdbmsException.asPreSQLParserException(DataBaseType.MySql, e, sql);
+ }
+ }
+ }
+ }
+
+ private void preCheckPostSQL(final Key keys) {
+ final String table = keys.getTable();
+ final List postSqls = keys.getPostSqlList();
+ final List renderedPostSqls = renderPreOrPostSqls(postSqls, table);
+ if (!renderedPostSqls.isEmpty()) {
+ LOG.info("prepare check postSqls:[{}].", String.join(";", renderedPostSqls));
+ for (final String sql : renderedPostSqls) {
+ try {
+ DBUtil.sqlValid(sql, DataBaseType.MySql);
+ } catch (ParserException e) {
+ throw RdbmsException.asPostSQLParserException(DataBaseType.MySql, e, sql);
+ }
+ }
+ }
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java b/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java
new file mode 100644
index 00000000000000..9fcd0575369594
--- /dev/null
+++ b/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java
@@ -0,0 +1,158 @@
+package com.alibaba.datax.plugin.writer.doriswriter;
+
+import com.alibaba.fastjson.JSON;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultRedirectStrategy;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+import java.util.Map;
+import java.util.StringJoiner;
+import java.util.UUID;
+
+public class DorisWriterEmitter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DorisWriterEmitter.class);
+ ;
+ private final Key keys;
+ private int pos;
+
+
+ public DorisWriterEmitter(final Key keys) {
+ this.keys = keys;
+ }
+
+
+ /**
+ * execute doris stream load
+ */
+ public void doStreamLoad(final DorisFlushBatch flushData) throws IOException {
+ final String host = this.getAvailableHost();
+ if (null == host) {
+ throw new IOException("None of the host in `beLoadUrl` could be connected.");
+ }
+ final String loadUrl = host + "/api/" + this.keys.getDatabase() + "/" + this.keys.getTable() + "/_stream_load";
+ LOG.debug(String.format("Start to join batch data: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel()));
+ // do http put request
+ final Map loadResult = this.doHttpPut(loadUrl, flushData.getLabel(), this.mergeRows(flushData.getRows()));
+ // get response
+ final String keyStatus = "Status";
+ if (null == loadResult || !loadResult.containsKey(keyStatus)) {
+ throw new IOException("Unable to flush data to doris: unknown result status.");
+ }
+ LOG.debug("StreamLoad response:\n" + JSON.toJSONString(loadResult));
+ if (loadResult.get(keyStatus).equals("Fail")) {
+ throw new IOException("Failed to flush data to doris.\n" + JSON.toJSONString(loadResult));
+ }
+ }
+
+ /**
+ * loop to get be host
+ * @return
+ */
+ private String getAvailableHost() {
+ final List hostList = this.keys.getBeLoadUrlList();
+ if (this.pos >= hostList.size()) {
+ this.pos = 0;
+ }
+ while (this.pos < hostList.size()) {
+ final String host = "http://" + hostList.get(this.pos);
+ if (this.tryHttpConnection(host)) {
+ return host;
+ }
+ ++this.pos;
+ }
+ return null;
+ }
+
+ private boolean tryHttpConnection(final String host) {
+ try {
+ final URL url = new URL(host);
+ final HttpURLConnection co = (HttpURLConnection) url.openConnection();
+ co.setConnectTimeout(1000);
+ co.connect();
+ co.disconnect();
+ return true;
+ } catch (Exception e) {
+ LOG.warn("Failed to connect to address:{} , Exception ={}", host, e);
+ return false;
+ }
+ }
+
+ private byte[] mergeRows(final List rows) {
+ final StringJoiner stringJoiner = new StringJoiner(",", "[", "]");
+ for (final String row : rows) {
+ stringJoiner.add(row);
+ }
+ return stringJoiner.toString().getBytes(StandardCharsets.UTF_8);
+ }
+
+ private Map doHttpPut(final String loadUrl, final String label, final byte[] data) throws IOException {
+ LOG.info(String.format("Executing stream load to: '%s', size: '%s'", loadUrl, data.length));
+ final HttpClientBuilder httpClientBuilder = HttpClients.custom().setRedirectStrategy(new DefaultRedirectStrategy() {
+ @Override
+ protected boolean isRedirectable(final String method) {
+ return true;
+ }
+ });
+ try (final CloseableHttpClient httpclient = httpClientBuilder.build()) {
+ final HttpPut httpPut = new HttpPut(loadUrl);
+ final List cols = this.keys.getColumns();
+ if (null != cols && !cols.isEmpty()) {
+ httpPut.setHeader("columns", String.join(",", cols));
+ }
+ // put loadProps to http header
+ final Map loadProps = this.keys.getLoadProps();
+ if (null != loadProps) {
+ for (final Map.Entry entry : loadProps.entrySet()) {
+ httpPut.setHeader(entry.getKey(), String.valueOf(entry.getValue()));
+ }
+ }
+ httpPut.setHeader("Expect", "100-continue");
+ httpPut.setHeader("label", label);
+ httpPut.setHeader("Content-Type", "application/x-www-form-urlencoded");
+ httpPut.setHeader("Authorization", this.getBasicAuthHeader(this.keys.getUsername(), this.keys.getPassword()));
+ httpPut.setHeader("format", "json");
+ httpPut.setHeader("strip_outer_array", "true");
+ httpPut.setEntity(new ByteArrayEntity(data));
+ httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build());
+ try (final CloseableHttpResponse resp = httpclient.execute(httpPut)) {
+ final int code = resp.getStatusLine().getStatusCode();
+ if (HttpStatus.SC_OK != code) {
+ LOG.warn("Request failed with code:{}", code);
+ return null;
+ }
+ final HttpEntity respEntity = resp.getEntity();
+ if (null == respEntity) {
+ LOG.warn("Request failed with empty response.");
+ return null;
+ }
+ return (Map) JSON.parse(EntityUtils.toString(respEntity));
+ }
+ }
+ }
+
+ private String getBasicAuthHeader(final String username, final String password) {
+ final String auth = username + ":" + password;
+ final byte[] encodedAuth = Base64.getEncoder().encode(auth.getBytes());
+ return "Basic " + new String(encodedAuth);
+ }
+
+
+}
\ No newline at end of file
diff --git a/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java b/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java
new file mode 100644
index 00000000000000..8d8da1ef863399
--- /dev/null
+++ b/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java
@@ -0,0 +1,104 @@
+package com.alibaba.datax.plugin.writer.doriswriter;
+
+import com.alibaba.datax.common.exception.DataXException;
+import com.alibaba.datax.common.util.Configuration;
+import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+public class Key implements Serializable
+{
+ public static final String JDBC_URL = "jdbcUrl";
+ public static final String DATABASE = "database";
+ public static final String TABLE = "table";
+ public static final String USERNAME = "username";
+ public static final String PASSWORD = "password";
+ public static final String BE_LOAD_URL = "beLoadUrl";
+ public static final String COLUMN = "column";
+ public static final String PRE_SQL = "preSql";
+ public static final String POST_SQL = "postSql";
+ public static final String LOAD_PROPS = "loadProps";
+ public static final String MAX_BATCH_ROWS = "maxBatchRows";
+ public static final String MAX_BATCH_BYTE_SIZE = "maxBatchByteSize";
+ private final Configuration options;
+
+ public Key(final Configuration options) {
+ this.options = options;
+ }
+
+ public void doPretreatment() {
+ this.validateRequired();
+ this.validateStreamLoadUrl();
+ }
+
+ public String getJdbcUrl() {
+ return this.options.getString(JDBC_URL);
+ }
+
+ public String getDatabase() {
+ return this.options.getString(DATABASE);
+ }
+
+ public String getTable() {
+ return this.options.getString(TABLE);
+ }
+
+ public String getUsername() {
+ return this.options.getString(USERNAME);
+ }
+
+ public String getPassword() {
+ return this.options.getString(PASSWORD);
+ }
+
+ public List getBeLoadUrlList() {
+ return this.options.getList(BE_LOAD_URL, String.class);
+ }
+
+ public List getColumns() {
+ return this.options.getList(COLUMN, String.class);
+ }
+
+ public List getPreSqlList() {
+ return this.options.getList(PRE_SQL, String.class);
+ }
+
+ public List getPostSqlList() {
+ return this.options.getList(POST_SQL, String.class);
+ }
+
+ public Map getLoadProps() {
+ return this.options.getMap(LOAD_PROPS);
+ }
+
+ public int getBatchRows() {
+ final Integer rows = this.options.getInt(MAX_BATCH_ROWS);
+ return (null == rows) ? 500000 : rows;
+ }
+
+ public long getBatchByteSize() {
+ final Long size = this.options.getLong(MAX_BATCH_BYTE_SIZE);
+ return (null == size) ? 94371840L : size;
+ }
+
+
+ private void validateStreamLoadUrl() {
+ final List urlList = this.getBeLoadUrlList();
+ for (final String host : urlList) {
+ if (host.split(":").length < 2) {
+ throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR, "loadUrl的格式不正确,请输入 `be_ip:be_http_ip;be_ip:be_http_ip`。");
+ }
+ }
+ }
+
+ private void validateRequired() {
+ final String[] requiredOptionKeys = new String[] { USERNAME, PASSWORD, DATABASE, TABLE, COLUMN, BE_LOAD_URL };
+ for (final String optionKey : requiredOptionKeys) {
+ this.options.getNecessaryValue(optionKey, DBUtilErrorCode.REQUIRED_VALUE);
+ }
+ }
+
+
+}
\ No newline at end of file
diff --git a/extension/datax-doriswriter/src/main/resources/plugin.json b/extension/datax-doriswriter/src/main/resources/plugin.json
new file mode 100644
index 00000000000000..9d2ad49726e68c
--- /dev/null
+++ b/extension/datax-doriswriter/src/main/resources/plugin.json
@@ -0,0 +1,6 @@
+{
+ "name": "doriswriter",
+ "class": "com.alibaba.datax.plugin.writer.doriswriter.DorisWriter",
+ "description": "",
+ "developer": ""
+}
diff --git a/extension/datax-doriswriter/src/main/resources/plugin_job_template.json b/extension/datax-doriswriter/src/main/resources/plugin_job_template.json
new file mode 100644
index 00000000000000..152f1eeefdeeb4
--- /dev/null
+++ b/extension/datax-doriswriter/src/main/resources/plugin_job_template.json
@@ -0,0 +1,15 @@
+{
+ "name": "doriswriter",
+ "parameter": {
+ "username": "",
+ "password": "",
+ "database": "",
+ "table": "",
+ "column": [],
+ "preSql": [],
+ "postSql": [],
+ "jdbcUrl": "",
+ "beLoadUrl": [],
+ "loadProps": {}
+ }
+}
\ No newline at end of file
From 164fbd334b39ab183c35e315e1fe82b134127b65 Mon Sep 17 00:00:00 2001
From: huzekang <1040080742@qq.com>
Date: Tue, 29 Jun 2021 09:27:13 +0800
Subject: [PATCH 2/5] style: add apache Licensed
---
extension/datax-doriswriter/pom.xml | 18 ++++++++++++++++++
.../writer/doriswriter/DorisFlushBatch.java | 19 +++++++++++++++++++
.../writer/doriswriter/DorisJsonCodec.java | 19 +++++++++++++++++++
.../writer/doriswriter/DorisWriter.java | 19 +++++++++++++++++++
.../doriswriter/DorisWriterEmitter.java | 19 +++++++++++++++++++
.../datax/plugin/writer/doriswriter/Key.java | 19 +++++++++++++++++++
6 files changed, 113 insertions(+)
diff --git a/extension/datax-doriswriter/pom.xml b/extension/datax-doriswriter/pom.xml
index ad6c3bd5102131..f4b4ea53e349de 100644
--- a/extension/datax-doriswriter/pom.xml
+++ b/extension/datax-doriswriter/pom.xml
@@ -1,4 +1,22 @@
+
diff --git a/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java b/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java
index e5dac0e7d2dfb0..7a9638d937a16e 100644
--- a/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java
+++ b/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java
@@ -1,3 +1,22 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+ */
package com.alibaba.datax.plugin.writer.doriswriter;
import java.util.List;
diff --git a/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java b/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java
index 7e442b22c02303..c73a8d59c5d74d 100644
--- a/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java
+++ b/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java
@@ -1,3 +1,22 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+ */
package com.alibaba.datax.plugin.writer.doriswriter;
import com.alibaba.datax.common.element.Column;
diff --git a/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java b/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java
index 8f271c482198fb..74720f86c6622e 100644
--- a/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java
+++ b/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java
@@ -1,3 +1,22 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+ */
package com.alibaba.datax.plugin.writer.doriswriter;
import com.alibaba.datax.common.element.Record;
diff --git a/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java b/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java
index 9fcd0575369594..e98214986010c5 100644
--- a/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java
+++ b/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java
@@ -1,3 +1,22 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+ */
package com.alibaba.datax.plugin.writer.doriswriter;
import com.alibaba.fastjson.JSON;
diff --git a/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java b/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java
index 8d8da1ef863399..f670e5919f0f6a 100644
--- a/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java
+++ b/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java
@@ -1,3 +1,22 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+ */
package com.alibaba.datax.plugin.writer.doriswriter;
import com.alibaba.datax.common.exception.DataXException;
From b07b38df8a39eab39f24fb6e04742f5e5612ba4d Mon Sep 17 00:00:00 2001
From: huzekang <1040080742@qq.com>
Date: Tue, 29 Jun 2021 09:31:52 +0800
Subject: [PATCH 3/5] style: change dir
---
.../{datax-doriswriter => Datax/doriswriter}/doc/doriswriter.md | 0
extension/{datax-doriswriter => Datax/doriswriter}/pom.xml | 0
.../doriswriter}/src/main/assembly/package.xml | 0
.../alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java | 0
.../alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java | 0
.../com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java | 0
.../datax/plugin/writer/doriswriter/DorisWriterEmitter.java | 0
.../java/com/alibaba/datax/plugin/writer/doriswriter/Key.java | 0
.../doriswriter}/src/main/resources/plugin.json | 0
.../doriswriter}/src/main/resources/plugin_job_template.json | 0
10 files changed, 0 insertions(+), 0 deletions(-)
rename extension/{datax-doriswriter => Datax/doriswriter}/doc/doriswriter.md (100%)
rename extension/{datax-doriswriter => Datax/doriswriter}/pom.xml (100%)
rename extension/{datax-doriswriter => Datax/doriswriter}/src/main/assembly/package.xml (100%)
rename extension/{datax-doriswriter => Datax/doriswriter}/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java (100%)
rename extension/{datax-doriswriter => Datax/doriswriter}/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java (100%)
rename extension/{datax-doriswriter => Datax/doriswriter}/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java (100%)
rename extension/{datax-doriswriter => Datax/doriswriter}/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java (100%)
rename extension/{datax-doriswriter => Datax/doriswriter}/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java (100%)
rename extension/{datax-doriswriter => Datax/doriswriter}/src/main/resources/plugin.json (100%)
rename extension/{datax-doriswriter => Datax/doriswriter}/src/main/resources/plugin_job_template.json (100%)
diff --git a/extension/datax-doriswriter/doc/doriswriter.md b/extension/Datax/doriswriter/doc/doriswriter.md
similarity index 100%
rename from extension/datax-doriswriter/doc/doriswriter.md
rename to extension/Datax/doriswriter/doc/doriswriter.md
diff --git a/extension/datax-doriswriter/pom.xml b/extension/Datax/doriswriter/pom.xml
similarity index 100%
rename from extension/datax-doriswriter/pom.xml
rename to extension/Datax/doriswriter/pom.xml
diff --git a/extension/datax-doriswriter/src/main/assembly/package.xml b/extension/Datax/doriswriter/src/main/assembly/package.xml
similarity index 100%
rename from extension/datax-doriswriter/src/main/assembly/package.xml
rename to extension/Datax/doriswriter/src/main/assembly/package.xml
diff --git a/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java b/extension/Datax/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java
similarity index 100%
rename from extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java
rename to extension/Datax/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java
diff --git a/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java b/extension/Datax/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java
similarity index 100%
rename from extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java
rename to extension/Datax/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java
diff --git a/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java b/extension/Datax/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java
similarity index 100%
rename from extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java
rename to extension/Datax/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java
diff --git a/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java b/extension/Datax/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java
similarity index 100%
rename from extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java
rename to extension/Datax/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java
diff --git a/extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java b/extension/Datax/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java
similarity index 100%
rename from extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java
rename to extension/Datax/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java
diff --git a/extension/datax-doriswriter/src/main/resources/plugin.json b/extension/Datax/doriswriter/src/main/resources/plugin.json
similarity index 100%
rename from extension/datax-doriswriter/src/main/resources/plugin.json
rename to extension/Datax/doriswriter/src/main/resources/plugin.json
diff --git a/extension/datax-doriswriter/src/main/resources/plugin_job_template.json b/extension/Datax/doriswriter/src/main/resources/plugin_job_template.json
similarity index 100%
rename from extension/datax-doriswriter/src/main/resources/plugin_job_template.json
rename to extension/Datax/doriswriter/src/main/resources/plugin_job_template.json
From 7a2a904e8a6b2c381c9d143e2d304c2f88c6d05e Mon Sep 17 00:00:00 2001
From: huzekang <1040080742@qq.com>
Date: Thu, 1 Jul 2021 09:31:39 +0800
Subject: [PATCH 4/5] style: change datax dir name
---
extension/{Datax => DataX}/doriswriter/doc/doriswriter.md | 0
extension/{Datax => DataX}/doriswriter/pom.xml | 0
.../{Datax => DataX}/doriswriter/src/main/assembly/package.xml | 0
.../alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java | 0
.../alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java | 0
.../com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java | 0
.../datax/plugin/writer/doriswriter/DorisWriterEmitter.java | 0
.../java/com/alibaba/datax/plugin/writer/doriswriter/Key.java | 0
.../{Datax => DataX}/doriswriter/src/main/resources/plugin.json | 0
.../doriswriter/src/main/resources/plugin_job_template.json | 0
10 files changed, 0 insertions(+), 0 deletions(-)
rename extension/{Datax => DataX}/doriswriter/doc/doriswriter.md (100%)
rename extension/{Datax => DataX}/doriswriter/pom.xml (100%)
rename extension/{Datax => DataX}/doriswriter/src/main/assembly/package.xml (100%)
rename extension/{Datax => DataX}/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java (100%)
rename extension/{Datax => DataX}/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java (100%)
rename extension/{Datax => DataX}/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java (100%)
rename extension/{Datax => DataX}/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java (100%)
rename extension/{Datax => DataX}/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java (100%)
rename extension/{Datax => DataX}/doriswriter/src/main/resources/plugin.json (100%)
rename extension/{Datax => DataX}/doriswriter/src/main/resources/plugin_job_template.json (100%)
diff --git a/extension/Datax/doriswriter/doc/doriswriter.md b/extension/DataX/doriswriter/doc/doriswriter.md
similarity index 100%
rename from extension/Datax/doriswriter/doc/doriswriter.md
rename to extension/DataX/doriswriter/doc/doriswriter.md
diff --git a/extension/Datax/doriswriter/pom.xml b/extension/DataX/doriswriter/pom.xml
similarity index 100%
rename from extension/Datax/doriswriter/pom.xml
rename to extension/DataX/doriswriter/pom.xml
diff --git a/extension/Datax/doriswriter/src/main/assembly/package.xml b/extension/DataX/doriswriter/src/main/assembly/package.xml
similarity index 100%
rename from extension/Datax/doriswriter/src/main/assembly/package.xml
rename to extension/DataX/doriswriter/src/main/assembly/package.xml
diff --git a/extension/Datax/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java
similarity index 100%
rename from extension/Datax/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java
rename to extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java
diff --git a/extension/Datax/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java
similarity index 100%
rename from extension/Datax/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java
rename to extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java
diff --git a/extension/Datax/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java
similarity index 100%
rename from extension/Datax/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java
rename to extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java
diff --git a/extension/Datax/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java
similarity index 100%
rename from extension/Datax/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java
rename to extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java
diff --git a/extension/Datax/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java
similarity index 100%
rename from extension/Datax/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java
rename to extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java
diff --git a/extension/Datax/doriswriter/src/main/resources/plugin.json b/extension/DataX/doriswriter/src/main/resources/plugin.json
similarity index 100%
rename from extension/Datax/doriswriter/src/main/resources/plugin.json
rename to extension/DataX/doriswriter/src/main/resources/plugin.json
diff --git a/extension/Datax/doriswriter/src/main/resources/plugin_job_template.json b/extension/DataX/doriswriter/src/main/resources/plugin_job_template.json
similarity index 100%
rename from extension/Datax/doriswriter/src/main/resources/plugin_job_template.json
rename to extension/DataX/doriswriter/src/main/resources/plugin_job_template.json
From 0ff6102aeeb711e2d9eefd1d87ad6eb454089587 Mon Sep 17 00:00:00 2001
From: huzekang <1040080742@qq.com>
Date: Fri, 2 Jul 2021 17:01:36 +0800
Subject: [PATCH 5/5] style: improve code style
---
.../alibaba/datax/plugin/writer/doriswriter/DorisWriter.java | 4 ++--
.../datax/plugin/writer/doriswriter/DorisWriterEmitter.java | 4 ++--
2 files changed, 4 insertions(+), 4 deletions(-)
diff --git a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java
index 74720f86c6622e..5148dcdee51a24 100644
--- a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java
+++ b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java
@@ -111,7 +111,7 @@ public void startWrite(RecordReceiver recordReceiver) {
}
private String getStreamLoadLabel() {
- return UUID.randomUUID().toString();
+ return "datax_doris_writer_" + UUID.randomUUID().toString();
}
@Override
@@ -170,7 +170,7 @@ public void prepare() {
public List split(int mandatoryNumber) {
List configurations = new ArrayList<>(mandatoryNumber);
- for(int i = 0; i < mandatoryNumber; ++i) {
+ for (int i = 0; i < mandatoryNumber; ++i) {
configurations.add(this.originalConfig);
}
diff --git a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java
index e98214986010c5..70792e3dffbe55 100644
--- a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java
+++ b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java
@@ -67,7 +67,7 @@ public void doStreamLoad(final DorisFlushBatch flushData) throws IOException {
throw new IOException("None of the host in `beLoadUrl` could be connected.");
}
final String loadUrl = host + "/api/" + this.keys.getDatabase() + "/" + this.keys.getTable() + "/_stream_load";
- LOG.debug(String.format("Start to join batch data: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel()));
+ LOG.info(String.format("Start to join batch data: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel()));
// do http put request
final Map loadResult = this.doHttpPut(loadUrl, flushData.getLabel(), this.mergeRows(flushData.getRows()));
// get response
@@ -75,7 +75,7 @@ public void doStreamLoad(final DorisFlushBatch flushData) throws IOException {
if (null == loadResult || !loadResult.containsKey(keyStatus)) {
throw new IOException("Unable to flush data to doris: unknown result status.");
}
- LOG.debug("StreamLoad response:\n" + JSON.toJSONString(loadResult));
+ LOG.info("StreamLoad response:\n" + JSON.toJSONString(loadResult));
if (loadResult.get(keyStatus).equals("Fail")) {
throw new IOException("Failed to flush data to doris.\n" + JSON.toJSONString(loadResult));
}