diff --git a/be/src/olap/version_graph.cpp b/be/src/olap/version_graph.cpp index 79380f6eb9bd98..ab9c901cba257a 100644 --- a/be/src/olap/version_graph.cpp +++ b/be/src/olap/version_graph.cpp @@ -452,6 +452,13 @@ void VersionGraph::construct_version_graph(const std::vector _version_graph[vertex_idx_b].value; + }); + } } void VersionGraph::reconstruct_version_graph(const std::vector& rs_metas, @@ -476,10 +483,26 @@ void VersionGraph::add_version_to_graph(const Version& version) { // We assume this version is new version, so we just add two edges // into version graph. add one edge from start_version to end_version - _version_graph[start_vertex_index].edges.push_front(end_vertex_index); + // Make sure the vertex's edges are sorted by version in descending order when inserting. + auto end_vertex_it = _version_graph[start_vertex_index].edges.begin(); + while (end_vertex_it != _version_graph[start_vertex_index].edges.end()) { + if (_version_graph[*end_vertex_it].value < _version_graph[end_vertex_index].value) { + break; + } + end_vertex_it++; + } + _version_graph[start_vertex_index].edges.insert(end_vertex_it, end_vertex_index); // We add reverse edge(from end_version to start_version) to graph - _version_graph[end_vertex_index].edges.push_front(start_vertex_index); + // Make sure the vertex's edges are sorted by version in descending order when inserting. + auto start_vertex_it = _version_graph[end_vertex_index].edges.begin(); + while (start_vertex_it != _version_graph[end_vertex_index].edges.end()) { + if (_version_graph[*start_vertex_it].value < _version_graph[start_vertex_index].value) { + break; + } + start_vertex_it++; + } + _version_graph[end_vertex_index].edges.insert(start_vertex_it, start_vertex_index); } OLAPStatus VersionGraph::delete_version_from_graph(const Version& version) { @@ -537,95 +560,58 @@ OLAPStatus VersionGraph::capture_consistent_versions(const Version& spec_version return OLAP_ERR_INPUT_PARAMETER_ERROR; } - // bfs_queue's element is vertex_index. - std::queue bfs_queue; - // predecessor[i] means the predecessor of vertex_index 'i'. - std::vector predecessor(_version_graph.size()); - // visited[int64_t]==true means it had entered bfs_queue. - std::vector visited(_version_graph.size()); - // [start_vertex_value, end_vertex_value) - int64_t start_vertex_value = spec_version.first; - int64_t end_vertex_value = spec_version.second + 1; - // -1 is invalid vertex index. - int64_t start_vertex_index = -1; - // -1 is valid vertex index. - int64_t end_vertex_index = -1; - - for (size_t i = 0; i < _version_graph.size(); ++i) { - if (_version_graph[i].value == start_vertex_value) { - start_vertex_index = i; - } - if (_version_graph[i].value == end_vertex_value) { - end_vertex_index = i; + int64_t cur_idx = -1; + for (size_t i = 0; i < _version_graph.size(); i++) { + if (_version_graph[i].value == spec_version.first) { + cur_idx = i; + break; } } - if (start_vertex_index < 0 || end_vertex_index < 0) { - LOG(WARNING) << "fail to find path in version_graph. " + if (cur_idx < 0) { + LOG(WARNING) << "failed to find path in version_graph. " << "spec_version: " << spec_version.first << "-" << spec_version.second; return OLAP_ERR_VERSION_NOT_EXIST; } - for (size_t i = 0; i < _version_graph.size(); ++i) { - visited[i] = false; - } - - bfs_queue.push(start_vertex_index); - visited[start_vertex_index] = true; - // The predecessor of root is itself. - predecessor[start_vertex_index] = start_vertex_index; - - while (bfs_queue.empty() == false && visited[end_vertex_index] == false) { - int64_t top_vertex_index = bfs_queue.front(); - bfs_queue.pop(); - for (const auto& it : _version_graph[top_vertex_index].edges) { - if (visited[it] == false) { - // If we don't support reverse version in the path, and start vertex - // value is larger than the end vertex value, we skip this edge. - if (_version_graph[top_vertex_index].value > _version_graph[it].value) { - continue; - } - visited[it] = true; - predecessor[it] = top_vertex_index; - bfs_queue.push(it); + int64_t end_value = spec_version.second + 1; + while (_version_graph[cur_idx].value < end_value) { + int64_t next_idx = -1; + for (const auto& it : _version_graph[cur_idx].edges) { + // Only consider incremental versions + if (_version_graph[it].value < _version_graph[cur_idx].value) { + break; } - } - } - if (!visited[end_vertex_index]) { - LOG(WARNING) << "fail to find path in version_graph. " - << "spec_version: " << spec_version.first << "-" << spec_version.second; - return OLAP_ERR_VERSION_NOT_EXIST; - } + if (_version_graph[it].value > end_value) { + continue; + } - std::vector reversed_path; - int64_t tmp_vertex_index = end_vertex_index; - reversed_path.push_back(tmp_vertex_index); + // Considering edges had been sorted by version in descending order, + // This version is the largest version that smaller than end_version. + next_idx = it; + break; + } - // For start_vertex_index, its predecessor must be itself. - while (predecessor[tmp_vertex_index] != tmp_vertex_index) { - tmp_vertex_index = predecessor[tmp_vertex_index]; - reversed_path.push_back(tmp_vertex_index); + if (next_idx > -1) { + if (version_path != nullptr) { + version_path->emplace_back(_version_graph[cur_idx].value, _version_graph[next_idx].value - 1); + } + cur_idx = next_idx; + } else { + LOG(WARNING) << "fail to find path in version_graph. " + << "spec_version: " << spec_version.first << "-" << spec_version.second; + return OLAP_ERR_VERSION_NOT_EXIST; + } } - if (version_path != nullptr) { - // Make version_path from reversed_path. + if (VLOG_TRACE_IS_ON && version_path != nullptr) { std::stringstream shortest_path_for_debug; - for (size_t path_id = reversed_path.size() - 1; path_id > 0; --path_id) { - int64_t tmp_start_vertex_value = _version_graph[reversed_path[path_id]].value; - int64_t tmp_end_vertex_value = _version_graph[reversed_path[path_id - 1]].value; - - // tmp_start_vertex_value mustn't be equal to tmp_end_vertex_value - if (tmp_start_vertex_value <= tmp_end_vertex_value) { - version_path->emplace_back(tmp_start_vertex_value, tmp_end_vertex_value - 1); - } else { - version_path->emplace_back(tmp_end_vertex_value, tmp_start_vertex_value - 1); - } - - shortest_path_for_debug << (*version_path)[version_path->size() - 1] << ' '; + for (const auto& version : *version_path) { + shortest_path_for_debug << version << ' '; } VLOG_TRACE << "success to find path for spec_version. spec_version=" << spec_version - << ", path=" << shortest_path_for_debug.str(); + << ", path=" << shortest_path_for_debug.str(); } return OLAP_SUCCESS; diff --git a/be/src/olap/version_graph.h b/be/src/olap/version_graph.h index d5ef70eabd3f66..4b8c42ee8c7480 100644 --- a/be/src/olap/version_graph.h +++ b/be/src/olap/version_graph.h @@ -60,6 +60,7 @@ class VersionGraph { // vertex's value is version.start_version, the other is // version.end_version + 1. // Use adjacency list to describe version graph. + // In order to speed up the version capture, vertex's edges are sorted by version in descending order. std::vector _version_graph; // vertex value --> vertex_index of _version_graph