Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 46 additions & 13 deletions be/src/vec/columns/column_object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "vec/columns/column_object.h"

#include <assert.h>
#include <fmt/core.h>
#include <fmt/format.h>
#include <glog/logging.h>
#include <parallel_hashmap/phmap.h>
Expand All @@ -34,6 +35,7 @@
#include <map>
#include <memory>
#include <optional>
#include <sstream>
#include <vector>

#include "common/compiler_util.h" // IWYU pragma: keep
Expand Down Expand Up @@ -677,8 +679,6 @@ void ColumnObject::check_consistency() const {
}
for (const auto& leaf : subcolumns) {
if (num_rows != leaf->data.size()) {
// LOG(FATAL) << "unmatched column:" << leaf->path.get_path()
// << ", expeted rows:" << num_rows << ", but meet:" << leaf->data.size();
throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
"unmatched column: {}, expeted rows: {}, but meet: {}",
leaf->path.get_path(), num_rows, leaf->data.size());
Expand Down Expand Up @@ -1545,21 +1545,58 @@ void ColumnObject::insert_indices_from(const IColumn& src, const uint32_t* indic
}
}

void ColumnObject::update_hash_with_value(size_t n, SipHash& hash) const {
if (!is_finalized()) {
// finalize has no side effect and can be safely used in const functions
const_cast<ColumnObject*>(this)->finalize();
// finalize has no side effect and can be safely used in const functions
#define ENSURE_FINALIZED() \
if (!is_finalized()) { \
const_cast<ColumnObject*>(this)->finalize(); \
}

void ColumnObject::update_hash_with_value(size_t n, SipHash& hash) const {
ENSURE_FINALIZED();
for_each_imutable_subcolumn([&](const auto& subcolumn) {
if (n >= subcolumn.size()) {
LOG(FATAL) << n << " greater than column size " << subcolumn.size()
<< " sub_column_info:" << subcolumn.dump_structure()
<< " total lines of this column " << num_rows;
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"greater than column size {}, sub_column_info:{}, total lines "
"of this column:{}",
subcolumn.size(), subcolumn.dump_structure(), num_rows);
}
return subcolumn.update_hash_with_value(n, hash);
});
}

void ColumnObject::update_hashes_with_value(uint64_t* __restrict hashes,
const uint8_t* __restrict null_data) const {
ENSURE_FINALIZED();
for_each_imutable_subcolumn([&](const auto& subcolumn) {
return subcolumn.update_hashes_with_value(hashes, nullptr);
});
}

void ColumnObject::update_xxHash_with_value(size_t start, size_t end, uint64_t& hash,
const uint8_t* __restrict null_data) const {
ENSURE_FINALIZED();
for_each_imutable_subcolumn([&](const auto& subcolumn) {
return subcolumn.update_xxHash_with_value(start, end, hash, nullptr);
});
}

void ColumnObject::update_crcs_with_value(uint32_t* __restrict hash, PrimitiveType type,
uint32_t rows, uint32_t offset,
const uint8_t* __restrict null_data) const {
ENSURE_FINALIZED();
for_each_imutable_subcolumn([&](const auto& subcolumn) {
return subcolumn.update_crcs_with_value(hash, type, rows, offset, nullptr);
});
}

void ColumnObject::update_crc_with_value(size_t start, size_t end, uint32_t& hash,
const uint8_t* __restrict null_data) const {
ENSURE_FINALIZED();
for_each_imutable_subcolumn([&](const auto& subcolumn) {
return subcolumn.update_crc_with_value(start, end, hash, nullptr);
});
}

void ColumnObject::for_each_imutable_subcolumn(ImutableColumnCallback callback) const {
for (const auto& entry : subcolumns) {
for (auto& part : entry->data.data) {
Expand Down Expand Up @@ -1600,8 +1637,4 @@ Status ColumnObject::sanitize() const {
return Status::OK();
}

void ColumnObject::replace_column_data(const IColumn& col, size_t row, size_t self_row) {
LOG(FATAL) << "Method replace_column_data is not supported for " << get_name();
}

} // namespace doris::vectorized
178 changes: 144 additions & 34 deletions be/src/vec/columns/column_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -295,16 +295,6 @@ class ColumnObject final : public COWHelper<IColumn, ColumnObject> {
// return null if not found
const Subcolumn* get_subcolumn(const PathInData& key, size_t index_hint) const;

/** More efficient methods of manipulation */
[[noreturn]] IColumn& get_data() {
LOG(FATAL) << "Not implemented method get_data()";
__builtin_unreachable();
}
[[noreturn]] const IColumn& get_data() const {
LOG(FATAL) << "Not implemented method get_data()";
__builtin_unreachable();
}

// return null if not found
Subcolumn* get_subcolumn(const PathInData& key);

Expand Down Expand Up @@ -429,35 +419,13 @@ class ColumnObject final : public COWHelper<IColumn, ColumnObject> {

void get(size_t n, Field& res) const override;

/// All other methods throw exception.
StringRef get_data_at(size_t) const override {
LOG(FATAL) << "should not call the method in column object";
return StringRef();
}

Status try_insert_indices_from(const IColumn& src, const int* indices_begin,
const int* indices_end);

StringRef serialize_value_into_arena(size_t n, Arena& arena,
char const*& begin) const override {
LOG(FATAL) << "should not call the method in column object";
return StringRef();
}

void for_each_imutable_subcolumn(ImutableColumnCallback callback) const;

const char* deserialize_and_insert_from_arena(const char* pos) override {
LOG(FATAL) << "should not call the method in column object";
return nullptr;
}

void update_hash_with_value(size_t n, SipHash& hash) const override;

void insert_data(const char* pos, size_t length) override {
LOG(FATAL) << "should not call the method in column object";
__builtin_unreachable();
}

ColumnPtr filter(const Filter&, ssize_t) const override;

Status filter_by_selector(const uint16_t* sel, size_t sel_size, IColumn* col_ptr) override;
Expand All @@ -468,8 +436,6 @@ class ColumnObject final : public COWHelper<IColumn, ColumnObject> {

bool is_variable_length() const override { return true; }

void replace_column_data(const IColumn&, size_t row, size_t self_row) override;

template <typename Func>
MutableColumnPtr apply_for_subcolumns(Func&& func) const;

Expand All @@ -488,6 +454,150 @@ class ColumnObject final : public COWHelper<IColumn, ColumnObject> {
Status sanitize() const;

std::string debug_string() const;

void update_hashes_with_value(uint64_t* __restrict hashes,
const uint8_t* __restrict null_data = nullptr) const override;

void update_xxHash_with_value(size_t start, size_t end, uint64_t& hash,
const uint8_t* __restrict null_data) const override;

void update_crcs_with_value(uint32_t* __restrict hash, PrimitiveType type, uint32_t rows,
uint32_t offset = 0,
const uint8_t* __restrict null_data = nullptr) const override;

void update_crc_with_value(size_t start, size_t end, uint32_t& hash,
const uint8_t* __restrict null_data) const override;

// Not implemented
MutableColumnPtr get_shrinked_column() override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"get_shrinked_column" + std::string(get_family_name()));
}

Int64 get_int(size_t /*n*/) const override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"get_int" + std::string(get_family_name()));
}

bool get_bool(size_t /*n*/) const override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"get_bool" + std::string(get_family_name()));
}

void insert_many_fix_len_data(const char* pos, size_t num) override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"insert_many_fix_len_data" + std::string(get_family_name()));
}

void insert_many_dict_data(const int32_t* data_array, size_t start_index, const StringRef* dict,
size_t data_num, uint32_t dict_num = 0) override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"insert_many_dict_data" + std::string(get_family_name()));
}

void insert_many_binary_data(char* data_array, uint32_t* len_array,
uint32_t* start_offset_array, size_t num) override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"insert_many_binary_data" + std::string(get_family_name()));
}

void insert_many_continuous_binary_data(const char* data, const uint32_t* offsets,
const size_t num) override {
throw doris::Exception(
ErrorCode::NOT_IMPLEMENTED_ERROR,
"insert_many_continuous_binary_data" + std::string(get_family_name()));
}

void insert_many_strings(const StringRef* strings, size_t num) override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"insert_many_strings" + std::string(get_family_name()));
}

void insert_many_strings_overflow(const StringRef* strings, size_t num,
size_t max_length) override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"insert_many_strings_overflow" + std::string(get_family_name()));
}

void insert_many_raw_data(const char* pos, size_t num) override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"insert_many_raw_data" + std::string(get_family_name()));
}

size_t get_max_row_byte_size() const override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"get_max_row_byte_size" + std::string(get_family_name()));
}

void serialize_vec(std::vector<StringRef>& keys, size_t num_rows,
size_t max_row_byte_size) const override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"serialize_vec" + std::string(get_family_name()));
}

void serialize_vec_with_null_map(std::vector<StringRef>& keys, size_t num_rows,
const uint8_t* null_map) const override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"serialize_vec_with_null_map" + std::string(get_family_name()));
}

void deserialize_vec(std::vector<StringRef>& keys, const size_t num_rows) override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"deserialize_vec" + std::string(get_family_name()));
}

void deserialize_vec_with_null_map(std::vector<StringRef>& keys, const size_t num_rows,
const uint8_t* null_map) override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"deserialize_vec_with_null_map" + std::string(get_family_name()));
}

Status filter_by_selector(const uint16_t* sel, size_t sel_size, IColumn* col_ptr) const {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"filter_by_selector" + std::string(get_family_name()));
}

bool structure_equals(const IColumn&) const override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"structure_equals" + std::string(get_family_name()));
}

StringRef get_raw_data() const override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"get_raw_data" + std::string(get_family_name()));
}

size_t size_of_value_if_fixed() const override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"size_of_value_if_fixed" + std::string(get_family_name()));
}

StringRef get_data_at(size_t) const override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"get_data_at" + std::string(get_family_name()));
}

StringRef serialize_value_into_arena(size_t n, Arena& arena,
char const*& begin) const override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"serialize_value_into_arena" + std::string(get_family_name()));
}

const char* deserialize_and_insert_from_arena(const char* pos) override {
throw doris::Exception(
ErrorCode::NOT_IMPLEMENTED_ERROR,
"deserialize_and_insert_from_arena" + std::string(get_family_name()));
}

void insert_data(const char* pos, size_t length) override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"insert_data" + std::string(get_family_name()));
}

void replace_column_data(const IColumn&, size_t row, size_t self_row) override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"replace_column_data" + std::string(get_family_name()));
}
};

} // namespace doris::vectorized