Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ class BaseTablet : public std::enable_shared_from_this<BaseTablet> {
int64_t tablet_id() const;
int32_t schema_hash() const;
int16_t shard_id();
const int64_t creation_time() const;
void set_creation_time(int64_t creation_time);
bool equal(int64_t tablet_id, int32_t schema_hash);

// properties encapsulated in TabletSchema
Expand Down Expand Up @@ -133,14 +131,6 @@ inline int16_t BaseTablet::shard_id() {
return _tablet_meta->shard_id();
}

inline const int64_t BaseTablet::creation_time() const {
return _tablet_meta->creation_time();
}

inline void BaseTablet::set_creation_time(int64_t creation_time) {
_tablet_meta->set_creation_time(creation_time);
}

inline bool BaseTablet::equal(int64_t id, int32_t hash) {
return (tablet_id() == id) && (schema_hash() == hash);
}
Expand Down
22 changes: 2 additions & 20 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#include "olap/compaction.h"

#include "gutil/strings/substitute.h"
#include "olap/rowset/rowset_factory.h"
#include "util/time.h"
#include "util/trace.h"

Expand Down Expand Up @@ -157,25 +156,8 @@ Status Compaction::do_compaction_impl(int64_t permits) {
}

Status Compaction::construct_output_rowset_writer() {
RowsetWriterContext context;
context.rowset_id = StorageEngine::instance()->next_rowset_id();
context.tablet_uid = _tablet->tablet_uid();
context.tablet_id = _tablet->tablet_id();
context.partition_id = _tablet->partition_id();
context.tablet_schema_hash = _tablet->schema_hash();
context.data_dir = _tablet->data_dir();
context.rowset_type = StorageEngine::instance()->default_rowset_type();
if (_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
context.rowset_type = BETA_ROWSET;
}
context.path_desc = _tablet->tablet_path_desc();
context.tablet_schema = &(_tablet->tablet_schema());
context.rowset_state = VISIBLE;
context.version = _output_version;
context.segments_overlap = NONOVERLAPPING;
// The test results show that one rs writer is low-memory-footprint, there is no need to tracker its mem pool
RETURN_NOT_OK(RowsetFactory::create_rowset_writer(context, &_output_rs_writer));
return Status::OK();
return _tablet->create_rowset_writer(_output_version, VISIBLE, NONOVERLAPPING,
&_output_rs_writer);
}

Status Compaction::construct_input_rowset_readers() {
Expand Down
6 changes: 2 additions & 4 deletions be/src/olap/data_dir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
#include "olap/file_helper.h"
#include "olap/olap_define.h"
#include "olap/rowset/alpha_rowset_meta.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_meta_manager.h"
#include "olap/storage_engine.h"
#include "olap/tablet_meta_manager.h"
Expand Down Expand Up @@ -469,8 +468,7 @@ Status DataDir::load() {
continue;
}
RowsetSharedPtr rowset;
Status create_status = RowsetFactory::create_rowset(
&tablet->tablet_schema(), tablet->tablet_path_desc(), rowset_meta, &rowset);
Status create_status = tablet->create_rowset(rowset_meta, &rowset);
if (!create_status) {
LOG(WARNING) << "could not create rowset from rowsetmeta: "
<< " rowset_id: " << rowset_meta->rowset_id()
Expand Down Expand Up @@ -498,7 +496,7 @@ Status DataDir::load() {
}
} else if (rowset_meta->rowset_state() == RowsetStatePB::VISIBLE &&
rowset_meta->tablet_uid() == tablet->tablet_uid()) {
Status publish_status = tablet->add_rowset(rowset, false);
Status publish_status = tablet->add_rowset(rowset);
if (!publish_status &&
publish_status.precise_code() != OLAP_ERR_PUSH_VERSION_ALREADY_EXIST) {
LOG(WARNING) << "add visible rowset to tablet failed rowset_id:"
Expand Down
25 changes: 3 additions & 22 deletions be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include "olap/data_dir.h"
#include "olap/memtable.h"
#include "olap/memtable_flush_executor.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/schema.h"
#include "olap/schema_change.h"
#include "olap/storage_engine.h"
Expand Down Expand Up @@ -115,33 +114,15 @@ Status DeltaWriter::init() {
_req.txn_id, _req.load_id));
}

RowsetWriterContext writer_context;
writer_context.rowset_id = _storage_engine->next_rowset_id();
writer_context.tablet_uid = _tablet->tablet_uid();
writer_context.tablet_id = _req.tablet_id;
writer_context.partition_id = _req.partition_id;
writer_context.tablet_schema_hash = _req.schema_hash;
if (_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
writer_context.rowset_type = BETA_ROWSET;
} else {
writer_context.rowset_type = ALPHA_ROWSET;
}
writer_context.path_desc = _tablet->tablet_path_desc();
writer_context.tablet_schema = &(_tablet->tablet_schema());
writer_context.rowset_state = PREPARED;
writer_context.txn_id = _req.txn_id;
writer_context.load_id = _req.load_id;
writer_context.segments_overlap = OVERLAPPING;
writer_context.data_dir = _tablet->data_dir();
RETURN_NOT_OK(RowsetFactory::create_rowset_writer(writer_context, &_rowset_writer));

RETURN_NOT_OK(_tablet->create_rowset_writer(_req.txn_id, _req.load_id, PREPARED, OVERLAPPING,
&_rowset_writer));
_tablet_schema = &(_tablet->tablet_schema());
_schema.reset(new Schema(*_tablet_schema));
_reset_mem_table();

// create flush handler
RETURN_NOT_OK(_storage_engine->memtable_flush_executor()->create_flush_token(
&_flush_token, writer_context.rowset_type, _req.is_high_priority));
&_flush_token, _rowset_writer->type(), _req.is_high_priority));

_is_init = true;
return Status::OK();
Expand Down
47 changes: 4 additions & 43 deletions be/src/olap/push_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include "common/status.h"
#include "exec/parquet_scanner.h"
#include "olap/row.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_id_generator.h"
#include "olap/rowset/rowset_meta_manager.h"
#include "olap/schema_change.h"
Expand Down Expand Up @@ -221,29 +220,12 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, TabletSharedPtr new_
// 1. init RowsetBuilder of cur_tablet for current push
VLOG_NOTICE << "init rowset builder. tablet=" << cur_tablet->full_name()
<< ", block_row_size=" << cur_tablet->num_rows_per_row_block();
RowsetWriterContext context;
context.rowset_id = StorageEngine::instance()->next_rowset_id();
context.tablet_uid = cur_tablet->tablet_uid();
context.tablet_id = cur_tablet->tablet_id();
context.partition_id = _request.partition_id;
context.tablet_schema_hash = cur_tablet->schema_hash();
context.data_dir = cur_tablet->data_dir();
context.rowset_type = StorageEngine::instance()->default_rowset_type();
if (cur_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
context.rowset_type = BETA_ROWSET;
}
context.path_desc = cur_tablet->tablet_path_desc();
context.tablet_schema = &(cur_tablet->tablet_schema());
context.rowset_state = PREPARED;
context.txn_id = _request.transaction_id;
context.load_id = load_id;
// although the spark load output files are fully sorted,
// but it depends on thirparty implementation, so we conservatively
// set this value to OVERLAP_UNKNOWN
context.segments_overlap = OVERLAP_UNKNOWN;

std::unique_ptr<RowsetWriter> rowset_writer;
res = RowsetFactory::create_rowset_writer(context, &rowset_writer);
res = cur_tablet->create_rowset_writer(_request.transaction_id, load_id, PREPARED,
OVERLAP_UNKNOWN, &rowset_writer);
if (!res.ok()) {
LOG(WARNING) << "failed to init rowset writer, tablet=" << cur_tablet->full_name()
<< ", txn_id=" << _request.transaction_id << ", res=" << res;
Expand Down Expand Up @@ -407,30 +389,9 @@ Status PushHandler::_convert(TabletSharedPtr cur_tablet, TabletSharedPtr new_tab
}

// 2. init RowsetBuilder of cur_tablet for current push
VLOG_NOTICE << "init RowsetBuilder.";
RowsetWriterContext context;
context.rowset_id = StorageEngine::instance()->next_rowset_id();
context.tablet_uid = cur_tablet->tablet_uid();
context.tablet_id = cur_tablet->tablet_id();
context.partition_id = _request.partition_id;
context.tablet_schema_hash = cur_tablet->schema_hash();
context.data_dir = cur_tablet->data_dir();
context.rowset_type = StorageEngine::instance()->default_rowset_type();
if (cur_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
context.rowset_type = BETA_ROWSET;
}
context.path_desc = cur_tablet->tablet_path_desc();
context.tablet_schema = &(cur_tablet->tablet_schema());
context.rowset_state = PREPARED;
context.txn_id = _request.transaction_id;
context.load_id = load_id;
// although the hadoop load output files are fully sorted,
// but it depends on thirparty implementation, so we conservatively
// set this value to OVERLAP_UNKNOWN
context.segments_overlap = OVERLAP_UNKNOWN;

std::unique_ptr<RowsetWriter> rowset_writer;
res = RowsetFactory::create_rowset_writer(context, &rowset_writer);
res = cur_tablet->create_rowset_writer(_request.transaction_id, load_id, PREPARED,
OVERLAP_UNKNOWN, &rowset_writer);
if (!res.ok()) {
LOG(WARNING) << "failed to init rowset writer, tablet=" << cur_tablet->full_name()
<< ", txn_id=" << _request.transaction_id << ", res=" << res;
Expand Down
88 changes: 15 additions & 73 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
#include "olap/row.h"
#include "olap/row_block.h"
#include "olap/row_cursor.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_id_generator.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
Expand Down Expand Up @@ -1160,8 +1159,6 @@ Status SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
reset_merged_rows();
reset_filtered_rows();

bool use_beta_rowset = new_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET;

SegmentsOverlapPB segments_overlap = rowset->rowset_meta()->segments_overlap();
RowBlock* ref_row_block = nullptr;
rowset_reader->next_block(&ref_row_block);
Expand Down Expand Up @@ -1191,14 +1188,10 @@ Status SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,

// enter here while memory limitation is reached.
RowsetSharedPtr rowset;
RowsetTypePB new_rowset_type = rowset_reader->rowset()->rowset_meta()->rowset_type();
if (use_beta_rowset) {
new_rowset_type = BETA_ROWSET;
}
if (!_internal_sorting(
row_block_arr,
Version(_temp_delta_versions.second, _temp_delta_versions.second),
new_tablet, new_rowset_type, segments_overlap, &rowset)) {
new_tablet, segments_overlap, &rowset)) {
LOG(WARNING) << "failed to sorting internally.";
return Status::OLAPInternalError(OLAP_ERR_ALTER_STATUS_ERR);
}
Expand Down Expand Up @@ -1247,13 +1240,9 @@ Status SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
// enter here while memory limitation is reached.
RowsetSharedPtr rowset = nullptr;

RowsetTypePB new_rowset_type = rowset_reader->rowset()->rowset_meta()->rowset_type();
if (use_beta_rowset) {
new_rowset_type = BETA_ROWSET;
}
if (!_internal_sorting(row_block_arr,
Version(_temp_delta_versions.second, _temp_delta_versions.second),
new_tablet, new_rowset_type, segments_overlap, &rowset)) {
new_tablet, segments_overlap, &rowset)) {
LOG(WARNING) << "failed to sorting internally.";
return Status::OLAPInternalError(OLAP_ERR_ALTER_STATUS_ERR);
}
Expand Down Expand Up @@ -1305,31 +1294,16 @@ Status SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,

bool SchemaChangeWithSorting::_internal_sorting(const std::vector<RowBlock*>& row_block_arr,
const Version& version, TabletSharedPtr new_tablet,
RowsetTypePB new_rowset_type,
SegmentsOverlapPB segments_overlap,
RowsetSharedPtr* rowset) {
uint64_t merged_rows = 0;
RowBlockMerger merger(new_tablet);

RowsetWriterContext context;
context.rowset_id = StorageEngine::instance()->next_rowset_id();
context.tablet_uid = new_tablet->tablet_uid();
context.tablet_id = new_tablet->tablet_id();
context.partition_id = new_tablet->partition_id();
context.tablet_schema_hash = new_tablet->schema_hash();
context.rowset_type = new_rowset_type;
context.path_desc = new_tablet->tablet_path_desc();
context.tablet_schema = &(new_tablet->tablet_schema());
context.data_dir = new_tablet->data_dir();
context.rowset_state = VISIBLE;
context.version = version;
context.segments_overlap = segments_overlap;

VLOG_NOTICE << "init rowset builder. tablet=" << new_tablet->full_name()
<< ", block_row_size=" << new_tablet->num_rows_per_row_block();

std::unique_ptr<RowsetWriter> rowset_writer;
if (RowsetFactory::create_rowset_writer(context, &rowset_writer) != Status::OK()) {
if (!new_tablet->create_rowset_writer(version, VISIBLE, segments_overlap, &rowset_writer)) {
return false;
}

Expand Down Expand Up @@ -1699,28 +1673,13 @@ Status SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tablet,
RowsetReaderSharedPtr rowset_reader;
RETURN_NOT_OK((*base_rowset)->create_reader(&rowset_reader));
RETURN_NOT_OK(rowset_reader->init(&reader_context));

RowsetWriterContext writer_context;
writer_context.rowset_id = StorageEngine::instance()->next_rowset_id();
writer_context.tablet_uid = new_tablet->tablet_uid();
writer_context.tablet_id = new_tablet->tablet_id();
writer_context.partition_id = (*base_rowset)->partition_id();
writer_context.tablet_schema_hash = new_tablet->schema_hash();
writer_context.data_dir = new_tablet->data_dir();
writer_context.rowset_type = (*base_rowset)->rowset_meta()->rowset_type();
if (new_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
writer_context.rowset_type = BETA_ROWSET;
}
writer_context.path_desc = new_tablet->tablet_path_desc();
writer_context.tablet_schema = &(new_tablet->tablet_schema());
writer_context.rowset_state = PREPARED;
writer_context.txn_id = (*base_rowset)->txn_id();
writer_context.load_id.set_hi((*base_rowset)->load_id().hi());
writer_context.load_id.set_lo((*base_rowset)->load_id().lo());
writer_context.segments_overlap = (*base_rowset)->rowset_meta()->segments_overlap();

PUniqueId load_id;
load_id.set_hi((*base_rowset)->load_id().hi());
load_id.set_lo((*base_rowset)->load_id().lo());
std::unique_ptr<RowsetWriter> rowset_writer;
RowsetFactory::create_rowset_writer(writer_context, &rowset_writer);
RETURN_NOT_OK(new_tablet->create_rowset_writer(
(*base_rowset)->txn_id(), load_id, PREPARED,
(*base_rowset)->rowset_meta()->segments_overlap(), &rowset_writer));

if ((res = sc_procedure->process(rowset_reader, rowset_writer.get(), new_tablet,
base_tablet)) != Status::OK()) {
Expand Down Expand Up @@ -1842,29 +1801,12 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
// As long as there is a new_table as running, ref table is set as running
// NOTE If the first sub_table fails first, it will continue to go as normal here
TabletSharedPtr new_tablet = sc_params.new_tablet;

RowsetWriterContext writer_context;
writer_context.rowset_id = StorageEngine::instance()->next_rowset_id();
writer_context.tablet_uid = new_tablet->tablet_uid();
writer_context.tablet_id = new_tablet->tablet_id();
writer_context.partition_id = new_tablet->partition_id();
writer_context.tablet_schema_hash = new_tablet->schema_hash();
writer_context.data_dir = new_tablet->data_dir();
// linked schema change can't change rowset type, therefore we preserve rowset type in schema change now
writer_context.rowset_type = rs_reader->rowset()->rowset_meta()->rowset_type();
if (sc_params.new_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
// Use beta rowset to do schema change
// And in this case, linked schema change will not be used.
writer_context.rowset_type = BETA_ROWSET;
}
writer_context.path_desc = new_tablet->tablet_path_desc();
writer_context.tablet_schema = &(new_tablet->tablet_schema());
writer_context.rowset_state = VISIBLE;
writer_context.version = rs_reader->version();
writer_context.segments_overlap = rs_reader->rowset()->rowset_meta()->segments_overlap();

// When tablet create new rowset writer, it may change rowset type, in this case
// linked schema change will not be used.
std::unique_ptr<RowsetWriter> rowset_writer;
Status status = RowsetFactory::create_rowset_writer(writer_context, &rowset_writer);
Status status = new_tablet->create_rowset_writer(
rs_reader->version(), VISIBLE,
rs_reader->rowset()->rowset_meta()->segments_overlap(), &rowset_writer);
if (!status.ok()) {
res = Status::OLAPInternalError(OLAP_ERR_ROWSET_BUILDER_INIT);
goto PROCESS_ALTER_EXIT;
Expand All @@ -1889,7 +1831,7 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
LOG(WARNING) << "failed to build rowset, exit alter process";
goto PROCESS_ALTER_EXIT;
}
res = sc_params.new_tablet->add_rowset(new_rowset, false);
res = sc_params.new_tablet->add_rowset(new_rowset);
if (res.precise_code() == OLAP_ERR_PUSH_VERSION_ALREADY_EXIST) {
LOG(WARNING) << "version already exist, version revert occurred. "
<< "tablet=" << sc_params.new_tablet->full_name() << ", version='"
Expand Down
3 changes: 1 addition & 2 deletions be/src/olap/schema_change.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,7 @@ class SchemaChangeWithSorting : public SchemaChange {
private:
bool _internal_sorting(const std::vector<RowBlock*>& row_block_arr,
const Version& temp_delta_versions, TabletSharedPtr new_tablet,
RowsetTypePB new_rowset_type, SegmentsOverlapPB segments_overlap,
RowsetSharedPtr* rowset);
SegmentsOverlapPB segments_overlap, RowsetSharedPtr* rowset);

bool _external_sorting(std::vector<RowsetSharedPtr>& src_rowsets, RowsetWriter* rowset_writer,
TabletSharedPtr new_tablet);
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/storage_migration_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ Status StorageMigrationV2Handler::_convert_historical_rowsets(
LOG(WARNING) << "failed to build rowset, exit alter process";
goto PROCESS_ALTER_EXIT;
}
res = sm_params.new_tablet->add_rowset(new_rowset, false);
res = sm_params.new_tablet->add_rowset(new_rowset);
if (res.precise_code() == OLAP_ERR_PUSH_VERSION_ALREADY_EXIST) {
LOG(WARNING) << "version already exist, version revert occurred. "
<< "tablet=" << sm_params.new_tablet->full_name() << ", version='"
Expand Down
Loading