Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 21 additions & 17 deletions be/src/vec/aggregate_functions/aggregate_function_collect.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
#include "vec/columns/column_string.h"
#include "vec/columns/columns_number.h"
#include "vec/common/assert_cast.h"
#include "vec/common/hash_table/hash_set.h"
#include "vec/common/pod_array_fwd.h"
#include "vec/common/string_buffer.hpp"
#include "vec/common/string_ref.h"
Expand All @@ -62,7 +61,7 @@ struct AggregateFunctionCollectSetData {
using ColVecType = ColumnVectorOrDecimal<ElementType>;
using ElementNativeType = typename NativeType<T>::Type;
using SelfType = AggregateFunctionCollectSetData;
using Set = HashSetWithStackMemory<ElementNativeType, DefaultHash<ElementNativeType>, 4>;
using Set = phmap::flat_hash_set<ElementNativeType>;
Set data_set;
Int64 max_size = -1;

Expand All @@ -83,28 +82,37 @@ struct AggregateFunctionCollectSetData {
if (size() >= max_size) {
return;
}
data_set.insert(rhs_elem.get_value());
data_set.insert(rhs_elem);
}
} else {
data_set.merge(rhs.data_set);
data_set.merge(Set(rhs.data_set));
}
}

void write(BufferWritable& buf) const {
data_set.write(buf);
write_var_uint(data_set.size(), buf);
for (const auto& value : data_set) {
write_binary(value, buf);
}
write_var_int(max_size, buf);
}

void read(BufferReadable& buf) {
data_set.read(buf);
size_t new_size = 0;
read_var_uint(new_size, buf);
ElementNativeType x;
for (size_t i = 0; i < new_size; ++i) {
read_binary(x, buf);
data_set.insert(x);
}
read_var_int(max_size, buf);
}

void insert_result_into(IColumn& to) const {
auto& vec = assert_cast<ColVecType&>(to).get_data();
vec.reserve(size());
for (const auto& item : data_set) {
vec.push_back(item.key);
vec.push_back(item);
}
}

Expand All @@ -116,23 +124,19 @@ struct AggregateFunctionCollectSetData<StringRef, HasLimit> {
using ElementType = StringRef;
using ColVecType = ColumnString;
using SelfType = AggregateFunctionCollectSetData<ElementType, HasLimit>;
using Set = HashSetWithStackMemory<ElementType, DefaultHash<ElementType>, 4>;
using Set = phmap::flat_hash_set<ElementType>;
Set data_set;
Int64 max_size = -1;

size_t size() const { return data_set.size(); }

void add(const IColumn& column, size_t row_num, Arena* arena) {
Set::LookupResult it;
bool inserted;
auto key = column.get_data_at(row_num);
key.data = arena->insert(key.data, key.size);
data_set.emplace(key, it, inserted);
data_set.insert(key);
}

void merge(const SelfType& rhs, Arena* arena) {
bool inserted;
Set::LookupResult it;
if (max_size == -1) {
max_size = rhs.max_size;
}
Expand All @@ -145,16 +149,16 @@ struct AggregateFunctionCollectSetData<StringRef, HasLimit> {
}
}
assert(arena != nullptr);
StringRef key = rhs_elem.get_value();
StringRef key = rhs_elem;
key.data = arena->insert(key.data, key.size);
data_set.emplace(key, it, inserted);
data_set.insert(key);
}
}

void write(BufferWritable& buf) const {
write_var_uint(size(), buf);
for (const auto& elem : data_set) {
write_string_binary(elem.get_value(), buf);
write_string_binary(elem, buf);
}
write_var_int(max_size, buf);
}
Expand All @@ -174,7 +178,7 @@ struct AggregateFunctionCollectSetData<StringRef, HasLimit> {
auto& vec = assert_cast<ColVecType&>(to);
vec.reserve(size());
for (const auto& item : data_set) {
vec.insert_data(item.key.data, item.key.size);
vec.insert_data(item.data, item.size);
}
}

Expand Down
44 changes: 23 additions & 21 deletions be/src/vec/aggregate_functions/aggregate_function_distinct.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/columns/column.h"
#include "vec/common/assert_cast.h"
#include "vec/common/hash_table/hash_set.h"
#include "vec/common/string_ref.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type.h"
Expand All @@ -59,8 +58,8 @@ namespace doris::vectorized {
template <typename T, bool stable>
struct AggregateFunctionDistinctSingleNumericData {
/// When creating, the hash table must be small.
using Container = std::conditional_t<stable, phmap::flat_hash_map<T, uint32_t>,
HashSetWithStackMemory<T, DefaultHash<T>, 4>>;
using Container =
std::conditional_t<stable, phmap::flat_hash_map<T, uint32_t>, phmap::flat_hash_set<T>>;
using Self = AggregateFunctionDistinctSingleNumericData<T, stable>;
Container data;

Expand All @@ -78,21 +77,30 @@ struct AggregateFunctionDistinctSingleNumericData {
void merge(const Self& rhs, Arena*) {
DCHECK(!stable);
if constexpr (!stable) {
data.merge(rhs.data);
data.merge(Container(rhs.data));
}
}

void serialize(BufferWritable& buf) const {
DCHECK(!stable);
if constexpr (!stable) {
data.write(buf);
write_var_uint(data.size(), buf);
for (const auto& value : data) {
write_binary(value, buf);
}
}
}

void deserialize(BufferReadable& buf, Arena*) {
DCHECK(!stable);
if constexpr (!stable) {
data.read(buf);
size_t new_size = 0;
read_var_uint(new_size, buf);
T x;
for (size_t i = 0; i < new_size; ++i) {
read_binary(x, buf);
data.insert(x);
}
}
}

Expand All @@ -108,7 +116,7 @@ struct AggregateFunctionDistinctSingleNumericData {
}
} else {
for (const auto& elem : data) {
argument_columns[0]->insert(elem.get_value());
argument_columns[0]->insert(elem);
}
}

Expand All @@ -120,19 +128,17 @@ template <bool stable>
struct AggregateFunctionDistinctGenericData {
/// When creating, the hash table must be small.
using Container = std::conditional_t<stable, phmap::flat_hash_map<StringRef, uint32_t>,
HashSetWithStackMemory<StringRef, StringRefHash, 4>>;
phmap::flat_hash_set<StringRef, StringRefHash>>;
using Self = AggregateFunctionDistinctGenericData;
Container data;

void merge(const Self& rhs, Arena* arena) {
DCHECK(!stable);
if constexpr (!stable) {
typename Container::LookupResult it;
bool inserted;
for (const auto& elem : rhs.data) {
StringRef key = elem.get_value();
StringRef key = elem;
key.data = arena->insert(key.data, key.size);
data.emplace(key, it, inserted);
data.emplace(key);
}
}
}
Expand All @@ -142,7 +148,7 @@ struct AggregateFunctionDistinctGenericData {
if constexpr (!stable) {
write_var_uint(data.size(), buf);
for (const auto& elem : data) {
write_string_binary(elem.get_value(), buf);
write_string_binary(elem, buf);
}
}
}
Expand Down Expand Up @@ -174,9 +180,7 @@ struct AggregateFunctionDistinctSingleGenericData
if constexpr (stable) {
data.emplace(key, data.size());
} else {
typename Base::Container::LookupResult it;
bool inserted;
data.emplace(key, it, inserted);
data.insert(key);
}
}

Expand All @@ -193,7 +197,7 @@ struct AggregateFunctionDistinctSingleGenericData
}
} else {
for (const auto& elem : data) {
argument_columns[0]->insert_data(elem.get_value().data, elem.get_value().size);
argument_columns[0]->insert_data(elem.data, elem.size);
}
}

Expand All @@ -218,9 +222,7 @@ struct AggregateFunctionDistinctMultipleGenericData
if constexpr (stable) {
data.emplace(key, data.size());
} else {
typename Base::Container::LookupResult it;
bool inserted;
data.emplace(key, it, inserted);
data.emplace(key);
}
}

Expand All @@ -243,7 +245,7 @@ struct AggregateFunctionDistinctMultipleGenericData
}
} else {
for (const auto& elem : data) {
const char* begin = elem.get_value().data;
const char* begin = elem.data;
for (auto& column : argument_columns) {
begin = column->deserialize_and_insert_from_arena(begin);
}
Expand Down
10 changes: 4 additions & 6 deletions be/src/vec/functions/array/function_array_distinct.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
#include "vec/columns/columns_number.h"
#include "vec/common/assert_cast.h"
#include "vec/common/hash_table/hash.h"
#include "vec/common/hash_table/hash_set.h"
#include "vec/common/pod_array_fwd.h"
#include "vec/common/string_ref.h"
#include "vec/core/block.h"
Expand Down Expand Up @@ -146,8 +145,7 @@ class FunctionArrayDistinct : public IFunction {
auto& dest_data_concrete = reinterpret_cast<ColumnType&>(dest_column);
PaddedPODArray<NestType>& dest_datas = dest_data_concrete.get_data();

using Set = HashSetWithStackMemory<ElementNativeType, DefaultHash<ElementNativeType>,
INITIAL_SIZE_DEGREE>;
using Set = phmap::flat_hash_set<ElementNativeType, DefaultHash<ElementNativeType>>;
Set set;

size_t prev_src_offset = 0;
Expand All @@ -171,7 +169,7 @@ class FunctionArrayDistinct : public IFunction {
continue;
}

if (!set.find(src_datas[j])) {
if (!set.contains(src_datas[j])) {
set.insert(src_datas[j]);
dest_datas.push_back(src_datas[j]);
if (dest_null_map) {
Expand Down Expand Up @@ -201,7 +199,7 @@ class FunctionArrayDistinct : public IFunction {
ColumnString::Offsets& column_string_offsets = dest_column_string.get_offsets();
column_string_chars.reserve(src_column.size());

using Set = HashSetWithStackMemory<StringRef, DefaultHash<StringRef>, INITIAL_SIZE_DEGREE>;
using Set = phmap::flat_hash_set<StringRef, DefaultHash<StringRef>>;
Set set;

size_t prev_src_offset = 0;
Expand All @@ -225,7 +223,7 @@ class FunctionArrayDistinct : public IFunction {
}

StringRef src_str_ref = src_data_concrete->get_data_at(j);
if (!set.find(src_str_ref)) {
if (!set.contains(src_str_ref)) {
set.insert(src_str_ref);
// copy the src data to column_string_chars
const size_t old_size = column_string_chars.size();
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/functions/array/function_array_except.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ struct ExceptAction {
template <bool is_left>
bool apply(Set& set, Set& result_set, const Element& elem) {
if constexpr (is_left) {
if (!set.find(elem)) {
if (!set.contains(elem)) {
set.insert(elem);
return true;
}
} else {
if (!set.find(elem)) {
if (!set.contains(elem)) {
set.insert(elem);
}
}
Expand Down
1 change: 0 additions & 1 deletion be/src/vec/functions/array/function_array_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include "vec/columns/column_array.h"
#include "vec/columns/column_string.h"
#include "vec/common/hash_table/hash_map.h"
#include "vec/common/hash_table/hash_set.h"
#include "vec/data_types/data_type_array.h"
#include "vec/functions/array/function_array_utils.h"
#include "vec/functions/function_helpers.h"
Expand Down
5 changes: 2 additions & 3 deletions be/src/vec/functions/array/function_array_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

#include "vec/columns/column_array.h"
#include "vec/columns/column_string.h"
#include "vec/common/hash_table/hash_set.h"
#include "vec/data_types/data_type_array.h"
#include "vec/functions/array/function_array_utils.h"
#include "vec/functions/function_helpers.h"
Expand All @@ -48,7 +47,7 @@ template <SetOperation operation, typename ColumnType>
struct OpenSetImpl {
using Element = typename ColumnType::value_type;
using ElementNativeType = typename NativeType<Element>::Type;
using Set = HashSetWithStackMemory<ElementNativeType, DefaultHash<ElementNativeType>, 4>;
using Set = phmap::flat_hash_set<ElementNativeType>;
using Action = typename ActionImpl<Set, Element, operation>::Action;
Action action;
Set set;
Expand Down Expand Up @@ -85,7 +84,7 @@ struct OpenSetImpl {

template <SetOperation operation>
struct OpenSetImpl<operation, ColumnString> {
using Set = HashSetWithStackMemory<StringRef, DefaultHash<StringRef>, 4>;
using Set = phmap::flat_hash_set<StringRef>;
using Action = typename ActionImpl<Set, StringRef, operation>::Action;
Action action;
Set set;
Expand Down
9 changes: 4 additions & 5 deletions be/src/vec/functions/array/function_arrays_overlap.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
#include "vec/columns/columns_number.h"
#include "vec/common/assert_cast.h"
#include "vec/common/hash_table/hash.h"
#include "vec/common/hash_table/hash_set.h"
#include "vec/common/string_ref.h"
#include "vec/core/block.h"
#include "vec/core/column_numbers.h"
Expand Down Expand Up @@ -62,7 +61,7 @@ using ColumnString = ColumnStr<UInt32>;
template <typename T>
struct OverlapSetImpl {
using ElementNativeType = typename NativeType<typename T::value_type>::Type;
using Set = HashSetWithStackMemory<ElementNativeType, DefaultHash<ElementNativeType>, 4>;
using Set = phmap::flat_hash_set<ElementNativeType, DefaultHash<ElementNativeType>>;
Set set;
void insert_array(const IColumn* column, size_t start, size_t size) {
const auto& vec = assert_cast<const T&>(*column).get_data();
Expand All @@ -73,7 +72,7 @@ struct OverlapSetImpl {
bool find_any(const IColumn* column, size_t start, size_t size) {
const auto& vec = assert_cast<const T&>(*column).get_data();
for (size_t i = start; i < start + size; ++i) {
if (set.find(vec[i])) {
if (set.contains(vec[i])) {
return true;
}
}
Expand All @@ -83,7 +82,7 @@ struct OverlapSetImpl {

template <>
struct OverlapSetImpl<ColumnString> {
using Set = HashSetWithStackMemory<StringRef, DefaultHash<StringRef>, 4>;
using Set = phmap::flat_hash_set<StringRef, DefaultHash<StringRef>>;
Set set;
void insert_array(const IColumn* column, size_t start, size_t size) {
for (size_t i = start; i < start + size; ++i) {
Expand All @@ -92,7 +91,7 @@ struct OverlapSetImpl<ColumnString> {
}
bool find_any(const IColumn* column, size_t start, size_t size) {
for (size_t i = start; i < start + size; ++i) {
if (set.find(column->get_data_at(i))) {
if (set.contains(column->get_data_at(i))) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
a ["aa", "a"]

-- !select_default --
a ["aaa", "aa", "a"]
a ["a", "aa", "aaa"]
b ["b"]

-- !select_default --
a ["aaa", "aa", "a"]
a ["a", "aa", "aaa"]
b ["b"]

Loading
Loading