diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 732bc40a7236f7..580678d4ea8520 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -618,6 +618,7 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { COUNTER_SET(_validate_data_timer, _validate_data_ns); COUNTER_SET(_wait_in_flight_packet_timer, _wait_in_flight_packet_ns); COUNTER_SET(_serialize_batch_timer, _serialize_batch_ns); + state->update_num_rows_load_total(_number_input_rows); state->update_num_rows_load_filtered(_number_filtered_rows); // print log of add batch time of all node, for tracing load performance easily diff --git a/docs/documentation/cn/administrator-guide/load-data/insert-into-manual.md b/docs/documentation/cn/administrator-guide/load-data/insert-into-manual.md index c80b4f87cf728b..f1d83f306f0fa9 100644 --- a/docs/documentation/cn/administrator-guide/load-data/insert-into-manual.md +++ b/docs/documentation/cn/administrator-guide/load-data/insert-into-manual.md @@ -54,7 +54,28 @@ INSERT INTO tbl1 VALUES ("qweasdzxcqweasdzxc"), ("a"); Insert Into 本身就是一个 SQL 命令,所以返回的行为同 SQL 命令的返回行为。 -如果导入失败,则返回语句执行失败。如果导入成功,则返回语句执行成功,还会附加返回一个 Label 字段。 +如果导入失败,则返回语句执行失败。示例如下: + +```ERROR 1064 (HY000): all partitions have no load data. url: http://ip:port/api/_load_error_log?file=__shard_14/error_log_insert_stmt_f435264d82f342e4-a33764f5f0dfbf00_f435264d82f342e4_a33764f5f0dfbf00``` + +其中 url 可以用于查询错误的数据,具体见后面 **查看错误行** 小结。 + +如果导入成功,则返回语句执行成功。示例如下: + +``` +Query OK, 100 row affected, 0 warning (0.22 sec) +``` + +导入可能部分成功,则还会附加一个 Label 字段。示例如下: + +``` +Query OK, 100 row affected, 1 warning (0.23 sec) +{'label':'7d66c457-658b-4a3e-bdcf-8beee872ef2c'} +``` + +其中 affected 表示导入的行数。warning 表示失败的行数。用户需要通过 `SHOW LOAD WHERE LABEL="xxx";` 命令,获取 url 查看错误行。 + +如果没有任何数据,也会返回成功,且 affected 和 warning 都是 0。 Label 是该 Insert Into 导入作业的标识。每个导入作业,都有一个在单 database 内部唯一的 Label。Insert Into 的 Label 则是由系统生成的,用户可以拿着这个 Label 通过查询导入命令异步获取导入状态。 @@ -74,7 +95,13 @@ Label 是该 Insert Into 导入作业的标识。每个导入作业,都有一 + enable\_insert\_strict - Insert Into 导入本身不能控制导入可容忍的错误率。用户只能通过 `enable_insert_strict` 这个 Session 参数用来控制。当该参数设置为 false 时,表示至少有一条数据被正确导入,则返回成功。当该参数设置为 false 时,表示如果有一条数据错误,则导入失败。默认为 false。可通过 `SET enable_insert_strict = true;` 来设置。 + Insert Into 导入本身不能控制导入可容忍的错误率。用户只能通过 `enable_insert_strict` 这个 Session 参数用来控制。 + + 当该参数设置为 false 时,表示至少有一条数据被正确导入,则返回成功。如果有失败数据,则还会返回一个 Label。 + + 当该参数设置为 false 时,表示如果有一条数据错误,则导入失败。 + + 默认为 false。可通过 `SET enable_insert_strict = true;` 来设置。 + query\_timeout @@ -132,6 +159,14 @@ bj_store_sales schema: * 查看错误行 - 由于 Insert Into 无法控制错误率,只能通过 `enable_insert_strict` 设置为完全容忍错误数据或完全忽略错误数据。因此如果 `enable_insert_strict` 设为 true,则 Insert Into 可能会失败。而如果 `enable_insert_strict` 设为 false,则可能出现仅导入了部分合格数据的情况。但无论以上哪种情况,Doris 目前无法提供查看不合格数据行的功能。因此用户无法通过 Insert Into 语句来查看具体的导入错误。 + 由于 Insert Into 无法控制错误率,只能通过 `enable_insert_strict` 设置为完全容忍错误数据或完全忽略错误数据。因此如果 `enable_insert_strict` 设为 true,则 Insert Into 可能会失败。而如果 `enable_insert_strict` 设为 false,则可能出现仅导入了部分合格数据的情况。 + + 当返回结果中提供了 url 字段时,可以通过以下命令查看错误行: + + ```SHOW LOAD WARNINGS ON "url";``` + + 示例: + + ```SHOW LOAD WARNINGS ON "http://ip:port/api/_load_error_log?file=__shard_13/error_log_insert_stmt_d2cac0a0a16d482d-9041c949a4b71605_d2cac0a0a16d482d_9041c949a4b71605";``` - 错误的原因通常如:源数据列长度超过目的数据列长度、列类型不匹配、分区不匹配、列顺序不匹配等等。当依然无法检查出问题时。目前只能建议先运行 Insert Into 语句中的 SELECT 命令将数据导出到一个文件中,然后在通过 Stream load 的方式导入这个文件,来查看具体的错误。 + 错误的原因通常如:源数据列长度超过目的数据列长度、列类型不匹配、分区不匹配、列顺序不匹配等等。 diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java index 3cb5f3dd00329d..2dbd66fbb7ee14 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java @@ -50,8 +50,8 @@ public InsertLoadJob() { this.jobType = EtlJobType.INSERT; } - public InsertLoadJob(String label, long dbId, long tableId, long createTimestamp, String failMsg) - throws MetaNotFoundException { + public InsertLoadJob(String label, long dbId, long tableId, long createTimestamp, String failMsg, + String trackingUrl) throws MetaNotFoundException { super(dbId, label); this.tableId = tableId; this.createTimestamp = createTimestamp; @@ -68,6 +68,7 @@ public InsertLoadJob(String label, long dbId, long tableId, long createTimestamp this.jobType = EtlJobType.INSERT; this.timeoutSecond = Config.insert_load_default_timeout_second; this.authorizationInfo = gatherAuthInfo(); + this.loadingStatus.setTrackingUrl(trackingUrl); } public AuthorizationInfo gatherAuthInfo() throws MetaNotFoundException { diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index c6022a514dbd74..a2ed3748f3ff8f 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -256,7 +256,7 @@ private void addLoadJob(LoadJob loadJob) { } public void recordFinishedLoadJob(String label, String dbName, long tableId, EtlJobType jobType, - long createTimestamp, String failMsg) throws MetaNotFoundException { + long createTimestamp, String failMsg, String trackingUrl) throws MetaNotFoundException { // get db id Database db = Catalog.getCurrentCatalog().getDb(dbName); @@ -267,7 +267,7 @@ public void recordFinishedLoadJob(String label, String dbName, long tableId, Etl LoadJob loadJob; switch (jobType) { case INSERT: - loadJob = new InsertLoadJob(label, db.getId(), tableId, createTimestamp, failMsg); + loadJob = new InsertLoadJob(label, db.getId(), tableId, createTimestamp, failMsg, trackingUrl); break; default: return; diff --git a/fe/src/main/java/org/apache/doris/mysql/MysqlErrPacket.java b/fe/src/main/java/org/apache/doris/mysql/MysqlErrPacket.java index 946024e9e69519..000cc0442d2f79 100644 --- a/fe/src/main/java/org/apache/doris/mysql/MysqlErrPacket.java +++ b/fe/src/main/java/org/apache/doris/mysql/MysqlErrPacket.java @@ -20,8 +20,13 @@ import org.apache.doris.common.ErrorCode; import org.apache.doris.qe.QueryState; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + // MySQL protocol error packet public class MysqlErrPacket extends MysqlPacket { + private static final Logger LOG = LogManager.getLogger(MysqlErrPacket.class); + private static final int ERROR_PACKET_INDICATOR = 0XFF; // only first FIVE char is useful in SQL STATE private byte[] sqlState = {'H', 'Y', '0', '0', '0'}; @@ -52,7 +57,7 @@ public void writeTo(MysqlSerializer serializer) { if (errorMessage == null || errorMessage.isEmpty()) { // NOTICE: if write "" or "\0", the client will be show "Query OK" // SO we need write no-empty string - serializer.writeEofString(" "); + serializer.writeEofString("Unknown error"); } else { serializer.writeEofString(errorMessage); } diff --git a/fe/src/main/java/org/apache/doris/mysql/MysqlOkPacket.java b/fe/src/main/java/org/apache/doris/mysql/MysqlOkPacket.java index 5a70e6eec93bf8..3614a0175fbf59 100644 --- a/fe/src/main/java/org/apache/doris/mysql/MysqlOkPacket.java +++ b/fe/src/main/java/org/apache/doris/mysql/MysqlOkPacket.java @@ -25,14 +25,16 @@ public class MysqlOkPacket extends MysqlPacket { private static final int PACKET_OK_INDICATOR = 0X00; // TODO(zhaochun): following are not used in palo - private static final long AFFECT_ROWS = 0; private static final long LAST_INSERT_ID = 0; private static final int STATUS_FLAGS = 0; - private static final int WARNINGS = 0; private final String infoMessage; + private long affectedRows = 0; + private int warningRows = 0; public MysqlOkPacket(QueryState state) { infoMessage = state.getInfoMessage(); + affectedRows = state.getAffectedRows(); + warningRows = state.getWarningRows(); } @Override @@ -41,11 +43,11 @@ public void writeTo(MysqlSerializer serializer) { MysqlCapability capability = serializer.getCapability(); serializer.writeInt1(PACKET_OK_INDICATOR); - serializer.writeVInt(AFFECT_ROWS); + serializer.writeVInt(affectedRows); serializer.writeVInt(LAST_INSERT_ID); if (capability.isProtocol41()) { serializer.writeInt2(STATUS_FLAGS); - serializer.writeInt2(WARNINGS); + serializer.writeInt2(warningRows); } else if (capability.isTransactions()) { serializer.writeInt2(STATUS_FLAGS); } diff --git a/fe/src/main/java/org/apache/doris/qe/QueryState.java b/fe/src/main/java/org/apache/doris/qe/QueryState.java index b1ddfda0e2a4f9..871407742a2579 100644 --- a/fe/src/main/java/org/apache/doris/qe/QueryState.java +++ b/fe/src/main/java/org/apache/doris/qe/QueryState.java @@ -43,6 +43,8 @@ public enum ErrType { private String infoMessage; private ErrType errType = ErrType.OTHER_ERR; private boolean isQuery = false; + private long affectedRows = 0; + private int warningRows = 0; public QueryState() { } @@ -62,12 +64,14 @@ public void setEof() { } public void setOk() { - stateType = MysqlStateType.OK; + setOk(0, 0, null); } - public void setOk(String infoMessage) { - stateType = MysqlStateType.OK; + public void setOk(long affectedRows, int warningRows, String infoMessage) { + this.affectedRows = affectedRows; + this.warningRows = warningRows; this.infoMessage = infoMessage; + stateType = MysqlStateType.OK; } public void setError(String errorMsg) { @@ -109,6 +113,14 @@ public ErrorCode getErrorCode() { return errorCode; } + public long getAffectedRows() { + return affectedRows; + } + + public int getWarningRows() { + return warningRows; + } + public MysqlPacket toResponsePacket() { MysqlPacket packet = null; switch (stateType) { diff --git a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java index ede891439b0e1f..ac89629411f9da 100644 --- a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -70,7 +70,9 @@ import org.apache.doris.thrift.TQueryType; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.TabletCommitInfo; +import org.apache.doris.transaction.TransactionCommitFailedException; +import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -592,6 +594,9 @@ private void handleInsertStmt() throws Exception { long createTime = System.currentTimeMillis(); UUID uuid = UUID.randomUUID(); Throwable throwable = null; + + long loadedRows = 0; + int filteredRows = 0; try { // assign request_id context.setQueryId(new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits())); @@ -612,35 +617,38 @@ private void handleInsertStmt() throws Exception { if (!coord.getExecStatus().ok()) { String errMsg = coord.getExecStatus().getErrorMsg(); LOG.warn("insert failed: {}", errMsg); - - // hide host info - int hostIndex = errMsg.indexOf("host"); - if (hostIndex != -1) { - errMsg = errMsg.substring(0, hostIndex); - } ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT); } - LOG.info("delta files is {}", coord.getDeltaUrls()); + LOG.debug("delta files is {}", coord.getDeltaUrls()); + + if (coord.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL) != null) { + loadedRows = Long.valueOf(coord.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL)); + } + if (coord.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL) != null) { + filteredRows = Integer.valueOf(coord.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL)); + } + // if in strict mode, insert will fail if there are filtered rows if (context.getSessionVariable().getEnableInsertStrict()) { - Map counters = coord.getLoadCounters(); - String strValue = counters.get(LoadEtlTask.DPP_ABNORMAL_ALL); - if (strValue != null && Long.valueOf(strValue) > 0) { - throw new UserException("Insert has filtered data in strict mode, tracking_url=" + if (filteredRows > 0) { + context.getState().setError("Insert has filtered data in strict mode, tracking_url=" + coord.getTrackingUrl()); + return; } } if (insertStmt.getTargetTable().getType() != TableType.OLAP) { // no need to add load job. // MySQL table is already being inserted. - context.getState().setOk(); + context.getState().setOk(loadedRows, filteredRows, null); return; } - if (insertStmt.getQueryStmt() != null && (coord.getCommitInfos() == null || coord.getCommitInfos().isEmpty())) { - Catalog.getCurrentGlobalTransactionMgr().abortTransaction(insertStmt.getTransactionId(), "select stmt return empty set when insert"); + if (loadedRows == 0 && filteredRows == 0) { + // if no data, just abort txn and return ok + Catalog.getCurrentGlobalTransactionMgr().abortTransaction(insertStmt.getTransactionId(), + TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG); context.getState().setOk(); return; } @@ -663,9 +671,13 @@ private void handleInsertStmt() throws Exception { } if (!Config.using_old_load_usage_pattern) { - // if not using old usage pattern, the exception will be thrown to user directly without - // a label - throw t; + // if not using old usage pattern, the exception will be thrown to user directly without a label + StringBuilder sb = new StringBuilder(t.getMessage()); + if (!Strings.isNullOrEmpty(coord.getTrackingUrl())) { + sb.append(". url: " + coord.getTrackingUrl()); + } + context.getState().setError(sb.toString()); + return; } /* @@ -676,8 +688,11 @@ private void handleInsertStmt() throws Exception { throwable = t; } - // record insert info for show load stmt - if (!insertStmt.isStreaming() || Config.using_old_load_usage_pattern) { + // record insert info for show load stmt if + // 1. NOT a streaming insert(deprecated) + // 2. using_old_load_usage_pattern is set to true, means a label will be returned for user to show load. + // 3. has filtered rows. so a label should be returned for user to show + if (!insertStmt.isStreaming() || Config.using_old_load_usage_pattern || filteredRows > 0) { try { context.getCatalog().getLoadManager().recordFinishedLoadJob( uuid.toString(), @@ -685,7 +700,8 @@ private void handleInsertStmt() throws Exception { insertStmt.getTargetTable().getId(), EtlJobType.INSERT, createTime, - throwable == null ? "" : throwable.getMessage() + throwable == null ? "" : throwable.getMessage(), + coord.getTrackingUrl() ); } catch (MetaNotFoundException e) { LOG.warn("Record info of insert load with error {}", e.getMessage(), e); @@ -696,10 +712,11 @@ private void handleInsertStmt() throws Exception { // set to OK, which means the insert load job is successfully submitted. // and user can check the job's status by label. - context.getState().setOk("{'label':'" + uuid.toString() + "'}"); + context.getState().setOk(loadedRows, filteredRows, "{'label':'" + uuid.toString() + "'}"); } else { - // just return OK without label, which means this job is successfully done - context.getState().setOk(); + // just return OK without label, which means this job is successfully done without any error + Preconditions.checkState(loadedRows > 0 && filteredRows == 0); + context.getState().setOk(loadedRows, filteredRows, null); } } diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index b024ccc1268c11..d88b79deeee3cb 100644 --- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -251,7 +251,8 @@ public void commitTransaction(long dbId, long transactionId, List