Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,11 @@ class RuntimeState {
return _query_options.__isset.enable_parallel_scan && _query_options.enable_parallel_scan;
}

bool is_read_csv_empty_line_as_null() const {
return _query_options.__isset.read_csv_empty_line_as_null &&
_query_options.read_csv_empty_line_as_null;
}

int parallel_scan_max_scanners_count() const {
return _query_options.__isset.parallel_scan_max_scanners_count
? _query_options.parallel_scan_max_scanners_count
Expand Down
25 changes: 23 additions & 2 deletions be/src/vec/exec/format/csv/csv_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,10 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
continue;
}
if (size == 0) {
// Read empty row, just continue
if (!_line_reader_eof && _state->is_read_csv_empty_line_as_null()) {
++rows;
}
// Read empty line, continue
continue;
}

Expand Down Expand Up @@ -516,7 +519,10 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
continue;
}
if (size == 0) {
// Read empty row, just continue
if (!_line_reader_eof && _state->is_read_csv_empty_line_as_null()) {
RETURN_IF_ERROR(_fill_empty_line(block, columns, &rows));
}
// Read empty line, continue
continue;
}

Expand Down Expand Up @@ -659,6 +665,21 @@ Status CsvReader::_fill_dest_columns(const Slice& line, Block* block,
return Status::OK();
}

Status CsvReader::_fill_empty_line(Block* block, std::vector<MutableColumnPtr>& columns,
size_t* rows) {
for (int i = 0; i < _file_slot_descs.size(); ++i) {
IColumn* col_ptr = columns[i];
if (!_is_load) {
col_ptr = const_cast<IColumn*>(
block->get_by_position(_file_slot_idx_map[i]).column.get());
}
auto& null_column = assert_cast<ColumnNullable&>(*col_ptr);
null_column.insert_data(nullptr, 0);
}
++(*rows);
return Status::OK();
}

Status CsvReader::_validate_line(const Slice& line, bool* success) {
if (!_is_proto_format && !validate_utf8(line.data, line.size)) {
if (!_is_load) {
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/format/csv/csv_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ class CsvReader : public GenericReader {
Status _create_decompressor();
Status _fill_dest_columns(const Slice& line, Block* block,
std::vector<MutableColumnPtr>& columns, size_t* rows);
Status _fill_empty_line(Block* block, std::vector<MutableColumnPtr>& columns, size_t* rows);
Status _line_split_to_values(const Slice& line, bool* success);
void _split_line(const Slice& line);
Status _check_array_format(std::vector<Slice>& split_values, bool* is_success);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ public class SessionVariable implements Serializable, Writable {

public static final String ENABLE_PARALLEL_RESULT_SINK = "enable_parallel_result_sink";

public static final String READ_CSV_EMPTY_LINE_AS_NULL = "read_csv_empty_line_as_null";

public static final String BE_NUMBER_FOR_TEST = "be_number_for_test";

// max ms to wait transaction publish finish when exec insert stmt.
Expand Down Expand Up @@ -1062,6 +1064,11 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = ENABLE_PARALLEL_RESULT_SINK, needForward = true, fuzzy = true)
private boolean enableParallelResultSink = true;

@VariableMgr.VarAttr(name = READ_CSV_EMPTY_LINE_AS_NULL, needForward = true,
description = {"在读取csv文件时是否读取csv的空行为null",
"Determine whether to read empty rows in CSV files as NULL when reading CSV files."})
public boolean readCsvEmptyLineAsNull = false;

@VariableMgr.VarAttr(name = USE_RF_DEFAULT)
public boolean useRuntimeFilterDefaultSize = false;

Expand Down Expand Up @@ -3444,6 +3451,7 @@ public TQueryOptions toThrift() {
tResult.setEnableLocalMergeSort(enableLocalMergeSort);
tResult.setEnableParallelResultSink(enableParallelResultSink);
tResult.setEnableShortCircuitQueryAccessColumnStore(enableShortCircuitQueryAcessColumnStore);
tResult.setReadCsvEmptyLineAsNull(readCsvEmptyLineAsNull);
return tResult;
}

Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ struct TQueryOptions {

116: optional bool enable_no_need_read_data_opt = true;

117: optional bool read_csv_empty_line_as_null = false
// For cloud, to control if the content would be written into file cache
1000: optional bool disable_file_cache = false
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_base --
1,doris1,16
2,doris2,18

3,doris3,19




4,doris4,20


-- !select_1 --
1 doris1 16
2 doris2 18
3 doris3 19
4 doris4 20

-- !select_1 --
\N \N \N
\N \N \N
\N \N \N
\N \N \N
\N \N \N
\N \N \N
1 doris1 16
2 doris2 18
3 doris3 19
4 doris4 20

Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_read_csv_empty_line_as_null", "p0") {
// open nereids
sql """ set enable_nereids_planner=true """
sql """ set enable_fallback_to_original_planner=false """

String ak = getS3AK()
String sk = getS3SK()
String s3_endpoint = getS3Endpoint()
String region = getS3Region()
String bucket = context.config.otherConfigs.get("s3BucketName");


def export_table_name = "test_read_csv_empty_line"
def outFilePath = "${bucket}/test_read_csv_empty_line/exp_"


def create_table = {table_name ->
sql """ DROP TABLE IF EXISTS ${table_name} """
sql """
CREATE TABLE IF NOT EXISTS ${table_name} (
`id` INT NULL,
`content` varchar(32) NULL
)
DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
"""
}

def outfile_to_S3 = {
// select ... into outfile ...
def res = sql """
SELECT content FROM ${export_table_name} t ORDER BY id
INTO OUTFILE "s3://${outFilePath}"
FORMAT AS csv
PROPERTIES (
"s3.endpoint" = "${s3_endpoint}",
"s3.region" = "${region}",
"s3.secret_key"="${sk}",
"s3.access_key" = "${ak}"
);
"""

return res[0][3]
}

// create table to export data
create_table(export_table_name)

// insert data
sql """ insert into ${export_table_name} values (1, "1,doris1,16"); """
sql """ insert into ${export_table_name} values (2, "2,doris2,18"); """
sql """ insert into ${export_table_name} values (3, ""); """
sql """ insert into ${export_table_name} values (4, "3,doris3,19"); """
sql """ insert into ${export_table_name} values (5, ""); """
sql """ insert into ${export_table_name} values (6, ""); """
sql """ insert into ${export_table_name} values (7, ""); """
sql """ insert into ${export_table_name} values (8, ""); """
sql """ insert into ${export_table_name} values (9, "4,doris4,20"); """
sql """ insert into ${export_table_name} values (10, ""); """

// test base data
qt_select_base """ SELECT content FROM ${export_table_name} t ORDER BY id; """

// test outfile to s3
def outfile_url = outfile_to_S3()

// test read_csv_empty_line_as_null = false
try {
order_qt_select_1 """ SELECT * FROM S3 (
"uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.csv",
"ACCESS_KEY"= "${ak}",
"SECRET_KEY" = "${sk}",
"format" = "csv",
"column_separator" = ",",
"region" = "${region}"
);
"""
} finally {
}

// test read_csv_empty_line_as_null = true
try {
sql """ set read_csv_empty_line_as_null=true; """
order_qt_select_1 """ SELECT * FROM S3 (
"uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.csv",
"ACCESS_KEY"= "${ak}",
"SECRET_KEY" = "${sk}",
"format" = "csv",
"column_separator" = ",",
"region" = "${region}"
);
"""
} finally {
}
}