Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/exec/tablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,7 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) {
COUNTER_SET(_validate_data_timer, _validate_data_ns);
COUNTER_SET(_wait_in_flight_packet_timer, _wait_in_flight_packet_ns);
COUNTER_SET(_serialize_batch_timer, _serialize_batch_ns);
state->update_num_rows_load_total(_number_input_rows);
state->update_num_rows_load_filtered(_number_filtered_rows);

// print log of add batch time of all node, for tracing load performance easily
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,28 @@ INSERT INTO tbl1 VALUES ("qweasdzxcqweasdzxc"), ("a");

Insert Into 本身就是一个 SQL 命令,所以返回的行为同 SQL 命令的返回行为。

如果导入失败,则返回语句执行失败。如果导入成功,则返回语句执行成功,还会附加返回一个 Label 字段。
如果导入失败,则返回语句执行失败。示例如下:

```ERROR 1064 (HY000): all partitions have no load data. url: http://ip:port/api/_load_error_log?file=__shard_14/error_log_insert_stmt_f435264d82f342e4-a33764f5f0dfbf00_f435264d82f342e4_a33764f5f0dfbf00```

其中 url 可以用于查询错误的数据,具体见后面 **查看错误行** 小结。

如果导入成功,则返回语句执行成功。示例如下:

```
Query OK, 100 row affected, 0 warning (0.22 sec)
```

导入可能部分成功,则还会附加一个 Label 字段。示例如下:

```
Query OK, 100 row affected, 1 warning (0.23 sec)
{'label':'7d66c457-658b-4a3e-bdcf-8beee872ef2c'}
```

其中 affected 表示导入的行数。warning 表示失败的行数。用户需要通过 `SHOW LOAD WHERE LABEL="xxx";` 命令,获取 url 查看错误行。

如果没有任何数据,也会返回成功,且 affected 和 warning 都是 0。

Label 是该 Insert Into 导入作业的标识。每个导入作业,都有一个在单 database 内部唯一的 Label。Insert Into 的 Label 则是由系统生成的,用户可以拿着这个 Label 通过查询导入命令异步获取导入状态。

Expand All @@ -74,7 +95,13 @@ Label 是该 Insert Into 导入作业的标识。每个导入作业,都有一

+ enable\_insert\_strict

Insert Into 导入本身不能控制导入可容忍的错误率。用户只能通过 `enable_insert_strict` 这个 Session 参数用来控制。当该参数设置为 false 时,表示至少有一条数据被正确导入,则返回成功。当该参数设置为 false 时,表示如果有一条数据错误,则导入失败。默认为 false。可通过 `SET enable_insert_strict = true;` 来设置。
Insert Into 导入本身不能控制导入可容忍的错误率。用户只能通过 `enable_insert_strict` 这个 Session 参数用来控制。

当该参数设置为 false 时,表示至少有一条数据被正确导入,则返回成功。如果有失败数据,则还会返回一个 Label。

当该参数设置为 false 时,表示如果有一条数据错误,则导入失败。

默认为 false。可通过 `SET enable_insert_strict = true;` 来设置。

+ query\_timeout

Expand Down Expand Up @@ -132,6 +159,14 @@ bj_store_sales schema:

* 查看错误行

由于 Insert Into 无法控制错误率,只能通过 `enable_insert_strict` 设置为完全容忍错误数据或完全忽略错误数据。因此如果 `enable_insert_strict` 设为 true,则 Insert Into 可能会失败。而如果 `enable_insert_strict` 设为 false,则可能出现仅导入了部分合格数据的情况。但无论以上哪种情况,Doris 目前无法提供查看不合格数据行的功能。因此用户无法通过 Insert Into 语句来查看具体的导入错误。
由于 Insert Into 无法控制错误率,只能通过 `enable_insert_strict` 设置为完全容忍错误数据或完全忽略错误数据。因此如果 `enable_insert_strict` 设为 true,则 Insert Into 可能会失败。而如果 `enable_insert_strict` 设为 false,则可能出现仅导入了部分合格数据的情况。

当返回结果中提供了 url 字段时,可以通过以下命令查看错误行:

```SHOW LOAD WARNINGS ON "url";```

示例:

```SHOW LOAD WARNINGS ON "http://ip:port/api/_load_error_log?file=__shard_13/error_log_insert_stmt_d2cac0a0a16d482d-9041c949a4b71605_d2cac0a0a16d482d_9041c949a4b71605";```

错误的原因通常如:源数据列长度超过目的数据列长度、列类型不匹配、分区不匹配、列顺序不匹配等等。当依然无法检查出问题时。目前只能建议先运行 Insert Into 语句中的 SELECT 命令将数据导出到一个文件中,然后在通过 Stream load 的方式导入这个文件,来查看具体的错误。
错误的原因通常如:源数据列长度超过目的数据列长度、列类型不匹配、分区不匹配、列顺序不匹配等等。
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ public InsertLoadJob() {
this.jobType = EtlJobType.INSERT;
}

public InsertLoadJob(String label, long dbId, long tableId, long createTimestamp, String failMsg)
throws MetaNotFoundException {
public InsertLoadJob(String label, long dbId, long tableId, long createTimestamp, String failMsg,
String trackingUrl) throws MetaNotFoundException {
super(dbId, label);
this.tableId = tableId;
this.createTimestamp = createTimestamp;
Expand All @@ -68,6 +68,7 @@ public InsertLoadJob(String label, long dbId, long tableId, long createTimestamp
this.jobType = EtlJobType.INSERT;
this.timeoutSecond = Config.insert_load_default_timeout_second;
this.authorizationInfo = gatherAuthInfo();
this.loadingStatus.setTrackingUrl(trackingUrl);
}

public AuthorizationInfo gatherAuthInfo() throws MetaNotFoundException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ private void addLoadJob(LoadJob loadJob) {
}

public void recordFinishedLoadJob(String label, String dbName, long tableId, EtlJobType jobType,
long createTimestamp, String failMsg) throws MetaNotFoundException {
long createTimestamp, String failMsg, String trackingUrl) throws MetaNotFoundException {

// get db id
Database db = Catalog.getCurrentCatalog().getDb(dbName);
Expand All @@ -267,7 +267,7 @@ public void recordFinishedLoadJob(String label, String dbName, long tableId, Etl
LoadJob loadJob;
switch (jobType) {
case INSERT:
loadJob = new InsertLoadJob(label, db.getId(), tableId, createTimestamp, failMsg);
loadJob = new InsertLoadJob(label, db.getId(), tableId, createTimestamp, failMsg, trackingUrl);
break;
default:
return;
Expand Down
7 changes: 6 additions & 1 deletion fe/src/main/java/org/apache/doris/mysql/MysqlErrPacket.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@
import org.apache.doris.common.ErrorCode;
import org.apache.doris.qe.QueryState;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

// MySQL protocol error packet
public class MysqlErrPacket extends MysqlPacket {
private static final Logger LOG = LogManager.getLogger(MysqlErrPacket.class);

private static final int ERROR_PACKET_INDICATOR = 0XFF;
// only first FIVE char is useful in SQL STATE
private byte[] sqlState = {'H', 'Y', '0', '0', '0'};
Expand Down Expand Up @@ -52,7 +57,7 @@ public void writeTo(MysqlSerializer serializer) {
if (errorMessage == null || errorMessage.isEmpty()) {
// NOTICE: if write "" or "\0", the client will be show "Query OK"
// SO we need write no-empty string
serializer.writeEofString(" ");
serializer.writeEofString("Unknown error");
} else {
serializer.writeEofString(errorMessage);
}
Expand Down
10 changes: 6 additions & 4 deletions fe/src/main/java/org/apache/doris/mysql/MysqlOkPacket.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@
public class MysqlOkPacket extends MysqlPacket {
private static final int PACKET_OK_INDICATOR = 0X00;
// TODO(zhaochun): following are not used in palo
private static final long AFFECT_ROWS = 0;
private static final long LAST_INSERT_ID = 0;
private static final int STATUS_FLAGS = 0;
private static final int WARNINGS = 0;
private final String infoMessage;
private long affectedRows = 0;
private int warningRows = 0;

public MysqlOkPacket(QueryState state) {
infoMessage = state.getInfoMessage();
affectedRows = state.getAffectedRows();
warningRows = state.getWarningRows();
}

@Override
Expand All @@ -41,11 +43,11 @@ public void writeTo(MysqlSerializer serializer) {
MysqlCapability capability = serializer.getCapability();

serializer.writeInt1(PACKET_OK_INDICATOR);
serializer.writeVInt(AFFECT_ROWS);
serializer.writeVInt(affectedRows);
serializer.writeVInt(LAST_INSERT_ID);
if (capability.isProtocol41()) {
serializer.writeInt2(STATUS_FLAGS);
serializer.writeInt2(WARNINGS);
serializer.writeInt2(warningRows);
} else if (capability.isTransactions()) {
serializer.writeInt2(STATUS_FLAGS);
}
Expand Down
18 changes: 15 additions & 3 deletions fe/src/main/java/org/apache/doris/qe/QueryState.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public enum ErrType {
private String infoMessage;
private ErrType errType = ErrType.OTHER_ERR;
private boolean isQuery = false;
private long affectedRows = 0;
private int warningRows = 0;

public QueryState() {
}
Expand All @@ -62,12 +64,14 @@ public void setEof() {
}

public void setOk() {
stateType = MysqlStateType.OK;
setOk(0, 0, null);
}

public void setOk(String infoMessage) {
stateType = MysqlStateType.OK;
public void setOk(long affectedRows, int warningRows, String infoMessage) {
this.affectedRows = affectedRows;
this.warningRows = warningRows;
this.infoMessage = infoMessage;
stateType = MysqlStateType.OK;
}

public void setError(String errorMsg) {
Expand Down Expand Up @@ -109,6 +113,14 @@ public ErrorCode getErrorCode() {
return errorCode;
}

public long getAffectedRows() {
return affectedRows;
}

public int getWarningRows() {
return warningRows;
}

public MysqlPacket toResponsePacket() {
MysqlPacket packet = null;
switch (stateType) {
Expand Down
63 changes: 40 additions & 23 deletions fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@
import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TransactionCommitFailedException;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -592,6 +594,9 @@ private void handleInsertStmt() throws Exception {
long createTime = System.currentTimeMillis();
UUID uuid = UUID.randomUUID();
Throwable throwable = null;

long loadedRows = 0;
int filteredRows = 0;
try {
// assign request_id
context.setQueryId(new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()));
Expand All @@ -612,35 +617,38 @@ private void handleInsertStmt() throws Exception {
if (!coord.getExecStatus().ok()) {
String errMsg = coord.getExecStatus().getErrorMsg();
LOG.warn("insert failed: {}", errMsg);

// hide host info
int hostIndex = errMsg.indexOf("host");
if (hostIndex != -1) {
errMsg = errMsg.substring(0, hostIndex);
}
ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT);
}

LOG.info("delta files is {}", coord.getDeltaUrls());
LOG.debug("delta files is {}", coord.getDeltaUrls());

if (coord.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL) != null) {
loadedRows = Long.valueOf(coord.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL));
}
if (coord.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL) != null) {
filteredRows = Integer.valueOf(coord.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL));
}

// if in strict mode, insert will fail if there are filtered rows
if (context.getSessionVariable().getEnableInsertStrict()) {
Map<String, String> counters = coord.getLoadCounters();
String strValue = counters.get(LoadEtlTask.DPP_ABNORMAL_ALL);
if (strValue != null && Long.valueOf(strValue) > 0) {
throw new UserException("Insert has filtered data in strict mode, tracking_url="
if (filteredRows > 0) {
context.getState().setError("Insert has filtered data in strict mode, tracking_url="
+ coord.getTrackingUrl());
return;
}
}

if (insertStmt.getTargetTable().getType() != TableType.OLAP) {
// no need to add load job.
// MySQL table is already being inserted.
context.getState().setOk();
context.getState().setOk(loadedRows, filteredRows, null);
return;
}

if (insertStmt.getQueryStmt() != null && (coord.getCommitInfos() == null || coord.getCommitInfos().isEmpty())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we keep this when insertedRows and filterRows both are 0

Catalog.getCurrentGlobalTransactionMgr().abortTransaction(insertStmt.getTransactionId(), "select stmt return empty set when insert");
if (loadedRows == 0 && filteredRows == 0) {
// if no data, just abort txn and return ok
Catalog.getCurrentGlobalTransactionMgr().abortTransaction(insertStmt.getTransactionId(),
TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG);
context.getState().setOk();
return;
}
Expand All @@ -663,9 +671,13 @@ private void handleInsertStmt() throws Exception {
}

if (!Config.using_old_load_usage_pattern) {
// if not using old usage pattern, the exception will be thrown to user directly without
// a label
throw t;
// if not using old usage pattern, the exception will be thrown to user directly without a label
StringBuilder sb = new StringBuilder(t.getMessage());
if (!Strings.isNullOrEmpty(coord.getTrackingUrl())) {
sb.append(". url: " + coord.getTrackingUrl());
}
context.getState().setError(sb.toString());
return;
}

/*
Expand All @@ -676,16 +688,20 @@ private void handleInsertStmt() throws Exception {
throwable = t;
}

// record insert info for show load stmt
if (!insertStmt.isStreaming() || Config.using_old_load_usage_pattern) {
// record insert info for show load stmt if
// 1. NOT a streaming insert(deprecated)
// 2. using_old_load_usage_pattern is set to true, means a label will be returned for user to show load.
// 3. has filtered rows. so a label should be returned for user to show
if (!insertStmt.isStreaming() || Config.using_old_load_usage_pattern || filteredRows > 0) {
try {
context.getCatalog().getLoadManager().recordFinishedLoadJob(
uuid.toString(),
insertStmt.getDb(),
insertStmt.getTargetTable().getId(),
EtlJobType.INSERT,
createTime,
throwable == null ? "" : throwable.getMessage()
throwable == null ? "" : throwable.getMessage(),
coord.getTrackingUrl()
);
} catch (MetaNotFoundException e) {
LOG.warn("Record info of insert load with error {}", e.getMessage(), e);
Expand All @@ -696,10 +712,11 @@ private void handleInsertStmt() throws Exception {

// set to OK, which means the insert load job is successfully submitted.
// and user can check the job's status by label.
context.getState().setOk("{'label':'" + uuid.toString() + "'}");
context.getState().setOk(loadedRows, filteredRows, "{'label':'" + uuid.toString() + "'}");
} else {
// just return OK without label, which means this job is successfully done
context.getState().setOk();
// just return OK without label, which means this job is successfully done without any error
Preconditions.checkState(loadedRows > 0 && filteredRows == 0);
context.getState().setOk(loadedRows, filteredRows, null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,8 @@ public void commitTransaction(long dbId, long transactionId, List<TabletCommitIn
TransactionState transactionState = idToTransactionState.get(transactionId);
if (transactionState == null
|| transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
throw new TransactionCommitFailedException(transactionState.getReason());
throw new TransactionCommitFailedException(
transactionState == null ? "transaction not found" : transactionState.getReason());
}

if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class InsertLoadJobTest {
public void testGetTableNames(@Mocked Catalog catalog,
@Injectable Database database,
@Injectable Table table) throws MetaNotFoundException {
InsertLoadJob insertLoadJob = new InsertLoadJob("label", 1L, 1L, 1000, "");
InsertLoadJob insertLoadJob = new InsertLoadJob("label", 1L, 1L, 1000, "", "");
String tableName = "table1";
new Expectations() {
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void testSerializationNormal(@Mocked Catalog catalog,
};

loadManager = new LoadManager(new LoadJobScheduler());
LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, System.currentTimeMillis(), "");
LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, System.currentTimeMillis(), "", "");
Deencapsulation.invoke(loadManager, "addLoadJob", job1);

File file = serializeToFile(loadManager);
Expand Down Expand Up @@ -145,7 +145,7 @@ public void testSerializationWithJobRemoved(@Mocked MetaContext metaContext,
};

loadManager = new LoadManager(new LoadJobScheduler());
LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, System.currentTimeMillis(), "");
LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, System.currentTimeMillis(), "", "");
Deencapsulation.invoke(loadManager, "addLoadJob", job1);

//make job1 don't serialize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void testWriteNullMsg() {
Assert.assertEquals("HY000", new String(MysqlProto.readFixedString(buffer, 5)));
// sql state
// NOTE: we put one space if MysqlErrPacket's errorMessage is null or empty
Assert.assertEquals(" ", new String(MysqlProto.readEofString(buffer)));
Assert.assertEquals("Unknown error", new String(MysqlProto.readEofString(buffer)));

Assert.assertEquals(0, buffer.remaining());
}
Expand Down