Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions cpp/.clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ ConstructorInitializerIndentWidth: 4
ContinuationIndentWidth: 4
Cpp11BracedListStyle: true
DeriveLineEnding: true
DerivePointerAlignment: true
DisableFormat: false
EmptyLineAfterAccessModifier: Never
EmptyLineBeforeAccessModifier: LogicalBlock
Expand Down Expand Up @@ -207,7 +206,7 @@ SpacesInParentheses: false
SpacesInSquareBrackets: false
SpaceBeforeSquareBrackets: false
BitFieldColonSpacing: Both
Standard: Auto
Standard: Cpp11
StatementAttributeLikeMacros:
- Q_EMIT
StatementMacros:
Expand Down
33 changes: 33 additions & 0 deletions cpp/src/common/device_id.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ class IDeviceID {
virtual bool operator<(const IDeviceID& other) { return false; }
virtual bool operator==(const IDeviceID& other) { return false; }
virtual bool operator!=(const IDeviceID& other) { return false; }
virtual std::string* get_split_segname_at(int pos) { return nullptr; }
virtual int get_split_seg_num() { return 0; }
virtual void split_table_name() {}

protected:
IDeviceID() : empty_segments_() {}
Expand Down Expand Up @@ -90,6 +93,9 @@ class StringArrayDeviceID : public IDeviceID {
for (const auto& segment : segments_) {
delete segment;
}
for (const auto& prefix_segments : prefix_segments_) {
delete prefix_segments;
}
}

std::string get_device_name() const override {
Expand Down Expand Up @@ -192,9 +198,36 @@ class StringArrayDeviceID : public IDeviceID {
return !(*this == other);
}

void split_table_name() override { init_prefix_segments(); }

std::string* get_split_segname_at(int pos) override {
if (prefix_segments_.size() == 0 || prefix_segments_.size() == 1) {
return segments_[pos];
} else {
if (pos < prefix_segments_.size()) {
return prefix_segments_[pos];
} else {
return segments_[pos - prefix_segments_.size() + 1];
}
}
}

int get_split_seg_num() override {
return prefix_segments_.size() == 0
? segments_.size()
: segments_.size() + prefix_segments_.size() - 1;
}

private:
std::vector<std::string*> segments_;
std::vector<std::string*> prefix_segments_;

void init_prefix_segments() {
auto splits = storage::PathNodesGenerator::invokeParser(*segments_[0]);
for (int i = 0; i < splits.size(); ++i) {
prefix_segments_.push_back(new std::string(splits[i]));
}
}
static std::vector<std::string*> formalize(
const std::vector<std::string>& segments) {
auto it =
Expand Down
83 changes: 46 additions & 37 deletions cpp/src/common/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ struct MeasurementSchema {
common::TSDataType data_type_;
common::TSEncoding encoding_;
common::CompressionType compression_type_;
storage::ChunkWriter *chunk_writer_;
ValueChunkWriter *value_chunk_writer_;
storage::ChunkWriter* chunk_writer_;
ValueChunkWriter* value_chunk_writer_;
std::map<std::string, std::string> props_;

MeasurementSchema()
Expand All @@ -58,7 +58,7 @@ struct MeasurementSchema {
chunk_writer_(nullptr),
value_chunk_writer_(nullptr) {}

MeasurementSchema(const std::string &measurement_name,
MeasurementSchema(const std::string& measurement_name,
common::TSDataType data_type)
: measurement_name_(measurement_name),
data_type_(data_type),
Expand All @@ -67,7 +67,7 @@ struct MeasurementSchema {
chunk_writer_(nullptr),
value_chunk_writer_(nullptr) {}

MeasurementSchema(const std::string &measurement_name,
MeasurementSchema(const std::string& measurement_name,
common::TSDataType data_type, common::TSEncoding encoding,
common::CompressionType compression_type)
: measurement_name_(measurement_name),
Expand All @@ -88,7 +88,7 @@ struct MeasurementSchema {
}
}

int serialize_to(common::ByteStream &out) {
int serialize_to(common::ByteStream& out) {
int ret = common::E_OK;
if (RET_FAIL(
common::SerializationUtil::write_str(measurement_name_, out))) {
Expand All @@ -102,7 +102,7 @@ struct MeasurementSchema {
if (ret == common::E_OK) {
if (RET_FAIL(common::SerializationUtil::write_ui32(props_.size(),
out))) {
for (const auto &prop : props_) {
for (const auto& prop : props_) {
if (RET_FAIL(common::SerializationUtil::write_str(
prop.first, out))) {
} else if (RET_FAIL(common::SerializationUtil::write_str(
Expand All @@ -115,7 +115,7 @@ struct MeasurementSchema {
return ret;
}

int deserialize_from(common::ByteStream &in) {
int deserialize_from(common::ByteStream& in) {
int ret = common::E_OK;
uint8_t data_type = common::TSDataType::INVALID_DATATYPE,
encoding = common::TSEncoding::INVALID_ENCODING,
Expand Down Expand Up @@ -153,8 +153,8 @@ struct MeasurementSchema {
}
};

typedef std::map<std::string, MeasurementSchema *> MeasurementSchemaMap;
typedef std::map<std::string, MeasurementSchema *>::iterator
typedef std::map<std::string, MeasurementSchema*> MeasurementSchemaMap;
typedef std::map<std::string, MeasurementSchema*>::iterator
MeasurementSchemaMapIter;
typedef std::pair<MeasurementSchemaMapIter, bool>
MeasurementSchemaMapInsertResult;
Expand All @@ -164,7 +164,7 @@ struct MeasurementSchemaGroup {
// measurement_name -> MeasurementSchema
MeasurementSchemaMap measurement_schema_map_;
bool is_aligned_ = false;
TimeChunkWriter *time_chunk_writer_ = nullptr;
TimeChunkWriter* time_chunk_writer_ = nullptr;

~MeasurementSchemaGroup() {
if (time_chunk_writer_ != nullptr) {
Expand Down Expand Up @@ -195,28 +195,28 @@ class TableSchema {
* Each ColumnSchema defines the schema for one column
* in the table.
*/
TableSchema(const std::string &table_name,
const std::vector<common::ColumnSchema> &column_schemas)
TableSchema(const std::string& table_name,
const std::vector<common::ColumnSchema>& column_schemas)
: table_name_(table_name) {
to_lowercase_inplace(table_name_);
for (const common::ColumnSchema &column_schema : column_schemas) {
for (const common::ColumnSchema& column_schema : column_schemas) {
column_schemas_.emplace_back(std::make_shared<MeasurementSchema>(
column_schema.get_column_name(),
column_schema.get_data_type()));
column_categories_.emplace_back(
column_schema.get_column_category());
}
int idx = 0;
for (const auto &measurement_schema : column_schemas_) {
for (const auto& measurement_schema : column_schemas_) {
to_lowercase_inplace(measurement_schema->measurement_name_);
column_pos_index_.insert(
std::make_pair(measurement_schema->measurement_name_, idx++));
}
}

TableSchema(const std::string &table_name,
const std::vector<MeasurementSchema *> &column_schemas,
const std::vector<common::ColumnCategory> &column_categories)
TableSchema(const std::string& table_name,
const std::vector<MeasurementSchema*>& column_schemas,
const std::vector<common::ColumnCategory>& column_categories)
: table_name_(table_name), column_categories_(column_categories) {
to_lowercase_inplace(table_name_);
for (const auto column_schema : column_schemas) {
Expand All @@ -226,34 +226,42 @@ class TableSchema {
}
}
int idx = 0;
for (const auto &measurement_schema : column_schemas_) {
for (const auto& measurement_schema : column_schemas_) {
to_lowercase_inplace(measurement_schema->measurement_name_);
column_pos_index_.insert(
std::make_pair(measurement_schema->measurement_name_, idx++));
}
}

TableSchema(TableSchema &&other) noexcept
TableSchema(TableSchema&& other) noexcept
: table_name_(std::move(other.table_name_)),
column_schemas_(std::move(other.column_schemas_)),
column_categories_(std::move(other.column_categories_)) {}

TableSchema(const TableSchema &other) noexcept
TableSchema(const TableSchema& other) noexcept
: table_name_(other.table_name_),
column_categories_(other.column_categories_) {
for (const auto &column_schema : other.column_schemas_) {
for (const auto& column_schema : other.column_schemas_) {
// Just call default construction
column_schemas_.emplace_back(
std::make_shared<MeasurementSchema>(*column_schema));
}
int idx = 0;
for (const auto &measurement_schema : column_schemas_) {
for (const auto& measurement_schema : column_schemas_) {
column_pos_index_.insert(
std::make_pair(measurement_schema->measurement_name_, idx++));
}
}

int serialize_to(common::ByteStream &out) {
// In cases where data is retrieved from a tree to form the table,
// there is no table name in the tree path, so adjustments are needed for
// this scenario. This flag is used specifically for such cases.
// TODO(Colin): remove this.
void set_virtual_table() { is_virtual_table_ = true; }

bool is_virtual_table() { return is_virtual_table_; }

int serialize_to(common::ByteStream& out) {
int ret = common::E_OK;
if (RET_FAIL(common::SerializationUtil::write_var_uint(
column_schemas_.size(), out))) {
Expand All @@ -271,7 +279,7 @@ class TableSchema {
return ret;
}

int deserialize(common::ByteStream &in) {
int deserialize(common::ByteStream& in) {
int ret = common::E_OK;
uint32_t num_columns;
if (RET_FAIL(
Expand All @@ -294,9 +302,9 @@ class TableSchema {

~TableSchema() { column_schemas_.clear(); }

const std::string &get_table_name() { return table_name_; }
const std::string& get_table_name() { return table_name_; }

void set_table_name(const std::string &table_name) {
void set_table_name(const std::string& table_name) {
table_name_ = table_name;
}

Expand All @@ -310,7 +318,7 @@ class TableSchema {

int32_t get_columns_num() const { return column_schemas_.size(); }

int find_column_index(const std::string &column_name) {
int find_column_index(const std::string& column_name) {
std::string lower_case_column_name = to_lower(column_name);
auto it = column_pos_index_.find(lower_case_column_name);
if (it != column_pos_index_.end()) {
Expand All @@ -333,10 +341,10 @@ class TableSchema {

size_t get_column_pos_index_num() const { return column_pos_index_.size(); }

void update(ChunkGroupMeta *chunk_group_meta) {
void update(ChunkGroupMeta* chunk_group_meta) {
for (auto iter = chunk_group_meta->chunk_meta_list_.begin();
iter != chunk_group_meta->chunk_meta_list_.end(); iter++) {
auto &chunk_meta = iter.get();
auto& chunk_meta = iter.get();
if (chunk_meta->data_type_ == common::VECTOR) {
continue;
}
Expand Down Expand Up @@ -365,7 +373,7 @@ class TableSchema {

std::vector<common::TSDataType> get_data_types() const {
std::vector<common::TSDataType> ret;
for (const auto &measurement_schema : column_schemas_) {
for (const auto& measurement_schema : column_schemas_) {
ret.emplace_back(measurement_schema->data_type_);
}
return ret;
Expand All @@ -375,12 +383,12 @@ class TableSchema {
return column_categories_;
}

std::vector<std::shared_ptr<MeasurementSchema> > get_measurement_schemas()
std::vector<std::shared_ptr<MeasurementSchema>> get_measurement_schemas()
const {
return column_schemas_;
}

common::ColumnSchema get_column_schema(const std::string &column_name) {
common::ColumnSchema get_column_schema(const std::string& column_name) {
int column_idx = find_column_index(column_name);
if (column_idx == -1) {
return common::ColumnSchema();
Expand All @@ -394,7 +402,7 @@ class TableSchema {
}
}

int32_t find_id_column_order(const std::string &column_name) {
int32_t find_id_column_order(const std::string& column_name) {
std::string lower_case_column_name = to_lower(column_name);

int column_order = 0;
Expand All @@ -412,17 +420,18 @@ class TableSchema {

private:
std::string table_name_;
std::vector<std::shared_ptr<MeasurementSchema> > column_schemas_;
std::vector<std::shared_ptr<MeasurementSchema>> column_schemas_;
std::vector<common::ColumnCategory> column_categories_;
std::map<std::string, int> column_pos_index_;
bool is_virtual_table_ = false;
};

struct Schema {
typedef std::unordered_map<std::string, std::shared_ptr<TableSchema> >
typedef std::unordered_map<std::string, std::shared_ptr<TableSchema>>
TableSchemasMap;
TableSchemasMap table_schema_map_;

void update_table_schema(ChunkGroupMeta *chunk_group_meta) {
void update_table_schema(ChunkGroupMeta* chunk_group_meta) {
std::shared_ptr<IDeviceID> device_id = chunk_group_meta->device_id_;
auto table_name = device_id->get_table_name();
if (table_schema_map_.find(table_name) == table_schema_map_.end()) {
Expand All @@ -431,7 +440,7 @@ struct Schema {
table_schema_map_[table_name]->update(chunk_group_meta);
}
void register_table_schema(
const std::shared_ptr<TableSchema> &table_schema) {
const std::shared_ptr<TableSchema>& table_schema) {
table_schema_map_[table_schema->get_table_name()] = table_schema;
}
};
Expand Down
Loading
Loading