diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadScanProvider.java index 70f44da7d83a4c..32d8262a0f39f5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadScanProvider.java @@ -62,6 +62,7 @@ import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -164,11 +165,20 @@ private void fillContextExprMap(List columnDescList, Ne // (k1, k2, tmpk3 = k1 + k2, k3 = k1 + k2) // so "tmpk3 = k1 + k2" is not needed anymore, we can skip it. List copiedColumnExprs = new ArrayList<>(columnDescs.size()); + Set constantMappingColumns = new HashSet<>(); for (NereidsImportColumnDesc importColumnDesc : columnDescs) { String mappingColumnName = importColumnDesc.getColumnName(); - if (importColumnDesc.isColumn() || tbl.getColumn(mappingColumnName) != null) { + if (importColumnDesc.isColumn()) { copiedColumnExprs.add(importColumnDesc); + } else if (tbl.getColumn(mappingColumnName) != null) { + copiedColumnExprs.add(importColumnDesc); + // Only track columns with constant expressions (e.g., "k1 = 'constant'") + // Non-constant expressions (e.g., "k1 = k1 + 1") still need to read from file + if (importColumnDesc.getExpr().isConstant()) { + constantMappingColumns.add(mappingColumnName); + } } + // Skip mapping columns that don't exist in table schema } // check whether the OlapTable has sequenceCol and skipBitmapCol @@ -188,6 +198,11 @@ private void fillContextExprMap(List columnDescList, Ne if (!specifyFileFieldNames) { List columns = tbl.getBaseSchema(false); for (Column column : columns) { + if (constantMappingColumns.contains(column.getName())) { + // Skip this column because user has already specified a constant mapping expression for it + // in the COLUMNS parameter (e.g., "column_name = 'constant_value'") + continue; + } NereidsImportColumnDesc columnDesc; if (fileGroup.getFileFormatProperties().getFileFormatType() == TFileFormatType.FORMAT_JSON) { columnDesc = new NereidsImportColumnDesc(column.getName()); diff --git a/regression-test/data/load_p0/broker_load/test_s3_load_with_set.out b/regression-test/data/load_p0/broker_load/test_s3_load_with_set.out new file mode 100644 index 00000000000000..61ae94b779e642 --- /dev/null +++ b/regression-test/data/load_p0/broker_load/test_s3_load_with_set.out @@ -0,0 +1,22 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select -- +20 + +-- !select -- +40 + +-- !select -- +60 + +-- !select -- +80 + +-- !select -- +100 + +-- !select -- +120 + +-- !select -- +120 + diff --git a/regression-test/data/load_p0/stream_load/test_bitmap.csv b/regression-test/data/load_p0/stream_load/test_bitmap.csv new file mode 100644 index 00000000000000..671731ed874253 --- /dev/null +++ b/regression-test/data/load_p0/stream_load/test_bitmap.csv @@ -0,0 +1 @@ +b,2,AA== diff --git a/regression-test/data/load_p0/stream_load/test_stream_load_bitmap.out b/regression-test/data/load_p0/stream_load/test_stream_load_bitmap.out new file mode 100644 index 00000000000000..5e7cb4ad9f647f --- /dev/null +++ b/regression-test/data/load_p0/stream_load/test_stream_load_bitmap.out @@ -0,0 +1,4 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql2 -- +b 2 \N + diff --git a/regression-test/data/load_p0/stream_load/test_stream_load_with_set.out b/regression-test/data/load_p0/stream_load/test_stream_load_with_set.out new file mode 100644 index 00000000000000..70ffe11e1ede54 --- /dev/null +++ b/regression-test/data/load_p0/stream_load/test_stream_load_with_set.out @@ -0,0 +1,4 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select -- +40 + diff --git a/regression-test/suites/load_p0/broker_load/test_s3_load_with_set.groovy b/regression-test/suites/load_p0/broker_load/test_s3_load_with_set.groovy new file mode 100644 index 00000000000000..1987f548bb313f --- /dev/null +++ b/regression-test/suites/load_p0/broker_load/test_s3_load_with_set.groovy @@ -0,0 +1,218 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_s3_load_with_set", "load_p0") { + def s3BucketName = getS3BucketName() + def s3Endpoint = getS3Endpoint() + def s3Region = getS3Region() + + def table = "s3_load_with_set" + + sql """ DROP TABLE IF EXISTS ${table} """ + + sql """ + CREATE TABLE ${table} + ( + k00 INT NOT NULL, + k01 DATE NOT NULL, + k02 BOOLEAN NULL, + k03 TINYINT NULL, + k04 SMALLINT NULL, + k05 INT NULL, + k06 BIGINT NULL, + k07 LARGEINT NULL, + k08 FLOAT NULL, + k09 DOUBLE NULL, + k10 DECIMAL(9,1) NULL, + k11 DECIMALV3(9,1) NULL, + k12 DATETIME NULL, + k13 DATEV2 NULL, + k14 DATETIMEV2 NULL, + k15 CHAR NULL, + k16 VARCHAR NULL, + k17 STRING NULL, + k18 JSON NULL, + kd01 DATE NOT NULL + ) + DUPLICATE KEY(k00) + DISTRIBUTED BY HASH(k00) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ); + """ + + def attributesList = [ + + ] + + attributesList.add(new LoadAttributes("s3://${s3BucketName}/regression/load/data/basic_data.csv", + "${table}", "LINES TERMINATED BY \"\n\"", "COLUMNS TERMINATED BY \"|\"", "FORMAT AS \"CSV\"", "(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18)", + "set(kd01=20240123)", "", "", "", "")) + + attributesList.add(new LoadAttributes("s3://${s3BucketName}/regression/load/data/basic_data.csv", + "${table}", "LINES TERMINATED BY \"\n\"", "COLUMNS TERMINATED BY \"|\"", "FORMAT AS \"CSV\"", "", + "set(kd01=20240123)", "", "", "", "")) + + attributesList.add(new LoadAttributes("s3://${s3BucketName}/regression/load/data/basic_data.parq", + "${table}", "", "", "FORMAT AS \"parquet\"", "(K00,K01,K02,K03,K04,K05,K06,K07,K08,K09,K10,K11,K12,K13,K14,K15,K16,K17,K18)", + "set(kd01=20240123)", "", "", "", "")) + + attributesList.add(new LoadAttributes("s3://${s3BucketName}/regression/load/data/basic_data.parq", + "${table}", "", "", "FORMAT AS \"parquet\"", "", + "set(kd01=20240123)", "", "", "", "")) + + attributesList.add(new LoadAttributes("s3://${s3BucketName}/regression/load/data/basic_data.orc", + "${table}", "", "", "FORMAT AS \"orc\"", "(K00,K01,K02,K03,K04,K05,K06,K07,K08,K09,K10,K11,K12,K13,K14,K15,K16,K17,K18)", + "set(kd01=20240123)", "", "", "", "")) + + attributesList.add(new LoadAttributes("s3://${s3BucketName}/regression/load/data/basic_data.orc", + "${table}", "", "", "FORMAT AS \"orc\"", "", + "set(kd01=20240123)", "", "", "", "")) + + def ak = getS3AK() + def sk = getS3SK() + + def i = 0 + for (LoadAttributes attributes : attributesList) { + def label = "test_s3_load_" + UUID.randomUUID().toString().replace("-", "_") + "_" + i + attributes.label = label + def prop = attributes.getPropertiesStr() + + def sql_str = """ + LOAD LABEL $label ( + $attributes.dataDesc.mergeType + DATA INFILE("$attributes.dataDesc.path") + INTO TABLE $attributes.dataDesc.tableName + $attributes.dataDesc.columnTermClause + $attributes.dataDesc.lineTermClause + $attributes.dataDesc.formatClause + $attributes.dataDesc.columns + $attributes.dataDesc.columnsFromPathClause + $attributes.dataDesc.columnMappingClause + $attributes.dataDesc.precedingFilterClause + $attributes.dataDesc.orderByClause + $attributes.dataDesc.whereExpr + ) + WITH S3 ( + "AWS_ACCESS_KEY" = "$ak", + "AWS_SECRET_KEY" = "$sk", + "AWS_ENDPOINT" = "${s3Endpoint}", + "AWS_REGION" = "${s3Region}", + "use_path_style" = "$attributes.usePathStyle" + ) + ${prop} + """ + logger.info("submit sql: ${sql_str}"); + sql """${sql_str}""" + logger.info("Submit load with lable: $label, table: $attributes.dataDesc.tableName, path: $attributes.dataDesc.path") + + def max_try_milli_secs = 600000 + while (max_try_milli_secs > 0) { + String[][] result = sql """ show load where label="$attributes.label" order by createtime desc limit 1; """ + if (result[0][2].equals("FINISHED")) { + if (attributes.isExceptFailed) { + assertTrue(false, "load should be failed but was success: $result") + } + logger.info("Load FINISHED " + attributes.label + ": $result") + break + } + if (result[0][2].equals("CANCELLED")) { + if (attributes.isExceptFailed) { + logger.info("Load FINISHED " + attributes.label) + break + } + assertTrue(false, "load failed: $result") + break + } + Thread.sleep(1000) + max_try_milli_secs -= 1000 + if (max_try_milli_secs <= 0) { + assertTrue(false, "load Timeout: $attributes.label") + } + } + qt_select """ select count(*) from $attributes.dataDesc.tableName """ + ++i + } + + qt_select """ select count(*) from ${table} """ +} + +class DataDesc { + public String mergeType = "" + public String path + public String tableName + public String lineTermClause + public String columnTermClause + public String formatClause + public String columns + public String columnsFromPathClause + public String precedingFilterClause + public String columnMappingClause + public String whereExpr + public String orderByClause +} + +class LoadAttributes { + LoadAttributes(String path, String tableName, String lineTermClause, String columnTermClause, String formatClause, + String columns, String columnsFromPathClause, String precedingFilterClause, String columnMappingClause, String whereExpr, String orderByClause, boolean isExceptFailed = false) { + this.dataDesc = new DataDesc() + this.dataDesc.path = path + this.dataDesc.tableName = tableName + this.dataDesc.lineTermClause = lineTermClause + this.dataDesc.columnTermClause = columnTermClause + this.dataDesc.formatClause = formatClause + this.dataDesc.columns = columns + this.dataDesc.columnsFromPathClause = columnsFromPathClause + this.dataDesc.precedingFilterClause = precedingFilterClause + this.dataDesc.columnMappingClause = columnMappingClause + this.dataDesc.whereExpr = whereExpr + this.dataDesc.orderByClause = orderByClause + + this.isExceptFailed = isExceptFailed + + properties = new HashMap<>() + } + + LoadAttributes addProperties(String k, String v) { + properties.put(k, v) + return this + } + + String getPropertiesStr() { + if (properties.isEmpty()) { + return "" + } + String prop = "PROPERTIES (" + properties.forEach (k, v) -> { + prop += "\"${k}\" = \"${v}\"," + } + prop = prop.substring(0, prop.size() - 1) + prop += ")" + return prop + } + + LoadAttributes withPathStyle() { + usePathStyle = "true" + return this + } + + public DataDesc dataDesc + public Map properties + public String label + public String usePathStyle = "false" + public boolean isExceptFailed +} \ No newline at end of file diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load_bitmap.groovy b/regression-test/suites/load_p0/stream_load/test_stream_load_bitmap.groovy new file mode 100644 index 00000000000000..0c22acc14c299b --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/test_stream_load_bitmap.groovy @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_stream_load_bitmap", "p0") { + def tableName = "test_stream_load_bitmap" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `cache_key` varchar(20) NOT NULL, + `result_cnt` int NULL, + `result` bitmap NOT NULL + ) ENGINE=OLAP + UNIQUE KEY(`cache_key`) + DISTRIBUTED BY HASH(`cache_key`) BUCKETS 1 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + + // test strict_mode success + streamLoad { + table "${tableName}" + + file 'test_bitmap.csv' + set "column_separator", "," + set "columns", "cache_key,result_cnt,result,result=bitmap_from_base64(result)" + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + } + time 10000 // limit inflight 10s + } + + sql "sync" + qt_sql2 "select * from ${tableName}" +} \ No newline at end of file diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load_with_set.groovy b/regression-test/suites/load_p0/stream_load/test_stream_load_with_set.groovy new file mode 100644 index 00000000000000..0783d65c27d226 --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/test_stream_load_with_set.groovy @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_stream_load_with_set", "load_p0") { + def tableName = "stream_load_with_set" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + sql """ + CREATE TABLE ${tableName} + ( + k00 INT NOT NULL, + k01 DATE NOT NULL, + k02 BOOLEAN NULL, + k03 TINYINT NULL, + k04 SMALLINT NULL, + k05 INT NULL, + k06 BIGINT NULL, + k07 LARGEINT NULL, + k08 FLOAT NULL, + k09 DOUBLE NULL, + k10 DECIMAL(9,1) NULL, + k11 DECIMALV3(9,1) NULL, + k12 DATETIME NULL, + k13 DATEV2 NULL, + k14 DATETIMEV2 NULL, + k15 CHAR NULL, + k16 VARCHAR NULL, + kd01 DATE NOT NULL, + k17 STRING NULL, + k18 JSON NULL + ) + DUPLICATE KEY(k00) + DISTRIBUTED BY HASH(k00) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ); + """ + + streamLoad { + table "${tableName}" + set 'column_separator', '|' + set 'columns', "k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18,kd01=20240123" + file "basic_data.csv" + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(20, json.NumberTotalRows) + assertEquals(20, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + streamLoad { + table "${tableName}" + set 'column_separator', '|' + set 'columns', "kd01=20240123" + file "basic_data.csv" + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(20, json.NumberTotalRows) + assertEquals(20, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + qt_select """ select count(*) from ${tableName} """ +}