Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 61 additions & 75 deletions be/src/olap/version_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,13 @@ void VersionGraph::construct_version_graph(const std::vector<RowsetMetaSharedPtr
// Add reverse edge from end_version to start_version.
_version_graph[end_vertex_index].edges.push_front(start_vertex_index);
}

// Sort edges by version in descending order.
for (auto& vertex : _version_graph) {
vertex.edges.sort([this](const int& vertex_idx_a, const int& vertex_idx_b) {
return _version_graph[vertex_idx_a].value > _version_graph[vertex_idx_b].value;
});
}
}

void VersionGraph::reconstruct_version_graph(const std::vector<RowsetMetaSharedPtr>& rs_metas,
Expand All @@ -476,10 +483,26 @@ void VersionGraph::add_version_to_graph(const Version& version) {

// We assume this version is new version, so we just add two edges
// into version graph. add one edge from start_version to end_version
_version_graph[start_vertex_index].edges.push_front(end_vertex_index);
// Make sure the vertex's edges are sorted by version in descending order when inserting.
auto end_vertex_it = _version_graph[start_vertex_index].edges.begin();
while (end_vertex_it != _version_graph[start_vertex_index].edges.end()) {
if (_version_graph[*end_vertex_it].value < _version_graph[end_vertex_index].value) {
break;
}
end_vertex_it++;
}
_version_graph[start_vertex_index].edges.insert(end_vertex_it, end_vertex_index);

// We add reverse edge(from end_version to start_version) to graph
_version_graph[end_vertex_index].edges.push_front(start_vertex_index);
// Make sure the vertex's edges are sorted by version in descending order when inserting.
auto start_vertex_it = _version_graph[end_vertex_index].edges.begin();
while (start_vertex_it != _version_graph[end_vertex_index].edges.end()) {
if (_version_graph[*start_vertex_it].value < _version_graph[start_vertex_index].value) {
break;
}
start_vertex_it++;
}
_version_graph[end_vertex_index].edges.insert(start_vertex_it, start_vertex_index);
}

OLAPStatus VersionGraph::delete_version_from_graph(const Version& version) {
Expand Down Expand Up @@ -537,95 +560,58 @@ OLAPStatus VersionGraph::capture_consistent_versions(const Version& spec_version
return OLAP_ERR_INPUT_PARAMETER_ERROR;
}

// bfs_queue's element is vertex_index.
std::queue<int64_t> bfs_queue;
// predecessor[i] means the predecessor of vertex_index 'i'.
std::vector<int64_t> predecessor(_version_graph.size());
// visited[int64_t]==true means it had entered bfs_queue.
std::vector<bool> visited(_version_graph.size());
// [start_vertex_value, end_vertex_value)
int64_t start_vertex_value = spec_version.first;
int64_t end_vertex_value = spec_version.second + 1;
// -1 is invalid vertex index.
int64_t start_vertex_index = -1;
// -1 is valid vertex index.
int64_t end_vertex_index = -1;

for (size_t i = 0; i < _version_graph.size(); ++i) {
if (_version_graph[i].value == start_vertex_value) {
start_vertex_index = i;
}
if (_version_graph[i].value == end_vertex_value) {
end_vertex_index = i;
int64_t cur_idx = -1;
for (size_t i = 0; i < _version_graph.size(); i++) {
if (_version_graph[i].value == spec_version.first) {
cur_idx = i;
break;
}
}

if (start_vertex_index < 0 || end_vertex_index < 0) {
LOG(WARNING) << "fail to find path in version_graph. "
if (cur_idx < 0) {
LOG(WARNING) << "failed to find path in version_graph. "
<< "spec_version: " << spec_version.first << "-" << spec_version.second;
return OLAP_ERR_VERSION_NOT_EXIST;
}

for (size_t i = 0; i < _version_graph.size(); ++i) {
visited[i] = false;
}

bfs_queue.push(start_vertex_index);
visited[start_vertex_index] = true;
// The predecessor of root is itself.
predecessor[start_vertex_index] = start_vertex_index;

while (bfs_queue.empty() == false && visited[end_vertex_index] == false) {
int64_t top_vertex_index = bfs_queue.front();
bfs_queue.pop();
for (const auto& it : _version_graph[top_vertex_index].edges) {
if (visited[it] == false) {
// If we don't support reverse version in the path, and start vertex
// value is larger than the end vertex value, we skip this edge.
if (_version_graph[top_vertex_index].value > _version_graph[it].value) {
continue;
}
visited[it] = true;
predecessor[it] = top_vertex_index;
bfs_queue.push(it);
int64_t end_value = spec_version.second + 1;
while (_version_graph[cur_idx].value < end_value) {
int64_t next_idx = -1;
for (const auto& it : _version_graph[cur_idx].edges) {
// Only consider incremental versions
if (_version_graph[it].value < _version_graph[cur_idx].value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we already sort edges in descending order, can we "break" here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolved~

break;
}
}
}

if (!visited[end_vertex_index]) {
LOG(WARNING) << "fail to find path in version_graph. "
<< "spec_version: " << spec_version.first << "-" << spec_version.second;
return OLAP_ERR_VERSION_NOT_EXIST;
}
if (_version_graph[it].value > end_value) {
continue;
}

std::vector<int64_t> reversed_path;
int64_t tmp_vertex_index = end_vertex_index;
reversed_path.push_back(tmp_vertex_index);
// Considering edges had been sorted by version in descending order,
// This version is the largest version that smaller than end_version.
next_idx = it;
break;
}

// For start_vertex_index, its predecessor must be itself.
while (predecessor[tmp_vertex_index] != tmp_vertex_index) {
tmp_vertex_index = predecessor[tmp_vertex_index];
reversed_path.push_back(tmp_vertex_index);
if (next_idx > -1) {
if (version_path != nullptr) {
version_path->emplace_back(_version_graph[cur_idx].value, _version_graph[next_idx].value - 1);
}
cur_idx = next_idx;
} else {
LOG(WARNING) << "fail to find path in version_graph. "
<< "spec_version: " << spec_version.first << "-" << spec_version.second;
return OLAP_ERR_VERSION_NOT_EXIST;
}
}

if (version_path != nullptr) {
// Make version_path from reversed_path.
if (VLOG_TRACE_IS_ON && version_path != nullptr) {
std::stringstream shortest_path_for_debug;
for (size_t path_id = reversed_path.size() - 1; path_id > 0; --path_id) {
int64_t tmp_start_vertex_value = _version_graph[reversed_path[path_id]].value;
int64_t tmp_end_vertex_value = _version_graph[reversed_path[path_id - 1]].value;

// tmp_start_vertex_value mustn't be equal to tmp_end_vertex_value
if (tmp_start_vertex_value <= tmp_end_vertex_value) {
version_path->emplace_back(tmp_start_vertex_value, tmp_end_vertex_value - 1);
} else {
version_path->emplace_back(tmp_end_vertex_value, tmp_start_vertex_value - 1);
}

shortest_path_for_debug << (*version_path)[version_path->size() - 1] << ' ';
for (const auto& version : *version_path) {
shortest_path_for_debug << version << ' ';
}
VLOG_TRACE << "success to find path for spec_version. spec_version=" << spec_version
<< ", path=" << shortest_path_for_debug.str();
<< ", path=" << shortest_path_for_debug.str();
}

return OLAP_SUCCESS;
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/version_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class VersionGraph {
// vertex's value is version.start_version, the other is
// version.end_version + 1.
// Use adjacency list to describe version graph.
// In order to speed up the version capture, vertex's edges are sorted by version in descending order.
std::vector<Vertex> _version_graph;

// vertex value --> vertex_index of _version_graph
Expand Down