Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 27 additions & 13 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1498,24 +1498,38 @@ bool SegmentIterator::_can_evaluated_by_vectorized(ColumnPredicate* predicate) {
}
}

bool SegmentIterator::_has_char_type(const Field& column_desc) {
switch (column_desc.type()) {
case FieldType::OLAP_FIELD_TYPE_CHAR:
return true;
case FieldType::OLAP_FIELD_TYPE_ARRAY:
return _has_char_type(*column_desc.get_sub_field(0));
case FieldType::OLAP_FIELD_TYPE_MAP:
return _has_char_type(*column_desc.get_sub_field(0)) ||
_has_char_type(*column_desc.get_sub_field(1));
case FieldType::OLAP_FIELD_TYPE_STRUCT:
for (int idx = 0; idx < column_desc.get_sub_field_count(); ++idx) {
if (_has_char_type(*column_desc.get_sub_field(idx))) {
return true;
}
}
return false;
default:
return false;
}
};

void SegmentIterator::_vec_init_char_column_id() {
for (size_t i = 0; i < _schema->num_column_ids(); i++) {
auto cid = _schema->column_id(i);
auto column_desc = _schema->column(cid);
const Field* column_desc = _schema->column(cid);

do {
if (column_desc->type() == FieldType::OLAP_FIELD_TYPE_CHAR) {
_char_type_idx.emplace_back(i);
if (i != 0) {
_char_type_idx_no_0.emplace_back(i);
}
break;
} else if (column_desc->type() != FieldType::OLAP_FIELD_TYPE_ARRAY) {
break;
if (_has_char_type(*column_desc)) {
_char_type_idx.emplace_back(i);
if (i != 0) {
_char_type_idx_no_0.emplace_back(i);
}
// for Array<Char> or Array<Array<Char>>
column_desc = column_desc->get_sub_field(0);
} while (column_desc != nullptr);
}
}
}

Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset/segment_v2/segment_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ class SegmentIterator : public RowwiseIterator {
// CHAR type in storage layer padding the 0 in length. But query engine need ignore the padding 0.
// so segment iterator need to shrink char column before output it. only use in vec query engine.
void _vec_init_char_column_id();
bool _has_char_type(const Field& column_desc);

uint32_t segment_id() const { return _segment->id(); }
uint32_t num_rows() const { return _segment->num_rows(); }
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/columns/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,8 @@ class IColumn : public COW<IColumn> {

virtual bool is_column_map() const { return false; }

virtual bool is_column_struct() const { return false; }

/// If the only value column can contain is NULL.
virtual bool only_null() const { return false; }

Expand Down
21 changes: 21 additions & 0 deletions be/src/vec/columns/column_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,27 @@ void ColumnMap::replicate(const uint32_t* indexs, size_t target_size, IColumn& c
->replicate(indexs, target_size, res.values_column->assume_mutable_ref());
}

MutableColumnPtr ColumnMap::get_shrinked_column() {
MutableColumns new_columns(2);

if (keys_column->is_column_string() || keys_column->is_column_array() ||
keys_column->is_column_map() || keys_column->is_column_struct()) {
new_columns[0] = keys_column->get_shrinked_column();
} else {
new_columns[0] = keys_column->get_ptr();
}

if (values_column->is_column_string() || values_column->is_column_array() ||
values_column->is_column_map() || values_column->is_column_struct()) {
new_columns[1] = values_column->get_shrinked_column();
} else {
new_columns[1] = values_column->get_ptr();
}

return ColumnMap::create(new_columns[0]->assume_mutable(), new_columns[1]->assume_mutable(),
offsets_column->assume_mutable());
}

void ColumnMap::reserve(size_t n) {
get_offsets().reserve(n);
keys_column->reserve(n);
Expand Down
4 changes: 3 additions & 1 deletion be/src/vec/columns/column_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
#include "vec/columns/column.h"
#include "vec/columns/column_array.h"
#include "vec/columns/column_impl.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_struct.h"
#include "vec/columns/column_vector.h"
#include "vec/common/assert_cast.h"
#include "vec/common/cow.h"
Expand Down Expand Up @@ -110,7 +112,7 @@ class ColumnMap final : public COWHelper<IColumn, ColumnMap> {
const char* deserialize_and_insert_from_arena(const char* pos) override;

void update_hash_with_value(size_t n, SipHash& hash) const override;

MutableColumnPtr get_shrinked_column() override;
ColumnPtr filter(const Filter& filt, ssize_t result_size_hint) const override;
size_t filter(const Filter& filter) override;
ColumnPtr permute(const Permutation& perm, size_t limit) const override;
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/columns/column_nullable.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ class ColumnNullable final : public COWHelper<IColumn, ColumnNullable> {
bool is_column_decimal() const override { return get_nested_column().is_column_decimal(); }
bool is_column_string() const override { return get_nested_column().is_column_string(); }
bool is_column_array() const override { return get_nested_column().is_column_array(); }
bool is_column_map() const override { return get_nested_column().is_column_map(); }
bool is_column_struct() const override { return get_nested_column().is_column_struct(); }
bool is_fixed_and_contiguous() const override { return false; }
bool values_have_fixed_size() const override { return nested_column->values_have_fixed_size(); }

Expand Down
15 changes: 15 additions & 0 deletions be/src/vec/columns/column_struct.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,21 @@ void ColumnStruct::replicate(const uint32_t* indexs, size_t target_size, IColumn
}
}

MutableColumnPtr ColumnStruct::get_shrinked_column() {
const size_t tuple_size = columns.size();
MutableColumns new_columns(tuple_size);

for (size_t i = 0; i < tuple_size; ++i) {
if (columns[i]->is_column_string() || columns[i]->is_column_array() ||
columns[i]->is_column_map() || columns[i]->is_column_struct()) {
new_columns[i] = columns[i]->get_shrinked_column();
} else {
new_columns[i] = columns[i]->get_ptr();
}
}
return ColumnStruct::create(std::move(new_columns));
}

MutableColumns ColumnStruct::scatter(ColumnIndex num_columns, const Selector& selector) const {
const size_t tuple_size = columns.size();
std::vector<MutableColumns> scattered_tuple_elements(tuple_size);
Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/columns/column_struct.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class ColumnStruct final : public COWHelper<IColumn, ColumnStruct> {
}

std::string get_name() const override;
bool is_column_struct() const override { return true; }
const char* get_family_name() const override { return "Struct"; }
bool can_be_inside_nullable() const override { return true; }
MutableColumnPtr clone_empty() const override;
Expand Down Expand Up @@ -160,6 +161,9 @@ class ColumnStruct final : public COWHelper<IColumn, ColumnStruct> {
LOG(FATAL) << "compare_at not implemented";
}
void get_extremes(Field& min, Field& max) const override;

MutableColumnPtr get_shrinked_column() override;

void reserve(size_t n) override;
void resize(size_t n) override;
size_t byte_size() const override;
Expand Down
Binary file modified regression-test/data/export/test_struct_export.out
Binary file not shown.
Binary file modified regression-test/data/insert_p0/test_struct_insert.out
Binary file not shown.
4 changes: 4 additions & 0 deletions regression-test/data/load_p0/stream_load/map_char_test.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
1 {1:"1", 2:"22", 3:"333", 4:"4444", 5:"55555", 6:"666666", 7:"7777777"}
3 {1:"11", 2:"22", 3:"33", 4:"44", 5:"55", 6:"66", 7:"77"}
2 {1:"1", 2:"2", 3:"3", 4:"4", 5:"5", 6:"6", 7:"7"}
4 {1:"111", 2:"22", 3:"333", 4:"444", 5:"55", 6:"66", 7:"777"}
6 changes: 6 additions & 0 deletions regression-test/data/load_p0/stream_load/test_stream_load.out
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@
-- !sql1 --
2019 9 9 9 7.700 a 2019-09-09 1970-01-01T08:33:39 k7 9.0 9.0

-- !map11 --
1 {1:"1", 2:"22", 3:"333", 4:"4444", 5:"55555", 6:"666666", 7:"7777777"}
2 {1:"1", 2:"2", 3:"3", 4:"4", 5:"5", 6:"6", 7:"7"}
3 {1:"11", 2:"22", 3:"33", 4:"44", 5:"55", 6:"66", 7:"77"}
4 {1:"111", 2:"22", 3:"333", 4:"444", 5:"55", 6:"66", 7:"777"}

-- !all11 --
2500

Expand Down
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,16 @@ suite("test_stream_load", "p0") {
def tableName7 = "test_unique_key_with_delete"
def tableName8 = "test_array"
def tableName10 = "test_struct"
def tableName11 = "test_map"

sql """ DROP TABLE IF EXISTS ${tableName3} """
sql """ DROP TABLE IF EXISTS ${tableName4} """
sql """ DROP TABLE IF EXISTS ${tableName5} """
sql """ DROP TABLE IF EXISTS ${tableName6} """
sql """ DROP TABLE IF EXISTS ${tableName7} """
sql """ DROP TABLE IF EXISTS ${tableName8} """
sql """ DROP TABLE IF EXISTS ${tableName10} """
sql """ DROP TABLE IF EXISTS ${tableName11} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName3} (
`k1` int(11) NULL,
Expand Down Expand Up @@ -281,7 +284,7 @@ suite("test_stream_load", "p0") {
`k4` ARRAY<BIGINT> NULL COMMENT "",
`k5` ARRAY<CHAR> NULL COMMENT "",
`k6` ARRAY<VARCHAR(20)> NULL COMMENT "",
`k7` ARRAY<DATE> NULL COMMENT "",
`k7` ARRAY<DATE> NULL COMMENT "",
`k8` ARRAY<DATETIME> NULL COMMENT "",
`k9` ARRAY<FLOAT> NULL COMMENT "",
`k10` ARRAY<DOUBLE> NULL COMMENT "",
Expand Down Expand Up @@ -316,6 +319,41 @@ suite("test_stream_load", "p0") {
);
"""

sql """
CREATE TABLE IF NOT EXISTS ${tableName11} (
`k1` int(11) NULL,
`k2` map<int, char(7)> NULL
) ENGINE=OLAP
DUPLICATE KEY(`k1`)
DISTRIBUTED BY HASH(`k1`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
"""

// load map with specific-length char with non-specific-length data
streamLoad {
table "${tableName11}"

set 'column_separator', '\t'

file 'map_char_test.csv'
time 10000 // limit inflight 10s

check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(4, json.NumberTotalRows)
assertEquals(0, json.NumberFilteredRows)
}
}
sql "sync"
order_qt_map11 "SELECT * FROM ${tableName11} order by k1"

// load all columns
streamLoad {
table "${tableName3}"
Expand Down