From d36691325c5604b0e219ddf6b5d82d68bde0539c Mon Sep 17 00:00:00 2001 From: Maxim Sukharev Date: Mon, 28 Jan 2019 16:34:36 +0100 Subject: [PATCH 1/4] move feature frequencies to separate table Map field of cassanda/scylladb works great but on many repositories dictionary becomes too big and scylladb can't commit the row with default configuration: exception during mutation write to xx.xx.xx.xx: std::invalid_argument (Mutation of 45358979 bytes is too large for the maxiumum size of 16777216) it's possible to increase commit size but for really huge dataset the dictionary would exceed any reasonable limit. Signed-off-by: Maxim Sukharev --- src/main/resources/schema.cql | 3 ++- .../scala/tech/sourced/gemini/Database.scala | 21 +++++++++++----- .../scala/tech/sourced/gemini/FileQuery.scala | 25 ++++++++++++------- .../scala/tech/sourced/gemini/Gemini.scala | 6 +++-- src/main/scala/tech/sourced/gemini/Hash.scala | 21 ++++++++++------ .../scala/tech/sourced/gemini/Report.scala | 3 +-- .../tech/sourced/gemini/BaseDBSpec.scala | 20 +++++++++------ 7 files changed, 65 insertions(+), 34 deletions(-) diff --git a/src/main/resources/schema.cql b/src/main/resources/schema.cql index 9bce2c15..af74b566 100644 --- a/src/main/resources/schema.cql +++ b/src/main/resources/schema.cql @@ -3,4 +3,5 @@ USE __KEYSPACE__; CREATE TABLE IF NOT EXISTS __KEYSPACE__.meta (sha1 ascii, repo text, commit ascii, path text, PRIMARY KEY (sha1, repo, commit, path)); CREATE TABLE IF NOT EXISTS __KEYSPACE__.hashtables_file (sha1 text, hashtable tinyint, value blob, PRIMARY KEY (hashtable, value, sha1)); CREATE TABLE IF NOT EXISTS __KEYSPACE__.hashtables_func (sha1 text, hashtable tinyint, value blob, PRIMARY KEY (hashtable, value, sha1)); -CREATE TABLE IF NOT EXISTS __KEYSPACE__.docfreq (id text, docs int, df map, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS __KEYSPACE__.features_docs (id text, docs int, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS __KEYSPACE__.features_freq (id text, feature text, weight int, PRIMARY KEY (id, feature)); diff --git a/src/main/scala/tech/sourced/gemini/Database.scala b/src/main/scala/tech/sourced/gemini/Database.scala index 01ba0984..dcb3f65a 100644 --- a/src/main/scala/tech/sourced/gemini/Database.scala +++ b/src/main/scala/tech/sourced/gemini/Database.scala @@ -7,22 +7,31 @@ import scala.collection.JavaConverters._ case class MetaCols(sha: String, repo: String, commit: String, path: String) case class HashtablesCols(sha: String, hashtable: String, value: String) -case class DocFreqCols(id: String, docs: String, df: String) +case class FeaturesDocsCols(id: String, docs: String) +case class FeaturesFreqCols(id: String, feature: String, weight: String) /** * Tables is static typed definition of DB schema * * @param meta name of meta table - * @param hashtables name of hashtables table - * @param metaCols - * @param hashtablesCols + * @param hashtables prefix of hashtables table + * @param featuresDocs name of features documents table + * @param featuresFreq name of features frequencies table + * @param metaCols columns of meta table + * @param hashtablesCols columns of hashtables table + * @param featuresDocsCols columns of features documents table + * @param featuresFreqCols columns of features frequencies table */ case class Tables(meta: String, hashtables: String, - docFreq: String, + featuresDocs: String, + featuresFreq: String, metaCols: MetaCols, hashtablesCols: HashtablesCols, - docFreqCols: DocFreqCols) + featuresDocsCols: FeaturesDocsCols, + featuresFreqCols: FeaturesFreqCols) { + def hashtables(mode: String): String = s"${hashtables}_$mode" +} /** * Database object contains common queries to DB diff --git a/src/main/scala/tech/sourced/gemini/FileQuery.scala b/src/main/scala/tech/sourced/gemini/FileQuery.scala index 41148d71..51cc4788 100644 --- a/src/main/scala/tech/sourced/gemini/FileQuery.scala +++ b/src/main/scala/tech/sourced/gemini/FileQuery.scala @@ -135,7 +135,6 @@ class FileQuery( case Gemini.funcSimilarityMode => FeaturesHash.funcParams } - val hashtablesTable = s"${tables.hashtables}_${mode}" val cols = tables.hashtablesCols val wmh = hashFile(featuresList, docFreq, sampleSize) @@ -143,7 +142,7 @@ class FileQuery( log.info("Looking for similar items") val similar = bands.zipWithIndex.foldLeft(Set[String]()) { case (sim, (band, i)) => - val cql = s"""SELECT ${cols.sha} FROM $keyspace.${hashtablesTable} + val cql = s"""SELECT ${cols.sha} FROM $keyspace.${tables.hashtables(mode)} WHERE ${cols.hashtable}=$i AND ${cols.value}=0x${MathUtil.bytes2hex(band)}""" log.debug(cql) @@ -186,18 +185,26 @@ class FileQuery( protected def readDocFreqFromDB(): Option[OrderedDocFreq] = { log.info(s"Reading docFreq from DB") - val cols = tables.docFreqCols - val row = conn.execute(s"SELECT * FROM ${tables.docFreq} WHERE ${cols.id} = '${mode}'").one() - if (row == null) { + val docsCols = tables.featuresDocsCols + val freqCols = tables.featuresFreqCols + val docsRow = conn.execute(s"SELECT * FROM ${tables.featuresDocs} WHERE ${docsCols.id} = '$mode'").one() + if (docsRow == null) { log.warn("Document frequency table is empty.") None } else { - val df = row - .getMap("df", classOf[java.lang.String], classOf[java.lang.Integer]) + var tokens = IndexedSeq[String]() + val df = conn + .execute(s"SELECT * FROM ${tables.featuresFreq} WHERE ${freqCols.id} = '$mode'") .asScala - .mapValues(_.toInt) + .map { row => + // tokens have to be sorted, df.keys isn't sorted + val name = row.getString(freqCols.feature) + tokens = tokens :+ name - Some(OrderedDocFreq(row.getInt(cols.docs), df.keys.toIndexedSeq, df)) + (name, row.getInt(freqCols.weight)) + }.toMap + + Some(OrderedDocFreq(docsRow.getInt(docsCols.docs), tokens, df)) } } diff --git a/src/main/scala/tech/sourced/gemini/Gemini.scala b/src/main/scala/tech/sourced/gemini/Gemini.scala index a04d520a..b69af01a 100644 --- a/src/main/scala/tech/sourced/gemini/Gemini.scala +++ b/src/main/scala/tech/sourced/gemini/Gemini.scala @@ -198,10 +198,12 @@ object Gemini { val tables = Tables( "meta", "hashtables", - "docfreq", + "features_docs", + "features_freq", MetaCols("sha1", "repo", "commit", "path"), HashtablesCols("sha1", "hashtable", "value"), - DocFreqCols("id", "docs", "df") + FeaturesDocsCols("id", "docs"), + FeaturesFreqCols("id", "feature", "weight") ) val formatter = new ObjectInserter.Formatter diff --git a/src/main/scala/tech/sourced/gemini/Hash.scala b/src/main/scala/tech/sourced/gemini/Hash.scala index 71525e8d..af77ce0a 100644 --- a/src/main/scala/tech/sourced/gemini/Hash.scala +++ b/src/main/scala/tech/sourced/gemini/Hash.scala @@ -229,14 +229,22 @@ class Hash(session: SparkSession, protected def saveDocFreqToDB(docFreq: OrderedDocFreq, keyspace: String, tables: Tables): Unit = { log.warn(s"save document frequencies to DB") - CassandraConnector(session.sparkContext).withSessionDo { cassandra => - val cols = tables.docFreqCols - val javaMap = docFreq.df.asJava + CassandraConnector(session.sparkContext).withSessionDo { cassandra => + val docsCols = tables.featuresDocsCols cassandra.execute( - s"INSERT INTO $keyspace.${tables.docFreq} (${cols.id}, ${cols.docs}, ${cols.df}) VALUES (?, ?, ?)", - mode, int2Integer(docFreq.docs), javaMap + s"INSERT INTO $keyspace.${tables.featuresDocs} (${docsCols.id}, ${docsCols.docs}) VALUES (?, ?)", + mode, int2Integer(docFreq.docs) ) + + val freqCols = tables.featuresFreqCols + docFreq.df.foreach { case(feature, weight) => + cassandra.execute( + s"INSERT INTO $keyspace.${tables.featuresFreq}" + + s"(${freqCols.id}, ${freqCols.feature}, ${freqCols.weight}) VALUES (?, ?, ?)", + mode, feature, int2Integer(weight) + ) + } } } @@ -267,7 +275,6 @@ class Hash(session: SparkSession, case Gemini.funcSimilarityMode => FeaturesHash.funcParams } - val hashtablesTable = s"${tables.hashtables}_${mode}" val cols = tables.hashtablesCols rdd .flatMap { case RDDHash(doc, wmh) => @@ -276,7 +283,7 @@ class Hash(session: SparkSession, .toDF(cols.sha, cols.hashtable, cols.value) .write .mode("append") - .cassandraFormat(hashtablesTable, keyspace) + .cassandraFormat(tables.hashtables(mode), keyspace) .save() } diff --git a/src/main/scala/tech/sourced/gemini/Report.scala b/src/main/scala/tech/sourced/gemini/Report.scala index 0260848e..06af45c3 100644 --- a/src/main/scala/tech/sourced/gemini/Report.scala +++ b/src/main/scala/tech/sourced/gemini/Report.scala @@ -63,8 +63,7 @@ class Report(conn: Session, log: Slf4jLogger, keyspace: String, tables: Tables) */ def findConnectedComponents(mode: String): (Map[Int, Set[Int]], Map[Int, List[Int]], Map[String, Int]) = { log.info(s"Finding ${mode} connected components") - val hashtablesTable = s"${tables.hashtables}_${mode}" - val cc = new DBConnectedComponents(log, conn, hashtablesTable, keyspace) + val cc = new DBConnectedComponents(log, conn, tables.hashtables(mode), keyspace) val (buckets, elementIds) = cc.makeBuckets() val elsToBuckets = cc.elementsToBuckets(buckets) diff --git a/src/test/scala/tech/sourced/gemini/BaseDBSpec.scala b/src/test/scala/tech/sourced/gemini/BaseDBSpec.scala index ce52469b..617e693c 100644 --- a/src/test/scala/tech/sourced/gemini/BaseDBSpec.scala +++ b/src/test/scala/tech/sourced/gemini/BaseDBSpec.scala @@ -40,10 +40,9 @@ trait BaseDBSpec extends BeforeAndAfterAll { } def insertHashtables(items: Iterable[HashtableItem], mode: String): Unit = { - val hashtablesTable = s"${Gemini.tables.hashtables}_${mode}" val cols = Gemini.tables.hashtablesCols items.foreach { case HashtableItem(ht, v, sha1) => - val cql = s"""INSERT INTO $keyspace.${hashtablesTable} + val cql = s"""INSERT INTO $keyspace.${Gemini.tables.hashtables(mode)} (${cols.hashtable}, ${cols.value}, ${cols.sha}) VALUES ($ht, $v, '$sha1')""" cassandra.execute(cql) @@ -51,13 +50,20 @@ trait BaseDBSpec extends BeforeAndAfterAll { } def insertDocFreq(docFreq: OrderedDocFreq, mode: String): Unit = { - val cols = Gemini.tables.docFreqCols - val javaMap = docFreq.df.asJava - + val docsCols = Gemini.tables.featuresDocsCols cassandra.execute( - s"INSERT INTO $keyspace.${Gemini.tables.docFreq} (${cols.id}, ${cols.docs}, ${cols.df}) VALUES (?, ?, ?)", - mode, int2Integer(docFreq.docs), javaMap + s"INSERT INTO $keyspace.${Gemini.tables.featuresDocs} (${docsCols.id}, ${docsCols.docs}) VALUES (?, ?)", + mode, int2Integer(docFreq.docs) ) + + val freqCols = Gemini.tables.featuresFreqCols + docFreq.df.foreach { case(feature, weight) => + cassandra.execute( + s"INSERT INTO $keyspace.${Gemini.tables.featuresFreq}" + + s"(${freqCols.id}, ${freqCols.feature}, ${freqCols.weight}) VALUES (?, ?, ?)", + mode, feature, int2Integer(weight) + ) + } } override def afterAll(): Unit = { From 4623061d7c6e64b210cd7fbbacbcf059efb18c47 Mon Sep 17 00:00:00 2001 From: Maxim Sukharev Date: Tue, 29 Jan 2019 15:54:27 +0100 Subject: [PATCH 2/4] use prepared statement for docfreq insertion Signed-off-by: Maxim Sukharev --- src/main/scala/tech/sourced/gemini/Hash.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/main/scala/tech/sourced/gemini/Hash.scala b/src/main/scala/tech/sourced/gemini/Hash.scala index af77ce0a..175597b0 100644 --- a/src/main/scala/tech/sourced/gemini/Hash.scala +++ b/src/main/scala/tech/sourced/gemini/Hash.scala @@ -238,12 +238,11 @@ class Hash(session: SparkSession, ) val freqCols = tables.featuresFreqCols + val prepared = cassandra.prepare(s"INSERT INTO $keyspace.${tables.featuresFreq}" + + s"(${freqCols.id}, ${freqCols.feature}, ${freqCols.weight}) VALUES (?, ?, ?)") + docFreq.df.foreach { case(feature, weight) => - cassandra.execute( - s"INSERT INTO $keyspace.${tables.featuresFreq}" + - s"(${freqCols.id}, ${freqCols.feature}, ${freqCols.weight}) VALUES (?, ?, ?)", - mode, feature, int2Integer(weight) - ) + cassandra.execute(prepared.bind(mode, feature, int2Integer(weight))) } } } From 605b80c32893846883796cc5af66e3768046f3e3 Mon Sep 17 00:00:00 2001 From: Maxim Sukharev Date: Tue, 29 Jan 2019 16:03:31 +0100 Subject: [PATCH 3/4] add order by to cql query to make sorting explicit Signed-off-by: Maxim Sukharev --- src/main/scala/tech/sourced/gemini/FileQuery.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/tech/sourced/gemini/FileQuery.scala b/src/main/scala/tech/sourced/gemini/FileQuery.scala index 51cc4788..16400240 100644 --- a/src/main/scala/tech/sourced/gemini/FileQuery.scala +++ b/src/main/scala/tech/sourced/gemini/FileQuery.scala @@ -194,7 +194,7 @@ class FileQuery( } else { var tokens = IndexedSeq[String]() val df = conn - .execute(s"SELECT * FROM ${tables.featuresFreq} WHERE ${freqCols.id} = '$mode'") + .execute(s"SELECT * FROM ${tables.featuresFreq} WHERE ${freqCols.id} = '$mode' ORDER BY ${freqCols.feature}") .asScala .map { row => // tokens have to be sorted, df.keys isn't sorted From 3ae28ba7706f725d55ffc1198255b1834f8222ac Mon Sep 17 00:00:00 2001 From: Maxim Sukharev Date: Tue, 29 Jan 2019 17:22:03 +0100 Subject: [PATCH 4/4] solve conflicts after rebase Signed-off-by: Maxim Sukharev --- .../scala/tech/sourced/gemini/Gemini.scala | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/main/scala/tech/sourced/gemini/Gemini.scala b/src/main/scala/tech/sourced/gemini/Gemini.scala index b69af01a..95b799d1 100644 --- a/src/main/scala/tech/sourced/gemini/Gemini.scala +++ b/src/main/scala/tech/sourced/gemini/Gemini.scala @@ -130,13 +130,18 @@ class Gemini(session: SparkSession, log: Slf4jLogger, keyspace: String = Gemini. } def isDBEmpty(session: Session, mode: String): Boolean = { - var row = session.execute(s"select count(*) from $keyspace.${tables.docFreq} where id='$mode' limit 1").one() - if (row.getLong(0) > 0) { + var row = session.execute(s"select * from $keyspace.${tables.featuresDocs} where id='$mode' limit 1").one() + if (row != null) { return false } - row = session.execute(s"select count(*) from $keyspace.${tables.hashtables}_$mode").one() - if (row.getLong(0) > 0) { + row = session.execute(s"select * from $keyspace.${tables.featuresFreq} where id='$mode' limit 1").one() + if (row != null) { + return false + } + + row = session.execute(s"select * from $keyspace.${tables.hashtables}_$mode limit 1").one() + if (row != null) { return false } @@ -144,8 +149,9 @@ class Gemini(session: SparkSession, log: Slf4jLogger, keyspace: String = Gemini. } def cleanDB(session: Session, mode: String): Unit = { - session.execute(s"delete from $keyspace.${tables.docFreq} where id='$mode'") - session.execute(s"truncate table $keyspace.${tables.hashtables}_$mode") + session.execute(s"delete from $keyspace.${tables.featuresDocs} where id='$mode'") + session.execute(s"delete from $keyspace.${tables.featuresFreq} where id='$mode'") + session.execute(s"truncate table $keyspace.${tables.hashtables(mode)}") } def applySchema(session: Session): Unit = {