Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/vec/common/cow.h
Original file line number Diff line number Diff line change
Expand Up @@ -410,4 +410,4 @@ class COWHelper : public Base {
MutablePtr shallow_mutate() const {
return MutablePtr(static_cast<Derived*>(Base::shallow_mutate().get()));
}
};
};
73 changes: 26 additions & 47 deletions be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,56 +125,35 @@ Status ByteArrayDictDecoder::_decode_values(MutableColumnPtr& doris_column, Data
return _decode_dict_values<has_filter>(doris_column, select_vector, is_dict_filter);
}

TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
switch (logical_type) {
case TypeIndex::String:
[[fallthrough]];
case TypeIndex::FixedString: {
size_t dict_index = 0;
size_t dict_index = 0;

ColumnSelectVector::DataReadType read_type;
while (size_t run_length = select_vector.get_next_run<has_filter>(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
std::vector<StringRef> string_values;
string_values.reserve(run_length);
for (size_t i = 0; i < run_length; ++i) {
string_values.emplace_back(_dict_items[_indexes[dict_index++]]);
}
doris_column->insert_many_strings_overflow(&string_values[0], run_length,
_max_value_length);
break;
}
case ColumnSelectVector::NULL_DATA: {
doris_column->insert_many_defaults(run_length);
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
dict_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_NULL: {
// do nothing
break;
}
ColumnSelectVector::DataReadType read_type;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: variable 'read_type' is not initialized [cppcoreguidelines-init-variables]

    ColumnSelectVector::DataReadType read_type;
                                     ^

while (size_t run_length = select_vector.get_next_run<has_filter>(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
std::vector<StringRef> string_values;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: variable 'string_values' is not initialized [cppcoreguidelines-init-variables]

Suggested change
std::vector<StringRef> string_values;
std::vector<StringRef> string_values = 0;

string_values.reserve(run_length);
for (size_t i = 0; i < run_length; ++i) {
string_values.emplace_back(_dict_items[_indexes[dict_index++]]);
}
doris_column->insert_many_strings_overflow(&string_values[0], run_length,
_max_value_length);
break;
}
case ColumnSelectVector::NULL_DATA: {
doris_column->insert_many_defaults(run_length);
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
dict_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_NULL: {
// do nothing
break;
}
}
return Status::OK();
}
case TypeIndex::Decimal32:
return _decode_binary_decimal<Int32, has_filter>(doris_column, data_type, select_vector);
case TypeIndex::Decimal64:
return _decode_binary_decimal<Int64, has_filter>(doris_column, data_type, select_vector);
case TypeIndex::Decimal128:
return _decode_binary_decimal<Int128, has_filter>(doris_column, data_type, select_vector);
case TypeIndex::Decimal128I:
return _decode_binary_decimal<Int128, has_filter>(doris_column, data_type, select_vector);
// TODO: decimal256
default:
break;
}
return Status::InvalidArgument(
"Can't decode parquet physical type BYTE_ARRAY to doris logical type {}",
getTypeName(logical_type));
return Status::OK();
}
} // namespace doris::vectorized
87 changes: 0 additions & 87 deletions be/src/vec/exec/format/parquet/byte_array_dict_decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,97 +66,10 @@ class ByteArrayDictDecoder final : public BaseDictDecoder {
MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32* dict_column) override;

protected:
template <typename DecimalPrimitiveType, bool has_filter>
Status _decode_binary_decimal(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector);

// For dictionary encoding
std::vector<StringRef> _dict_items;
std::vector<uint8_t> _dict_data;
size_t _max_value_length;
std::unordered_map<StringRef, int32_t> _dict_value_to_code;

private:
template <typename DecimalPrimitiveType, bool has_filter,
DecimalScaleParams::ScaleType ScaleType>
Status _decode_binary_decimal_internal(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector);
};

template <typename DecimalPrimitiveType, bool has_filter>
Status ByteArrayDictDecoder::_decode_binary_decimal(MutableColumnPtr& doris_column,
DataTypePtr& data_type,
ColumnSelectVector& select_vector) {
init_decimal_converter<DecimalPrimitiveType>(data_type);
DecimalScaleParams& scale_params = _decode_params->decimal_scale;
if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) {
return _decode_binary_decimal_internal<DecimalPrimitiveType, has_filter,
DecimalScaleParams::SCALE_UP>(
doris_column, data_type, select_vector);
} else if (scale_params.scale_type == DecimalScaleParams::SCALE_DOWN) {
return _decode_binary_decimal_internal<DecimalPrimitiveType, has_filter,
DecimalScaleParams::SCALE_DOWN>(
doris_column, data_type, select_vector);
} else {
return _decode_binary_decimal_internal<DecimalPrimitiveType, has_filter,
DecimalScaleParams::NO_SCALE>(
doris_column, data_type, select_vector);
}
}

template <typename DecimalPrimitiveType, bool has_filter, DecimalScaleParams::ScaleType ScaleType>
Status ByteArrayDictDecoder::_decode_binary_decimal_internal(MutableColumnPtr& doris_column,
DataTypePtr& data_type,
ColumnSelectVector& select_vector) {
auto& column_data =
static_cast<ColumnDecimal<Decimal<DecimalPrimitiveType>>&>(*doris_column).get_data();
size_t data_index = column_data.size();
column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered());
size_t dict_index = 0;
DecimalScaleParams& scale_params = _decode_params->decimal_scale;
ColumnSelectVector::DataReadType read_type;
while (size_t run_length = select_vector.get_next_run<has_filter>(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
for (size_t i = 0; i < run_length; ++i) {
StringRef& slice = _dict_items[_indexes[dict_index++]];
char* buf_start = const_cast<char*>(slice.data);
uint32_t length = (uint32_t)slice.size;
// When Decimal in parquet is stored in byte arrays, binary and fixed,
// the unscaled number must be encoded as two's complement using big-endian byte order.
DecimalPrimitiveType value = 0;
memcpy(reinterpret_cast<char*>(&value), buf_start, length);
value = BitUtil::big_endian_to_host(value);
value = value >> ((sizeof(value) - length) * 8);
if constexpr (ScaleType == DecimalScaleParams::SCALE_UP) {
value *= scale_params.scale_factor;
} else if constexpr (ScaleType == DecimalScaleParams::SCALE_DOWN) {
value /= scale_params.scale_factor;
} else if constexpr (ScaleType == DecimalScaleParams::NO_SCALE) {
// do nothing
} else {
LOG(FATAL) << "__builtin_unreachable";
__builtin_unreachable();
}
auto& v = reinterpret_cast<DecimalPrimitiveType&>(column_data[data_index++]);
v = (DecimalPrimitiveType)value;
}
break;
}
case ColumnSelectVector::NULL_DATA: {
data_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
dict_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_NULL: {
// do nothing
break;
}
}
}
return Status::OK();
}
} // namespace doris::vectorized
105 changes: 42 additions & 63 deletions be/src/vec/exec/format/parquet/byte_array_plain_decoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,74 +56,53 @@ template <bool has_filter>
Status ByteArrayPlainDecoder::_decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector,
bool is_dict_filter) {
TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
switch (logical_type) {
case TypeIndex::String:
[[fallthrough]];
case TypeIndex::FixedString: {
ColumnSelectVector::DataReadType read_type;
while (size_t run_length = select_vector.get_next_run<has_filter>(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
std::vector<StringRef> string_values;
string_values.reserve(run_length);
for (size_t i = 0; i < run_length; ++i) {
if (UNLIKELY(_offset + 4 > _data->size)) {
return Status::IOError("Can't read byte array length from plain decoder");
}
uint32_t length = decode_fixed32_le(
reinterpret_cast<const uint8_t*>(_data->data) + _offset);
_offset += 4;
if (UNLIKELY(_offset + length) > _data->size) {
return Status::IOError("Can't read enough bytes in plain decoder");
}
string_values.emplace_back(_data->data + _offset, length);
_offset += length;
ColumnSelectVector::DataReadType read_type;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: variable 'read_type' is not initialized [cppcoreguidelines-init-variables]

    ColumnSelectVector::DataReadType read_type;
                                     ^

while (size_t run_length = select_vector.get_next_run<has_filter>(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
std::vector<StringRef> string_values;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: variable 'string_values' is not initialized [cppcoreguidelines-init-variables]

Suggested change
std::vector<StringRef> string_values;
std::vector<StringRef> string_values = 0;

string_values.reserve(run_length);
for (size_t i = 0; i < run_length; ++i) {
if (UNLIKELY(_offset + 4 > _data->size)) {
return Status::IOError("Can't read byte array length from plain decoder");
}
doris_column->insert_many_strings(&string_values[0], run_length);
break;
}
case ColumnSelectVector::NULL_DATA: {
doris_column->insert_many_defaults(run_length);
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
for (int i = 0; i < run_length; ++i) {
if (UNLIKELY(_offset + 4 > _data->size)) {
return Status::IOError("Can't read byte array length from plain decoder");
}
uint32_t length = decode_fixed32_le(
reinterpret_cast<const uint8_t*>(_data->data) + _offset);
_offset += 4;
if (UNLIKELY(_offset + length) > _data->size) {
return Status::IOError("Can't read enough bytes in plain decoder");
}
_offset += length;
uint32_t length =
decode_fixed32_le(reinterpret_cast<const uint8_t*>(_data->data) + _offset);
_offset += 4;
if (UNLIKELY(_offset + length) > _data->size) {
return Status::IOError("Can't read enough bytes in plain decoder");
}
break;
}
case ColumnSelectVector::FILTERED_NULL: {
// do nothing
break;
string_values.emplace_back(_data->data + _offset, length);
_offset += length;
}
doris_column->insert_many_strings(&string_values[0], run_length);
break;
}
case ColumnSelectVector::NULL_DATA: {
doris_column->insert_many_defaults(run_length);
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
for (int i = 0; i < run_length; ++i) {
if (UNLIKELY(_offset + 4 > _data->size)) {
return Status::IOError("Can't read byte array length from plain decoder");
}
uint32_t length =
decode_fixed32_le(reinterpret_cast<const uint8_t*>(_data->data) + _offset);
_offset += 4;
if (UNLIKELY(_offset + length) > _data->size) {
return Status::IOError("Can't read enough bytes in plain decoder");
}
_offset += length;
}
break;
}
case ColumnSelectVector::FILTERED_NULL: {
// do nothing
break;
}
}
return Status::OK();
}
case TypeIndex::Decimal32:
return _decode_binary_decimal<Int32, has_filter>(doris_column, data_type, select_vector);
case TypeIndex::Decimal64:
return _decode_binary_decimal<Int64, has_filter>(doris_column, data_type, select_vector);
case TypeIndex::Decimal128:
return _decode_binary_decimal<Int128, has_filter>(doris_column, data_type, select_vector);
case TypeIndex::Decimal128I:
return _decode_binary_decimal<Int128, has_filter>(doris_column, data_type, select_vector);
// TODO: decimal256
default:
break;
}
return Status::InvalidArgument(
"Can't decode parquet physical type BYTE_ARRAY to doris logical type {}",
getTypeName(logical_type));
return Status::OK();
}
} // namespace doris::vectorized
Loading