From 57558a8417e891c361d08b3d00371148dcf76b54 Mon Sep 17 00:00:00 2001 From: morningman Date: Wed, 7 Aug 2019 09:44:26 +0800 Subject: [PATCH 01/11] fix bugs --- .../doris/load/loadv2/InsertLoadJob.java | 5 ++- .../apache/doris/load/loadv2/LoadManager.java | 4 +- .../apache/doris/mysql/MysqlErrPacket.java | 6 +++ .../org/apache/doris/qe/StmtExecutor.java | 37 +++++++++++++------ .../doris/load/loadv2/InsertLoadJobTest.java | 2 +- .../doris/load/loadv2/LoadManagerTest.java | 4 +- 6 files changed, 39 insertions(+), 19 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java index 3cb5f3dd00329d..2dbd66fbb7ee14 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java @@ -50,8 +50,8 @@ public InsertLoadJob() { this.jobType = EtlJobType.INSERT; } - public InsertLoadJob(String label, long dbId, long tableId, long createTimestamp, String failMsg) - throws MetaNotFoundException { + public InsertLoadJob(String label, long dbId, long tableId, long createTimestamp, String failMsg, + String trackingUrl) throws MetaNotFoundException { super(dbId, label); this.tableId = tableId; this.createTimestamp = createTimestamp; @@ -68,6 +68,7 @@ public InsertLoadJob(String label, long dbId, long tableId, long createTimestamp this.jobType = EtlJobType.INSERT; this.timeoutSecond = Config.insert_load_default_timeout_second; this.authorizationInfo = gatherAuthInfo(); + this.loadingStatus.setTrackingUrl(trackingUrl); } public AuthorizationInfo gatherAuthInfo() throws MetaNotFoundException { diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index c6022a514dbd74..a2ed3748f3ff8f 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -256,7 +256,7 @@ private void addLoadJob(LoadJob loadJob) { } public void recordFinishedLoadJob(String label, String dbName, long tableId, EtlJobType jobType, - long createTimestamp, String failMsg) throws MetaNotFoundException { + long createTimestamp, String failMsg, String trackingUrl) throws MetaNotFoundException { // get db id Database db = Catalog.getCurrentCatalog().getDb(dbName); @@ -267,7 +267,7 @@ public void recordFinishedLoadJob(String label, String dbName, long tableId, Etl LoadJob loadJob; switch (jobType) { case INSERT: - loadJob = new InsertLoadJob(label, db.getId(), tableId, createTimestamp, failMsg); + loadJob = new InsertLoadJob(label, db.getId(), tableId, createTimestamp, failMsg, trackingUrl); break; default: return; diff --git a/fe/src/main/java/org/apache/doris/mysql/MysqlErrPacket.java b/fe/src/main/java/org/apache/doris/mysql/MysqlErrPacket.java index 946024e9e69519..571a5a464b9d6e 100644 --- a/fe/src/main/java/org/apache/doris/mysql/MysqlErrPacket.java +++ b/fe/src/main/java/org/apache/doris/mysql/MysqlErrPacket.java @@ -20,8 +20,13 @@ import org.apache.doris.common.ErrorCode; import org.apache.doris.qe.QueryState; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + // MySQL protocol error packet public class MysqlErrPacket extends MysqlPacket { + private static final Logger LOG = LogManager.getLogger(MysqlErrPacket.class); + private static final int ERROR_PACKET_INDICATOR = 0XFF; // only first FIVE char is useful in SQL STATE private byte[] sqlState = {'H', 'Y', '0', '0', '0'}; @@ -52,6 +57,7 @@ public void writeTo(MysqlSerializer serializer) { if (errorMessage == null || errorMessage.isEmpty()) { // NOTICE: if write "" or "\0", the client will be show "Query OK" // SO we need write no-empty string + LOG.info("get empty error msg"); serializer.writeEofString(" "); } else { serializer.writeEofString(errorMessage); diff --git a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java index ede891439b0e1f..8aba8bed6c2f1f 100644 --- a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -74,6 +74,8 @@ import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -612,12 +614,6 @@ private void handleInsertStmt() throws Exception { if (!coord.getExecStatus().ok()) { String errMsg = coord.getExecStatus().getErrorMsg(); LOG.warn("insert failed: {}", errMsg); - - // hide host info - int hostIndex = errMsg.indexOf("host"); - if (hostIndex != -1) { - errMsg = errMsg.substring(0, hostIndex); - } ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT); } @@ -627,8 +623,11 @@ private void handleInsertStmt() throws Exception { Map counters = coord.getLoadCounters(); String strValue = counters.get(LoadEtlTask.DPP_ABNORMAL_ALL); if (strValue != null && Long.valueOf(strValue) > 0) { - throw new UserException("Insert has filtered data in strict mode, tracking_url=" + context.getState().setError("Insert has filtered data in strict mode, tracking_url=" + coord.getTrackingUrl()); + return; + // throw new UserException("Insert has filtered data in strict mode, tracking_url=" + // + coord.getTrackingUrl()); } } @@ -663,9 +662,13 @@ private void handleInsertStmt() throws Exception { } if (!Config.using_old_load_usage_pattern) { - // if not using old usage pattern, the exception will be thrown to user directly without - // a label - throw t; + // if not using old usage pattern, the exception will be thrown to user directly without a label + StringBuilder sb = new StringBuilder(t.getMessage()); + if (!Strings.isNullOrEmpty(coord.getTrackingUrl())) { + sb.append(". url: " + coord.getTrackingUrl()); + } + context.getState().setError(sb.toString()); + return; } /* @@ -685,7 +688,8 @@ private void handleInsertStmt() throws Exception { insertStmt.getTargetTable().getId(), EtlJobType.INSERT, createTime, - throwable == null ? "" : throwable.getMessage() + throwable == null ? "" : throwable.getMessage(), + coord.getTrackingUrl() ); } catch (MetaNotFoundException e) { LOG.warn("Record info of insert load with error {}", e.getMessage(), e); @@ -696,7 +700,16 @@ private void handleInsertStmt() throws Exception { // set to OK, which means the insert load job is successfully submitted. // and user can check the job's status by label. - context.getState().setOk("{'label':'" + uuid.toString() + "'}"); + Map res = Maps.newHashMap(); + res.put("label", uuid.toString()); + if (!Strings.isNullOrEmpty(coord.getTrackingUrl())) { + res.put("url", coord.getTrackingUrl()); + } + GsonBuilder gsonBuilder = new GsonBuilder(); + gsonBuilder.disableHtmlEscaping(); + Gson gson = gsonBuilder.create(); + + context.getState().setOk(gson.toJson(res)); } else { // just return OK without label, which means this job is successfully done context.getState().setOk(); diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/InsertLoadJobTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/InsertLoadJobTest.java index acb1cacaf4793f..22df1fb6607cde 100644 --- a/fe/src/test/java/org/apache/doris/load/loadv2/InsertLoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/loadv2/InsertLoadJobTest.java @@ -38,7 +38,7 @@ public class InsertLoadJobTest { public void testGetTableNames(@Mocked Catalog catalog, @Injectable Database database, @Injectable Table table) throws MetaNotFoundException { - InsertLoadJob insertLoadJob = new InsertLoadJob("label", 1L, 1L, 1000, ""); + InsertLoadJob insertLoadJob = new InsertLoadJob("label", 1L, 1L, 1000, "", ""); String tableName = "table1"; new Expectations() { { diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java index 6c5d67597caa8c..8aacef818382a0 100644 --- a/fe/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java +++ b/fe/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java @@ -116,7 +116,7 @@ public void testSerializationNormal(@Mocked Catalog catalog, }; loadManager = new LoadManager(new LoadJobScheduler()); - LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, System.currentTimeMillis(), ""); + LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, System.currentTimeMillis(), "", ""); Deencapsulation.invoke(loadManager, "addLoadJob", job1); File file = serializeToFile(loadManager); @@ -145,7 +145,7 @@ public void testSerializationWithJobRemoved(@Mocked MetaContext metaContext, }; loadManager = new LoadManager(new LoadJobScheduler()); - LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, System.currentTimeMillis(), ""); + LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, System.currentTimeMillis(), "", ""); Deencapsulation.invoke(loadManager, "addLoadJob", job1); //make job1 don't serialize From e576e6bdfa91003d935d9f8716022efaf847cde4 Mon Sep 17 00:00:00 2001 From: morningman Date: Wed, 7 Aug 2019 16:22:44 +0800 Subject: [PATCH 02/11] add doc --- .../load-data/insert-into-manual.md | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/docs/documentation/cn/administrator-guide/load-data/insert-into-manual.md b/docs/documentation/cn/administrator-guide/load-data/insert-into-manual.md index c80b4f87cf728b..2b8b81fd1448e7 100644 --- a/docs/documentation/cn/administrator-guide/load-data/insert-into-manual.md +++ b/docs/documentation/cn/administrator-guide/load-data/insert-into-manual.md @@ -54,7 +54,17 @@ INSERT INTO tbl1 VALUES ("qweasdzxcqweasdzxc"), ("a"); Insert Into 本身就是一个 SQL 命令,所以返回的行为同 SQL 命令的返回行为。 -如果导入失败,则返回语句执行失败。如果导入成功,则返回语句执行成功,还会附加返回一个 Label 字段。 +如果导入失败,则返回语句执行失败。示例如下: + +```ERROR 1064 (HY000): all partitions have no load data. url: http://ip:port/api/_load_error_log?file=__shard_14/error_log_insert_stmt_f435264d82f342e4-a33764f5f0dfbf00_f435264d82f342e4_a33764f5f0dfbf00``` + +其中 url 可以用于查询错误的数据,具体见后面 **查看错误行** 小结。 + +如果导入成功,则返回语句执行成功,还会附加返回一个 Label 字段。 + +导入可能部分成功,则在返回结果中还会有一个 url,用户查看错误行。示例如下: + +```{"label":"d2cac0a0-a16d-482d-9041-c949a4b71604","url":"http://ip:port/api/_load_error_log?file=__shard_13/error_log_insert_stmt_d2cac0a0a16d482d-9041c949a4b71605_d2cac0a0a16d482d_9041c949a4b71605"}``` Label 是该 Insert Into 导入作业的标识。每个导入作业,都有一个在单 database 内部唯一的 Label。Insert Into 的 Label 则是由系统生成的,用户可以拿着这个 Label 通过查询导入命令异步获取导入状态。 @@ -132,6 +142,14 @@ bj_store_sales schema: * 查看错误行 - 由于 Insert Into 无法控制错误率,只能通过 `enable_insert_strict` 设置为完全容忍错误数据或完全忽略错误数据。因此如果 `enable_insert_strict` 设为 true,则 Insert Into 可能会失败。而如果 `enable_insert_strict` 设为 false,则可能出现仅导入了部分合格数据的情况。但无论以上哪种情况,Doris 目前无法提供查看不合格数据行的功能。因此用户无法通过 Insert Into 语句来查看具体的导入错误。 + 由于 Insert Into 无法控制错误率,只能通过 `enable_insert_strict` 设置为完全容忍错误数据或完全忽略错误数据。因此如果 `enable_insert_strict` 设为 true,则 Insert Into 可能会失败。而如果 `enable_insert_strict` 设为 false,则可能出现仅导入了部分合格数据的情况。 + + 当返回结果中提供了 url 字段时,可以通过以下命令查看错误行: + + ```SHOW LOAD WARNINGS ON "url";``` + + 示例: + + ```SHOW LOAD WARNINGS ON "http://ip:port/api/_load_error_log?file=__shard_13/error_log_insert_stmt_d2cac0a0a16d482d-9041c949a4b71605_d2cac0a0a16d482d_9041c949a4b71605";``` - 错误的原因通常如:源数据列长度超过目的数据列长度、列类型不匹配、分区不匹配、列顺序不匹配等等。当依然无法检查出问题时。目前只能建议先运行 Insert Into 语句中的 SELECT 命令将数据导出到一个文件中,然后在通过 Stream load 的方式导入这个文件,来查看具体的错误。 + 错误的原因通常如:源数据列长度超过目的数据列长度、列类型不匹配、分区不匹配、列顺序不匹配等等。 From 9d5661323c9ca828e93b07a018c3e7ca384df92a Mon Sep 17 00:00:00 2001 From: morningman Date: Wed, 7 Aug 2019 16:28:10 +0800 Subject: [PATCH 03/11] Support checking error data row when doing INSERT. If INSERT operation failed on some error data rows, a url will be provided for user to checking the error data rows. eg: {"label":"d2cac0a0-a16d-482d-9041-c949a4b71604","url":"xxx"} or ERROR 1064 (HY000): all partitions have no load data. url:xxx --- fe/src/main/java/org/apache/doris/mysql/MysqlErrPacket.java | 3 +-- fe/src/main/java/org/apache/doris/qe/StmtExecutor.java | 2 -- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/mysql/MysqlErrPacket.java b/fe/src/main/java/org/apache/doris/mysql/MysqlErrPacket.java index 571a5a464b9d6e..000cc0442d2f79 100644 --- a/fe/src/main/java/org/apache/doris/mysql/MysqlErrPacket.java +++ b/fe/src/main/java/org/apache/doris/mysql/MysqlErrPacket.java @@ -57,8 +57,7 @@ public void writeTo(MysqlSerializer serializer) { if (errorMessage == null || errorMessage.isEmpty()) { // NOTICE: if write "" or "\0", the client will be show "Query OK" // SO we need write no-empty string - LOG.info("get empty error msg"); - serializer.writeEofString(" "); + serializer.writeEofString("Unknown error"); } else { serializer.writeEofString(errorMessage); } diff --git a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java index 8aba8bed6c2f1f..1584e9ba88c543 100644 --- a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -626,8 +626,6 @@ private void handleInsertStmt() throws Exception { context.getState().setError("Insert has filtered data in strict mode, tracking_url=" + coord.getTrackingUrl()); return; - // throw new UserException("Insert has filtered data in strict mode, tracking_url=" - // + coord.getTrackingUrl()); } } From 6673d3fc62a2f8a602fe13d601de9cdf765602a8 Mon Sep 17 00:00:00 2001 From: morningman Date: Wed, 7 Aug 2019 16:47:47 +0800 Subject: [PATCH 04/11] fix ut --- fe/src/test/java/org/apache/doris/mysql/MysqlErrPacketTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/src/test/java/org/apache/doris/mysql/MysqlErrPacketTest.java b/fe/src/test/java/org/apache/doris/mysql/MysqlErrPacketTest.java index 8354f2c67ced83..640fde8240a127 100644 --- a/fe/src/test/java/org/apache/doris/mysql/MysqlErrPacketTest.java +++ b/fe/src/test/java/org/apache/doris/mysql/MysqlErrPacketTest.java @@ -76,7 +76,7 @@ public void testWriteNullMsg() { Assert.assertEquals("HY000", new String(MysqlProto.readFixedString(buffer, 5))); // sql state // NOTE: we put one space if MysqlErrPacket's errorMessage is null or empty - Assert.assertEquals(" ", new String(MysqlProto.readEofString(buffer))); + Assert.assertEquals("Unknown error", new String(MysqlProto.readEofString(buffer))); Assert.assertEquals(0, buffer.remaining()); } From 161bd34793878993f63c03de924969de54160118 Mon Sep 17 00:00:00 2001 From: morningman Date: Wed, 7 Aug 2019 18:30:47 +0800 Subject: [PATCH 05/11] fix nullpointer exception --- .../org/apache/doris/transaction/GlobalTransactionMgr.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index b024ccc1268c11..d88b79deeee3cb 100644 --- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -251,7 +251,8 @@ public void commitTransaction(long dbId, long transactionId, List Date: Thu, 15 Aug 2019 09:48:33 +0800 Subject: [PATCH 06/11] add affected rows --- .../load-data/insert-into-manual.md | 6 ++-- .../org/apache/doris/mysql/MysqlOkPacket.java | 10 +++--- .../java/org/apache/doris/qe/QueryState.java | 18 +++++++++-- .../org/apache/doris/qe/StmtExecutor.java | 31 +++++++++---------- 4 files changed, 38 insertions(+), 27 deletions(-) diff --git a/docs/documentation/cn/administrator-guide/load-data/insert-into-manual.md b/docs/documentation/cn/administrator-guide/load-data/insert-into-manual.md index 2b8b81fd1448e7..35685fb08c3d92 100644 --- a/docs/documentation/cn/administrator-guide/load-data/insert-into-manual.md +++ b/docs/documentation/cn/administrator-guide/load-data/insert-into-manual.md @@ -60,11 +60,11 @@ Insert Into 本身就是一个 SQL 命令,所以返回的行为同 SQL 命令 其中 url 可以用于查询错误的数据,具体见后面 **查看错误行** 小结。 -如果导入成功,则返回语句执行成功,还会附加返回一个 Label 字段。 +如果导入成功,则返回语句执行成功,还会附加返回一个 Label 字段。示例如下: -导入可能部分成功,则在返回结果中还会有一个 url,用户查看错误行。示例如下: +```{"label":"d2cac0a0-a16d-482d-9041-c949a4b71604"}``` -```{"label":"d2cac0a0-a16d-482d-9041-c949a4b71604","url":"http://ip:port/api/_load_error_log?file=__shard_13/error_log_insert_stmt_d2cac0a0a16d482d-9041c949a4b71605_d2cac0a0a16d482d_9041c949a4b71605"}``` +导入可能部分成功,用户需要通过 `SHOW LOAD WHERE LABEL="xxx";` 命令,获取 url 查看错误行。 Label 是该 Insert Into 导入作业的标识。每个导入作业,都有一个在单 database 内部唯一的 Label。Insert Into 的 Label 则是由系统生成的,用户可以拿着这个 Label 通过查询导入命令异步获取导入状态。 diff --git a/fe/src/main/java/org/apache/doris/mysql/MysqlOkPacket.java b/fe/src/main/java/org/apache/doris/mysql/MysqlOkPacket.java index 5a70e6eec93bf8..3614a0175fbf59 100644 --- a/fe/src/main/java/org/apache/doris/mysql/MysqlOkPacket.java +++ b/fe/src/main/java/org/apache/doris/mysql/MysqlOkPacket.java @@ -25,14 +25,16 @@ public class MysqlOkPacket extends MysqlPacket { private static final int PACKET_OK_INDICATOR = 0X00; // TODO(zhaochun): following are not used in palo - private static final long AFFECT_ROWS = 0; private static final long LAST_INSERT_ID = 0; private static final int STATUS_FLAGS = 0; - private static final int WARNINGS = 0; private final String infoMessage; + private long affectedRows = 0; + private int warningRows = 0; public MysqlOkPacket(QueryState state) { infoMessage = state.getInfoMessage(); + affectedRows = state.getAffectedRows(); + warningRows = state.getWarningRows(); } @Override @@ -41,11 +43,11 @@ public void writeTo(MysqlSerializer serializer) { MysqlCapability capability = serializer.getCapability(); serializer.writeInt1(PACKET_OK_INDICATOR); - serializer.writeVInt(AFFECT_ROWS); + serializer.writeVInt(affectedRows); serializer.writeVInt(LAST_INSERT_ID); if (capability.isProtocol41()) { serializer.writeInt2(STATUS_FLAGS); - serializer.writeInt2(WARNINGS); + serializer.writeInt2(warningRows); } else if (capability.isTransactions()) { serializer.writeInt2(STATUS_FLAGS); } diff --git a/fe/src/main/java/org/apache/doris/qe/QueryState.java b/fe/src/main/java/org/apache/doris/qe/QueryState.java index b1ddfda0e2a4f9..871407742a2579 100644 --- a/fe/src/main/java/org/apache/doris/qe/QueryState.java +++ b/fe/src/main/java/org/apache/doris/qe/QueryState.java @@ -43,6 +43,8 @@ public enum ErrType { private String infoMessage; private ErrType errType = ErrType.OTHER_ERR; private boolean isQuery = false; + private long affectedRows = 0; + private int warningRows = 0; public QueryState() { } @@ -62,12 +64,14 @@ public void setEof() { } public void setOk() { - stateType = MysqlStateType.OK; + setOk(0, 0, null); } - public void setOk(String infoMessage) { - stateType = MysqlStateType.OK; + public void setOk(long affectedRows, int warningRows, String infoMessage) { + this.affectedRows = affectedRows; + this.warningRows = warningRows; this.infoMessage = infoMessage; + stateType = MysqlStateType.OK; } public void setError(String errorMsg) { @@ -109,6 +113,14 @@ public ErrorCode getErrorCode() { return errorCode; } + public long getAffectedRows() { + return affectedRows; + } + + public int getWarningRows() { + return warningRows; + } + public MysqlPacket toResponsePacket() { MysqlPacket packet = null; switch (stateType) { diff --git a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java index 1584e9ba88c543..a9a86d92da7c1f 100644 --- a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -74,8 +74,6 @@ import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -594,6 +592,9 @@ private void handleInsertStmt() throws Exception { long createTime = System.currentTimeMillis(); UUID uuid = UUID.randomUUID(); Throwable throwable = null; + + long loadedRows = Long.valueOf(coord.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL)); + int filteredRows = Integer.valueOf(coord.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL)); try { // assign request_id context.setQueryId(new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits())); @@ -619,10 +620,15 @@ private void handleInsertStmt() throws Exception { LOG.info("delta files is {}", coord.getDeltaUrls()); + if (coord.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL) != null) { + loadedRows = Long.valueOf(coord.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL)); + } + if (coord.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL) != null) { + filteredRows = Integer.valueOf(coord.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL)); + } + if (context.getSessionVariable().getEnableInsertStrict()) { - Map counters = coord.getLoadCounters(); - String strValue = counters.get(LoadEtlTask.DPP_ABNORMAL_ALL); - if (strValue != null && Long.valueOf(strValue) > 0) { + if (filteredRows > 0) { context.getState().setError("Insert has filtered data in strict mode, tracking_url=" + coord.getTrackingUrl()); return; @@ -632,7 +638,7 @@ private void handleInsertStmt() throws Exception { if (insertStmt.getTargetTable().getType() != TableType.OLAP) { // no need to add load job. // MySQL table is already being inserted. - context.getState().setOk(); + context.getState().setOk(loadedRows, filteredRows, null); return; } @@ -698,19 +704,10 @@ private void handleInsertStmt() throws Exception { // set to OK, which means the insert load job is successfully submitted. // and user can check the job's status by label. - Map res = Maps.newHashMap(); - res.put("label", uuid.toString()); - if (!Strings.isNullOrEmpty(coord.getTrackingUrl())) { - res.put("url", coord.getTrackingUrl()); - } - GsonBuilder gsonBuilder = new GsonBuilder(); - gsonBuilder.disableHtmlEscaping(); - Gson gson = gsonBuilder.create(); - - context.getState().setOk(gson.toJson(res)); + context.getState().setOk(loadedRows, filteredRows, "{'label':'" + uuid.toString() + "'}"); } else { // just return OK without label, which means this job is successfully done - context.getState().setOk(); + context.getState().setOk(loadedRows, filteredRows, null); } } From b46103ac80d8a6cf3ed1f5c2e14bd84e39a63832 Mon Sep 17 00:00:00 2001 From: morningman Date: Thu, 15 Aug 2019 11:19:23 +0800 Subject: [PATCH 07/11] add insert load row --- be/src/exec/tablet_sink.cpp | 1 + .../java/org/apache/doris/qe/StmtExecutor.java | 18 ++++++++++++------ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 732bc40a7236f7..580678d4ea8520 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -618,6 +618,7 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { COUNTER_SET(_validate_data_timer, _validate_data_ns); COUNTER_SET(_wait_in_flight_packet_timer, _wait_in_flight_packet_ns); COUNTER_SET(_serialize_batch_timer, _serialize_batch_ns); + state->update_num_rows_load_total(_number_input_rows); state->update_num_rows_load_filtered(_number_filtered_rows); // print log of add batch time of all node, for tracing load performance easily diff --git a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java index a9a86d92da7c1f..adde9f6db1d249 100644 --- a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -71,6 +71,7 @@ import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.TabletCommitInfo; +import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -593,8 +594,8 @@ private void handleInsertStmt() throws Exception { UUID uuid = UUID.randomUUID(); Throwable throwable = null; - long loadedRows = Long.valueOf(coord.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL)); - int filteredRows = Integer.valueOf(coord.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL)); + long loadedRows = 0; + int filteredRows = 0; try { // assign request_id context.setQueryId(new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits())); @@ -618,7 +619,7 @@ private void handleInsertStmt() throws Exception { ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT); } - LOG.info("delta files is {}", coord.getDeltaUrls()); + LOG.debug("delta files is {}", coord.getDeltaUrls()); if (coord.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL) != null) { loadedRows = Long.valueOf(coord.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL)); @@ -627,6 +628,7 @@ private void handleInsertStmt() throws Exception { filteredRows = Integer.valueOf(coord.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL)); } + // if in strict mode, insert will fail if there are filtered rows if (context.getSessionVariable().getEnableInsertStrict()) { if (filteredRows > 0) { context.getState().setError("Insert has filtered data in strict mode, tracking_url=" @@ -683,8 +685,11 @@ private void handleInsertStmt() throws Exception { throwable = t; } - // record insert info for show load stmt - if (!insertStmt.isStreaming() || Config.using_old_load_usage_pattern) { + // record insert info for show load stmt if + // 1. NOT a streaming insert(deprecated) + // 2. using_old_load_usage_pattern is set to true, means a label will be returned for user to show load. + // 3. has filtered rows. so a label should be returned for user to show + if (!insertStmt.isStreaming() || Config.using_old_load_usage_pattern || filteredRows > 0) { try { context.getCatalog().getLoadManager().recordFinishedLoadJob( uuid.toString(), @@ -706,7 +711,8 @@ private void handleInsertStmt() throws Exception { // and user can check the job's status by label. context.getState().setOk(loadedRows, filteredRows, "{'label':'" + uuid.toString() + "'}"); } else { - // just return OK without label, which means this job is successfully done + // just return OK without label, which means this job is successfully done without any error + Preconditions.checkState(loadedRows > 0 && filteredRows == 0); context.getState().setOk(loadedRows, filteredRows, null); } } From ccc9de92bae893b11a86421c32cecc80ac821354 Mon Sep 17 00:00:00 2001 From: morningman Date: Thu, 15 Aug 2019 12:14:11 +0800 Subject: [PATCH 08/11] fix bug --- fe/src/main/java/org/apache/doris/qe/StmtExecutor.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java index adde9f6db1d249..ee57e56bba4ab4 100644 --- a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -644,12 +644,6 @@ private void handleInsertStmt() throws Exception { return; } - if (insertStmt.getQueryStmt() != null && (coord.getCommitInfos() == null || coord.getCommitInfos().isEmpty())) { - Catalog.getCurrentGlobalTransactionMgr().abortTransaction(insertStmt.getTransactionId(), "select stmt return empty set when insert"); - context.getState().setOk(); - return; - } - Catalog.getCurrentGlobalTransactionMgr().commitAndPublishTransaction( insertStmt.getDbObj(), insertStmt.getTransactionId(), TabletCommitInfo.fromThrift(coord.getCommitInfos()), From 61fe8f321a58ed418f6c93a65844574e22ea2192 Mon Sep 17 00:00:00 2001 From: morningman Date: Thu, 15 Aug 2019 12:19:53 +0800 Subject: [PATCH 09/11] add doc --- .../load-data/insert-into-manual.md | 23 +++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/docs/documentation/cn/administrator-guide/load-data/insert-into-manual.md b/docs/documentation/cn/administrator-guide/load-data/insert-into-manual.md index 35685fb08c3d92..cc8a186bd8ef54 100644 --- a/docs/documentation/cn/administrator-guide/load-data/insert-into-manual.md +++ b/docs/documentation/cn/administrator-guide/load-data/insert-into-manual.md @@ -60,11 +60,20 @@ Insert Into 本身就是一个 SQL 命令,所以返回的行为同 SQL 命令 其中 url 可以用于查询错误的数据,具体见后面 **查看错误行** 小结。 -如果导入成功,则返回语句执行成功,还会附加返回一个 Label 字段。示例如下: +如果导入成功,则返回语句执行成功。示例如下: -```{"label":"d2cac0a0-a16d-482d-9041-c949a4b71604"}``` +``` +Query OK, 100 row affected, 0 warning (0.22 sec) +``` + +导入可能部分成功,则还会附加一个 Label 字段。示例如下: -导入可能部分成功,用户需要通过 `SHOW LOAD WHERE LABEL="xxx";` 命令,获取 url 查看错误行。 +``` +Query OK, 100 row affected, 1 warning (0.23 sec) +{'label':'7d66c457-658b-4a3e-bdcf-8beee872ef2c'} +``` + +其中 affected 表示导入的行数。warning 表示失败的行数。用户需要通过 `SHOW LOAD WHERE LABEL="xxx";` 命令,获取 url 查看错误行。 Label 是该 Insert Into 导入作业的标识。每个导入作业,都有一个在单 database 内部唯一的 Label。Insert Into 的 Label 则是由系统生成的,用户可以拿着这个 Label 通过查询导入命令异步获取导入状态。 @@ -84,7 +93,13 @@ Label 是该 Insert Into 导入作业的标识。每个导入作业,都有一 + enable\_insert\_strict - Insert Into 导入本身不能控制导入可容忍的错误率。用户只能通过 `enable_insert_strict` 这个 Session 参数用来控制。当该参数设置为 false 时,表示至少有一条数据被正确导入,则返回成功。当该参数设置为 false 时,表示如果有一条数据错误,则导入失败。默认为 false。可通过 `SET enable_insert_strict = true;` 来设置。 + Insert Into 导入本身不能控制导入可容忍的错误率。用户只能通过 `enable_insert_strict` 这个 Session 参数用来控制。 + + 当该参数设置为 false 时,表示至少有一条数据被正确导入,则返回成功。如果有失败数据,则还会返回一个 Label。 + + 当该参数设置为 false 时,表示如果有一条数据错误,则导入失败。 + + 默认为 false。可通过 `SET enable_insert_strict = true;` 来设置。 + query\_timeout From 02f34a410b63117c4e9096fd38ac0cf51c853dfb Mon Sep 17 00:00:00 2001 From: morningman Date: Fri, 16 Aug 2019 16:52:09 +0800 Subject: [PATCH 10/11] fix review --- .../cn/administrator-guide/load-data/insert-into-manual.md | 2 ++ fe/src/main/java/org/apache/doris/qe/StmtExecutor.java | 6 ++++++ 2 files changed, 8 insertions(+) diff --git a/docs/documentation/cn/administrator-guide/load-data/insert-into-manual.md b/docs/documentation/cn/administrator-guide/load-data/insert-into-manual.md index cc8a186bd8ef54..f1d83f306f0fa9 100644 --- a/docs/documentation/cn/administrator-guide/load-data/insert-into-manual.md +++ b/docs/documentation/cn/administrator-guide/load-data/insert-into-manual.md @@ -75,6 +75,8 @@ Query OK, 100 row affected, 1 warning (0.23 sec) 其中 affected 表示导入的行数。warning 表示失败的行数。用户需要通过 `SHOW LOAD WHERE LABEL="xxx";` 命令,获取 url 查看错误行。 +如果没有任何数据,也会返回成功,且 affected 和 warning 都是 0。 + Label 是该 Insert Into 导入作业的标识。每个导入作业,都有一个在单 database 内部唯一的 Label。Insert Into 的 Label 则是由系统生成的,用户可以拿着这个 Label 通过查询导入命令异步获取导入状态。 ## 相关系统配置 diff --git a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java index ee57e56bba4ab4..e4fff0c57ed4e5 100644 --- a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -644,6 +644,12 @@ private void handleInsertStmt() throws Exception { return; } + if (loadedRows == 0 && filteredRows == 0) { + // if no data, just return ok + context.getState().setOk(); + return; + } + Catalog.getCurrentGlobalTransactionMgr().commitAndPublishTransaction( insertStmt.getDbObj(), insertStmt.getTransactionId(), TabletCommitInfo.fromThrift(coord.getCommitInfos()), From 02c240a7ef5a0e0adfa2ab97696c167d25fb7035 Mon Sep 17 00:00:00 2001 From: morningman Date: Fri, 16 Aug 2019 19:46:23 +0800 Subject: [PATCH 11/11] fix review --- fe/src/main/java/org/apache/doris/qe/StmtExecutor.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java index e4fff0c57ed4e5..ac89629411f9da 100644 --- a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -70,6 +70,7 @@ import org.apache.doris.thrift.TQueryType; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.TabletCommitInfo; +import org.apache.doris.transaction.TransactionCommitFailedException; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -645,7 +646,9 @@ private void handleInsertStmt() throws Exception { } if (loadedRows == 0 && filteredRows == 0) { - // if no data, just return ok + // if no data, just abort txn and return ok + Catalog.getCurrentGlobalTransactionMgr().abortTransaction(insertStmt.getTransactionId(), + TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG); context.getState().setOk(); return; }