diff --git a/src/main/resources/schema.cql b/src/main/resources/schema.cql index 9bce2c15..af74b566 100644 --- a/src/main/resources/schema.cql +++ b/src/main/resources/schema.cql @@ -3,4 +3,5 @@ USE __KEYSPACE__; CREATE TABLE IF NOT EXISTS __KEYSPACE__.meta (sha1 ascii, repo text, commit ascii, path text, PRIMARY KEY (sha1, repo, commit, path)); CREATE TABLE IF NOT EXISTS __KEYSPACE__.hashtables_file (sha1 text, hashtable tinyint, value blob, PRIMARY KEY (hashtable, value, sha1)); CREATE TABLE IF NOT EXISTS __KEYSPACE__.hashtables_func (sha1 text, hashtable tinyint, value blob, PRIMARY KEY (hashtable, value, sha1)); -CREATE TABLE IF NOT EXISTS __KEYSPACE__.docfreq (id text, docs int, df map, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS __KEYSPACE__.features_docs (id text, docs int, PRIMARY KEY (id)); +CREATE TABLE IF NOT EXISTS __KEYSPACE__.features_freq (id text, feature text, weight int, PRIMARY KEY (id, feature)); diff --git a/src/main/scala/tech/sourced/gemini/Database.scala b/src/main/scala/tech/sourced/gemini/Database.scala index 01ba0984..dcb3f65a 100644 --- a/src/main/scala/tech/sourced/gemini/Database.scala +++ b/src/main/scala/tech/sourced/gemini/Database.scala @@ -7,22 +7,31 @@ import scala.collection.JavaConverters._ case class MetaCols(sha: String, repo: String, commit: String, path: String) case class HashtablesCols(sha: String, hashtable: String, value: String) -case class DocFreqCols(id: String, docs: String, df: String) +case class FeaturesDocsCols(id: String, docs: String) +case class FeaturesFreqCols(id: String, feature: String, weight: String) /** * Tables is static typed definition of DB schema * * @param meta name of meta table - * @param hashtables name of hashtables table - * @param metaCols - * @param hashtablesCols + * @param hashtables prefix of hashtables table + * @param featuresDocs name of features documents table + * @param featuresFreq name of features frequencies table + * @param metaCols columns of meta table + * @param hashtablesCols columns of hashtables table + * @param featuresDocsCols columns of features documents table + * @param featuresFreqCols columns of features frequencies table */ case class Tables(meta: String, hashtables: String, - docFreq: String, + featuresDocs: String, + featuresFreq: String, metaCols: MetaCols, hashtablesCols: HashtablesCols, - docFreqCols: DocFreqCols) + featuresDocsCols: FeaturesDocsCols, + featuresFreqCols: FeaturesFreqCols) { + def hashtables(mode: String): String = s"${hashtables}_$mode" +} /** * Database object contains common queries to DB diff --git a/src/main/scala/tech/sourced/gemini/FileQuery.scala b/src/main/scala/tech/sourced/gemini/FileQuery.scala index 41148d71..16400240 100644 --- a/src/main/scala/tech/sourced/gemini/FileQuery.scala +++ b/src/main/scala/tech/sourced/gemini/FileQuery.scala @@ -135,7 +135,6 @@ class FileQuery( case Gemini.funcSimilarityMode => FeaturesHash.funcParams } - val hashtablesTable = s"${tables.hashtables}_${mode}" val cols = tables.hashtablesCols val wmh = hashFile(featuresList, docFreq, sampleSize) @@ -143,7 +142,7 @@ class FileQuery( log.info("Looking for similar items") val similar = bands.zipWithIndex.foldLeft(Set[String]()) { case (sim, (band, i)) => - val cql = s"""SELECT ${cols.sha} FROM $keyspace.${hashtablesTable} + val cql = s"""SELECT ${cols.sha} FROM $keyspace.${tables.hashtables(mode)} WHERE ${cols.hashtable}=$i AND ${cols.value}=0x${MathUtil.bytes2hex(band)}""" log.debug(cql) @@ -186,18 +185,26 @@ class FileQuery( protected def readDocFreqFromDB(): Option[OrderedDocFreq] = { log.info(s"Reading docFreq from DB") - val cols = tables.docFreqCols - val row = conn.execute(s"SELECT * FROM ${tables.docFreq} WHERE ${cols.id} = '${mode}'").one() - if (row == null) { + val docsCols = tables.featuresDocsCols + val freqCols = tables.featuresFreqCols + val docsRow = conn.execute(s"SELECT * FROM ${tables.featuresDocs} WHERE ${docsCols.id} = '$mode'").one() + if (docsRow == null) { log.warn("Document frequency table is empty.") None } else { - val df = row - .getMap("df", classOf[java.lang.String], classOf[java.lang.Integer]) + var tokens = IndexedSeq[String]() + val df = conn + .execute(s"SELECT * FROM ${tables.featuresFreq} WHERE ${freqCols.id} = '$mode' ORDER BY ${freqCols.feature}") .asScala - .mapValues(_.toInt) + .map { row => + // tokens have to be sorted, df.keys isn't sorted + val name = row.getString(freqCols.feature) + tokens = tokens :+ name - Some(OrderedDocFreq(row.getInt(cols.docs), df.keys.toIndexedSeq, df)) + (name, row.getInt(freqCols.weight)) + }.toMap + + Some(OrderedDocFreq(docsRow.getInt(docsCols.docs), tokens, df)) } } diff --git a/src/main/scala/tech/sourced/gemini/Gemini.scala b/src/main/scala/tech/sourced/gemini/Gemini.scala index a04d520a..95b799d1 100644 --- a/src/main/scala/tech/sourced/gemini/Gemini.scala +++ b/src/main/scala/tech/sourced/gemini/Gemini.scala @@ -130,13 +130,18 @@ class Gemini(session: SparkSession, log: Slf4jLogger, keyspace: String = Gemini. } def isDBEmpty(session: Session, mode: String): Boolean = { - var row = session.execute(s"select count(*) from $keyspace.${tables.docFreq} where id='$mode' limit 1").one() - if (row.getLong(0) > 0) { + var row = session.execute(s"select * from $keyspace.${tables.featuresDocs} where id='$mode' limit 1").one() + if (row != null) { return false } - row = session.execute(s"select count(*) from $keyspace.${tables.hashtables}_$mode").one() - if (row.getLong(0) > 0) { + row = session.execute(s"select * from $keyspace.${tables.featuresFreq} where id='$mode' limit 1").one() + if (row != null) { + return false + } + + row = session.execute(s"select * from $keyspace.${tables.hashtables}_$mode limit 1").one() + if (row != null) { return false } @@ -144,8 +149,9 @@ class Gemini(session: SparkSession, log: Slf4jLogger, keyspace: String = Gemini. } def cleanDB(session: Session, mode: String): Unit = { - session.execute(s"delete from $keyspace.${tables.docFreq} where id='$mode'") - session.execute(s"truncate table $keyspace.${tables.hashtables}_$mode") + session.execute(s"delete from $keyspace.${tables.featuresDocs} where id='$mode'") + session.execute(s"delete from $keyspace.${tables.featuresFreq} where id='$mode'") + session.execute(s"truncate table $keyspace.${tables.hashtables(mode)}") } def applySchema(session: Session): Unit = { @@ -198,10 +204,12 @@ object Gemini { val tables = Tables( "meta", "hashtables", - "docfreq", + "features_docs", + "features_freq", MetaCols("sha1", "repo", "commit", "path"), HashtablesCols("sha1", "hashtable", "value"), - DocFreqCols("id", "docs", "df") + FeaturesDocsCols("id", "docs"), + FeaturesFreqCols("id", "feature", "weight") ) val formatter = new ObjectInserter.Formatter diff --git a/src/main/scala/tech/sourced/gemini/Hash.scala b/src/main/scala/tech/sourced/gemini/Hash.scala index 71525e8d..175597b0 100644 --- a/src/main/scala/tech/sourced/gemini/Hash.scala +++ b/src/main/scala/tech/sourced/gemini/Hash.scala @@ -229,14 +229,21 @@ class Hash(session: SparkSession, protected def saveDocFreqToDB(docFreq: OrderedDocFreq, keyspace: String, tables: Tables): Unit = { log.warn(s"save document frequencies to DB") - CassandraConnector(session.sparkContext).withSessionDo { cassandra => - val cols = tables.docFreqCols - val javaMap = docFreq.df.asJava + CassandraConnector(session.sparkContext).withSessionDo { cassandra => + val docsCols = tables.featuresDocsCols cassandra.execute( - s"INSERT INTO $keyspace.${tables.docFreq} (${cols.id}, ${cols.docs}, ${cols.df}) VALUES (?, ?, ?)", - mode, int2Integer(docFreq.docs), javaMap + s"INSERT INTO $keyspace.${tables.featuresDocs} (${docsCols.id}, ${docsCols.docs}) VALUES (?, ?)", + mode, int2Integer(docFreq.docs) ) + + val freqCols = tables.featuresFreqCols + val prepared = cassandra.prepare(s"INSERT INTO $keyspace.${tables.featuresFreq}" + + s"(${freqCols.id}, ${freqCols.feature}, ${freqCols.weight}) VALUES (?, ?, ?)") + + docFreq.df.foreach { case(feature, weight) => + cassandra.execute(prepared.bind(mode, feature, int2Integer(weight))) + } } } @@ -267,7 +274,6 @@ class Hash(session: SparkSession, case Gemini.funcSimilarityMode => FeaturesHash.funcParams } - val hashtablesTable = s"${tables.hashtables}_${mode}" val cols = tables.hashtablesCols rdd .flatMap { case RDDHash(doc, wmh) => @@ -276,7 +282,7 @@ class Hash(session: SparkSession, .toDF(cols.sha, cols.hashtable, cols.value) .write .mode("append") - .cassandraFormat(hashtablesTable, keyspace) + .cassandraFormat(tables.hashtables(mode), keyspace) .save() } diff --git a/src/main/scala/tech/sourced/gemini/Report.scala b/src/main/scala/tech/sourced/gemini/Report.scala index 0260848e..06af45c3 100644 --- a/src/main/scala/tech/sourced/gemini/Report.scala +++ b/src/main/scala/tech/sourced/gemini/Report.scala @@ -63,8 +63,7 @@ class Report(conn: Session, log: Slf4jLogger, keyspace: String, tables: Tables) */ def findConnectedComponents(mode: String): (Map[Int, Set[Int]], Map[Int, List[Int]], Map[String, Int]) = { log.info(s"Finding ${mode} connected components") - val hashtablesTable = s"${tables.hashtables}_${mode}" - val cc = new DBConnectedComponents(log, conn, hashtablesTable, keyspace) + val cc = new DBConnectedComponents(log, conn, tables.hashtables(mode), keyspace) val (buckets, elementIds) = cc.makeBuckets() val elsToBuckets = cc.elementsToBuckets(buckets) diff --git a/src/test/scala/tech/sourced/gemini/BaseDBSpec.scala b/src/test/scala/tech/sourced/gemini/BaseDBSpec.scala index ce52469b..617e693c 100644 --- a/src/test/scala/tech/sourced/gemini/BaseDBSpec.scala +++ b/src/test/scala/tech/sourced/gemini/BaseDBSpec.scala @@ -40,10 +40,9 @@ trait BaseDBSpec extends BeforeAndAfterAll { } def insertHashtables(items: Iterable[HashtableItem], mode: String): Unit = { - val hashtablesTable = s"${Gemini.tables.hashtables}_${mode}" val cols = Gemini.tables.hashtablesCols items.foreach { case HashtableItem(ht, v, sha1) => - val cql = s"""INSERT INTO $keyspace.${hashtablesTable} + val cql = s"""INSERT INTO $keyspace.${Gemini.tables.hashtables(mode)} (${cols.hashtable}, ${cols.value}, ${cols.sha}) VALUES ($ht, $v, '$sha1')""" cassandra.execute(cql) @@ -51,13 +50,20 @@ trait BaseDBSpec extends BeforeAndAfterAll { } def insertDocFreq(docFreq: OrderedDocFreq, mode: String): Unit = { - val cols = Gemini.tables.docFreqCols - val javaMap = docFreq.df.asJava - + val docsCols = Gemini.tables.featuresDocsCols cassandra.execute( - s"INSERT INTO $keyspace.${Gemini.tables.docFreq} (${cols.id}, ${cols.docs}, ${cols.df}) VALUES (?, ?, ?)", - mode, int2Integer(docFreq.docs), javaMap + s"INSERT INTO $keyspace.${Gemini.tables.featuresDocs} (${docsCols.id}, ${docsCols.docs}) VALUES (?, ?)", + mode, int2Integer(docFreq.docs) ) + + val freqCols = Gemini.tables.featuresFreqCols + docFreq.df.foreach { case(feature, weight) => + cassandra.execute( + s"INSERT INTO $keyspace.${Gemini.tables.featuresFreq}" + + s"(${freqCols.id}, ${freqCols.feature}, ${freqCols.weight}) VALUES (?, ?, ?)", + mode, feature, int2Integer(weight) + ) + } } override def afterAll(): Unit = {