diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala index 4d2a05bb3..45dc96de1 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala @@ -98,7 +98,7 @@ object RuleUtils { val isCandidate = isDeleteCandidate || isAppendOnlyCandidate if (isCandidate) { - entry.setTagValue(IndexLogEntryTags.COMMON_SOURCE_SIZE_IN_BYTES, commonBytes) + entry.setTagValue(plan, IndexLogEntryTags.COMMON_SOURCE_SIZE_IN_BYTES, commonBytes) // If there is no change in source dataset, the index will be applied by // transformPlanToUseIndexOnlyScan. diff --git a/src/test/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRankerTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRankerTest.scala index 1b0f0d7e8..9f57c5c77 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRankerTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rankers/FilterIndexRankerTest.scala @@ -47,11 +47,11 @@ class FilterIndexRankerTest extends HyperspaceRuleTestSuite { test("rank() should return the head of the list by default.") { val ind1 = createIndexLogEntry("ind1", Seq(t1c1), Seq(t1c2), tempPlan, writeLog = false) - setCommonBytesTags(ind1, tempPlan, Nil) + setCommonSourceSizeInBytesTag(ind1, tempPlan, Nil) val ind2 = createIndexLogEntry("ind2", Seq(t1c1), Seq(t1c2), tempPlan, writeLog = false) - setCommonBytesTags(ind2, tempPlan, Nil) + setCommonSourceSizeInBytesTag(ind2, tempPlan, Nil) val ind3 = createIndexLogEntry("ind3", Seq(t2c1), Seq(t2c2), tempPlan, writeLog = false) - setCommonBytesTags(ind3, tempPlan, Nil) + setCommonSourceSizeInBytesTag(ind3, tempPlan, Nil) val indexes = Seq(ind1, ind2, ind3) assert(FilterIndexRanker.rank(spark, tempPlan, indexes).get.equals(ind1)) @@ -71,7 +71,7 @@ class FilterIndexRankerTest extends HyperspaceRuleTestSuite { tempPlan, inputFiles = fileList1, writeLog = false) - setCommonBytesTags(ind1, tempPlan, fileList1) + setCommonSourceSizeInBytesTag(ind1, tempPlan, fileList1) val ind2 = createIndexLogEntry( "ind2", Seq(t1c1), @@ -79,7 +79,7 @@ class FilterIndexRankerTest extends HyperspaceRuleTestSuite { tempPlan, inputFiles = fileList1 ++ fileList2, writeLog = false) - setCommonBytesTags(ind2, tempPlan, fileList1 ++ fileList2) + setCommonSourceSizeInBytesTag(ind2, tempPlan, fileList1 ++ fileList2) val ind3 = createIndexLogEntry( "ind3", Seq(t2c1), @@ -87,7 +87,7 @@ class FilterIndexRankerTest extends HyperspaceRuleTestSuite { tempPlan, inputFiles = fileList2, writeLog = false) - setCommonBytesTags(ind3, tempPlan, fileList2) + setCommonSourceSizeInBytesTag(ind3, tempPlan, fileList2) val indexes = Seq(ind1, ind2, ind3) diff --git a/src/test/scala/com/microsoft/hyperspace/index/rankers/JoinIndexRankerTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rankers/JoinIndexRankerTest.scala index ab106e71f..beb36492d 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rankers/JoinIndexRankerTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rankers/JoinIndexRankerTest.scala @@ -79,13 +79,13 @@ class JoinIndexRankerTest extends HyperspaceRuleTestSuite with SQLHelper { val fileList1 = Seq(FileInfo("a", 1, 1, 1), FileInfo("b", 2, 1, 2)) val fileList2 = Seq(FileInfo("c", 1, 1, 3), FileInfo("d", 1, 1, 4)) val l_10 = createIndexLogEntry("l1", Seq(t1c1), Seq(t1c2), leftPlan, 10, fileList1, false) - setCommonBytesTags(l_10, leftPlan, fileList1) + setCommonSourceSizeInBytesTag(l_10, leftPlan, fileList1) val l_20 = createIndexLogEntry("l2", Seq(t1c1), Seq(t1c2), leftPlan, 20, fileList2, false) - setCommonBytesTags(l_20, leftPlan, fileList2) + setCommonSourceSizeInBytesTag(l_20, leftPlan, fileList2) val r_10 = createIndexLogEntry("r1", Seq(t2c1), Seq(t2c2), rightPlan, 10, fileList1, false) - setCommonBytesTags(r_10, rightPlan, fileList1) + setCommonSourceSizeInBytesTag(r_10, rightPlan, fileList1) val r_20 = createIndexLogEntry("r2", Seq(t2c1), Seq(t2c2), rightPlan, 20, fileList2, false) - setCommonBytesTags(r_20, rightPlan, fileList2) + setCommonSourceSizeInBytesTag(r_20, rightPlan, fileList2) { // Test rank algorithm without Hybrid Scan. @@ -108,13 +108,13 @@ class JoinIndexRankerTest extends HyperspaceRuleTestSuite with SQLHelper { { // If both indexes have the same amount of common bytes, follow the original algorithm. val l_10 = createIndexLogEntry("l1", Seq(t1c1), Seq(t1c2), leftPlan, 10, fileList1, false) - setCommonBytesTags(l_10, leftPlan, fileList1) + setCommonSourceSizeInBytesTag(l_10, leftPlan, fileList1) val l_20 = createIndexLogEntry("l2", Seq(t1c1), Seq(t1c2), leftPlan, 20, fileList1, false) - setCommonBytesTags(l_20, leftPlan, fileList1) + setCommonSourceSizeInBytesTag(l_20, leftPlan, fileList1) val r_10 = createIndexLogEntry("r1", Seq(t2c1), Seq(t2c2), rightPlan, 10, fileList1, false) - setCommonBytesTags(r_10, rightPlan, fileList1) + setCommonSourceSizeInBytesTag(r_10, rightPlan, fileList1) val r_20 = createIndexLogEntry("r2", Seq(t2c1), Seq(t2c2), rightPlan, 20, fileList1, false) - setCommonBytesTags(r_20, rightPlan, fileList1) + setCommonSourceSizeInBytesTag(r_20, rightPlan, fileList1) val indexPairs = Seq((l_10, r_10), (l_10, r_20), (l_20, r_20)) diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleTestSuite.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleTestSuite.scala index 19da1d5a1..781847dc4 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleTestSuite.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleTestSuite.scala @@ -101,7 +101,10 @@ trait HyperspaceRuleTestSuite extends HyperspaceSuite { def getIndexRootPath(indexName: String): Path = new Path(systemPath, indexName) - def setCommonBytesTags(index: IndexLogEntry, plan: LogicalPlan, files: Seq[FileInfo]): Unit = { + def setCommonSourceSizeInBytesTag( + index: IndexLogEntry, + plan: LogicalPlan, + files: Seq[FileInfo]): Unit = { val commonBytes = files.foldLeft(0L) { (bytes, f) => bytes + f.size } diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala index bbfd213d5..03a86850f 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala @@ -124,6 +124,7 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { withIndex("index1") { val readDf = spark.read.parquet(dataPath) + val expectedCommonSourceBytes = FileUtils.getDirectorySize(new Path(dataPath)) withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") { indexManager.create(readDf, IndexConfig("index1", Seq("id"))) } @@ -134,7 +135,8 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { hybridScanEnabled: Boolean, hybridScanDeleteEnabled: Boolean, expectCandidateIndex: Boolean, - expectedHybridScanTag: Option[Boolean]): Unit = { + expectedHybridScanTag: Option[Boolean], + expectedCommonSourceBytes: Option[Long]): Unit = { withSQLConf( "spark.hyperspace.index.hybridscan.enabled" -> hybridScanEnabled.toString, "spark.hyperspace.index.hybridscan.delete.enabled" -> @@ -147,6 +149,9 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { assert( indexes.head.getTagValue(plan, IndexLogEntryTags.HYBRIDSCAN_REQUIRED) === expectedHybridScanTag) + assert( + indexes.head.getTagValue(plan, IndexLogEntryTags.COMMON_SOURCE_SIZE_IN_BYTES) + === expectedCommonSourceBytes) } else { assert(indexes.isEmpty) } @@ -162,13 +167,15 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { hybridScanEnabled = false, hybridScanDeleteEnabled = false, expectCandidateIndex = true, - expectedHybridScanTag = None) + expectedHybridScanTag = None, + expectedCommonSourceBytes = None) verify( optimizedPlan, hybridScanEnabled = true, hybridScanDeleteEnabled = false, expectCandidateIndex = true, - expectedHybridScanTag = Some(false)) + expectedHybridScanTag = Some(false), + expectedCommonSourceBytes = Some(expectedCommonSourceBytes)) } // Scenario #1: Append new files. @@ -181,17 +188,22 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { hybridScanEnabled = false, hybridScanDeleteEnabled = false, expectCandidateIndex = false, - expectedHybridScanTag = None) + expectedHybridScanTag = None, + expectedCommonSourceBytes = None) verify( optimizedPlan, hybridScanEnabled = true, hybridScanDeleteEnabled = false, expectCandidateIndex = true, - expectedHybridScanTag = Some(true)) + expectedHybridScanTag = Some(true), + expectedCommonSourceBytes = Some(expectedCommonSourceBytes)) } // Scenario #2: Delete 1 file. - FileUtils.delete(new Path(readDf.inputFiles.head)) + val deleteFilePath = new Path(readDf.inputFiles.head) + val updatedExpectedCommonSourceBytes = expectedCommonSourceBytes - FileUtils + .getDirectorySize(deleteFilePath) + FileUtils.delete(deleteFilePath) { val optimizedPlan = spark.read.parquet(dataPath).queryExecution.optimizedPlan @@ -200,19 +212,22 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { hybridScanEnabled = false, hybridScanDeleteEnabled = false, expectCandidateIndex = false, - expectedHybridScanTag = None) + expectedHybridScanTag = None, + expectedCommonSourceBytes = None) verify( optimizedPlan, hybridScanEnabled = true, hybridScanDeleteEnabled = false, expectCandidateIndex = false, - expectedHybridScanTag = None) + expectedHybridScanTag = None, + expectedCommonSourceBytes = None) verify( optimizedPlan, hybridScanEnabled = true, hybridScanDeleteEnabled = true, expectCandidateIndex = true, - expectedHybridScanTag = Some(true)) + expectedHybridScanTag = Some(true), + expectedCommonSourceBytes = Some(updatedExpectedCommonSourceBytes)) } // Scenario #3: Replace all files. @@ -225,13 +240,15 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { hybridScanEnabled = false, hybridScanDeleteEnabled = false, expectCandidateIndex = false, - expectedHybridScanTag = None) + expectedHybridScanTag = None, + expectedCommonSourceBytes = None) verify( optimizedPlan, hybridScanEnabled = true, hybridScanDeleteEnabled = true, expectCandidateIndex = false, - expectedHybridScanTag = None) + expectedHybridScanTag = None, + expectedCommonSourceBytes = None) } } }