Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@
*.thrift text eol=lf
*.proto text eol=lf
*.conf text eol=lf
*.out text eol=lf -diff
40 changes: 0 additions & 40 deletions be/src/olap/hll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -407,44 +407,4 @@ void HllSetResolver::parse() {
}
}

void HllSetHelper::set_sparse(char* result, const std::map<int, uint8_t>& index_to_value,
int& len) {
result[0] = HLL_DATA_SPARSE;
len = sizeof(HllSetResolver::SetTypeValueType) + sizeof(HllSetResolver::SparseLengthValueType);
char* write_value_pos = result + len;
for (auto iter = index_to_value.begin(); iter != index_to_value.end(); ++iter) {
write_value_pos[0] = (char)(iter->first & 0xff);
write_value_pos[1] = (char)(iter->first >> 8 & 0xff);
write_value_pos[2] = iter->second;
write_value_pos += 3;
}
int registers_count = index_to_value.size();
len += registers_count *
(sizeof(HllSetResolver::SparseIndexType) + sizeof(HllSetResolver::SparseValueType));
*(int*)(result + 1) = registers_count;
}

void HllSetHelper::set_explicit(char* result, const std::set<uint64_t>& hash_value_set, int& len) {
result[0] = HLL_DATA_EXPLICIT;
result[1] = (HllSetResolver::ExplicitLengthValueType)(hash_value_set.size());
len = sizeof(HllSetResolver::SetTypeValueType) +
sizeof(HllSetResolver::ExplicitLengthValueType);
char* write_pos = result + len;
for (auto iter = hash_value_set.begin(); iter != hash_value_set.end(); ++iter) {
uint64_t hash_value = *iter;
*(uint64_t*)write_pos = hash_value;
write_pos += 8;
}
len += sizeof(uint64_t) * hash_value_set.size();
}

void HllSetHelper::set_full(char* result, const std::map<int, uint8_t>& index_to_value,
const int registers_len, int& len) {
result[0] = HLL_DATA_FULL;
for (auto iter = index_to_value.begin(); iter != index_to_value.end(); ++iter) {
result[1 + iter->first] = iter->second;
}
len = registers_len + sizeof(HllSetResolver::SetTypeValueType);
}

} // namespace doris
9 changes: 0 additions & 9 deletions be/src/olap/hll.h
Original file line number Diff line number Diff line change
Expand Up @@ -370,13 +370,4 @@ class HllSetResolver {
SparseLengthValueType* _sparse_count;
};

// todo(kks): remove this when dpp_sink class was removed
class HllSetHelper {
public:
static void set_sparse(char* result, const std::map<int, uint8_t>& index_to_value, int& len);
static void set_explicit(char* result, const std::set<uint64_t>& hash_value_set, int& len);
static void set_full(char* result, const std::map<int, uint8_t>& index_to_value,
const int set_len, int& len);
};

} // namespace doris
10 changes: 4 additions & 6 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1001,16 +1001,15 @@ uint16_t SegmentIterator::_evaluate_short_circuit_predicate(uint16_t* vec_sel_ro

Status SegmentIterator::_read_columns_by_rowids(std::vector<ColumnId>& read_column_ids,
std::vector<rowid_t>& rowid_vector,
uint16_t* sel_rowid_idx, size_t select_size,
vectorized::MutableColumns* mutable_columns) {
uint16_t* sel_rowid_idx, size_t select_size) {
SCOPED_RAW_TIMER(&_opts.stats->lazy_read_ns);
std::vector<rowid_t> rowids(select_size);
for (size_t i = 0; i < select_size; ++i) {
rowids[i] = rowid_vector[sel_rowid_idx[i]];
}
for (auto cid : read_column_ids) {
auto& column = (*mutable_columns)[cid];
RETURN_IF_ERROR(_column_iterators[cid]->read_by_rowids(rowids.data(), select_size, column));
RETURN_IF_ERROR(_column_iterators[cid]->read_by_rowids(rowids.data(), select_size,
_current_return_columns[cid]));
}

return Status::OK();
Expand Down Expand Up @@ -1117,8 +1116,7 @@ Status SegmentIterator::next_batch(vectorized::Block* block) {

// step3: read non_predicate column
RETURN_IF_ERROR(_read_columns_by_rowids(_non_predicate_columns, _block_rowids,
sel_rowid_idx, selected_size,
&_current_return_columns));
sel_rowid_idx, selected_size));

// step4: output columns
// 4.1 output non-predicate column
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/segment_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class SegmentIterator : public RowwiseIterator {
void _output_non_pred_columns(vectorized::Block* block);
Status _read_columns_by_rowids(std::vector<ColumnId>& read_column_ids,
std::vector<rowid_t>& rowid_vector, uint16_t* sel_rowid_idx,
size_t select_size, vectorized::MutableColumns* mutable_columns);
size_t select_size);

template <class Container>
Status _output_column_by_sel_idx(vectorized::Block* block, const Container& column_ids,
Expand Down
48 changes: 22 additions & 26 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -785,22 +785,11 @@ Status RowBlockChanger::change_block(vectorized::Block* ref_block,
return Status::OLAPInternalError(OLAP_ERR_NOT_INITED);
}

// material-view or rollup task will fail now
if (_desc_tbl.get_row_tuples().size() != ref_block->columns()) {
return Status::NotSupported(
"_desc_tbl.get_row_tuples().size() != ref_block->columns(), maybe because rollup "
"not supported now. ");
}

std::vector<bool> nullable_tuples;
for (int i = 0; i < ref_block->columns(); i++) {
nullable_tuples.emplace_back(ref_block->get_by_position(i).column->is_nullable());
}

ObjectPool pool;
RuntimeState* state = pool.add(new RuntimeState());
state->set_desc_tbl(&_desc_tbl);
RowDescriptor row_desc = RowDescriptor::create_default(_desc_tbl, nullable_tuples);
RowDescriptor row_desc =
RowDescriptor(_desc_tbl.get_tuple_descriptor(_desc_tbl.get_row_tuples()[0]), false);

const int row_size = ref_block->rows();
const int column_size = new_block->columns();
Expand All @@ -811,10 +800,6 @@ Status RowBlockChanger::change_block(vectorized::Block* ref_block,
for (int idx = 0; idx < column_size; idx++) {
int ref_idx = _schema_mapping[idx].ref_column;

if (!_schema_mapping[idx].materialized_function.empty()) {
return Status::NotSupported("Materialized function not supported now. ");
}

if (ref_idx < 0) {
// new column, write default value
auto value = _schema_mapping[idx].default_value;
Expand Down Expand Up @@ -1547,15 +1532,14 @@ Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_rea
rowset_reader->next_block(ref_block.get());
while (ref_block->rows()) {
RETURN_IF_ERROR(_changer.change_block(ref_block.get(), new_block.get()));
if (!_mem_tracker->check_limit(config::memory_limitation_per_thread_for_schema_change_bytes,
new_block->allocated_bytes())) {
if (!_mem_tracker->check_limit(_memory_limitation, new_block->allocated_bytes())) {
RETURN_IF_ERROR(create_rowset());

if (!_mem_tracker->check_limit(
config::memory_limitation_per_thread_for_schema_change_bytes,
new_block->allocated_bytes())) {
if (!_mem_tracker->check_limit(_memory_limitation, new_block->allocated_bytes())) {
LOG(WARNING) << "Memory limitation is too small for Schema Change."
<< "memory_limitation=" << _memory_limitation;
<< " _memory_limitation=" << _memory_limitation
<< ", new_block->allocated_bytes()=" << new_block->allocated_bytes()
<< ", consumption=" << _mem_tracker->consumption();
return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR);
}
}
Expand Down Expand Up @@ -1649,9 +1633,8 @@ bool SchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_row
rs_readers.push_back(rs_reader);
}
// get cur schema if rowset schema exist, rowset schema must be newer than tablet schema
auto max_version_rowset = src_rowsets.back();
const TabletSchema* cur_tablet_schema =
max_version_rowset->rowset_meta()->tablet_schema().get();
src_rowsets.back()->rowset_meta()->tablet_schema().get();
if (cur_tablet_schema == nullptr) {
cur_tablet_schema = new_tablet->tablet_schema().get();
}
Expand Down Expand Up @@ -1680,6 +1663,12 @@ Status VSchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_
rs_readers.push_back(rs_reader);
}

// get cur schema if rowset schema exist, rowset schema must be newer than tablet schema
auto cur_tablet_schema = src_rowsets.back()->rowset_meta()->tablet_schema();
if (cur_tablet_schema == nullptr) {
cur_tablet_schema = new_tablet->tablet_schema();
}

Merger::Statistics stats;
RETURN_IF_ERROR(Merger::vmerge_rowsets(new_tablet, READER_ALTER_TABLE,
new_tablet->tablet_schema().get(), rs_readers,
Expand Down Expand Up @@ -1717,6 +1706,7 @@ Status SchemaChangeHandler::process_alter_tablet_v2(const TAlterTabletReqV2& req

std::shared_mutex SchemaChangeHandler::_mutex;
std::unordered_set<int64_t> SchemaChangeHandler::_tablet_ids_in_converting;
std::set<std::string> SchemaChangeHandler::_supported_functions = {"hll_hash", "to_bitmap"};

// In the past schema change and rollup will create new tablet and will wait for txns starting before the task to finished
// It will cost a lot of time to wait and the task is very difficult to understand.
Expand Down Expand Up @@ -1848,7 +1838,7 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
LOG(WARNING) << "New tablet has a version " << pair.first
<< " crossing base tablet's max_version="
<< max_rowset->end_version();
Status::OLAPInternalError(OLAP_ERR_VERSION_ALREADY_MERGED);
return Status::OLAPInternalError(OLAP_ERR_VERSION_ALREADY_MERGED);
}
}
std::vector<RowsetSharedPtr> empty_vec;
Expand Down Expand Up @@ -1949,9 +1939,14 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
if (item.__isset.mv_expr) {
if (item.mv_expr.nodes[0].node_type == TExprNodeType::FUNCTION_CALL) {
mv_param.mv_expr = item.mv_expr.nodes[0].fn.name.function_name;
if (!_supported_functions.count(mv_param.mv_expr)) {
return Status::NotSupported("Unknow materialized view expr " +
mv_param.mv_expr);
}
} else if (item.mv_expr.nodes[0].node_type == TExprNodeType::CASE_EXPR) {
mv_param.mv_expr = "count_field";
}

mv_param.expr = std::make_shared<TExpr>(item.mv_expr);
}
sc_params.materialized_params_map.insert(
Expand Down Expand Up @@ -2152,6 +2147,7 @@ Status SchemaChangeHandler::_parse_request(
const TabletColumn& new_column = new_tablet->tablet_schema()->column(i);
const string& column_name = new_column.name();
ColumnMapping* column_mapping = rb_changer->get_mutable_column_mapping(i);
column_mapping->new_column = &new_column;

if (new_column.has_reference_column()) {
int32_t column_index = base_tablet_schema->field_index(new_column.referenced_column());
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/schema_change.h
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ class SchemaChangeHandler {

static std::shared_mutex _mutex;
static std::unordered_set<int64_t> _tablet_ids_in_converting;
static std::set<std::string> _supported_functions;
};

using RowBlockDeleter = std::function<void(RowBlock*)>;
Expand Down
5 changes: 0 additions & 5 deletions be/src/runtime/descriptors.h
Original file line number Diff line number Diff line change
Expand Up @@ -392,11 +392,6 @@ class RowDescriptor {
RowDescriptor(const DescriptorTbl& desc_tbl, const std::vector<TTupleId>& row_tuples,
const std::vector<bool>& nullable_tuples);

static RowDescriptor create_default(const DescriptorTbl& desc_tbl,
const std::vector<bool>& nullable_tuples) {
return RowDescriptor(desc_tbl, desc_tbl.get_row_tuples(), nullable_tuples);
}

// standard copy c'tor, made explicit here
RowDescriptor(const RowDescriptor& desc)
: _tuple_desc_map(desc._tuple_desc_map),
Expand Down
1 change: 0 additions & 1 deletion be/src/runtime/primitive_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ struct PrimitiveTypeTraits<TYPE_HLL> {
using ColumnType = vectorized::ColumnString;
};

// only for adapt get_predicate_column_ptr
template <PrimitiveType type>
struct PredicatePrimitiveTypeTraits {
using PredicateFieldType = typename PrimitiveTypeTraits<type>::CppType;
Expand Down
50 changes: 18 additions & 32 deletions be/src/vec/columns/column_complex.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,41 +62,30 @@ class ColumnComplexType final : public COWHelper<IColumn, ColumnComplexType<T>>
data.push_back(*reinterpret_cast<const T*>(pos));
}

void insert_many_binary_data(char* data_array, uint32_t* len_array,
uint32_t* start_offset_array, size_t num) override {
void insert_binary_data(const char* pos, size_t length) {
insert_default();
T* pvalue = &get_element(size() - 1);
if (!length) {
*pvalue = *reinterpret_cast<const T*>(pos);
return;
}

if constexpr (std::is_same_v<T, BitmapValue>) {
for (size_t i = 0; i < num; i++) {
uint32_t len = len_array[i];
uint32_t start_offset = start_offset_array[i];
insert_default();
BitmapValue* pvalue = &get_element(size() - 1);
if (len != 0) {
BitmapValue value;
value.deserialize(data_array + start_offset);
*pvalue = std::move(value);
} else {
*pvalue = std::move(*reinterpret_cast<BitmapValue*>(data_array + start_offset));
}
}
pvalue->deserialize(pos);
} else if constexpr (std::is_same_v<T, HyperLogLog>) {
for (size_t i = 0; i < num; i++) {
uint32_t len = len_array[i];
uint32_t start_offset = start_offset_array[i];
insert_default();
HyperLogLog* pvalue = &get_element(size() - 1);
if (len != 0) {
HyperLogLog value;
value.deserialize(Slice(data_array + start_offset, len));
*pvalue = std::move(value);
} else {
*pvalue = std::move(*reinterpret_cast<HyperLogLog*>(data_array + start_offset));
}
}
pvalue->deserialize(Slice(pos, length));
} else {
LOG(FATAL) << "Unexpected type in column complex";
}
}

void insert_many_binary_data(char* data_array, uint32_t* len_array,
uint32_t* start_offset_array, size_t num) override {
for (size_t i = 0; i < num; i++) {
insert_binary_data(data_array + start_offset_array[i], len_array[i]);
}
}

void insert_default() override { data.push_back(T()); }

void insert_many_defaults(size_t length) override {
Expand Down Expand Up @@ -299,10 +288,7 @@ template <typename T>
ColumnPtr ColumnComplexType<T>::permute(const IColumn::Permutation& perm, size_t limit) const {
size_t size = data.size();

if (limit == 0)
limit = size;
else
limit = std::min(size, limit);
limit = limit ? std::min(size, limit) : size;

if (perm.size() < limit) {
LOG(FATAL) << "Size of permutation is less than required.";
Expand Down
13 changes: 9 additions & 4 deletions be/src/vec/exec/volap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,18 +252,23 @@ Status VOlapScanner::_init_return_columns(bool need_seq_col) {
if (!slot->is_materialized()) {
continue;
}
int32_t index = slot->col_unique_id() >= 0
? _tablet_schema.field_index(slot->col_unique_id())
: _tablet_schema.field_index(slot->col_name());

int32_t index = _tablet_schema.field_index(slot->col_unique_id());
if (index < 0) {
// rollup/materialized view should use col_name to find index
index = _tablet_schema.field_index(slot->col_name());
}

if (index < 0) {
std::stringstream ss;
ss << "field name is invalid. field=" << slot->col_name();
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
_return_columns.push_back(index);
if (slot->is_nullable() && !_tablet_schema.column(index).is_nullable())
if (slot->is_nullable() && !_tablet_schema.column(index).is_nullable()) {
_tablet_columns_convert_to_null_set.emplace(index);
}
}

// expand the sequence column
Expand Down
11 changes: 6 additions & 5 deletions fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,10 @@ private RollupJobV2() {
super(JobType.ROLLUP);
}

public RollupJobV2(long jobId, long dbId, long tableId, String tableName, long timeoutMs,
long baseIndexId, long rollupIndexId, String baseIndexName, String rollupIndexName,
List<Column> rollupSchema, int baseSchemaHash, int rollupSchemaHash, KeysType rollupKeysType,
short rollupShortKeyColumnCount, OriginStatement origStmt) {
public RollupJobV2(long jobId, long dbId, long tableId, String tableName, long timeoutMs, long baseIndexId,
long rollupIndexId, String baseIndexName, String rollupIndexName, List<Column> rollupSchema,
int baseSchemaHash, int rollupSchemaHash, KeysType rollupKeysType, short rollupShortKeyColumnCount,
OriginStatement origStmt) {
super(jobId, JobType.ROLLUP, dbId, tableId, tableName, timeoutMs);

this.baseIndexId = baseIndexId;
Expand All @@ -150,6 +150,7 @@ public RollupJobV2(long jobId, long dbId, long tableId, String tableName, long t
this.rollupIndexName = rollupIndexName;

this.rollupSchema = rollupSchema;

this.baseSchemaHash = baseSchemaHash;
this.rollupSchemaHash = rollupSchemaHash;
this.rollupKeysType = rollupKeysType;
Expand Down Expand Up @@ -376,8 +377,8 @@ protected void runWaitingTxnJob() throws AlterCancelException {

List<Column> fullSchema = tbl.getBaseSchema(true);
DescriptorTable descTable = new DescriptorTable();
TupleDescriptor destTupleDesc = descTable.createTupleDescriptor();
for (Column column : fullSchema) {
TupleDescriptor destTupleDesc = descTable.createTupleDescriptor();
SlotDescriptor destSlotDesc = descTable.addSlotDescriptor(destTupleDesc);
destSlotDesc.setIsMaterialized(true);
destSlotDesc.setColumn(column);
Expand Down
Loading