Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions be/src/vec/sink/vrow_distribution.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -443,22 +443,23 @@ Status VRowDistribution::generate_rows_distribution(
_vpartition->set_transformed_slots(partition_cols_idx);
}

Status st = Status::OK();
if (_vpartition->is_auto_detect_overwrite()) {
// when overwrite, no auto create partition allowed.
RETURN_IF_ERROR(_generate_rows_distribution_for_auto_overwrite(
block.get(), has_filtered_rows, row_part_tablet_ids));
st = _generate_rows_distribution_for_auto_overwrite(block.get(), has_filtered_rows,
row_part_tablet_ids);
} else if (_vpartition->is_auto_partition() && !_deal_batched) {
RETURN_IF_ERROR(_generate_rows_distribution_for_auto_partition(
block.get(), partition_cols_idx, has_filtered_rows, row_part_tablet_ids,
rows_stat_val));
st = _generate_rows_distribution_for_auto_partition(block.get(), partition_cols_idx,
has_filtered_rows, row_part_tablet_ids,
rows_stat_val);
} else { // not auto partition
RETURN_IF_ERROR(_generate_rows_distribution_for_non_auto_partition(
block.get(), has_filtered_rows, row_part_tablet_ids));
st = _generate_rows_distribution_for_non_auto_partition(block.get(), has_filtered_rows,
row_part_tablet_ids);
}

filtered_rows = _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows() -
prev_filtered_rows;
return Status::OK();
return st;
}

// reuse vars for find_tablets
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
2024-01-09 15:40:46|lhCH2|ctxoxUuJnspAIJAmMuVJKh1B2sLXAwr9XWqcKYuH2ygU71QfcCfB1r8kOBFtllQewP3Hqw3dRq0zv0Bnlwm6Q20yPSQkU4gDmXAbEzxVBCSwU|3roU9YLxSVUVS5vIce1RA9wzNxBEcXGXEAonAGORyVwf6JMYYTlHb58we0|WWrrXKsWzMGAaJfw1veudp5sxSd5FjSzFSqfDWUwVGe96SAikxQhZKt4kpwSiINkJRNkUBf|neq__service_li|kodo-testing-kodoasynq.redis|92xlFBZ1FpQFfnt1ysS71TmVxcekRmCqesO2fBUxV2hWK7LeMbAFoGry6IbqbV2c5hKi9l8NuTG35mXnot9QTvjHKqdWkdy86fMn6|4XBHWHKxZOwh5kVsLz3xLy59W2NbwNDmpWpMQBci6h34uiUknTnseKy2F7dAYr6jjzq5VG7ulv2gwHor6V4HnA3U1XyQNoA9wJ8MFT9wt|Bv0Et5tUNM2JjFz0hS3cCBPX2S9begi8p1lv48M6W0PZQxfB3UoTvavaeyxwxhpfYkOsfqGQTCOMQt7Wwyn3dl2MvFqclt80kNYp7D25ZqREAYskBxWPhOSUaXWDTc|nBj3PlVV|uxoqGdn2SJlIaSEdMlb8hKW2XXgQcL4Tcz9rRqkv9O9rIIqYQ|RrHHtJ5BMmSiBGJsY3cgA4JqoI5l1otuYQoiXE70QQ2X17dOiF3t5umoK8pFEPk|x8zRUfaRJFFkdgBYQyzHrp3bhWIboTv8s2TuLxQ7jbAwduLEI0MFZLgxZHYSge8627ZPgiu96Qwqt9hKK7ZHQpYMEjQTlp2GvZwJiiuxDTYefAbrl6P|R9hEZdbWERAggflyCWNbfqBjNLNWSc0Xyu02Zl8OKpSVPrSIbI|KGHWT4C2GQyOXY2PkWsulr0gvBbfYHVMlKgYU6CEkywO8DQqvzPZkL7axRuVXTq6smU9NiT3KMirozpkTkvPL5MlwBYzsj|9CYIs6GGwpeRov|YNTDSLVHGRrevdOPBewx76od1htIsfcM6c8aYNTLrjoQ1jI9N6M2T5c7zsYQ7vZVobDnD3tqYCpxbjJNrI6LyqkKPxXsOQWTXnmhU2MLygRQrBPyPYkNseTIIve9|I5VKX1noAIeW7EgWM4lpEFvmUy7WgullmYtdNWBBkjhTrz7eetkTm6RYh0eH1lC69xfL2NaIcXbzTkwNPPAug1qnsYrfPQ4xdVvSgjJDropG4SMmT6y134AbnBun|k6wuJs0QDf4YmumR0hOoCDVwf18OpL3q1NoyfLZbt2mhY0YrMH7A8JqdTwvvdjMrYLyMCW1zCK8png4|Q7xOuIxRNVQt6wBntMq6nCauyORBXh7BV0Ac7z6SEReRWYBwyr|iZTSBUU7zukS0z9iPO9IfAJu|GXJRuS5mEUB|OTf42X35rt4T5dtmTO7377i0lphYj2uHb7ZUfJ|1117637773.099065|4614638931763895839|5341939051272067353|3954792818428854896|1967|-6700276424423912861|EPaT0V9uLfy3TrB6St44a9MRtxUuHPbg66JH0mCT4UXEcUAYQv2sXO8VNDP7K
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,98 @@ suite("test_stream_load_error_url", "p0") {
} finally {
sql """ DROP TABLE IF EXISTS ${tableName} """
}

try {
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
`time` DATETIME(6) NULL,
`__docid` VARCHAR(64) NULL,
`__source` TEXT NULL COMMENT 'hidden',
`message` TEXT NULL,
`__namespace` TEXT NULL COMMENT 'hidden',
`source` TEXT NULL,
`service` TEXT NULL,
`container_host` TEXT NULL,
`endpoint` TEXT NULL,
`env` TEXT NULL,
`http_host` TEXT NULL,
`http_method` TEXT NULL,
`http_route` TEXT NULL,
`http_status_code` TEXT NULL,
`http_url` TEXT NULL,
`operation` TEXT NULL,
`project` TEXT NULL,
`source_type` TEXT NULL,
`status` TEXT NULL,
`span_type` TEXT NULL,
`parent_id` TEXT NULL,
`resource` TEXT NULL,
`span_id` TEXT NULL,
`trace_id` TEXT NULL,
`sample_rate` DOUBLE NULL,
`date` BIGINT NULL,
`create_time` BIGINT NULL,
`priority` BIGINT NULL,
`duration` BIGINT NULL,
`start` BIGINT NULL,
`var` TEXT NULL
) ENGINE=OLAP
DUPLICATE KEY(`time`, `__docid`)
COMMENT 'default'
PARTITION BY RANGE(`time`)
(PARTITION p20240625 VALUES [('2024-06-25 00:00:00'), ('2024-06-26 00:00:00')),
PARTITION p20240626 VALUES [('2024-06-26 00:00:00'), ('2024-06-27 00:00:00')))
DISTRIBUTED BY RANDOM BUCKETS AUTO
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"min_load_replica_num" = "-1",
"is_being_synced" = "false",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.time_zone" = "Asia/Shanghai",
"dynamic_partition.start" = "-100000",
"dynamic_partition.end" = "1",
"dynamic_partition.prefix" = "p",
"dynamic_partition.replication_allocation" = "tag.location.default: 1",
"dynamic_partition.buckets" = "10",
"dynamic_partition.create_history_partition" = "false",
"dynamic_partition.history_partition_num" = "16",
"dynamic_partition.hot_partition_num" = "0",
"dynamic_partition.reserved_history_periods" = "NULL",
"dynamic_partition.storage_policy" = "",
"storage_medium" = "hdd",
"storage_format" = "V2",
"inverted_index_storage_format" = "V2",
"light_schema_change" = "true",
"disable_auto_compaction" = "false",
"enable_single_replica_compaction" = "false",
"group_commit_interval_ms" = "10000",
"group_commit_data_bytes" = "134217728"
);
"""

streamLoad {
table "${tableName}"
set 'column_separator', '|'
set 'columns', '`time`,`__docid`,`__source`,`message`,`__namespace`,`source`,`service`,`container_host`,`endpoint`,`env`,`http_host`,`http_method`,`http_route`,`http_status_code`,`http_url`,`operation`,`project`,`source_type`,`status`,`span_type`,`parent_id`,`resource`,`span_id`,`trace_id`,`sample_rate`,`date`,`create_time`,`priority`,`duration`,`start`,`var`'
file 'test_error_url_1.csv'

check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("fail", json.Status.toLowerCase())
assertTrue(json.Message.contains("[DATA_QUALITY_ERROR]too many filtered rows"))
def (code, out, err) = curl("GET", json.ErrorURL)
log.info("error result: " + out)
assertTrue(out.contains("no partition for this tuple"))
log.info("url: " + json.ErrorURL)
}
}
} finally {
sql """ DROP TABLE IF EXISTS ${tableName} """
}
}