Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions be/src/vec/columns/column_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,7 @@ class ColumnVector final : public COWHelper<ColumnVectorHelper, ColumnVector<T>>
use by date, datetime, basic type
*/
void insert_many_fix_len_data(const char* data_ptr, size_t num) override {
if constexpr (!std::is_same_v<T, vectorized::Int64>) {
insert_many_in_copy_way(data_ptr, num);
} else if (IColumn::is_date) {
if (IColumn::is_date) {
insert_date_column(data_ptr, num);
} else if (IColumn::is_date_time) {
insert_datetime_column(data_ptr, num);
Expand Down
24 changes: 13 additions & 11 deletions be/src/vec/common/aggregation_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,46 +166,43 @@ T pack_fixed(size_t i, size_t keys_size, const ColumnRawPtrs& key_columns, const
}

for (size_t j = 0; j < keys_size; ++j) {
bool is_null;
bool is_null = false;

if (!has_bitmap)
is_null = false;
else {
if (has_bitmap) {
size_t bucket = j / 8;
size_t off = j % 8;
is_null = ((bitmap[bucket] >> off) & 1) == 1;
}

if (is_null) continue;
if (is_null) {
offset += key_sizes[j];
continue;
}

switch (key_sizes[j]) {
case 1:
memcpy(bytes + offset,
static_cast<const ColumnVectorHelper*>(key_columns[j])->get_raw_data_begin<1>() +
i,
1);
offset += 1;
break;
case 2:
memcpy(bytes + offset,
static_cast<const ColumnVectorHelper*>(key_columns[j])->get_raw_data_begin<2>() +
i * 2,
2);
offset += 2;
break;
case 4:
memcpy(bytes + offset,
static_cast<const ColumnVectorHelper*>(key_columns[j])->get_raw_data_begin<4>() +
i * 4,
4);
offset += 4;
break;
case 8:
memcpy(bytes + offset,
static_cast<const ColumnVectorHelper*>(key_columns[j])->get_raw_data_begin<8>() +
i * 8,
8);
offset += 8;
break;
default:
memcpy(bytes + offset,
Expand All @@ -214,6 +211,8 @@ T pack_fixed(size_t i, size_t keys_size, const ColumnRawPtrs& key_columns, const
key_sizes[j]);
offset += key_sizes[j];
}

offset += key_sizes[j];
}

return key;
Expand All @@ -224,7 +223,9 @@ inline UInt128 hash128(size_t i, size_t keys_size, const ColumnRawPtrs& key_colu
UInt128 key;
SipHash hash;

for (size_t j = 0; j < keys_size; ++j) key_columns[j]->update_hash_with_value(i, hash);
for (size_t j = 0; j < keys_size; ++j) {
key_columns[j]->update_hash_with_value(i, hash);
}

hash.get128(key.low, key.high);

Expand Down Expand Up @@ -253,8 +254,9 @@ inline StringRef serialize_keys_to_pool_contiguous(size_t i, size_t keys_size,
const char* begin = nullptr;

size_t sum_size = 0;
for (size_t j = 0; j < keys_size; ++j)
for (size_t j = 0; j < keys_size; ++j) {
sum_size += key_columns[j]->serialize_value_into_arena(i, pool, begin).size;
}

return {begin, sum_size};
}
Expand Down
111 changes: 32 additions & 79 deletions be/src/vec/exec/vaggregation_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,6 @@ struct AggregationMethodSerialized {
return max_one_row_byte_size;
}

static void insert_key_into_columns(const StringRef& key, MutableColumns& key_columns,
const Sizes&) {
auto pos = key.data;
for (auto& column : key_columns) {
pos = column->deserialize_and_insert_from_arena(pos);
}
}

static void insert_keys_into_columns(std::vector<StringRef>& keys, MutableColumns& key_columns,
const size_t num_rows, const Sizes&) {
for (auto& column : key_columns) {
Expand Down Expand Up @@ -215,11 +207,6 @@ struct AggregationMethodStringNoCache {

static const bool low_cardinality_optimization = false;

static void insert_key_into_columns(const StringRef& key, MutableColumns& key_columns,
const Sizes&) {
key_columns[0]->insert_data(key.data, key.size);
}

static void insert_keys_into_columns(std::vector<StringRef>& keys, MutableColumns& key_columns,
const size_t num_rows, const Sizes&) {
key_columns[0]->reserve(num_rows);
Expand Down Expand Up @@ -256,14 +243,6 @@ struct AggregationMethodOneNumber {
using State = ColumnsHashing::HashMethodOneNumber<typename Data::value_type, Mapped, FieldType,
consecutive_keys_optimization>;

// Insert the key from the hash table into columns.
static void insert_key_into_columns(const Key& key, MutableColumns& key_columns,
const Sizes& /*key_sizes*/) {
const auto* key_holder = reinterpret_cast<const char*>(&key);
auto* column = static_cast<ColumnVectorHelper*>(key_columns[0].get());
column->insert_raw_data<sizeof(FieldType)>(key_holder);
}

static void insert_keys_into_columns(std::vector<Key>& keys, MutableColumns& key_columns,
const size_t num_rows, const Sizes&) {
key_columns[0]->reserve(num_rows);
Expand Down Expand Up @@ -328,59 +307,44 @@ struct AggregationMethodKeysFixed {
using State = ColumnsHashing::HashMethodKeysFixed<typename Data::value_type, Key, Mapped,
has_nullable_keys, false>;

static void insert_key_into_columns(const Key& key, MutableColumns& key_columns,
const Sizes& key_sizes) {
size_t keys_size = key_columns.size();

static constexpr auto bitmap_size =
has_nullable_keys ? std::tuple_size<KeysNullMap<Key>>::value : 0;
/// In any hash key value, column values to be read start just after the bitmap, if it exists.
size_t pos = bitmap_size;

for (size_t i = 0; i < keys_size; ++i) {
IColumn* observed_column;
ColumnUInt8* null_map;

bool column_nullable = false;
if constexpr (has_nullable_keys) {
column_nullable = is_column_nullable(*key_columns[i]);
}

/// If we have a nullable column, get its nested column and its null map.
if (column_nullable) {
static void insert_keys_into_columns(std::vector<Key>& keys, MutableColumns& key_columns,
const size_t num_rows, const Sizes& key_sizes) {
// In any hash key value, column values to be read start just after the bitmap, if it exists.
size_t pos = has_nullable_keys ? std::tuple_size<KeysNullMap<Key>>::value : 0;

for (size_t i = 0; i < key_columns.size(); ++i) {
size_t size = key_sizes[i];
key_columns[i]->resize(num_rows);
// If we have a nullable column, get its nested column and its null map.
if (is_column_nullable(*key_columns[i])) {
ColumnNullable& nullable_col = assert_cast<ColumnNullable&>(*key_columns[i]);
observed_column = &nullable_col.get_nested_column();
null_map = assert_cast<ColumnUInt8*>(&nullable_col.get_null_map_column());
} else {
observed_column = key_columns[i].get();
null_map = nullptr;
}

bool is_null = false;
if (column_nullable) {
/// The current column is nullable. Check if the value of the
/// corresponding key is nullable. Update the null map accordingly.
char* data =
const_cast<char*>(nullable_col.get_nested_column().get_raw_data().data);
UInt8* nullmap = assert_cast<ColumnUInt8*>(&nullable_col.get_null_map_column())
->get_data()
.data();

// The current column is nullable. Check if the value of the
// corresponding key is nullable. Update the null map accordingly.
size_t bucket = i / 8;
size_t offset = i % 8;
UInt8 val = (reinterpret_cast<const UInt8*>(&key)[bucket] >> offset) & 1;
null_map->insert_value(val);
is_null = val == 1;
}

if (has_nullable_keys && is_null) {
observed_column->insert_default();
for (size_t j = 0; j < num_rows; j++) {
const Key& key = keys[j];
UInt8 val = (reinterpret_cast<const UInt8*>(&key)[bucket] >> offset) & 1;
nullmap[j] = val;
if (!val) {
memcpy(data + j * size, reinterpret_cast<const char*>(&key) + pos, size);
}
}
} else {
size_t size = key_sizes[i];
observed_column->insert_data(reinterpret_cast<const char*>(&key) + pos, size);
pos += size;
char* data = const_cast<char*>(key_columns[i]->get_raw_data().data);
for (size_t j = 0; j < num_rows; j++) {
const Key& key = keys[j];
memcpy(data + j * size, reinterpret_cast<const char*>(&key) + pos, size);
}
}
}
}

static void insert_keys_into_columns(std::vector<Key>& keys, MutableColumns& key_columns,
const size_t num_rows, const Sizes& key_sizes) {
for (size_t i = 0; i != num_rows; ++i) {
insert_key_into_columns(keys[i], key_columns, key_sizes);
pos += size;
}
}

Expand Down Expand Up @@ -411,17 +375,6 @@ struct AggregationMethodSingleNullableColumn : public SingleColumnMethod {

using State = ColumnsHashing::HashMethodSingleLowNullableColumn<BaseState, Mapped, true>;

static void insert_key_into_columns(const Key& key, MutableColumns& key_columns,
const Sizes& /*key_sizes*/) {
auto col = key_columns[0].get();

if constexpr (std::is_same_v<Key, StringRef>) {
col->insert_data(key.data, key.size);
} else {
col->insert_data(reinterpret_cast<const char*>(&key), sizeof(key));
}
}

static void insert_keys_into_columns(std::vector<Key>& keys, MutableColumns& key_columns,
const size_t num_rows, const Sizes&) {
auto col = key_columns[0].get();
Expand Down