Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions be/src/runtime/types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ void TypeDescriptor::to_thrift(TTypeDesc* thrift_type) const {
}

void TypeDescriptor::to_protobuf(PTypeDesc* ptype) const {
DCHECK(!is_complex_type() || type == TYPE_ARRAY)
DCHECK(!is_complex_type() || type == TYPE_ARRAY || type == TYPE_STRUCT)
<< "Don't support complex type now, type=" << type;
auto node = ptype->add_types();
node->set_type(TTypeNodeType::SCALAR);
Expand All @@ -170,8 +170,18 @@ void TypeDescriptor::to_protobuf(PTypeDesc* ptype) const {
for (const TypeDescriptor& child : children) {
child.to_protobuf(ptype);
}
} else if (type == TYPE_STRUCT) {
node->set_type(TTypeNodeType::STRUCT);
DCHECK_EQ(field_names.size(), contains_nulls.size());
for (size_t i = 0; i < field_names.size(); ++i) {
auto field = node->add_struct_fields();
field->set_name(field_names[i]);
field->set_contains_null(contains_nulls[i]);
}
for (const TypeDescriptor& child : children) {
child.to_protobuf(ptype);
}
}
// TODO(xy): support struct
}

TypeDescriptor::TypeDescriptor(const google::protobuf::RepeatedPtrField<PTypeNode>& types, int* idx)
Expand Down Expand Up @@ -213,7 +223,20 @@ TypeDescriptor::TypeDescriptor(const google::protobuf::RepeatedPtrField<PTypeNod
children.push_back(TypeDescriptor(types, idx));
break;
}
// TODO(xy): support struct
case TTypeNodeType::STRUCT: {
type = TYPE_STRUCT;
size_t children_size = node.struct_fields_size();
for (size_t i = 0; i < children_size; ++i) {
const auto& field = node.struct_fields(i);
field_names.push_back(field.name());
contains_nulls.push_back(field.contains_null());
}
for (size_t i = 0; i < children_size; ++i) {
++(*idx);
children.push_back(TypeDescriptor(types, idx));
}
break;
}
default:
DCHECK(false) << node.type();
}
Expand Down
38 changes: 14 additions & 24 deletions be/src/vec/columns/column_struct.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,43 +54,25 @@ ColumnStruct::ColumnStruct(MutableColumns&& mutable_columns) {
}
}

ColumnStruct::ColumnStruct(Columns&& columns) {
columns.reserve(columns.size());
for (auto& column : columns) {
if (is_column_const(*column)) {
LOG(FATAL) << "ColumnStruct cannot have ColumnConst as its element";
}
columns.push_back(std::move(column));
}
}

ColumnStruct::ColumnStruct(TupleColumns&& tuple_columns) {
columns.reserve(tuple_columns.size());
for (auto& column : tuple_columns) {
if (is_column_const(*column)) {
LOG(FATAL) << "ColumnStruct cannot have ColumnConst as its element";
}
columns.push_back(std::move(column));
}
}

ColumnStruct::Ptr ColumnStruct::create(Columns& columns) {
ColumnStruct::Ptr ColumnStruct::create(const Columns& columns) {
for (const auto& column : columns) {
if (is_column_const(*column)) {
LOG(FATAL) << "ColumnStruct cannot have ColumnConst as its element";
}
}
auto column_struct = ColumnStruct::create(columns);
auto column_struct = ColumnStruct::create(MutableColumns());
column_struct->columns.assign(columns.begin(), columns.end());
return column_struct;
}

ColumnStruct::Ptr ColumnStruct::create(TupleColumns& tuple_columns) {
ColumnStruct::Ptr ColumnStruct::create(const TupleColumns& tuple_columns) {
for (const auto& column : tuple_columns) {
if (is_column_const(*column)) {
LOG(FATAL) << "ColumnStruct cannot have ColumnConst as its element";
}
}
auto column_struct = ColumnStruct::create(tuple_columns);
auto column_struct = ColumnStruct::create(MutableColumns());
column_struct->columns = tuple_columns;
return column_struct;
}

Expand Down Expand Up @@ -221,6 +203,14 @@ void ColumnStruct::update_hash_with_value(size_t n, SipHash& hash) const {
// }
// }

void ColumnStruct::insert_indices_from(const IColumn& src, const int* indices_begin,
const int* indices_end) {
const ColumnStruct& src_concrete = assert_cast<const ColumnStruct&>(src);
for (size_t i = 0; i < columns.size(); ++i) {
columns[i]->insert_indices_from(src_concrete.get_column(i), indices_begin, indices_end);
}
}

// const char * ColumnStruct::skip_serialized_in_arena(const char * pos) const {
// for (const auto & column : columns) {
// pos = column->skip_serialized_in_arena(pos);
Expand Down
18 changes: 7 additions & 11 deletions be/src/vec/columns/column_struct.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ class ColumnStruct final : public COWHelper<IColumn, ColumnStruct> {
template <bool positive>
struct Less;

ColumnStruct(Columns&& columns);
ColumnStruct(TupleColumns&& tuple_columns);
explicit ColumnStruct(MutableColumns&& mutable_columns);
ColumnStruct(const ColumnStruct&) = default;

Expand All @@ -89,14 +87,14 @@ class ColumnStruct final : public COWHelper<IColumn, ColumnStruct> {
* Use IColumn::mutate in order to make mutable column and mutate shared nested columns.
*/
using Base = COWHelper<IColumn, ColumnStruct>;
static Ptr create(Columns& columns);
static Ptr create(MutableColumns& columns);
static Ptr create(TupleColumns& columns);
static Ptr create(const Columns& columns);
static Ptr create(const TupleColumns& columns);
static Ptr create(Columns&& arg) { return create(arg); }

template <typename... Args>
static MutablePtr create(Args&&... args) {
return Base::create(std::forward<Args>(args)...);
template <typename Arg,
typename = typename std::enable_if<std::is_rvalue_reference<Arg&&>::value>::type>
static MutablePtr create(Arg&& arg) {
return Base::create(std::forward<Arg>(arg));
}

std::string get_name() const override;
Expand Down Expand Up @@ -131,9 +129,7 @@ class ColumnStruct final : public COWHelper<IColumn, ColumnStruct> {
// void update_hash_fast(SipHash & hash) const override;

void insert_indices_from(const IColumn& src, const int* indices_begin,
const int* indices_end) override {
LOG(FATAL) << "insert_indices_from not implemented";
}
const int* indices_end) override;

void get_permutation(bool reverse, size_t limit, int nan_direction_hint,
Permutation& res) const override {
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/data_types/data_type.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ PGenericType_TypeId IDataType::get_pdata_type(const IDataType* data_type) {
return PGenericType::HLL;
case TypeIndex::Array:
return PGenericType::LIST;
case TypeIndex::Struct:
return PGenericType::STRUCT;
case TypeIndex::FixedLengthObject:
return PGenericType::FIXEDLENGTHOBJECT;
case TypeIndex::JSONB:
Expand Down
14 changes: 14 additions & 0 deletions be/src/vec/data_types/data_type_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,20 @@ DataTypePtr DataTypeFactory::create_data_type(const PColumnMeta& pcolumn) {
case PGenericType::FIXEDLENGTHOBJECT:
nested = std::make_shared<DataTypeFixedLengthObject>();
break;
case PGenericType::STRUCT: {
size_t col_size = pcolumn.children_size();
DCHECK(col_size >= 1);
DataTypes dataTypes;
Strings names;
dataTypes.reserve(col_size);
names.reserve(col_size);
for (size_t i = 0; i < col_size; i++) {
dataTypes.push_back(create_data_type(pcolumn.children(i)));
names.push_back(pcolumn.name());
}
nested = std::make_shared<DataTypeStruct>(dataTypes, names);
break;
}
default: {
LOG(FATAL) << fmt::format("Unknown data type: {}", pcolumn.type());
return nullptr;
Expand Down
43 changes: 43 additions & 0 deletions be/src/vec/data_types/data_type_struct.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,49 @@ String DataTypeStruct::get_name_by_position(size_t i) const {
return names[i - 1];
}

int64_t DataTypeStruct::get_uncompressed_serialized_bytes(const IColumn& column,
int be_exec_version) const {
auto ptr = column.convert_to_full_column_if_const();
const auto& struct_column = assert_cast<const ColumnStruct&>(*ptr.get());
DCHECK(elems.size() == struct_column.tuple_size());

int64_t bytes = 0;
for (size_t i = 0; i < elems.size(); ++i) {
bytes += elems[i]->get_uncompressed_serialized_bytes(struct_column.get_column(i),
be_exec_version);
}
return bytes;
}

char* DataTypeStruct::serialize(const IColumn& column, char* buf, int be_exec_version) const {
auto ptr = column.convert_to_full_column_if_const();
const auto& struct_column = assert_cast<const ColumnStruct&>(*ptr.get());
DCHECK(elems.size() == struct_column.tuple_size());

for (size_t i = 0; i < elems.size(); ++i) {
buf = elems[i]->serialize(struct_column.get_column(i), buf, be_exec_version);
}
return buf;
}

const char* DataTypeStruct::deserialize(const char* buf, IColumn* column,
int be_exec_version) const {
auto* struct_column = assert_cast<ColumnStruct*>(column);
DCHECK(elems.size() == struct_column->tuple_size());

for (size_t i = 0; i < elems.size(); ++i) {
buf = elems[i]->deserialize(buf, &struct_column->get_column(i), be_exec_version);
}
return buf;
}

void DataTypeStruct::to_pb_column_meta(PColumnMeta* col_meta) const {
IDataType::to_pb_column_meta(col_meta);
for (size_t i = 0; i < elems.size(); ++i) {
elems[i]->to_pb_column_meta(col_meta->add_children());
}
}

bool DataTypeStruct::text_can_contain_only_valid_utf8() const {
return std::all_of(elems.begin(), elems.end(),
[](auto&& elem) { return elem->text_can_contain_only_valid_utf8(); });
Expand Down
19 changes: 5 additions & 14 deletions be/src/vec/data_types/data_type_struct.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,20 +90,11 @@ class DataTypeStruct final : public IDataType {
std::optional<size_t> try_get_position_by_name(const String& name) const;
String get_name_by_position(size_t i) const;

[[noreturn]] int64_t get_uncompressed_serialized_bytes(const IColumn& column,
int be_exec_version) const override {
LOG(FATAL) << "get_uncompressed_serialized_bytes not implemented";
}

[[noreturn]] char* serialize(const IColumn& column, char* buf,
int be_exec_version) const override {
LOG(FATAL) << "serialize not implemented";
}

[[noreturn]] const char* deserialize(const char* buf, IColumn* column,
int be_exec_version) const override {
LOG(FATAL) << "serialize not implemented";
}
int64_t get_uncompressed_serialized_bytes(const IColumn& column,
int be_exec_version) const override;
char* serialize(const IColumn& column, char* buf, int be_exec_version) const override;
const char* deserialize(const char* buf, IColumn* column, int be_exec_version) const override;
void to_pb_column_meta(PColumnMeta* col_meta) const override;

// bool is_parametric() const { return true; }
// SerializationPtr do_get_default_serialization() const override;
Expand Down