Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 44 additions & 110 deletions be/src/olap/push_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,15 @@ Status PushHandler::process_streaming_ingestion(TabletSharedPtr tablet, const TP

DescriptorTbl::create(&_pool, _request.desc_tbl, &_desc_tbl);

std::vector<TabletVars> tablet_vars(1);
tablet_vars[0].tablet = tablet;
res = _do_streaming_ingestion(tablet, request, push_type, &tablet_vars, tablet_info_vec);
res = _do_streaming_ingestion(tablet, request, push_type, tablet_info_vec);

if (res.ok()) {
if (tablet_info_vec != nullptr) {
_get_tablet_infos(tablet_vars, tablet_info_vec);
TTabletInfo tablet_info;
tablet_info.tablet_id = tablet->tablet_id();
tablet_info.schema_hash = tablet->schema_hash();
StorageEngine::instance()->tablet_manager()->report_tablet_info(&tablet_info);
tablet_info_vec->push_back(tablet_info);
}
LOG(INFO) << "process realtime push successfully. "
<< "tablet=" << tablet->full_name() << ", partition_id=" << request.partition_id
Expand All @@ -78,7 +80,6 @@ Status PushHandler::process_streaming_ingestion(TabletSharedPtr tablet, const TP

Status PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushReq& request,
PushType push_type,
std::vector<TabletVars>* tablet_vars,
std::vector<TTabletInfo>* tablet_info_vec) {
// add transaction in engine, then check sc status
// lock, prevent sc handler checking transaction concurrently
Expand All @@ -99,128 +100,86 @@ Status PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushR
request.partition_id, tablet, request.transaction_id, load_id));
}

if (tablet_vars->size() == 1) {
tablet_vars->resize(2);
}

// not call validate request here, because realtime load does not
// contain version info

Status res;
// check delete condition if push for delete
std::queue<DeletePredicatePB> del_preds;
if (push_type == PUSH_FOR_DELETE) {
for (TabletVars& tablet_var : *tablet_vars) {
if (tablet_var.tablet == nullptr) {
continue;
}

DeletePredicatePB del_pred;
TabletSchema tablet_schema;
tablet_schema.copy_from(*tablet_var.tablet->tablet_schema());
if (!request.columns_desc.empty() && request.columns_desc[0].col_unique_id >= 0) {
tablet_schema.clear_columns();
for (const auto& column_desc : request.columns_desc) {
tablet_schema.append_column(TabletColumn(column_desc));
}
}
res = DeleteHandler::generate_delete_predicate(tablet_schema, request.delete_conditions,
&del_pred);
del_preds.push(del_pred);
if (!res.ok()) {
LOG(WARNING) << "fail to generate delete condition. res=" << res
<< ", tablet=" << tablet_var.tablet->full_name();
return res;
DeletePredicatePB del_pred;
TabletSchema tablet_schema;
tablet_schema.copy_from(*tablet->tablet_schema());
if (!request.columns_desc.empty() && request.columns_desc[0].col_unique_id >= 0) {
tablet_schema.clear_columns();
for (const auto& column_desc : request.columns_desc) {
tablet_schema.append_column(TabletColumn(column_desc));
}
}
res = DeleteHandler::generate_delete_predicate(tablet_schema, request.delete_conditions,
&del_pred);
del_preds.push(del_pred);
if (!res.ok()) {
LOG(WARNING) << "fail to generate delete condition. res=" << res
<< ", tablet=" << tablet->full_name();
return res;
}
}

// check if version number exceed limit
if (tablet_vars->at(0).tablet->version_count() > config::max_tablet_version_num) {
LOG(WARNING) << "failed to push data. version count: "
<< tablet_vars->at(0).tablet->version_count()
if (tablet->version_count() > config::max_tablet_version_num) {
LOG(WARNING) << "failed to push data. version count: " << tablet->version_count()
<< ", exceed limit: " << config::max_tablet_version_num
<< ". tablet: " << tablet_vars->at(0).tablet->full_name();
<< ". tablet: " << tablet->full_name();
return Status::OLAPInternalError(OLAP_ERR_TOO_MANY_VERSION);
}
auto tablet_schema = std::make_shared<TabletSchema>();
tablet_schema->copy_from(*tablet_vars->at(0).tablet->tablet_schema());
tablet_schema->copy_from(*tablet->tablet_schema());
if (!request.columns_desc.empty() && request.columns_desc[0].col_unique_id >= 0) {
tablet_schema->clear_columns();
for (const auto& column_desc : request.columns_desc) {
tablet_schema->append_column(TabletColumn(column_desc));
}
}

RowsetSharedPtr rowset_to_add;
// writes
if (push_type == PUSH_NORMAL_V2) {
res = _convert_v2(tablet_vars->at(0).tablet, tablet_vars->at(1).tablet,
&(tablet_vars->at(0).rowset_to_add), &(tablet_vars->at(1).rowset_to_add),
tablet_schema);
res = _convert_v2(tablet, &rowset_to_add, tablet_schema);

} else {
res = _convert(tablet_vars->at(0).tablet, tablet_vars->at(1).tablet,
&(tablet_vars->at(0).rowset_to_add), &(tablet_vars->at(1).rowset_to_add),
tablet_schema);
res = _convert(tablet, &rowset_to_add, tablet_schema);
}
if (!res.ok()) {
LOG(WARNING) << "fail to convert tmp file when realtime push. res=" << res
<< ", failed to process realtime push."
<< ", tablet=" << tablet->full_name()
<< ", transaction_id=" << request.transaction_id;
for (TabletVars& tablet_var : *tablet_vars) {
if (tablet_var.tablet == nullptr) {
continue;
}

Status rollback_status = StorageEngine::instance()->txn_manager()->rollback_txn(
request.partition_id, tablet_var.tablet, request.transaction_id);
// has to check rollback status to ensure not delete a committed rowset
if (rollback_status.ok()) {
StorageEngine::instance()->add_unused_rowset(tablet_var.rowset_to_add);
}
Status rollback_status = StorageEngine::instance()->txn_manager()->rollback_txn(
request.partition_id, tablet, request.transaction_id);
// has to check rollback status to ensure not delete a committed rowset
if (rollback_status.ok()) {
StorageEngine::instance()->add_unused_rowset(rowset_to_add);
}
return res;
}

// add pending data to tablet
for (TabletVars& tablet_var : *tablet_vars) {
if (tablet_var.tablet == nullptr) {
continue;
}

if (push_type == PUSH_FOR_DELETE) {
tablet_var.rowset_to_add->rowset_meta()->set_delete_predicate(del_preds.front());
del_preds.pop();
}
Status commit_status = StorageEngine::instance()->txn_manager()->commit_txn(
request.partition_id, tablet_var.tablet, request.transaction_id, load_id,
tablet_var.rowset_to_add, false);
if (commit_status != Status::OK() &&
commit_status.precise_code() != OLAP_ERR_PUSH_TRANSACTION_ALREADY_EXIST) {
res = commit_status;
}
if (push_type == PUSH_FOR_DELETE) {
rowset_to_add->rowset_meta()->set_delete_predicate(del_preds.front());
del_preds.pop();
}
return res;
}

void PushHandler::_get_tablet_infos(const std::vector<TabletVars>& tablet_vars,
std::vector<TTabletInfo>* tablet_info_vec) {
for (const TabletVars& tablet_var : tablet_vars) {
if (tablet_var.tablet.get() == nullptr) {
continue;
}

TTabletInfo tablet_info;
tablet_info.tablet_id = tablet_var.tablet->tablet_id();
tablet_info.schema_hash = tablet_var.tablet->schema_hash();
StorageEngine::instance()->tablet_manager()->report_tablet_info(&tablet_info);
tablet_info_vec->push_back(tablet_info);
Status commit_status = StorageEngine::instance()->txn_manager()->commit_txn(
request.partition_id, tablet, request.transaction_id, load_id, rowset_to_add, false);
if (commit_status != Status::OK() &&
commit_status.precise_code() != OLAP_ERR_PUSH_TRANSACTION_ALREADY_EXIST) {
res = commit_status;
}
return res;
}

Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, TabletSharedPtr new_tablet,
RowsetSharedPtr* cur_rowset, RowsetSharedPtr* new_rowset,
Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, RowsetSharedPtr* cur_rowset,
TabletSchemaSPtr tablet_schema) {
Status res = Status::OK();
uint32_t num_rows = 0;
Expand Down Expand Up @@ -325,27 +284,14 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, TabletSharedPtr new_

_write_bytes += (*cur_rowset)->data_disk_size();
_write_rows += (*cur_rowset)->num_rows();

// 5. Convert data for schema change tables
VLOG_TRACE << "load to related tables of schema_change if possible.";
if (new_tablet != nullptr) {
res = SchemaChangeHandler::schema_version_convert(
cur_tablet, new_tablet, cur_rowset, new_rowset, *_desc_tbl, tablet_schema);
if (!res.ok()) {
LOG(WARNING) << "failed to change schema version for delta."
<< "[res=" << res << " new_tablet='" << new_tablet->full_name()
<< "']";
}
}
} while (false);

VLOG_TRACE << "convert delta file end. res=" << res << ", tablet=" << cur_tablet->full_name()
<< ", processed_rows" << num_rows;
return res;
}

Status PushHandler::_convert(TabletSharedPtr cur_tablet, TabletSharedPtr new_tablet,
RowsetSharedPtr* cur_rowset, RowsetSharedPtr* new_rowset,
Status PushHandler::_convert(TabletSharedPtr cur_tablet, RowsetSharedPtr* cur_rowset,
TabletSchemaSPtr tablet_schema) {
Status res = Status::OK();
RowCursor row;
Expand Down Expand Up @@ -466,18 +412,6 @@ Status PushHandler::_convert(TabletSharedPtr cur_tablet, TabletSharedPtr new_tab

_write_bytes += (*cur_rowset)->data_disk_size();
_write_rows += (*cur_rowset)->num_rows();

// 7. Convert data for schema change tables
VLOG_TRACE << "load to related tables of schema_change if possible.";
if (new_tablet != nullptr) {
res = SchemaChangeHandler::schema_version_convert(
cur_tablet, new_tablet, cur_rowset, new_rowset, *_desc_tbl, tablet_schema);
if (!res.ok()) {
LOG(WARNING) << "failed to change schema version for delta."
<< "[res=" << res << " new_tablet='" << new_tablet->full_name()
<< "']";
}
}
} while (false);

SAFE_DELETE(reader);
Expand Down
17 changes: 3 additions & 14 deletions be/src/olap/push_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@ class BinaryReader;
struct ColumnMapping;
class RowCursor;

struct TabletVars {
TabletSharedPtr tablet;
RowsetSharedPtr rowset_to_add;
};

class PushHandler {
public:
using SchemaMapping = std::vector<ColumnMapping>;
Expand All @@ -60,24 +55,18 @@ class PushHandler {
int64_t write_rows() const { return _write_rows; }

private:
Status _convert_v2(TabletSharedPtr cur_tablet, TabletSharedPtr new_tablet_vec,
RowsetSharedPtr* cur_rowset, RowsetSharedPtr* new_rowset,
Status _convert_v2(TabletSharedPtr cur_tablet, RowsetSharedPtr* cur_rowset,
TabletSchemaSPtr tablet_schema);
// Convert local data file to internal formatted delta,
// return new delta's SegmentGroup
Status _convert(TabletSharedPtr cur_tablet, TabletSharedPtr new_tablet_vec,
RowsetSharedPtr* cur_rowset, RowsetSharedPtr* new_rowset,
Status _convert(TabletSharedPtr cur_tablet, RowsetSharedPtr* cur_rowset,
TabletSchemaSPtr tablet_schema);

// Only for debug
std::string _debug_version_list(const Versions& versions) const;

void _get_tablet_infos(const std::vector<TabletVars>& tablet_infos,
std::vector<TTabletInfo>* tablet_info_vec);

Status _do_streaming_ingestion(TabletSharedPtr tablet, const TPushReq& request,
PushType push_type, vector<TabletVars>* tablet_vars,
std::vector<TTabletInfo>* tablet_info_vec);
PushType push_type, std::vector<TTabletInfo>* tablet_info_vec);

private:
// mainly tablet_id, version and delta file path
Expand Down
99 changes: 0 additions & 99 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2039,105 +2039,6 @@ bool SchemaChangeHandler::tablet_in_converting(int64_t tablet_id) {
return _tablet_ids_in_converting.find(tablet_id) != _tablet_ids_in_converting.end();
}

Status SchemaChangeHandler::schema_version_convert(
TabletSharedPtr base_tablet, TabletSharedPtr new_tablet, RowsetSharedPtr* base_rowset,
RowsetSharedPtr* new_rowset, DescriptorTbl desc_tbl, TabletSchemaSPtr base_schema_change) {
Status res = Status::OK();
LOG(INFO) << "begin to convert delta version for schema changing. "
<< "base_tablet=" << base_tablet->full_name()
<< ", new_tablet=" << new_tablet->full_name();

// a. Parse the Alter request and convert it into an internal representation
// Do not use the delete condition specified by the DELETE_DATA command
RowBlockChanger rb_changer(*new_tablet->tablet_schema(), desc_tbl);
bool sc_sorting = false;
bool sc_directly = false;

const std::unordered_map<std::string, AlterMaterializedViewParam> materialized_function_map;
if (res = _parse_request(base_tablet, new_tablet, &rb_changer, &sc_sorting, &sc_directly,
materialized_function_map, desc_tbl, base_schema_change.get());
!res) {
LOG(WARNING) << "failed to parse the request. res=" << res;
return res;
}

// NOTE split_table if row_block is used, the original block will become smaller
// But since the historical data will become normal after the subsequent base/cumulative, it is also possible to use directly
// b. Generate historical data converter
auto sc_procedure = get_sc_procedure(rb_changer, sc_sorting, sc_directly);

// c. Convert data
DeleteHandler delete_handler;
std::vector<ColumnId> return_columns;
size_t num_cols = base_schema_change->num_columns();
return_columns.resize(num_cols);
for (int i = 0; i < num_cols; ++i) {
return_columns[i] = i;
}

RowsetReaderContext reader_context;
reader_context.reader_type = READER_ALTER_TABLE;
reader_context.tablet_schema = base_schema_change.get();
reader_context.need_ordered_result = true;
reader_context.delete_handler = &delete_handler;
reader_context.return_columns = &return_columns;
reader_context.seek_columns = &return_columns;
reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx();
reader_context.is_unique = base_tablet->keys_type() == UNIQUE_KEYS;
reader_context.is_vec = config::enable_vectorized_alter_table;

RowsetReaderSharedPtr rowset_reader;
RETURN_NOT_OK((*base_rowset)->create_reader(&rowset_reader));
RETURN_NOT_OK(rowset_reader->init(&reader_context));
PUniqueId load_id;
load_id.set_hi((*base_rowset)->load_id().hi());
load_id.set_lo((*base_rowset)->load_id().lo());
std::unique_ptr<RowsetWriter> rowset_writer;
RETURN_NOT_OK(new_tablet->create_rowset_writer(
(*base_rowset)->txn_id(), load_id, PREPARED,
(*base_rowset)->rowset_meta()->segments_overlap(), base_schema_change, &rowset_writer));

auto schema_version_convert_error = [&]() -> Status {
if (*new_rowset != nullptr) {
StorageEngine::instance()->add_unused_rowset(*new_rowset);
}

LOG(WARNING) << "failed to convert rowsets. "
<< " base_tablet=" << base_tablet->full_name()
<< ", new_tablet=" << new_tablet->full_name() << " res = " << res;
return res;
};

if (res = sc_procedure->process(rowset_reader, rowset_writer.get(), new_tablet, base_tablet);
!res) {
if ((*base_rowset)->is_pending()) {
LOG(WARNING) << "failed to process the transaction when schema change. "
<< "tablet=" << new_tablet->full_name() << "'"
<< ", transaction=" << (*base_rowset)->txn_id();
} else {
LOG(WARNING) << "failed to process the version. "
<< "version=" << (*base_rowset)->version().first << "-"
<< (*base_rowset)->version().second;
}
new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
rowset_writer->rowset_id().to_string());
return schema_version_convert_error();
}
*new_rowset = rowset_writer->build();
new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
rowset_writer->rowset_id().to_string());
if (*new_rowset == nullptr) {
LOG(WARNING) << "build rowset failed.";
res = Status::OLAPInternalError(OLAP_ERR_MALLOC_ERROR);
return schema_version_convert_error();
}

LOG(INFO) << "successfully convert rowsets. "
<< " base_tablet=" << base_tablet->full_name()
<< ", new_tablet=" << new_tablet->full_name();
return res;
}

Status SchemaChangeHandler::_get_versions_to_be_changed(
TabletSharedPtr base_tablet, std::vector<Version>* versions_to_be_changed,
RowsetSharedPtr* max_rowset) {
Expand Down
5 changes: 0 additions & 5 deletions be/src/olap/schema_change.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,6 @@ class VSchemaChangeWithSorting : public SchemaChange {

class SchemaChangeHandler {
public:
static Status schema_version_convert(TabletSharedPtr base_tablet, TabletSharedPtr new_tablet,
RowsetSharedPtr* base_rowset, RowsetSharedPtr* new_rowset,
DescriptorTbl desc_tbl,
TabletSchemaSPtr base_schema_change);

// schema change v2, it will not set alter task in base tablet
static Status process_alter_tablet_v2(const TAlterTabletReqV2& request);

Expand Down