Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ Status Compaction::modify_rowsets(const Merger::Statistics* stats) {
it.rowset_ids.insert(_output_rowset->rowset_id());
StorageEngine::instance()->txn_manager()->set_txn_related_delete_bitmap(
it.partition_id, it.transaction_id, _tablet->tablet_id(),
_tablet->tablet_uid(), true, it.delete_bitmap, it.rowset_ids);
_tablet->tablet_uid(), true, it.delete_bitmap, it.rowset_ids, nullptr);
}
}

Expand Down
1 change: 1 addition & 0 deletions be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ Status DeltaWriter::init() {
RETURN_IF_ERROR(_rowset_builder.init());
RETURN_IF_ERROR(
_memtable_writer->init(_rowset_builder.rowset_writer(), _rowset_builder.tablet_schema(),
_rowset_builder.get_partial_update_info(),
_rowset_builder.tablet()->enable_unique_key_merge_on_write()));
ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer);
_is_init = true;
Expand Down
10 changes: 7 additions & 3 deletions be/src/olap/delta_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
#include "olap/schema_change.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "olap/tablet_schema.h"
#include "runtime/exec_env.h"
#include "service/backend_options.h"
#include "util/brpc_client_cache.h"
Expand Down Expand Up @@ -121,10 +122,11 @@ Status DeltaWriterV2::init() {
context.rowset_type = RowsetTypePB::BETA_ROWSET;
context.rowset_id = StorageEngine::instance()->next_rowset_id();
context.data_dir = nullptr;
context.partial_update_info = _partial_update_info;

_rowset_writer = std::make_shared<BetaRowsetWriterV2>(_streams);
RETURN_IF_ERROR(_rowset_writer->init(context));
RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema,
RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema, _partial_update_info,
_streams[0]->enable_unique_mow(_req.index_id)));
ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer);
_is_init = true;
Expand Down Expand Up @@ -221,8 +223,10 @@ void DeltaWriterV2::_build_current_tablet_schema(int64_t index_id,

_tablet_schema->set_table_id(table_schema_param->table_id());
// set partial update columns info
_tablet_schema->set_partial_update_info(table_schema_param->is_partial_update(),
table_schema_param->partial_update_input_columns());
_partial_update_info = std::make_shared<PartialUpdateInfo>();
_partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(),
table_schema_param->partial_update_input_columns(),
table_schema_param->is_strict_mode());
}

} // namespace doris
3 changes: 3 additions & 0 deletions be/src/olap/delta_writer_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "olap/delta_writer_context.h"
#include "olap/memtable_writer.h"
#include "olap/olap_common.h"
#include "olap/partial_update_info.h"
#include "olap/rowset/rowset.h"
#include "olap/tablet.h"
#include "olap/tablet_meta.h"
Expand Down Expand Up @@ -126,6 +127,8 @@ class DeltaWriterV2 {
MonotonicStopWatch _lock_watch;

std::vector<std::shared_ptr<LoadStreamStub>> _streams;

std::shared_ptr<PartialUpdateInfo> _partial_update_info;
};

} // namespace doris
20 changes: 9 additions & 11 deletions be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ using namespace ErrorCode;

MemTable::MemTable(int64_t tablet_id, const TabletSchema* tablet_schema,
const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc,
bool enable_unique_key_mow,
bool enable_unique_key_mow, PartialUpdateInfo* partial_update_info,
const std::shared_ptr<MemTracker>& insert_mem_tracker,
const std::shared_ptr<MemTracker>& flush_mem_tracker)
: _tablet_id(tablet_id),
Expand Down Expand Up @@ -77,8 +77,11 @@ MemTable::MemTable(int64_t tablet_id, const TabletSchema* tablet_schema,
// TODO: Support ZOrderComparator in the future
_init_columns_offset_by_slot_descs(slot_descs, tuple_desc);
_num_columns = _tablet_schema->num_columns();
if (_tablet_schema->is_partial_update()) {
_num_columns = _tablet_schema->partial_input_column_size();
if (partial_update_info != nullptr) {
_is_partial_update = partial_update_info->is_partial_update;
if (_is_partial_update) {
_num_columns = partial_update_info->partial_update_input_columns.size();
}
}
}
void MemTable::_init_columns_offset_by_slot_descs(const std::vector<SlotDescriptor*>* slot_descs,
Expand Down Expand Up @@ -178,7 +181,7 @@ void MemTable::insert(const vectorized::Block* input_block, const std::vector<in
_init_agg_functions(&target_block);
}
if (_tablet_schema->has_sequence_col()) {
if (_tablet_schema->is_partial_update()) {
if (_is_partial_update) {
// for unique key partial update, sequence column index in block
// may be different with the index in `_tablet_schema`
for (size_t i = 0; i < cloneBlock.columns(); i++) {
Expand Down Expand Up @@ -417,8 +420,8 @@ void MemTable::shrink_memtable_by_agg() {

bool MemTable::need_flush() const {
auto max_size = config::write_buffer_size;
if (_tablet_schema->is_partial_update()) {
auto update_columns_size = _tablet_schema->partial_input_column_size();
if (_is_partial_update) {
auto update_columns_size = _num_columns;
max_size = max_size * update_columns_size / _tablet_schema->num_columns();
max_size = max_size > 1048576 ? max_size : 1048576;
}
Expand All @@ -428,11 +431,6 @@ bool MemTable::need_flush() const {
bool MemTable::need_agg() const {
if (_keys_type == KeysType::AGG_KEYS) {
auto max_size = config::write_buffer_size_for_agg;
if (_tablet_schema->is_partial_update()) {
auto update_columns_size = _tablet_schema->partial_input_column_size();
max_size = max_size * update_columns_size / _tablet_schema->num_columns();
max_size = max_size > 1048576 ? max_size : 1048576;
}
return memory_usage() >= max_size;
}
return false;
Expand Down
6 changes: 5 additions & 1 deletion be/src/olap/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
#include "common/status.h"
#include "gutil/integral_types.h"
#include "olap/olap_common.h"
#include "olap/partial_update_info.h"
#include "olap/tablet_schema.h"
#include "runtime/memory/mem_tracker.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/common/arena.h"
Expand Down Expand Up @@ -167,7 +169,8 @@ class MemTable {
public:
MemTable(int64_t tablet_id, const TabletSchema* tablet_schema,
const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc,
bool enable_unique_key_mow, const std::shared_ptr<MemTracker>& insert_mem_tracker,
bool enable_unique_key_mow, PartialUpdateInfo* partial_update_info,
const std::shared_ptr<MemTracker>& insert_mem_tracker,
const std::shared_ptr<MemTracker>& flush_mem_tracker);
~MemTable();

Expand Down Expand Up @@ -202,6 +205,7 @@ class MemTable {
private:
int64_t _tablet_id;
bool _enable_unique_key_mow = false;
bool _is_partial_update = false;
const KeysType _keys_type;
const TabletSchema* _tablet_schema;

Expand Down
10 changes: 7 additions & 3 deletions be/src/olap/memtable_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "olap/rowset/rowset_writer.h"
#include "olap/schema_change.h"
#include "olap/storage_engine.h"
#include "olap/tablet_schema.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker.h"
#include "service/backend_options.h"
Expand All @@ -63,10 +64,13 @@ MemTableWriter::~MemTableWriter() {
}

Status MemTableWriter::init(std::shared_ptr<RowsetWriter> rowset_writer,
TabletSchemaSPtr tablet_schema, bool unique_key_mow) {
TabletSchemaSPtr tablet_schema,
std::shared_ptr<PartialUpdateInfo> partial_update_info,
bool unique_key_mow) {
_rowset_writer = rowset_writer;
_tablet_schema = tablet_schema;
_unique_key_mow = unique_key_mow;
_partial_update_info = partial_update_info;

_reset_mem_table();

Expand Down Expand Up @@ -195,8 +199,8 @@ void MemTableWriter::_reset_mem_table() {
_mem_table_flush_trackers.push_back(mem_table_flush_tracker);
}
_mem_table.reset(new MemTable(_req.tablet_id, _tablet_schema.get(), _req.slots, _req.tuple_desc,
_unique_key_mow, mem_table_insert_tracker,
mem_table_flush_tracker));
_unique_key_mow, _partial_update_info.get(),
mem_table_insert_tracker, mem_table_flush_tracker));

_segment_num++;
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/memtable_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "olap/delta_writer_context.h"
#include "olap/memtable.h"
#include "olap/olap_common.h"
#include "olap/partial_update_info.h"
#include "olap/rowset/rowset.h"
#include "olap/tablet.h"
#include "olap/tablet_meta.h"
Expand Down Expand Up @@ -67,6 +68,7 @@ class MemTableWriter {
~MemTableWriter();

Status init(std::shared_ptr<RowsetWriter> rowset_writer, TabletSchemaSPtr tablet_schema,
std::shared_ptr<PartialUpdateInfo> partial_update_info,
bool unique_key_mow = false);

Status write(const vectorized::Block* block, const std::vector<int>& row_idxs,
Expand Down Expand Up @@ -141,6 +143,8 @@ class MemTableWriter {
int64_t _segment_num = 0;

MonotonicStopWatch _lock_watch;

std::shared_ptr<PartialUpdateInfo> _partial_update_info;
};

} // namespace doris
54 changes: 54 additions & 0 deletions be/src/olap/partial_update_info.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "olap/tablet_schema.h"

namespace doris {

struct PartialUpdateInfo {
void init(const TabletSchema& tablet_schema, bool partial_update,
const std::set<string>& partial_update_cols, bool is_strict_mode) {
is_partial_update = partial_update;
partial_update_input_columns = partial_update_cols;
missing_cids.clear();
update_cids.clear();
for (auto i = 0; i < tablet_schema.num_columns(); ++i) {
auto tablet_column = tablet_schema.column(i);
if (!partial_update_input_columns.contains(tablet_column.name())) {
missing_cids.emplace_back(i);
if (!tablet_column.has_default_value() && !tablet_column.is_nullable()) {
can_insert_new_rows_in_partial_update = false;
}
} else {
update_cids.emplace_back(i);
}
}
this->is_strict_mode = is_strict_mode;
}

bool is_partial_update {false};
std::set<std::string> partial_update_input_columns;
std::vector<uint32_t> missing_cids;
std::vector<uint32_t> update_cids;
// if key not exist in old rowset, use default value or null value for the unmentioned cols
// to generate a new row, only available in non-strict mode
bool can_insert_new_rows_in_partial_update {true};
bool is_strict_mode {false};
};
} // namespace doris
2 changes: 1 addition & 1 deletion be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) {
Status BetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
SCOPED_RAW_TIMER(&_delete_bitmap_ns);
if (!_context.tablet->enable_unique_key_merge_on_write() ||
_context.tablet_schema->is_partial_update()) {
(_context.partial_update_info && _context.partial_update_info->is_partial_update)) {
return Status::OK();
}
auto rowset = _build_tmp();
Expand Down
8 changes: 8 additions & 0 deletions be/src/olap/rowset/beta_rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,14 @@ class BetaRowsetWriter : public RowsetWriter {

int64_t segment_writer_ns() override { return _segment_writer_ns; }

std::shared_ptr<PartialUpdateInfo> get_partial_update_info() override {
return _context.partial_update_info;
}

bool is_partial_update() override {
return _context.partial_update_info && _context.partial_update_info->is_partial_update;
}

private:
Status _create_file_writer(std::string path, io::FileWriterPtr& file_writer);
Status _check_segment_number_limit();
Expand Down
8 changes: 8 additions & 0 deletions be/src/olap/rowset/beta_rowset_writer_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,14 @@ class BetaRowsetWriterV2 : public RowsetWriter {

int64_t segment_writer_ns() override { return _segment_writer_ns; }

std::shared_ptr<PartialUpdateInfo> get_partial_update_info() override {
return _context.partial_update_info;
}

bool is_partial_update() override {
return _context.partial_update_info && _context.partial_update_info->is_partial_update;
}

private:
RowsetWriterContext _context;

Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/rowset/rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ class RowsetWriter {

virtual int64_t segment_writer_ns() { return 0; }

virtual std::shared_ptr<PartialUpdateInfo> get_partial_update_info() = 0;

virtual bool is_partial_update() = 0;

private:
DISALLOW_COPY_AND_ASSIGN(RowsetWriter);
};
Expand Down
5 changes: 5 additions & 0 deletions be/src/olap/rowset/rowset_writer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "io/fs/file_system.h"
#include "olap/olap_define.h"
#include "olap/partial_update_info.h"
#include "olap/tablet.h"
#include "olap/tablet_schema.h"

Expand Down Expand Up @@ -105,6 +106,10 @@ struct RowsetWriterContext {

// segcompaction for this RowsetWriter, disable it for some transient writers
bool enable_segcompaction = true;

std::shared_ptr<PartialUpdateInfo> partial_update_info;

bool is_transient_rowset_writer = false;
};

} // namespace doris
20 changes: 12 additions & 8 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,9 +333,10 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
}
DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write);

DCHECK(_opts.rowset_ctx->partial_update_info);
// find missing column cids
std::vector<uint32_t> missing_cids = _tablet_schema->get_missing_cids();
std::vector<uint32_t> including_cids = _tablet_schema->get_update_cids();
std::vector<uint32_t> missing_cids = _opts.rowset_ctx->partial_update_info->missing_cids;
std::vector<uint32_t> including_cids = _opts.rowset_ctx->partial_update_info->update_cids;

// create full block and fill with input columns
auto full_block = _tablet_schema->create_block();
Expand Down Expand Up @@ -421,15 +422,15 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
auto st = _tablet->lookup_row_key(key, have_input_seq_column, specified_rowsets, &loc,
_mow_context->max_version, segment_caches, &rowset);
if (st.is<KEY_NOT_FOUND>()) {
if (_tablet_schema->is_strict_mode()) {
if (_opts.rowset_ctx->partial_update_info->is_strict_mode) {
++num_rows_filtered;
// delete the invalid newly inserted row
_mow_context->delete_bitmap->add({_opts.rowset_ctx->rowset_id, _segment_id,
DeleteBitmap::TEMP_VERSION_COMMON},
segment_pos);
}

if (!_tablet_schema->can_insert_new_rows_in_partial_update()) {
if (!_opts.rowset_ctx->partial_update_info->can_insert_new_rows_in_partial_update) {
return Status::InternalError(
"the unmentioned columns should have default value or be nullable for "
"newly inserted rows in non-strict mode partial update");
Expand Down Expand Up @@ -492,7 +493,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
}

// convert missing columns and send to column writer
auto cids_missing = _tablet_schema->get_missing_cids();
auto cids_missing = _opts.rowset_ctx->partial_update_info->missing_cids;
_olap_data_convertor->set_source_content_with_specifid_columns(&full_block, row_pos, num_rows,
cids_missing);
for (auto cid : cids_missing) {
Expand Down Expand Up @@ -545,8 +546,8 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f
bool has_default_or_nullable,
const size_t& segment_start_pos) {
// create old value columns
auto old_value_block = _tablet_schema->create_missing_columns_block();
std::vector<uint32_t> cids_missing = _tablet_schema->get_missing_cids();
std::vector<uint32_t> cids_missing = _opts.rowset_ctx->partial_update_info->missing_cids;
auto old_value_block = _tablet_schema->create_block_by_cids(cids_missing);
CHECK(cids_missing.size() == old_value_block.columns());
auto mutable_old_columns = old_value_block.mutate_columns();
bool has_row_column = _tablet_schema->store_row_column();
Expand Down Expand Up @@ -652,7 +653,10 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f

Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_pos,
size_t num_rows) {
if (_tablet_schema->is_partial_update() && _opts.write_type == DataWriteType::TYPE_DIRECT) {
if (_opts.rowset_ctx->partial_update_info &&
_opts.rowset_ctx->partial_update_info->is_partial_update &&
_opts.write_type == DataWriteType::TYPE_DIRECT &&
!_opts.rowset_ctx->is_transient_rowset_writer) {
RETURN_IF_ERROR(append_block_with_partial_content(block, row_pos, num_rows));
return Status::OK();
}
Expand Down
Loading