Skip to content
This repository was archived by the owner on Jun 14, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ object RuleUtils {

val isCandidate = isDeleteCandidate || isAppendOnlyCandidate
if (isCandidate) {
entry.setTagValue(IndexLogEntryTags.COMMON_SOURCE_SIZE_IN_BYTES, commonBytes)
entry.setTagValue(plan, IndexLogEntryTags.COMMON_SOURCE_SIZE_IN_BYTES, commonBytes)

// If there is no change in source dataset, the index will be applied by
// transformPlanToUseIndexOnlyScan.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ class FilterIndexRankerTest extends HyperspaceRuleTestSuite {

test("rank() should return the head of the list by default.") {
val ind1 = createIndexLogEntry("ind1", Seq(t1c1), Seq(t1c2), tempPlan, writeLog = false)
setCommonBytesTags(ind1, tempPlan, Nil)
setCommonSourceSizeInBytesTag(ind1, tempPlan, Nil)
val ind2 = createIndexLogEntry("ind2", Seq(t1c1), Seq(t1c2), tempPlan, writeLog = false)
setCommonBytesTags(ind2, tempPlan, Nil)
setCommonSourceSizeInBytesTag(ind2, tempPlan, Nil)
val ind3 = createIndexLogEntry("ind3", Seq(t2c1), Seq(t2c2), tempPlan, writeLog = false)
setCommonBytesTags(ind3, tempPlan, Nil)
setCommonSourceSizeInBytesTag(ind3, tempPlan, Nil)

val indexes = Seq(ind1, ind2, ind3)
assert(FilterIndexRanker.rank(spark, tempPlan, indexes).get.equals(ind1))
Expand All @@ -71,23 +71,23 @@ class FilterIndexRankerTest extends HyperspaceRuleTestSuite {
tempPlan,
inputFiles = fileList1,
writeLog = false)
setCommonBytesTags(ind1, tempPlan, fileList1)
setCommonSourceSizeInBytesTag(ind1, tempPlan, fileList1)
val ind2 = createIndexLogEntry(
"ind2",
Seq(t1c1),
Seq(t1c2),
tempPlan,
inputFiles = fileList1 ++ fileList2,
writeLog = false)
setCommonBytesTags(ind2, tempPlan, fileList1 ++ fileList2)
setCommonSourceSizeInBytesTag(ind2, tempPlan, fileList1 ++ fileList2)
val ind3 = createIndexLogEntry(
"ind3",
Seq(t2c1),
Seq(t1c2),
tempPlan,
inputFiles = fileList2,
writeLog = false)
setCommonBytesTags(ind3, tempPlan, fileList2)
setCommonSourceSizeInBytesTag(ind3, tempPlan, fileList2)

val indexes = Seq(ind1, ind2, ind3)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ class JoinIndexRankerTest extends HyperspaceRuleTestSuite with SQLHelper {
val fileList1 = Seq(FileInfo("a", 1, 1, 1), FileInfo("b", 2, 1, 2))
val fileList2 = Seq(FileInfo("c", 1, 1, 3), FileInfo("d", 1, 1, 4))
val l_10 = createIndexLogEntry("l1", Seq(t1c1), Seq(t1c2), leftPlan, 10, fileList1, false)
setCommonBytesTags(l_10, leftPlan, fileList1)
setCommonSourceSizeInBytesTag(l_10, leftPlan, fileList1)
val l_20 = createIndexLogEntry("l2", Seq(t1c1), Seq(t1c2), leftPlan, 20, fileList2, false)
setCommonBytesTags(l_20, leftPlan, fileList2)
setCommonSourceSizeInBytesTag(l_20, leftPlan, fileList2)
val r_10 = createIndexLogEntry("r1", Seq(t2c1), Seq(t2c2), rightPlan, 10, fileList1, false)
setCommonBytesTags(r_10, rightPlan, fileList1)
setCommonSourceSizeInBytesTag(r_10, rightPlan, fileList1)
val r_20 = createIndexLogEntry("r2", Seq(t2c1), Seq(t2c2), rightPlan, 20, fileList2, false)
setCommonBytesTags(r_20, rightPlan, fileList2)
setCommonSourceSizeInBytesTag(r_20, rightPlan, fileList2)

{
// Test rank algorithm without Hybrid Scan.
Expand All @@ -108,13 +108,13 @@ class JoinIndexRankerTest extends HyperspaceRuleTestSuite with SQLHelper {
{
// If both indexes have the same amount of common bytes, follow the original algorithm.
val l_10 = createIndexLogEntry("l1", Seq(t1c1), Seq(t1c2), leftPlan, 10, fileList1, false)
setCommonBytesTags(l_10, leftPlan, fileList1)
setCommonSourceSizeInBytesTag(l_10, leftPlan, fileList1)
val l_20 = createIndexLogEntry("l2", Seq(t1c1), Seq(t1c2), leftPlan, 20, fileList1, false)
setCommonBytesTags(l_20, leftPlan, fileList1)
setCommonSourceSizeInBytesTag(l_20, leftPlan, fileList1)
val r_10 = createIndexLogEntry("r1", Seq(t2c1), Seq(t2c2), rightPlan, 10, fileList1, false)
setCommonBytesTags(r_10, rightPlan, fileList1)
setCommonSourceSizeInBytesTag(r_10, rightPlan, fileList1)
val r_20 = createIndexLogEntry("r2", Seq(t2c1), Seq(t2c2), rightPlan, 20, fileList1, false)
setCommonBytesTags(r_20, rightPlan, fileList1)
setCommonSourceSizeInBytesTag(r_20, rightPlan, fileList1)


val indexPairs = Seq((l_10, r_10), (l_10, r_20), (l_20, r_20))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,10 @@ trait HyperspaceRuleTestSuite extends HyperspaceSuite {
def getIndexRootPath(indexName: String): Path =
new Path(systemPath, indexName)

def setCommonBytesTags(index: IndexLogEntry, plan: LogicalPlan, files: Seq[FileInfo]): Unit = {
def setCommonSourceSizeInBytesTag(
index: IndexLogEntry,
plan: LogicalPlan,
files: Seq[FileInfo]): Unit = {
val commonBytes = files.foldLeft(0L) { (bytes, f) =>
bytes + f.size
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper {

withIndex("index1") {
val readDf = spark.read.parquet(dataPath)
val expectedCommonSourceBytes = FileUtils.getDirectorySize(new Path(dataPath))
withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") {
indexManager.create(readDf, IndexConfig("index1", Seq("id")))
}
Expand All @@ -134,7 +135,8 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper {
hybridScanEnabled: Boolean,
hybridScanDeleteEnabled: Boolean,
expectCandidateIndex: Boolean,
expectedHybridScanTag: Option[Boolean]): Unit = {
expectedHybridScanTag: Option[Boolean],
expectedCommonSourceBytes: Option[Long]): Unit = {
withSQLConf(
"spark.hyperspace.index.hybridscan.enabled" -> hybridScanEnabled.toString,
"spark.hyperspace.index.hybridscan.delete.enabled" ->
Expand All @@ -147,6 +149,9 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper {
assert(
indexes.head.getTagValue(plan, IndexLogEntryTags.HYBRIDSCAN_REQUIRED)
=== expectedHybridScanTag)
assert(
indexes.head.getTagValue(plan, IndexLogEntryTags.COMMON_SOURCE_SIZE_IN_BYTES)
=== expectedCommonSourceBytes)
} else {
assert(indexes.isEmpty)
}
Expand All @@ -162,13 +167,15 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper {
hybridScanEnabled = false,
hybridScanDeleteEnabled = false,
expectCandidateIndex = true,
expectedHybridScanTag = None)
expectedHybridScanTag = None,
expectedCommonSourceBytes = None)
verify(
optimizedPlan,
hybridScanEnabled = true,
hybridScanDeleteEnabled = false,
expectCandidateIndex = true,
expectedHybridScanTag = Some(false))
expectedHybridScanTag = Some(false),
expectedCommonSourceBytes = Some(expectedCommonSourceBytes))
}

// Scenario #1: Append new files.
Expand All @@ -181,17 +188,22 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper {
hybridScanEnabled = false,
hybridScanDeleteEnabled = false,
expectCandidateIndex = false,
expectedHybridScanTag = None)
expectedHybridScanTag = None,
expectedCommonSourceBytes = None)
verify(
optimizedPlan,
hybridScanEnabled = true,
hybridScanDeleteEnabled = false,
expectCandidateIndex = true,
expectedHybridScanTag = Some(true))
expectedHybridScanTag = Some(true),
expectedCommonSourceBytes = Some(expectedCommonSourceBytes))
}

// Scenario #2: Delete 1 file.
FileUtils.delete(new Path(readDf.inputFiles.head))
val deleteFilePath = new Path(readDf.inputFiles.head)
val updatedExpectedCommonSourceBytes = expectedCommonSourceBytes - FileUtils
.getDirectorySize(deleteFilePath)
FileUtils.delete(deleteFilePath)

{
val optimizedPlan = spark.read.parquet(dataPath).queryExecution.optimizedPlan
Expand All @@ -200,19 +212,22 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper {
hybridScanEnabled = false,
hybridScanDeleteEnabled = false,
expectCandidateIndex = false,
expectedHybridScanTag = None)
expectedHybridScanTag = None,
expectedCommonSourceBytes = None)
verify(
optimizedPlan,
hybridScanEnabled = true,
hybridScanDeleteEnabled = false,
expectCandidateIndex = false,
expectedHybridScanTag = None)
expectedHybridScanTag = None,
expectedCommonSourceBytes = None)
verify(
optimizedPlan,
hybridScanEnabled = true,
hybridScanDeleteEnabled = true,
expectCandidateIndex = true,
expectedHybridScanTag = Some(true))
expectedHybridScanTag = Some(true),
expectedCommonSourceBytes = Some(updatedExpectedCommonSourceBytes))
}

// Scenario #3: Replace all files.
Expand All @@ -225,13 +240,15 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper {
hybridScanEnabled = false,
hybridScanDeleteEnabled = false,
expectCandidateIndex = false,
expectedHybridScanTag = None)
expectedHybridScanTag = None,
expectedCommonSourceBytes = None)
verify(
optimizedPlan,
hybridScanEnabled = true,
hybridScanDeleteEnabled = true,
expectCandidateIndex = false,
expectedHybridScanTag = None)
expectedHybridScanTag = None,
expectedCommonSourceBytes = None)
}
}
}
Expand Down