From 3ae1c4feffaa6ac0752821b692100a3d698671e8 Mon Sep 17 00:00:00 2001 From: Maxim Sukharev Date: Mon, 11 Feb 2019 13:18:43 +0100 Subject: [PATCH 01/10] remove unnecessary transformation in report.py - makes it faster - allows to pass number of buckets that doesn't much number of cc Signed-off-by: Maxim Sukharev --- .../community-detector/community_detector.py | 37 ++----------------- src/main/python/community-detector/report.py | 17 ++------- .../test_community_detector.py | 6 ++- 3 files changed, 13 insertions(+), 47 deletions(-) diff --git a/src/main/python/community-detector/community_detector.py b/src/main/python/community-detector/community_detector.py index 04b97328..0b4e5db8 100644 --- a/src/main/python/community-detector/community_detector.py +++ b/src/main/python/community-detector/community_detector.py @@ -1,4 +1,3 @@ -from collections import defaultdict from itertools import chain import logging @@ -34,30 +33,7 @@ def build_matrix(id_to_buckets): return csr_matrix((data, indices, indptr)) -def build_id_to_cc(connected_components, length): - """Builds a ndarray that associates element id to a connected component - - Same code as in Apollo ConnectedComponentsModel. - https://github.com/src-d/apollo/blob/f51c5a92c24cbedd54b9b30bab02f03e51fd27b3/apollo/graph.py#L28 - - Args: - connected_components: list of tuples (connected-component, element ids) - length: number of elements - - Returns: - A 1 dimension ndarray. The index will be the element id, and the - value is the connected component - """ - - id_to_cc = numpy.zeros(length, dtype=numpy.uint32) - for cc, ids in connected_components: - for id_ in ids: - id_to_cc[id_] = cc - - return id_to_cc - - -def detect_communities(cc, +def detect_communities(ccs, buckets_matrix, edges="linear", algorithm="walktrap", @@ -68,8 +44,8 @@ def detect_communities(cc, https://github.com/src-d/apollo/blob/6b370b5f34ba9e31cf3310e70a2eff35dd978faa/apollo/graph.py#L191 Args: - cc: list with the connected components. Index is the element id, the - value is the connected component + ccs: list with the connected components. Index is the connected component, the + value is the list of element ids buckets_matrix: scipy.sparse.csr_matrix with the buckets. One row for each element, with a column for each bucket. If the element is in a bucket, the corresponding row,column (element id, bucket id) is 1, @@ -80,7 +56,7 @@ def detect_communities(cc, - quadratic: slow, but surely fits all the algorithms. algorithm: The community detection algorithm to apply. algorithm_params: Parameters for the algorithm (**kwargs, JSON format). - + Returns: A list of communities. Each community is a list of element-ids """ @@ -93,11 +69,6 @@ def detect_communities(cc, log = logging.getLogger("community-detector") log.debug("Building the connected components") - ccs = defaultdict(list) - - for i, c in enumerate(cc): - ccs[c].append(i) - buckindices = buckets_matrix.indices buckindptr = buckets_matrix.indptr total_nvertices = buckets_matrix.shape[0] diff --git a/src/main/python/community-detector/report.py b/src/main/python/community-detector/report.py index 60ffde0c..9ec95cde 100644 --- a/src/main/python/community-detector/report.py +++ b/src/main/python/community-detector/report.py @@ -8,11 +8,8 @@ def read_connected_components(filepath): - dict = pq.read_table(filepath).to_pydict() - - ccs = dict['cc'] - ids = dict['element_ids'] - return list(zip(ccs, ids)) + d = pq.read_table(filepath).to_pydict() + return dict(zip(d['cc'], d['element_ids'])) def read_buckets_matrix(filepath): @@ -25,17 +22,11 @@ def read_buckets_matrix(filepath): def main(dirpath): connected_components = read_connected_components('%s/cc.parquet' % dirpath) - buckets_matrix = read_buckets_matrix('%s/buckets.parquet' % dirpath) - n_ids = buckets_matrix.shape[0] - - # TODO (carlosms): Scala produces a map of cc->element-id, - # the lib requires element-id->cc, but only to convert it - # to cc->element-id. Easy change once everything is working. - id_to_cc = community_detector.build_id_to_cc(connected_components, n_ids) # The result is a list of communities. Each community is a list of element-ids - coms = community_detector.detect_communities(id_to_cc, buckets_matrix) + coms = community_detector.detect_communities(connected_components, + buckets_matrix) com_ids = list(range(len(coms))) data = [pa.array(com_ids), pa.array(coms)] diff --git a/src/main/python/community-detector/test_community_detector.py b/src/main/python/community-detector/test_community_detector.py index 93466f7c..08c16179 100644 --- a/src/main/python/community-detector/test_community_detector.py +++ b/src/main/python/community-detector/test_community_detector.py @@ -1,3 +1,4 @@ +from collections import defaultdict import unittest import os @@ -23,9 +24,12 @@ def test_detect_communities(self): with numpy.load("%s/fixtures/input.npz" % (dirname)) as input_npz: buckets = build_csr_matrix(input_npz) cc = input_npz['id_to_cc'] + ccs = defaultdict(list) + for i, c in enumerate(cc): + ccs[c].append(i) # Call community_detector - communities = detect_communities(cc.tolist(), buckets) + communities = detect_communities(ccs, buckets) # Replaces CommunitiesModel().construct(communities, ccsmodel.id_to_element).save(output) size = sum(map(len, communities)) From acca2c99cd8f38cf3a7094daa2136f429c2341da Mon Sep 17 00:00:00 2001 From: Maxim Sukharev Date: Mon, 11 Feb 2019 13:23:15 +0100 Subject: [PATCH 02/10] exclude buckets with only 1 element Bucket with only 1 element means that element isn't connected to anything. Currently such elements are filtered only when we build graph but we can remove it much earlier which would improve performance a lot. Signed-off-by: Maxim Sukharev --- .../scala/tech/sourced/gemini/ConnectedComponents.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/scala/tech/sourced/gemini/ConnectedComponents.scala b/src/main/scala/tech/sourced/gemini/ConnectedComponents.scala index 1ecf63dc..162a573e 100644 --- a/src/main/scala/tech/sourced/gemini/ConnectedComponents.scala +++ b/src/main/scala/tech/sourced/gemini/ConnectedComponents.scala @@ -64,7 +64,9 @@ abstract class ConnectedComponents(log: Slf4jLogger) { val elId = elementIds.getOrElseUpdate(sha1, elementIds.size) if (!band.contains(value)) { if (band.isDefined) { - buckets = buckets :+ bucket + if (bucket.size > 1) { + buckets = buckets :+ bucket + } bucket = List[Int]() } band = Some(value) @@ -72,7 +74,7 @@ abstract class ConnectedComponents(log: Slf4jLogger) { bucket = bucket :+ elId } - if (bucket.nonEmpty) { + if (bucket.size > 1) { buckets = buckets :+ bucket } From 9be307934f6bcb649617d13c8c3d551ee945e86b Mon Sep 17 00:00:00 2001 From: Maxim Sukharev Date: Mon, 11 Feb 2019 14:19:03 +0100 Subject: [PATCH 03/10] use multiple cores to calculate buckets It repeats a little bit of code for the first hashtable but more performant because it loops only once both for building elementsIds map and for buckets generation. Signed-off-by: Maxim Sukharev --- .../sourced/gemini/ConnectedComponents.scala | 84 +++++++++++++------ 1 file changed, 58 insertions(+), 26 deletions(-) diff --git a/src/main/scala/tech/sourced/gemini/ConnectedComponents.scala b/src/main/scala/tech/sourced/gemini/ConnectedComponents.scala index 162a573e..830b3413 100644 --- a/src/main/scala/tech/sourced/gemini/ConnectedComponents.scala +++ b/src/main/scala/tech/sourced/gemini/ConnectedComponents.scala @@ -52,42 +52,74 @@ abstract class ConnectedComponents(log: Slf4jLogger) { * @return Buckets, Element-to-ID */ def makeBuckets(): (List[List[Int]], Map[String, Int]) = { - val (buckets, elementIds) = getHashtables() - .foldLeft(List[List[Int]](), mutable.Map[String, Int]()) { (result, hashtable) => - - var (buckets, elementIds) = result - val prevBucketsSize = buckets.size - var band: Option[ByteBuffer] = None - var bucket = List[Int]() - - getHashValues(hashtable).foreach { case FileHash(sha1, value) => - val elId = elementIds.getOrElseUpdate(sha1, elementIds.size) - if (!band.contains(value)) { - if (band.isDefined) { - if (bucket.size > 1) { - buckets = buckets :+ bucket - } - bucket = List[Int]() - } - band = Some(value) - } - bucket = bucket :+ elId - } + val allHashTables = getHashtables() + if (allHashTables.isEmpty) { + return (List[List[Int]](), Map[String, Int]()) + } + + val firstHashTable::hashTables = allHashTables + + var buckets = List[List[Int]]() + var band: Option[ByteBuffer] = None + var bucket = List[Int]() + + // code for the first hashTable uses the algorithm as for others (bucketsForHashTable methods) + // but also builds elementIds map + val elementIds = getHashValues(firstHashTable) + .zipWithIndex. + foldLeft(Map[String, Int]()) { (idsMap, item) => + val (hash, elId) = item + val FileHash(sha1, value) = hash + if (!band.contains(value)) { if (bucket.size > 1) { buckets = buckets :+ bucket } - val bucketsSize = buckets.size - prevBucketsSize - log.debug(s"Fetched $hashtable, $bucketsSize buckets") - - (buckets, elementIds) + bucket = List[Int]() + band = Some(value) } + bucket = bucket :+ elId + + idsMap + (sha1 -> elId) + } + + if (bucket.size > 1) { + buckets = buckets :+ bucket + } + + buckets ++= hashTables.par.flatMap(bucketsForHashTable(_, elementIds)) log.info(s"Number of buckets: ${buckets.size}") log.info(s"Number of elements: ${elementIds.size}") - (buckets, elementIds.toMap) + (buckets, elementIds) + } + + private def bucketsForHashTable(hashTable: Byte, elementIds: Map[String, Int]): List[List[Int]] = { + var buckets = List[List[Int]]() + var band: Option[ByteBuffer] = None + var bucket = List[Int]() + + getHashValues(hashTable).foreach { case FileHash(sha1, value) => + val elId = elementIds(sha1) + + if (!band.contains(value)) { + if (bucket.size > 1) { + buckets = buckets :+ bucket + } + + bucket = List[Int]() + band = Some(value) + } + bucket = bucket :+ elId + } + + if (bucket.size > 1) { + buckets = buckets :+ bucket + } + + buckets } /** From ff63d392de6a8e5cc53476e456d0f3593dc1e61c Mon Sep 17 00:00:00 2001 From: Maxim Sukharev Date: Mon, 11 Feb 2019 15:50:32 +0100 Subject: [PATCH 04/10] update report tests Signed-off-by: Maxim Sukharev --- .../gemini/ConnectedComponentsSpec.scala | 77 ++++++++++--------- .../tech/sourced/gemini/ReportSpec.scala | 4 +- 2 files changed, 43 insertions(+), 38 deletions(-) diff --git a/src/test/scala/tech/sourced/gemini/ConnectedComponentsSpec.scala b/src/test/scala/tech/sourced/gemini/ConnectedComponentsSpec.scala index f8e3c754..09b34d8f 100644 --- a/src/test/scala/tech/sourced/gemini/ConnectedComponentsSpec.scala +++ b/src/test/scala/tech/sourced/gemini/ConnectedComponentsSpec.scala @@ -11,20 +11,38 @@ import scala.collection.mutable.ListBuffer class TestConnectedComponents(log: Slf4jLogger) extends ConnectedComponents(log) { def getHashtables(): List[Byte] = List(0, 1, 2) - def getHashValues(hashtable: Byte): Iterable[FileHash] = { - val uniqVal = hashtable + 10 - val intersectVal = hashtable + 1 + def intToHash(x: Byte): ByteBuffer = ByteBuffer.wrap(Array[Byte](x)) - List( - // same elem in all buckets - FileHash("a", ByteBuffer.wrap(Array[Byte](1))), - FileHash("b" + hashtable.toString, ByteBuffer.wrap(Array[Byte](1))), - // different elem in each bucket - FileHash("c" + hashtable.toString, ByteBuffer.wrap(Array[Byte](uniqVal.toByte))), - // appear in 2 buckets - FileHash("d", ByteBuffer.wrap(Array[Byte](intersectVal.toByte))), - FileHash("d" + hashtable.toString, ByteBuffer.wrap(Array[Byte](intersectVal.toByte))) - ) + // emulate database, restrictions: + // - each hashtable must have all elements + // - results must be sorted by (hash values, key) + def getHashValues(hashtable: Byte): Iterable[FileHash] = { + hashtable match { + // bucket for a&b and d&3 + case 0 => List( + FileHash("a", intToHash(1)), + FileHash("b", intToHash(1)), + FileHash("c", intToHash(2)), + FileHash("d", intToHash(3)), + FileHash("e", intToHash(3)) + ) + // bucket for b&c + case 1 => List( + FileHash("a", intToHash(1)), + FileHash("b", intToHash(2)), + FileHash("c", intToHash(2)), + FileHash("d", intToHash(3)), + FileHash("e", intToHash(4)) + ) + // no bucket + case 2 => List( + FileHash("a", intToHash(1)), + FileHash("b", intToHash(2)), + FileHash("c", intToHash(3)), + FileHash("d", intToHash(4)), + FileHash("e", intToHash(5)) + ) + } } } @@ -36,32 +54,22 @@ class ConnectedComponentsSpec extends FlatSpec "makeBuckets" should "correctly create buckets" in { cc.makeBuckets()._1 shouldEqual List[List[Int]]( + // buckets from hashtable 0 List(0, 1), - List(2), List(3, 4), - List(0, 5), - List(6), - List(3, 7), - List(0, 8), - List(9), - List(3, 10) + // bucket from hashtable 1 + List(1, 2) ) } "elementsToBuckets" should "create correct map" in { val (buckets, _) = cc.makeBuckets() cc.elementsToBuckets(buckets) shouldEqual Map[Int, List[Int]]( - 0 -> List(0, 3, 6), - 1 -> List(0), - 2 -> List(1), - 3 -> List(2, 5, 8), - 4 -> List(2), - 5 -> List(3), - 6 -> List(4), - 7 -> List(5), - 8 -> List(6), - 9 -> List(7), - 10 -> List(8) + 0 -> List(0), + 1 -> List(0, 2), + 2 -> List(2), + 3 -> List(1), + 4 -> List(1) ) } @@ -69,11 +77,8 @@ class ConnectedComponentsSpec extends FlatSpec val (buckets, _) = cc.makeBuckets() val elementToBuckets = cc.elementsToBuckets(buckets) cc.findInBuckets(buckets, elementToBuckets) shouldEqual Map[Int, Set[Int]]( - 0 -> Set(3, 10, 7, 4), - 1 -> Set(9), - 2 -> Set(0, 8, 5, 1), - 3 -> Set(6), - 4 -> Set(2) + 0 -> Set(1, 2, 0), + 1 -> Set(3, 4) ) } } diff --git a/src/test/scala/tech/sourced/gemini/ReportSpec.scala b/src/test/scala/tech/sourced/gemini/ReportSpec.scala index c1402a97..e453604b 100644 --- a/src/test/scala/tech/sourced/gemini/ReportSpec.scala +++ b/src/test/scala/tech/sourced/gemini/ReportSpec.scala @@ -108,8 +108,8 @@ class ReportSpec extends FlatSpec val files = similarGroups.head.map(_.toString) files.toSeq should contain theSameElementsAs Seq( - "https://github.com/src-d/borges/blob/e784f9d5f59d5c081c5f8f71b6c517918b899df0/consumer_test.go", - "https://github.com/erizocosmico/borges/blob/b1fcd3bf0ba810c05cb418babc09cc7f7783cc03/consumer_test.go" + "https://github.com/erizocosmico/borges/blob/b1fcd3bf0ba810c05cb418babc09cc7f7783cc03/fixtures_test.go", + "https://github.com/src-d/borges/blob/e784f9d5f59d5c081c5f8f71b6c517918b899df0/fixtures_test.go" ) } From 98d4ef6b14c099249668ee2a6e20d6d543bfc46c Mon Sep 17 00:00:00 2001 From: Maxim Sukharev Date: Tue, 12 Feb 2019 13:32:12 +0100 Subject: [PATCH 05/10] fix incorrect order of buckets in parquet file Order of keys in map is random. But python code relies on indexes as element id. Signed-off-by: Maxim Sukharev --- src/main/scala/tech/sourced/gemini/Report.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/tech/sourced/gemini/Report.scala b/src/main/scala/tech/sourced/gemini/Report.scala index 06af45c3..e248fd38 100644 --- a/src/main/scala/tech/sourced/gemini/Report.scala +++ b/src/main/scala/tech/sourced/gemini/Report.scala @@ -253,7 +253,7 @@ class Report(conn: Session, log: Slf4jLogger, keyspace: String, tables: Tables) .withConf(parquetConf) .build() - elsToBuckets.foreach { case (_, bucket) => + elsToBuckets.toSeq.sortBy(_._1).foreach { case (_, bucket) => val record = new GenericRecordBuilder(schemaBuckets) .set("buckets", bucket.toArray) .build() From 63989c6aedcd2977fd30500125de78e8d7d78ccd Mon Sep 17 00:00:00 2001 From: Maxim Sukharev Date: Tue, 12 Feb 2019 18:43:20 +0100 Subject: [PATCH 06/10] add support for non-sequential element ids previous commits introduced filtering of elements that appear only in one bucket. But it breaks python logic. Signed-off-by: Maxim Sukharev --- .../community-detector/community_detector.py | 27 ++++++------ src/main/python/community-detector/report.py | 7 +-- .../test_community_detector.py | 43 ++++++++++++++++++- .../scala/tech/sourced/gemini/Report.scala | 4 +- 4 files changed, 60 insertions(+), 21 deletions(-) diff --git a/src/main/python/community-detector/community_detector.py b/src/main/python/community-detector/community_detector.py index 0b4e5db8..3bfc6920 100644 --- a/src/main/python/community-detector/community_detector.py +++ b/src/main/python/community-detector/community_detector.py @@ -9,11 +9,8 @@ def build_matrix(id_to_buckets): """Builds a CSR matrix from a list of lists of buckets - Same code as in Apollo ConnectedComponentsModel. - https://github.com/src-d/apollo/blob/f51c5a92c24cbedd54b9b30bab02f03e51fd27b3/apollo/graph.py#L28 - Args: - id_to_buckets: list of lists of buckets. The index is the element id + id_to_buckets: list of lists of elementid, buckets. Returns: A scipy.sparse.csr_matrix with the same contents @@ -22,14 +19,16 @@ def build_matrix(id_to_buckets): if len(id_to_buckets) == 0: return csr_matrix((0, 0), dtype=numpy.uint8) - data = numpy.ones(sum(map(len, id_to_buckets)), dtype=numpy.uint8) + max_el_id = max((item[0] for item in id_to_buckets)) + data = numpy.ones( + sum((len(item[1]) for item in id_to_buckets)), dtype=numpy.uint8) indices = numpy.zeros(len(data), dtype=numpy.uint32) - indptr = numpy.zeros(len(id_to_buckets) + 1, dtype=numpy.uint32) + indptr = numpy.zeros(max_el_id + 2, dtype=numpy.uint32) pos = 0 - for i, element in enumerate(id_to_buckets): - indices[pos:(pos + len(element))] = element - pos += len(element) - indptr[i + 1] = pos + for el_id, bucket in id_to_buckets: + indices[pos:(pos + len(bucket))] = bucket + pos += len(bucket) + indptr[el_id + 1:] = pos return csr_matrix((data, indices, indptr)) @@ -44,7 +43,7 @@ def detect_communities(ccs, https://github.com/src-d/apollo/blob/6b370b5f34ba9e31cf3310e70a2eff35dd978faa/apollo/graph.py#L191 Args: - ccs: list with the connected components. Index is the connected component, the + ccs: dict with the connected components. Index is the connected component, the value is the list of element ids buckets_matrix: scipy.sparse.csr_matrix with the buckets. One row for each element, with a column for each bucket. If the element is in a @@ -91,12 +90,12 @@ def detect_communities(ccs, fat_ccs.append(vertices) log.debug("Building %d graphs", len(fat_ccs)) + bucket_weights = buckets_matrix.sum(axis=0) for vertices in fat_ccs: if linear: edges = [] weights = [] - bucket_weights = buckets_matrix.sum(axis=0) buckets = set() for i in vertices: for j in range(buckindptr[i], buckindptr[i + 1]): @@ -113,8 +112,8 @@ def detect_communities(ccs, for j in range(buckindptr[i], buckindptr[i + 1]): buckets.add(buckindices[j]) for bucket in buckets: - buckverts = \ - buckmat_csc.indices[buckmat_csc.indptr[bucket]:buckmat_csc.indptr[bucket + 1]] + buckverts = buckmat_csc.indices[ + buckmat_csc.indptr[bucket]:buckmat_csc.indptr[bucket + 1]] for i, x in enumerate(buckverts): for y in buckverts: if x < y: diff --git a/src/main/python/community-detector/report.py b/src/main/python/community-detector/report.py index 9ec95cde..62fa52ea 100644 --- a/src/main/python/community-detector/report.py +++ b/src/main/python/community-detector/report.py @@ -13,11 +13,8 @@ def read_connected_components(filepath): def read_buckets_matrix(filepath): - dict = pq.read_table(filepath).to_pydict() - - id_to_buckets = dict['buckets'] - - return community_detector.build_matrix(id_to_buckets) + d = pq.read_table(filepath).to_pydict() + return community_detector.build_matrix(list(zip(d['elId'], d['buckets']))) def main(dirpath): diff --git a/src/main/python/community-detector/test_community_detector.py b/src/main/python/community-detector/test_community_detector.py index 08c16179..c4407ad9 100644 --- a/src/main/python/community-detector/test_community_detector.py +++ b/src/main/python/community-detector/test_community_detector.py @@ -6,7 +6,7 @@ from numpy.testing import assert_array_equal from scipy.sparse import csr_matrix -from community_detector import detect_communities +from community_detector import detect_communities, build_matrix dirname = os.path.dirname(__file__) @@ -50,6 +50,47 @@ def test_detect_communities(self): assert_array_equal(data, fixture_data) assert_array_equal(indptr, fixture_indptr) + def test_with_optimized_input(self): + # scala part would remove elements that appear only in 1 bucket + id_to_buckets = [ + [0, [0]], + [1, [0, 2]], + [5, [2]], + [6, [1]], + [7, [1]], + ] + ccs = { + 0: [1, 5, 0], + 1: [6, 7] + } + buckets = build_matrix(id_to_buckets) + communities = detect_communities(ccs, buckets) + + self.assertTrue(len(communities) == 3) + assert_array_equal(communities[0], [6, 7]) + assert_array_equal(communities[1], [1, 0]) + assert_array_equal(communities[2], [5]) + + # input without skipped ids should produce the same communites + id_to_buckets = [ + [0, [0]], + [1, [0, 2]], + [2, [2]], + [3, [1]], + [4, [1]], + ] + ccs = { + 0: [1, 2, 0], + 1: [3, 4] + } + buckets = build_matrix(id_to_buckets) + communities = detect_communities(ccs, buckets) + + self.assertTrue(len(communities) == 3) + assert_array_equal(communities[0], [3, 4]) + assert_array_equal(communities[1], [1, 0]) + assert_array_equal(communities[2], [2]) + if __name__ == '__main__': unittest.main() diff --git a/src/main/scala/tech/sourced/gemini/Report.scala b/src/main/scala/tech/sourced/gemini/Report.scala index e248fd38..36149e3e 100644 --- a/src/main/scala/tech/sourced/gemini/Report.scala +++ b/src/main/scala/tech/sourced/gemini/Report.scala @@ -240,6 +240,7 @@ class Report(conn: Session, log: Slf4jLogger, keyspace: String, tables: Tables) val schemaBuckets = SchemaBuilder .record("id_to_buckets") .fields() + .name("elId").`type`().intType().noDefault() .name("buckets").`type`().array().items().intType().noDefault() .endRecord() @@ -253,8 +254,9 @@ class Report(conn: Session, log: Slf4jLogger, keyspace: String, tables: Tables) .withConf(parquetConf) .build() - elsToBuckets.toSeq.sortBy(_._1).foreach { case (_, bucket) => + elsToBuckets.toSeq.sortBy(_._1).foreach { case (elId, bucket) => val record = new GenericRecordBuilder(schemaBuckets) + .set("elId", elId) .set("buckets", bucket.toArray) .build() From 1f89b4de6a080291fbda5880caf890465b2f45fb Mon Sep 17 00:00:00 2001 From: Maxim Sukharev Date: Tue, 12 Feb 2019 19:15:03 +0100 Subject: [PATCH 07/10] filter out buckets from communties because now python receives only elements appeared in more than 1 bucket it's possible that bucket id and element ids in scala will collide. Signed-off-by: Maxim Sukharev --- .../community-detector/community_detector.py | 3 ++- .../test_community_detector.py | 26 ++++++++----------- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/src/main/python/community-detector/community_detector.py b/src/main/python/community-detector/community_detector.py index 3bfc6920..01536e52 100644 --- a/src/main/python/community-detector/community_detector.py +++ b/src/main/python/community-detector/community_detector.py @@ -130,7 +130,8 @@ def detect_communities(ccs, log.debug("Launching the community detection") detector = CommunityDetector(algorithm=algorithm, config=algorithm_params) - communities.extend(chain.from_iterable((detector(g) for g in graphs))) + for community in chain.from_iterable((detector(g) for g in graphs)): + communities.append([i for i in community if i < total_nvertices]) if len(communities) > 0: log.debug("Overall communities: %d", len(communities)) diff --git a/src/main/python/community-detector/test_community_detector.py b/src/main/python/community-detector/test_community_detector.py index c4407ad9..1192a46a 100644 --- a/src/main/python/community-detector/test_community_detector.py +++ b/src/main/python/community-detector/test_community_detector.py @@ -31,24 +31,20 @@ def test_detect_communities(self): # Call community_detector communities = detect_communities(ccs, buckets) - # Replaces CommunitiesModel().construct(communities, ccsmodel.id_to_element).save(output) - size = sum(map(len, communities)) - data = numpy.zeros(size, dtype=numpy.uint32) - indptr = numpy.zeros(len(communities) + 1, dtype=numpy.int64) - pos = 0 - for i, community in enumerate(communities): - data[pos:pos + len(community)] = community - pos += len(community) - indptr[i + 1] = pos - # Read npz output with numpy.load("%s/fixtures/output.npz" % (dirname)) as output: - fixture_data = output['data'] fixture_indptr = output['indptr'] - - # Assert equality - assert_array_equal(data, fixture_data) - assert_array_equal(indptr, fixture_indptr) + fixture_data = output['data'] + fixture_communities = [] + for i in range(len(fixture_indptr) - 1): + ptr_from = fixture_indptr[i] + ptr_to = fixture_indptr[i + 1] + community = fixture_data[ptr_from:ptr_to] + # filter out buckets from fixture (apollo returns them) + fixture_communities.append( + [j for j in community if j < len(cc)]) + + assert_array_equal(communities, fixture_communities) def test_with_optimized_input(self): # scala part would remove elements that appear only in 1 bucket From 9a69a875d948786c46a276ac4fa4ceaba5d8eb7e Mon Sep 17 00:00:00 2001 From: Maxim Sukharev Date: Wed, 13 Feb 2019 12:05:40 +0100 Subject: [PATCH 08/10] Speedup duplicates calculation find dups in scala instead of a new query for each hash Signed-off-by: Maxim Sukharev --- .../scala/tech/sourced/gemini/Database.scala | 2 +- .../scala/tech/sourced/gemini/Report.scala | 23 ++++++++++++------- .../tech/sourced/gemini/cmd/ReportApp.scala | 2 +- 3 files changed, 17 insertions(+), 10 deletions(-) diff --git a/src/main/scala/tech/sourced/gemini/Database.scala b/src/main/scala/tech/sourced/gemini/Database.scala index dcb3f65a..2427b63d 100644 --- a/src/main/scala/tech/sourced/gemini/Database.scala +++ b/src/main/scala/tech/sourced/gemini/Database.scala @@ -53,7 +53,7 @@ object Database { conn.execute(query).asScala.map(rowToRepoFile(tables)) } - private def rowToRepoFile(tables: Tables)(row: Row): RepoFile = { + def rowToRepoFile(tables: Tables)(row: Row): RepoFile = { val cols = tables.metaCols RepoFile(row.getString(cols.repo), row.getString(cols.commit), row.getString(cols.path), row.getString(cols.sha)) } diff --git a/src/main/scala/tech/sourced/gemini/Report.scala b/src/main/scala/tech/sourced/gemini/Report.scala index 36149e3e..d44b16fd 100644 --- a/src/main/scala/tech/sourced/gemini/Report.scala +++ b/src/main/scala/tech/sourced/gemini/Report.scala @@ -65,6 +65,10 @@ class Report(conn: Session, log: Slf4jLogger, keyspace: String, tables: Tables) log.info(s"Finding ${mode} connected components") val cc = new DBConnectedComponents(log, conn, tables.hashtables(mode), keyspace) val (buckets, elementIds) = cc.makeBuckets() + if (buckets.isEmpty) { + return (Map[Int, Set[Int]](), Map[Int, List[Int]](), elementIds) + } + val elsToBuckets = cc.elementsToBuckets(buckets) val result = cc.findInBuckets(buckets, elsToBuckets) @@ -98,19 +102,18 @@ class Report(conn: Session, log: Slf4jLogger, keyspace: String, tables: Tables) */ def findAllDuplicateItems(): Iterable[Iterable[RepoFile]] = { val hash = tables.metaCols.sha - val distinctBlobHash = s"SELECT distinct $hash FROM $keyspace.${tables.meta}" + val distinctBlobHash = s"SELECT * FROM $keyspace.${tables.meta}" conn .execute(new SimpleStatement(distinctBlobHash)) .asScala - .flatMap { r => - val dupes = Database.findFilesByHash(r.getString(hash), conn, keyspace, tables) - if (dupes.size > 1) { - List(dupes) - } else { - List() - } + .foldLeft(Map[String, List[RepoFile]]()) { (map, row) => + val file = Database.rowToRepoFile(tables)(row) + val list = map.getOrElse(file.sha, List[RepoFile]()) :+ file + map + (file.sha -> list) } + .values + .filter(_.size > 1) } case class funcElem(sha1: String, name: String, line: String) @@ -185,6 +188,10 @@ class Report(conn: Session, log: Slf4jLogger, keyspace: String, tables: Tables) def findSimilarItems(ccDirPath: String, mode: String): Iterable[Iterable[SimilarItem]] = { val (connectedComponents, elsToBuckets, elementIds) = findConnectedComponents(mode) + if (connectedComponents.isEmpty) { + return Iterable[Iterable[SimilarItem]]() + } + saveConnectedComponents(connectedComponents, elsToBuckets, ccDirPath) log.info("Detecting communities in Python") diff --git a/src/main/scala/tech/sourced/gemini/cmd/ReportApp.scala b/src/main/scala/tech/sourced/gemini/cmd/ReportApp.scala index 0690c9e8..492ef121 100644 --- a/src/main/scala/tech/sourced/gemini/cmd/ReportApp.scala +++ b/src/main/scala/tech/sourced/gemini/cmd/ReportApp.scala @@ -109,7 +109,7 @@ object ReportApp extends App { if (similarities.isEmpty) { println(s"No similarities found.") } else { - similarities.foreach { community => + similarities.filter(_.nonEmpty).foreach { community => val count = community.size val typeName = community.head match { case SimilarFunc(_, _, _) => "functions" From 38849af2a9effa396828b01dac1f760d213e8287 Mon Sep 17 00:00:00 2001 From: Maxim Sukharev Date: Thu, 14 Feb 2019 13:00:24 +0100 Subject: [PATCH 09/10] apply review comments Signed-off-by: Maxim Sukharev --- .../community-detector/community_detector.py | 14 +++++++++----- src/main/scala/tech/sourced/gemini/Report.scala | 1 - 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/main/python/community-detector/community_detector.py b/src/main/python/community-detector/community_detector.py index 01536e52..d8e29acd 100644 --- a/src/main/python/community-detector/community_detector.py +++ b/src/main/python/community-detector/community_detector.py @@ -10,18 +10,22 @@ def build_matrix(id_to_buckets): """Builds a CSR matrix from a list of lists of buckets Args: - id_to_buckets: list of lists of elementid, buckets. + id_to_buckets: list of [elementid, buckets]. Returns: A scipy.sparse.csr_matrix with the same contents """ - if len(id_to_buckets) == 0: + if not id_to_buckets: return csr_matrix((0, 0), dtype=numpy.uint8) - max_el_id = max((item[0] for item in id_to_buckets)) - data = numpy.ones( - sum((len(item[1]) for item in id_to_buckets)), dtype=numpy.uint8) + max_el_id = 0 + data_size = 0 + for item in id_to_buckets: + max_el_id = max(max_el_id, item[0]) + data_size += len(item[1]) + + data = numpy.ones(data_size, dtype=numpy.uint8) indices = numpy.zeros(len(data), dtype=numpy.uint32) indptr = numpy.zeros(max_el_id + 2, dtype=numpy.uint32) pos = 0 diff --git a/src/main/scala/tech/sourced/gemini/Report.scala b/src/main/scala/tech/sourced/gemini/Report.scala index d44b16fd..2c0e3432 100644 --- a/src/main/scala/tech/sourced/gemini/Report.scala +++ b/src/main/scala/tech/sourced/gemini/Report.scala @@ -101,7 +101,6 @@ class Report(conn: Session, log: Slf4jLogger, keyspace: String, tables: Tables) * @return */ def findAllDuplicateItems(): Iterable[Iterable[RepoFile]] = { - val hash = tables.metaCols.sha val distinctBlobHash = s"SELECT * FROM $keyspace.${tables.meta}" conn From 611444f342bdbbdab19969e200c4a34cf5ccdf5b Mon Sep 17 00:00:00 2001 From: Maxim Sukharev Date: Thu, 14 Feb 2019 13:14:27 +0100 Subject: [PATCH 10/10] apply more code review comments Signed-off-by: Maxim Sukharev --- .../community-detector/community_detector.py | 11 +++++++--- .../test_community_detector.py | 20 +++++++++++++++++++ 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/src/main/python/community-detector/community_detector.py b/src/main/python/community-detector/community_detector.py index d8e29acd..b4825534 100644 --- a/src/main/python/community-detector/community_detector.py +++ b/src/main/python/community-detector/community_detector.py @@ -7,10 +7,10 @@ def build_matrix(id_to_buckets): - """Builds a CSR matrix from a list of lists of buckets + """Builds a CSR matrix from a list of [elementid, buckets] Args: - id_to_buckets: list of [elementid, buckets]. + id_to_buckets: sorted list of [elementid, buckets] by elementid. Returns: A scipy.sparse.csr_matrix with the same contents @@ -29,10 +29,15 @@ def build_matrix(id_to_buckets): indices = numpy.zeros(len(data), dtype=numpy.uint32) indptr = numpy.zeros(max_el_id + 2, dtype=numpy.uint32) pos = 0 + from_el_id = 0 for el_id, bucket in id_to_buckets: indices[pos:(pos + len(bucket))] = bucket + # fill gap from previous elem id to current el id with prev pos value + indptr[from_el_id + 1:el_id + 1] = pos pos += len(bucket) - indptr[el_id + 1:] = pos + indptr[el_id + 1] = pos + from_el_id = el_id + 1 + return csr_matrix((data, indices, indptr)) diff --git a/src/main/python/community-detector/test_community_detector.py b/src/main/python/community-detector/test_community_detector.py index 1192a46a..d964bfaf 100644 --- a/src/main/python/community-detector/test_community_detector.py +++ b/src/main/python/community-detector/test_community_detector.py @@ -87,6 +87,26 @@ def test_with_optimized_input(self): assert_array_equal(communities[1], [1, 0]) assert_array_equal(communities[2], [2]) + def test_start_with_not_zero_input(self): + id_to_buckets = [ + [2, [0]], + [3, [0, 2]], + [7, [2]], + [8, [1]], + [9, [1]], + ] + ccs = { + 0: [3, 7, 2], + 1: [8, 9] + } + buckets = build_matrix(id_to_buckets) + communities = detect_communities(ccs, buckets) + + self.assertTrue(len(communities) == 3) + assert_array_equal(communities[0], [8, 9]) + assert_array_equal(communities[1], [3, 2]) + assert_array_equal(communities[2], [7]) + if __name__ == '__main__': unittest.main()