Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion be/src/olap/column_mapping.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,7 @@ struct ColumnMapping {
WrapperField* default_value;
};

typedef std::vector<ColumnMapping> SchemaMapping;

} // namespace doris
#endif // DORIS_BE_SRC_COLUMN_MAPPING_H
#endif // DORIS_BE_SRC_COLUMN_MAPPING_H
23 changes: 22 additions & 1 deletion be/src/olap/rowset/alpha_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,28 @@ OLAPStatus AlphaRowsetWriter::add_rowset(RowsetSharedPtr rowset) {
_rowset_writer_context.rowset_id));
_cur_segment_group->set_empty(segment_group->empty());
_cur_segment_group->set_num_segments(segment_group->num_segments());
_cur_segment_group->add_zone_maps_for_linked_schema_change(segment_group->get_zone_maps());
_cur_segment_group->add_zone_maps(segment_group->get_zone_maps());
RETURN_NOT_OK(flush());
_num_rows_written += segment_group->num_rows();
}
return OLAP_SUCCESS;
}

OLAPStatus AlphaRowsetWriter::add_rowset_for_linked_schema_change(
RowsetSharedPtr rowset, const SchemaMapping& schema_mapping) {
_need_column_data_writer = false;
// this api is for LinkedSchemaChange
// use create hard link to copy rowset for performance
// this is feasible because LinkedSchemaChange is done on the same disk
AlphaRowsetSharedPtr alpha_rowset = std::dynamic_pointer_cast<AlphaRowset>(rowset);
for (auto& segment_group : alpha_rowset->_segment_groups) {
RETURN_NOT_OK(_init());
RETURN_NOT_OK(segment_group->link_segments_to_path(_rowset_writer_context.rowset_path_prefix,
_rowset_writer_context.rowset_id));
_cur_segment_group->set_empty(segment_group->empty());
_cur_segment_group->set_num_segments(segment_group->num_segments());
_cur_segment_group->add_zone_maps_for_linked_schema_change(segment_group->get_zone_maps(),
schema_mapping);
RETURN_NOT_OK(flush());
_num_rows_written += segment_group->num_rows();
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/alpha_rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ class AlphaRowsetWriter : public RowsetWriter {

// add rowset by create hard link
OLAPStatus add_rowset(RowsetSharedPtr rowset) override;
OLAPStatus add_rowset_for_linked_schema_change(
RowsetSharedPtr rowset, const SchemaMapping& schema_mapping) override;

OLAPStatus flush() override;

Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class RowsetWriter {
virtual OLAPStatus add_row_block(RowBlock* row_block) = 0;

virtual OLAPStatus add_rowset(RowsetSharedPtr rowset) = 0;
virtual OLAPStatus add_rowset_for_linked_schema_change(
RowsetSharedPtr rowset, const SchemaMapping& schema_mapping) = 0;

virtual OLAPStatus flush() = 0;

Expand Down
34 changes: 25 additions & 9 deletions be/src/olap/rowset/segment_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@
#include <fstream>
#include <sstream>

#include "olap/data_dir.h"
#include "olap/column_mapping.h"
#include "olap/rowset/column_data.h"
#include "olap/row_block.h"
#include "olap/row_cursor.h"
#include "olap/schema.h"
#include "olap/storage_engine.h"
#include "olap/utils.h"
#include "olap/wrapper_field.h"
#include "olap/schema.h"
#include "util/stack_util.h"
#include "olap/storage_engine.h"
#include "olap/data_dir.h"

using std::ifstream;
using std::string;
Expand Down Expand Up @@ -234,27 +235,42 @@ bool SegmentGroup::delete_all_files() {
}

OLAPStatus SegmentGroup::add_zone_maps_for_linked_schema_change(
const std::vector<std::pair<WrapperField*, WrapperField*>>& zone_map_fields) {
const std::vector<std::pair<WrapperField*, WrapperField*>>& zone_map_fields,
const SchemaMapping& schema_mapping) {
//When add rollup tablet, the base tablet index maybe empty
if (zone_map_fields.size() == 0) {
return OLAP_SUCCESS;
}

//Should use _num_key_columns, not zone_map_fields.size()
//as rollup tablet num_key_columns will less than base tablet zone_map_fields.size().
//For LinkedSchemaChange, the rollup tablet keys order is the same as base tablet
// 1. rollup tablet num_key_columns() will less than base tablet zone_map_fields.size().
// For LinkedSchemaChange, the rollup tablet keys order is the same as base tablet
// 2. adding column to existed table, num_key_columns() will larger than
// zone_map_fields.size()

int num_new_keys = 0;
for (size_t i = 0; i < _schema->num_key_columns(); ++i) {
const TabletColumn& column = _schema->column(i);

WrapperField* first = WrapperField::create(column);
DCHECK(first != NULL) << "failed to allocate memory for field: " << i;
first->copy(zone_map_fields[i].first);

WrapperField* second = WrapperField::create(column);
DCHECK(second != NULL) << "failed to allocate memory for field: " << i;
second->copy(zone_map_fields[i].second);

// for new key column, use default value to fill into column_statistics
if (schema_mapping[i].ref_column == -1) {
num_new_keys++;

first->copy(schema_mapping[i].default_value);
second->copy(schema_mapping[i].default_value);
} else {
first->copy(zone_map_fields[i - num_new_keys].first);
second->copy(zone_map_fields[i - num_new_keys].second);
}

_zone_maps.push_back(std::make_pair(first, second));
}

return OLAP_SUCCESS;
}

Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/segment_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ class SegmentGroup {
}

OLAPStatus add_zone_maps_for_linked_schema_change(
const std::vector<std::pair<WrapperField*, WrapperField*>>& zone_map_fields);
const std::vector<std::pair<WrapperField*, WrapperField*>>& zone_map_fields,
const SchemaMapping& schema_mapping);

OLAPStatus add_zone_maps(
const std::vector<std::pair<WrapperField*, WrapperField*>>& zone_map_fields);
Expand Down
7 changes: 4 additions & 3 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,8 @@ bool LinkedSchemaChange::process(
RowsetWriterSharedPtr new_rowset_writer,
TabletSharedPtr new_tablet,
TabletSharedPtr base_tablet) {
OLAPStatus status = new_rowset_writer->add_rowset(rowset_reader->rowset());
OLAPStatus status = new_rowset_writer->add_rowset_for_linked_schema_change(
rowset_reader->rowset(), _row_block_changer.get_schema_mapping());
if (status != OLAP_SUCCESS) {
LOG(WARNING) << "fail to convert rowset."
<< ", new_tablet=" << new_tablet->full_name()
Expand Down Expand Up @@ -1426,7 +1427,7 @@ OLAPStatus SchemaChangeHandler::schema_version_convert(
sc_procedure = new(nothrow) SchemaChangeDirectly(rb_changer);
} else {
LOG(INFO) << "doing linked schema change.";
sc_procedure = new(nothrow) LinkedSchemaChange();
sc_procedure = new(nothrow) LinkedSchemaChange(rb_changer);
}

if (sc_procedure == nullptr) {
Expand Down Expand Up @@ -1652,7 +1653,7 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa
sc_procedure = new(nothrow) SchemaChangeDirectly(rb_changer);
} else {
LOG(INFO) << "doing linked schema change.";
sc_procedure = new(nothrow) LinkedSchemaChange();
sc_procedure = new(nothrow) LinkedSchemaChange(rb_changer);
}

if (sc_procedure == nullptr) {
Expand Down
6 changes: 3 additions & 3 deletions be/src/olap/schema_change.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ class RowCursor;

class RowBlockChanger {
public:
typedef std::vector<ColumnMapping> SchemaMapping;

RowBlockChanger(const TabletSchema& tablet_schema,
const TabletSharedPtr& base_tablet,
const DeleteHandler& delete_handler);
Expand Down Expand Up @@ -177,14 +175,16 @@ class SchemaChange {

class LinkedSchemaChange : public SchemaChange {
public:
explicit LinkedSchemaChange() { }
explicit LinkedSchemaChange(const RowBlockChanger& row_block_changer)
: _row_block_changer(row_block_changer) { }
~LinkedSchemaChange() {}

bool process(RowsetReaderSharedPtr rowset_reader,
RowsetWriterSharedPtr new_rowset_writer,
TabletSharedPtr new_tablet,
TabletSharedPtr base_tablet);
private:
const RowBlockChanger& _row_block_changer;
DISALLOW_COPY_AND_ASSIGN(LinkedSchemaChange);
};

Expand Down