diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 57f01f3d152e14..fe3fbbe8d7fd0f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -464,7 +464,12 @@ public long getDbId() { } public void setOtherMsg(String otherMsg) { - this.otherMsg = TimeUtils.getCurrentFormatTime() + ":" + Strings.nullToEmpty(otherMsg); + writeLock(); + try { + this.otherMsg = TimeUtils.getCurrentFormatTime() + ":" + Strings.nullToEmpty(otherMsg); + } finally { + writeUnlock(); + } } public String getDbFullName() throws MetaNotFoundException { @@ -856,9 +861,10 @@ private void updateNumOfData(long numOfTotalRows, long numOfErrorRows, long unse + "when current total rows is more than base or the filter ratio is more than the max") .build()); } - // reset currentTotalNum and currentErrorNum + // reset currentTotalNum, currentErrorNum and otherMsg this.jobStatistic.currentErrorRows = 0; this.jobStatistic.currentTotalRows = 0; + this.otherMsg = ""; } else if (this.jobStatistic.currentErrorRows > maxErrorNum || (this.jobStatistic.currentTotalRows > 0 && ((double) this.jobStatistic.currentErrorRows @@ -877,9 +883,10 @@ private void updateNumOfData(long numOfTotalRows, long numOfErrorRows, long unse "current error rows is more than max_error_number " + "or the max_filter_ratio is more than the value set"), isReplay); } - // reset currentTotalNum and currentErrorNum + // reset currentTotalNum, currentErrorNum and otherMsg this.jobStatistic.currentErrorRows = 0; this.jobStatistic.currentTotalRows = 0; + this.otherMsg = ""; } } @@ -1139,6 +1146,7 @@ public void afterAborted(TransactionState txnState, boolean txnOperated, String long taskBeId = -1L; try { this.jobStatistic.runningTxnIds.remove(txnState.getTransactionId()); + setOtherMsg(txnStatusChangeReasonString); if (txnOperated) { // step0: find task in job Optional routineLoadTaskInfoOptional = routineLoadTaskInfoList.stream().filter( @@ -1454,6 +1462,7 @@ public void update() throws UserException { LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) .add("msg", "Job need to be rescheduled") .build()); + this.otherMsg = pauseReason == null ? "" : pauseReason.getMsg(); unprotectUpdateProgress(); unprotectUpdateState(JobState.NEED_SCHEDULE, null, false); } diff --git a/regression-test/suites/load_p0/routine_load/data/test_error_info.csv b/regression-test/suites/load_p0/routine_load/data/test_error_info.csv new file mode 100644 index 00000000000000..bc857cabcfdb5c --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/data/test_error_info.csv @@ -0,0 +1 @@ +57|2023-08-19|TRUE|2|-25462|-74112029|6458082754318544493|-7910671781690629051|-15205.859375|-306870797.484914|759730669.0|-628556336.0|2023-07-10 18:39:10|2023-02-12|2023-01-27 07:26:06|y||Xi9nDVrLv8m6AwEpUxmtzFAuK48sQ|{"name": "John", "age": 25, "city": "New York"} \ No newline at end of file diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy new file mode 100644 index 00000000000000..f05b8af79ee53a --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy @@ -0,0 +1,290 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.ProducerConfig + +suite("test_routine_load_error_info","nonConcurrent") { + def kafkaCsvTpoics = [ + "test_error_info", + ] + + String enabled = context.config.otherConfigs.get("enableKafkaTest") + String kafka_port = context.config.otherConfigs.get("kafka_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + def kafka_broker = "${externalEnvIp}:${kafka_port}" + + // send data to kafka + if (enabled != null && enabled.equalsIgnoreCase("true")) { + def props = new Properties() + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + def producer = new KafkaProducer<>(props) + for (String kafkaCsvTopic in kafkaCsvTpoics) { + def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text + def lines = txt.readLines() + lines.each { line -> + logger.info("=====${line}========") + def record = new ProducerRecord<>(kafkaCsvTopic, null, line) + producer.send(record) + } + } + } + + // case 1: task failed + if (enabled != null && enabled.equalsIgnoreCase("true")) { + // create table + def jobName = "test_error_info" + def tableName = "test_routine_error_info" + try { + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} + ( + k00 INT NOT NULL, + k01 DATE NOT NULL, + k02 BOOLEAN NULL, + k03 TINYINT NULL, + k04 SMALLINT NULL, + k05 INT NULL, + k06 BIGINT NULL, + k07 LARGEINT NULL, + k08 FLOAT NULL, + k09 DOUBLE NULL, + k10 DECIMAL(9,1) NULL, + k11 DECIMALV3(9,1) NULL, + k12 DATETIME NULL, + k13 DATEV2 NULL, + k14 DATETIMEV2 NULL, + k15 CHAR NULL, + k16 VARCHAR NULL, + k17 STRING NULL, + k18 JSON NULL, + kd01 BOOLEAN NOT NULL DEFAULT "TRUE", + kd02 TINYINT NOT NULL DEFAULT "1", + kd03 SMALLINT NOT NULL DEFAULT "2", + kd04 INT NOT NULL DEFAULT "3", + kd05 BIGINT NOT NULL DEFAULT "4", + kd06 LARGEINT NOT NULL DEFAULT "5", + kd07 FLOAT NOT NULL DEFAULT "6.0", + kd08 DOUBLE NOT NULL DEFAULT "7.0", + kd09 DECIMAL NOT NULL DEFAULT "888888888", + kd10 DECIMALV3 NOT NULL DEFAULT "999999999", + kd11 DATE NOT NULL DEFAULT "2023-08-24", + kd12 DATETIME NOT NULL DEFAULT "2023-08-24 12:00:00", + kd13 DATEV2 NOT NULL DEFAULT "2023-08-24", + kd14 DATETIMEV2 NOT NULL DEFAULT "2023-08-24 12:00:00", + kd15 CHAR(255) NOT NULL DEFAULT "我能吞下玻璃而不伤身体", + kd16 VARCHAR(300) NOT NULL DEFAULT "我能吞下玻璃而不伤身体", + kd17 STRING NOT NULL DEFAULT "我能吞下玻璃而不伤身体", + kd18 JSON NULL, + + INDEX idx_inverted_k104 (`k05`) USING INVERTED, + INDEX idx_inverted_k110 (`k11`) USING INVERTED, + INDEX idx_inverted_k113 (`k13`) USING INVERTED, + INDEX idx_inverted_k114 (`k14`) USING INVERTED, + INDEX idx_inverted_k117 (`k17`) USING INVERTED PROPERTIES("parser" = "english"), + INDEX idx_ngrambf_k115 (`k15`) USING NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="256"), + INDEX idx_ngrambf_k116 (`k16`) USING NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="256"), + INDEX idx_ngrambf_k117 (`k17`) USING NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="256"), + + INDEX idx_bitmap_k104 (`k02`) USING BITMAP, + INDEX idx_bitmap_k110 (`kd01`) USING BITMAP + + ) + DUPLICATE KEY(k00) + PARTITION BY RANGE(k01) + ( + PARTITION p1 VALUES [('2023-08-01'), ('2023-08-11')), + PARTITION p2 VALUES [('2023-08-11'), ('2023-08-21')), + PARTITION p3 VALUES [('2023-08-21'), ('2023-09-01')) + ) + DISTRIBUTED BY HASH(k00) BUCKETS 32 + PROPERTIES ( + "bloom_filter_columns"="k05", + "replication_num" = "1" + ); + """ + sql "sync" + + // create job + GetDebugPoint().enableDebugPointForAllBEs("BetaRowsetWriter._check_segment_number_limit_too_many_segments") + sql """ + CREATE ROUTINE LOAD ${jobName} on ${tableName} + COLUMNS(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18), + COLUMNS TERMINATED BY "|" + PROPERTIES + ( + "max_batch_interval" = "5", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${externalEnvIp}:${kafka_port}", + "kafka_topic" = "${kafkaCsvTpoics[0]}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + sql "sync" + + // check error info + def count = 0 + while (true) { + def res = sql "show routine load for ${jobName}" + log.info("show routine load: ${res[0].toString()}".toString()) + log.info("other msg: ${res[0][19].toString()}".toString()) + if (res[0][19].toString() != "") { + assertTrue(res[0][19].toString().contains("too many segments in rowset")) + break; + } + count++ + if (count > 60) { + assertEquals(1, 2) + break; + } else { + sleep(1000) + continue; + } + } + } finally { + GetDebugPoint().disableDebugPointForAllBEs("BetaRowsetWriter._check_segment_number_limit_too_many_segments") + sql "stop routine load for ${jobName}" + sql "DROP TABLE IF EXISTS ${tableName}" + } + } + + // case 2: reschedule job + if (enabled != null && enabled.equalsIgnoreCase("true")) { + def jobName = "test_error_info" + def tableName = "test_routine_error_info" + try { + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} + ( + k00 INT NOT NULL, + k01 DATE NOT NULL, + k02 BOOLEAN NULL, + k03 TINYINT NULL, + k04 SMALLINT NULL, + k05 INT NULL, + k06 BIGINT NULL, + k07 LARGEINT NULL, + k08 FLOAT NULL, + k09 DOUBLE NULL, + k10 DECIMAL(9,1) NULL, + k11 DECIMALV3(9,1) NULL, + k12 DATETIME NULL, + k13 DATEV2 NULL, + k14 DATETIMEV2 NULL, + k15 CHAR NULL, + k16 VARCHAR NULL, + k17 STRING NULL, + k18 JSON NULL, + kd01 BOOLEAN NOT NULL DEFAULT "TRUE", + kd02 TINYINT NOT NULL DEFAULT "1", + kd03 SMALLINT NOT NULL DEFAULT "2", + kd04 INT NOT NULL DEFAULT "3", + kd05 BIGINT NOT NULL DEFAULT "4", + kd06 LARGEINT NOT NULL DEFAULT "5", + kd07 FLOAT NOT NULL DEFAULT "6.0", + kd08 DOUBLE NOT NULL DEFAULT "7.0", + kd09 DECIMAL NOT NULL DEFAULT "888888888", + kd10 DECIMALV3 NOT NULL DEFAULT "999999999", + kd11 DATE NOT NULL DEFAULT "2023-08-24", + kd12 DATETIME NOT NULL DEFAULT "2023-08-24 12:00:00", + kd13 DATEV2 NOT NULL DEFAULT "2023-08-24", + kd14 DATETIMEV2 NOT NULL DEFAULT "2023-08-24 12:00:00", + kd15 CHAR(255) NOT NULL DEFAULT "我能吞下玻璃而不伤身体", + kd16 VARCHAR(300) NOT NULL DEFAULT "我能吞下玻璃而不伤身体", + kd17 STRING NOT NULL DEFAULT "我能吞下玻璃而不伤身体", + kd18 JSON NULL, + + INDEX idx_inverted_k104 (`k05`) USING INVERTED, + INDEX idx_inverted_k110 (`k11`) USING INVERTED, + INDEX idx_inverted_k113 (`k13`) USING INVERTED, + INDEX idx_inverted_k114 (`k14`) USING INVERTED, + INDEX idx_inverted_k117 (`k17`) USING INVERTED PROPERTIES("parser" = "english"), + INDEX idx_ngrambf_k115 (`k15`) USING NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="256"), + INDEX idx_ngrambf_k116 (`k16`) USING NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="256"), + INDEX idx_ngrambf_k117 (`k17`) USING NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="256"), + + INDEX idx_bitmap_k104 (`k02`) USING BITMAP, + INDEX idx_bitmap_k110 (`kd01`) USING BITMAP + + ) + DUPLICATE KEY(k00) + PARTITION BY RANGE(k01) + ( + PARTITION p1 VALUES [('2023-08-01'), ('2023-08-11')), + PARTITION p2 VALUES [('2023-08-11'), ('2023-08-21')), + PARTITION p3 VALUES [('2023-08-21'), ('2023-09-01')) + ) + DISTRIBUTED BY HASH(k00) BUCKETS 32 + PROPERTIES ( + "bloom_filter_columns"="k05", + "replication_num" = "1" + ); + """ + sql "sync" + + // create job + sql """ + CREATE ROUTINE LOAD ${jobName} on ${tableName} + COLUMNS(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18), + COLUMNS TERMINATED BY "|" + PROPERTIES + ( + "max_batch_interval" = "5", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${externalEnvIp}:${kafka_port}", + "kafka_topic" = "invalid_job", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + + // check error info + def count = 0 + while (true) { + def res = sql "show routine load for ${jobName}" + log.info("show routine load: ${res[0].toString()}".toString()) + log.info("other msg: ${res[0][19].toString()}".toString()) + if (res[0][19].toString() != "" && res[0][8].toString() == "NEED_SCHEDULE") { + assertTrue(res[0][19].toString().contains("may be Kafka properties set in job is error or no partition in this topic that should check Kafka")) + break; + } + count++ + if (count > 60) { + assertEquals(1, 2) + break; + } else { + sleep(1000) + continue; + } + } + } finally { + sql "stop routine load for ${jobName}" + sql "DROP TABLE IF EXISTS ${tableName}" + } + } +} \ No newline at end of file