From 0f5b0f896f62a855a32953d7a0c429e53af45df4 Mon Sep 17 00:00:00 2001 From: sezruby Date: Wed, 6 Jan 2021 17:29:30 +0900 Subject: [PATCH 1/2] Quick fix for hybrid scan rank algorithm --- .../microsoft/hyperspace/index/rules/RuleUtils.scala | 2 +- .../hyperspace/index/rules/RuleUtilsTest.scala | 10 +++++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala index 4d2a05bb3..45dc96de1 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala @@ -98,7 +98,7 @@ object RuleUtils { val isCandidate = isDeleteCandidate || isAppendOnlyCandidate if (isCandidate) { - entry.setTagValue(IndexLogEntryTags.COMMON_SOURCE_SIZE_IN_BYTES, commonBytes) + entry.setTagValue(plan, IndexLogEntryTags.COMMON_SOURCE_SIZE_IN_BYTES, commonBytes) // If there is no change in source dataset, the index will be applied by // transformPlanToUseIndexOnlyScan. diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala index bbfd213d5..aee4b4612 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala @@ -121,9 +121,11 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { val df = spark.range(1, 5).toDF("id") val dataPath = tempPath.getAbsolutePath df.write.parquet(dataPath) + var expectedCommonSourceBytes = 0L withIndex("index1") { val readDf = spark.read.parquet(dataPath) + expectedCommonSourceBytes = FileUtils.getDirectorySize(new Path(dataPath)) withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") { indexManager.create(readDf, IndexConfig("index1", Seq("id"))) } @@ -147,6 +149,10 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { assert( indexes.head.getTagValue(plan, IndexLogEntryTags.HYBRIDSCAN_REQUIRED) === expectedHybridScanTag) + assert( + !hybridScanEnabled || indexes.head + .getTagValue(plan, IndexLogEntryTags.COMMON_SOURCE_SIZE_IN_BYTES) === Some( + expectedCommonSourceBytes)) } else { assert(indexes.isEmpty) } @@ -191,7 +197,9 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { } // Scenario #2: Delete 1 file. - FileUtils.delete(new Path(readDf.inputFiles.head)) + val deleteFilePath = new Path(readDf.inputFiles.head) + expectedCommonSourceBytes -= FileUtils.getDirectorySize(deleteFilePath) + FileUtils.delete(deleteFilePath) { val optimizedPlan = spark.read.parquet(dataPath).queryExecution.optimizedPlan From 1d77755e76cfd7e75f0727f03ff64d3f3dff844d Mon Sep 17 00:00:00 2001 From: sezruby Date: Thu, 7 Jan 2021 12:04:47 +0900 Subject: [PATCH 2/2] Review commit --- .../index/rankers/FilterIndexRankerTest.scala | 12 +++--- .../index/rankers/JoinIndexRankerTest.scala | 16 ++++---- .../index/rules/HyperspaceRuleTestSuite.scala | 5 ++- .../index/rules/RuleUtilsTest.scala | 41 +++++++++++-------- 4 files changed, 43 insertions(+), 31 deletions(-) diff --git a/src/test/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRankerTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRankerTest.scala index 1b0f0d7e8..9f57c5c77 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRankerTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRankerTest.scala @@ -47,11 +47,11 @@ class FilterIndexRankerTest extends HyperspaceRuleTestSuite { test("rank() should return the head of the list by default.") { val ind1 = createIndexLogEntry("ind1", Seq(t1c1), Seq(t1c2), tempPlan, writeLog = false) - setCommonBytesTags(ind1, tempPlan, Nil) + setCommonSourceSizeInBytesTag(ind1, tempPlan, Nil) val ind2 = createIndexLogEntry("ind2", Seq(t1c1), Seq(t1c2), tempPlan, writeLog = false) - setCommonBytesTags(ind2, tempPlan, Nil) + setCommonSourceSizeInBytesTag(ind2, tempPlan, Nil) val ind3 = createIndexLogEntry("ind3", Seq(t2c1), Seq(t2c2), tempPlan, writeLog = false) - setCommonBytesTags(ind3, tempPlan, Nil) + setCommonSourceSizeInBytesTag(ind3, tempPlan, Nil) val indexes = Seq(ind1, ind2, ind3) assert(FilterIndexRanker.rank(spark, tempPlan, indexes).get.equals(ind1)) @@ -71,7 +71,7 @@ class FilterIndexRankerTest extends HyperspaceRuleTestSuite { tempPlan, inputFiles = fileList1, writeLog = false) - setCommonBytesTags(ind1, tempPlan, fileList1) + setCommonSourceSizeInBytesTag(ind1, tempPlan, fileList1) val ind2 = createIndexLogEntry( "ind2", Seq(t1c1), @@ -79,7 +79,7 @@ class FilterIndexRankerTest extends HyperspaceRuleTestSuite { tempPlan, inputFiles = fileList1 ++ fileList2, writeLog = false) - setCommonBytesTags(ind2, tempPlan, fileList1 ++ fileList2) + setCommonSourceSizeInBytesTag(ind2, tempPlan, fileList1 ++ fileList2) val ind3 = createIndexLogEntry( "ind3", Seq(t2c1), @@ -87,7 +87,7 @@ class FilterIndexRankerTest extends HyperspaceRuleTestSuite { tempPlan, inputFiles = fileList2, writeLog = false) - setCommonBytesTags(ind3, tempPlan, fileList2) + setCommonSourceSizeInBytesTag(ind3, tempPlan, fileList2) val indexes = Seq(ind1, ind2, ind3) diff --git a/src/test/scala/com/microsoft/hyperspace/index/rankers/JoinIndexRankerTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rankers/JoinIndexRankerTest.scala index ab106e71f..beb36492d 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rankers/JoinIndexRankerTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rankers/JoinIndexRankerTest.scala @@ -79,13 +79,13 @@ class JoinIndexRankerTest extends HyperspaceRuleTestSuite with SQLHelper { val fileList1 = Seq(FileInfo("a", 1, 1, 1), FileInfo("b", 2, 1, 2)) val fileList2 = Seq(FileInfo("c", 1, 1, 3), FileInfo("d", 1, 1, 4)) val l_10 = createIndexLogEntry("l1", Seq(t1c1), Seq(t1c2), leftPlan, 10, fileList1, false) - setCommonBytesTags(l_10, leftPlan, fileList1) + setCommonSourceSizeInBytesTag(l_10, leftPlan, fileList1) val l_20 = createIndexLogEntry("l2", Seq(t1c1), Seq(t1c2), leftPlan, 20, fileList2, false) - setCommonBytesTags(l_20, leftPlan, fileList2) + setCommonSourceSizeInBytesTag(l_20, leftPlan, fileList2) val r_10 = createIndexLogEntry("r1", Seq(t2c1), Seq(t2c2), rightPlan, 10, fileList1, false) - setCommonBytesTags(r_10, rightPlan, fileList1) + setCommonSourceSizeInBytesTag(r_10, rightPlan, fileList1) val r_20 = createIndexLogEntry("r2", Seq(t2c1), Seq(t2c2), rightPlan, 20, fileList2, false) - setCommonBytesTags(r_20, rightPlan, fileList2) + setCommonSourceSizeInBytesTag(r_20, rightPlan, fileList2) { // Test rank algorithm without Hybrid Scan. @@ -108,13 +108,13 @@ class JoinIndexRankerTest extends HyperspaceRuleTestSuite with SQLHelper { { // If both indexes have the same amount of common bytes, follow the original algorithm. val l_10 = createIndexLogEntry("l1", Seq(t1c1), Seq(t1c2), leftPlan, 10, fileList1, false) - setCommonBytesTags(l_10, leftPlan, fileList1) + setCommonSourceSizeInBytesTag(l_10, leftPlan, fileList1) val l_20 = createIndexLogEntry("l2", Seq(t1c1), Seq(t1c2), leftPlan, 20, fileList1, false) - setCommonBytesTags(l_20, leftPlan, fileList1) + setCommonSourceSizeInBytesTag(l_20, leftPlan, fileList1) val r_10 = createIndexLogEntry("r1", Seq(t2c1), Seq(t2c2), rightPlan, 10, fileList1, false) - setCommonBytesTags(r_10, rightPlan, fileList1) + setCommonSourceSizeInBytesTag(r_10, rightPlan, fileList1) val r_20 = createIndexLogEntry("r2", Seq(t2c1), Seq(t2c2), rightPlan, 20, fileList1, false) - setCommonBytesTags(r_20, rightPlan, fileList1) + setCommonSourceSizeInBytesTag(r_20, rightPlan, fileList1) val indexPairs = Seq((l_10, r_10), (l_10, r_20), (l_20, r_20)) diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleTestSuite.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleTestSuite.scala index 19da1d5a1..781847dc4 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleTestSuite.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleTestSuite.scala @@ -101,7 +101,10 @@ trait HyperspaceRuleTestSuite extends HyperspaceSuite { def getIndexRootPath(indexName: String): Path = new Path(systemPath, indexName) - def setCommonBytesTags(index: IndexLogEntry, plan: LogicalPlan, files: Seq[FileInfo]): Unit = { + def setCommonSourceSizeInBytesTag( + index: IndexLogEntry, + plan: LogicalPlan, + files: Seq[FileInfo]): Unit = { val commonBytes = files.foldLeft(0L) { (bytes, f) => bytes + f.size } diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala index aee4b4612..03a86850f 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala @@ -121,11 +121,10 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { val df = spark.range(1, 5).toDF("id") val dataPath = tempPath.getAbsolutePath df.write.parquet(dataPath) - var expectedCommonSourceBytes = 0L withIndex("index1") { val readDf = spark.read.parquet(dataPath) - expectedCommonSourceBytes = FileUtils.getDirectorySize(new Path(dataPath)) + val expectedCommonSourceBytes = FileUtils.getDirectorySize(new Path(dataPath)) withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") { indexManager.create(readDf, IndexConfig("index1", Seq("id"))) } @@ -136,7 +135,8 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { hybridScanEnabled: Boolean, hybridScanDeleteEnabled: Boolean, expectCandidateIndex: Boolean, - expectedHybridScanTag: Option[Boolean]): Unit = { + expectedHybridScanTag: Option[Boolean], + expectedCommonSourceBytes: Option[Long]): Unit = { withSQLConf( "spark.hyperspace.index.hybridscan.enabled" -> hybridScanEnabled.toString, "spark.hyperspace.index.hybridscan.delete.enabled" -> @@ -150,9 +150,8 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { indexes.head.getTagValue(plan, IndexLogEntryTags.HYBRIDSCAN_REQUIRED) === expectedHybridScanTag) assert( - !hybridScanEnabled || indexes.head - .getTagValue(plan, IndexLogEntryTags.COMMON_SOURCE_SIZE_IN_BYTES) === Some( - expectedCommonSourceBytes)) + indexes.head.getTagValue(plan, IndexLogEntryTags.COMMON_SOURCE_SIZE_IN_BYTES) + === expectedCommonSourceBytes) } else { assert(indexes.isEmpty) } @@ -168,13 +167,15 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { hybridScanEnabled = false, hybridScanDeleteEnabled = false, expectCandidateIndex = true, - expectedHybridScanTag = None) + expectedHybridScanTag = None, + expectedCommonSourceBytes = None) verify( optimizedPlan, hybridScanEnabled = true, hybridScanDeleteEnabled = false, expectCandidateIndex = true, - expectedHybridScanTag = Some(false)) + expectedHybridScanTag = Some(false), + expectedCommonSourceBytes = Some(expectedCommonSourceBytes)) } // Scenario #1: Append new files. @@ -187,18 +188,21 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { hybridScanEnabled = false, hybridScanDeleteEnabled = false, expectCandidateIndex = false, - expectedHybridScanTag = None) + expectedHybridScanTag = None, + expectedCommonSourceBytes = None) verify( optimizedPlan, hybridScanEnabled = true, hybridScanDeleteEnabled = false, expectCandidateIndex = true, - expectedHybridScanTag = Some(true)) + expectedHybridScanTag = Some(true), + expectedCommonSourceBytes = Some(expectedCommonSourceBytes)) } // Scenario #2: Delete 1 file. val deleteFilePath = new Path(readDf.inputFiles.head) - expectedCommonSourceBytes -= FileUtils.getDirectorySize(deleteFilePath) + val updatedExpectedCommonSourceBytes = expectedCommonSourceBytes - FileUtils + .getDirectorySize(deleteFilePath) FileUtils.delete(deleteFilePath) { @@ -208,19 +212,22 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { hybridScanEnabled = false, hybridScanDeleteEnabled = false, expectCandidateIndex = false, - expectedHybridScanTag = None) + expectedHybridScanTag = None, + expectedCommonSourceBytes = None) verify( optimizedPlan, hybridScanEnabled = true, hybridScanDeleteEnabled = false, expectCandidateIndex = false, - expectedHybridScanTag = None) + expectedHybridScanTag = None, + expectedCommonSourceBytes = None) verify( optimizedPlan, hybridScanEnabled = true, hybridScanDeleteEnabled = true, expectCandidateIndex = true, - expectedHybridScanTag = Some(true)) + expectedHybridScanTag = Some(true), + expectedCommonSourceBytes = Some(updatedExpectedCommonSourceBytes)) } // Scenario #3: Replace all files. @@ -233,13 +240,15 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { hybridScanEnabled = false, hybridScanDeleteEnabled = false, expectCandidateIndex = false, - expectedHybridScanTag = None) + expectedHybridScanTag = None, + expectedCommonSourceBytes = None) verify( optimizedPlan, hybridScanEnabled = true, hybridScanDeleteEnabled = true, expectCandidateIndex = false, - expectedHybridScanTag = None) + expectedHybridScanTag = None, + expectedCommonSourceBytes = None) } } }