From 0c665772440d2125eba142b4d71e96a228f5dda5 Mon Sep 17 00:00:00 2001 From: EJ Song Date: Wed, 21 Oct 2020 13:08:36 +0900 Subject: [PATCH 01/11] Introduce IndexLogEntryTag for auxiliary data while applying rules --- .../hyperspace/index/IndexConstants.scala | 6 ++++ .../hyperspace/index/IndexLogEntry.scala | 22 ++++++++++++ .../hyperspace/index/rules/RuleUtils.scala | 19 ++++++++--- .../index/rules/RuleUtilsTest.scala | 34 +++++++++++++------ 4 files changed, 66 insertions(+), 15 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala index 77c4ebeb1..0e393b24b 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala @@ -56,6 +56,12 @@ object IndexConstants { // See https://github.com/microsoft/hyperspace/issues/185 val INDEX_RELATION_IDENTIFIER: (String, String) = "indexRelation" -> "true" + // Tags are used while applying rules. + // INDEX_HYBRIDSCAN_REQUIRED_TAG indicates if Hybrid Scan is required for this index or not. + // This is set in getCandidateIndexes and utilized in transformPlanToUseIndex. + val INDEX_HYBRIDSCAN_REQUIRED_TAG: IndexLogEntryTag[Boolean] = + IndexLogEntryTag[Boolean]("hybridScanRequired") + // Default number of buckets is set the default value of "spark.sql.shuffle.partitions". val INDEX_NUM_BUCKETS_DEFAULT: Int = SQLConf.SHUFFLE_PARTITIONS.defaultValue.get diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index 073aa7473..5051eb5ef 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -20,6 +20,7 @@ import java.io.FileNotFoundException import scala.annotation.tailrec import scala.collection.mutable.{HashMap, ListBuffer} +import scala.collection.mutable import com.fasterxml.jackson.annotation.JsonIgnore import org.apache.hadoop.conf.Configuration @@ -449,8 +450,29 @@ case class IndexLogEntry( override def hashCode(): Int = { config.hashCode + signature.hashCode + numBuckets.hashCode + content.hashCode } + + /** + * A mutable map for holding auxiliary information of this index log entry while applying rules. + */ + @JsonIgnore + private val tags: mutable.Map[IndexLogEntryTag[_], Any] = mutable.Map.empty + + def setTagValue[T](tag: IndexLogEntryTag[T], value: T): Unit = { + tags(tag) = value + } + + def getTagValue[T](tag: IndexLogEntryTag[T]): Option[T] = { + tags.get(tag).map(_.asInstanceOf[T]) + } + + def unsetTagValue[T](tag: IndexLogEntryTag[T]): Unit = { + tags -= tag + } } +// A tag of a `IndexLogEntry`, which defines name and type. +case class IndexLogEntryTag[T](name: String) + object IndexLogEntry { val VERSION: String = "0.1" diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala index 802b1cb32..ecb4ea785 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala @@ -82,6 +82,12 @@ object RuleUtils { val commonCnt = inputSourceFiles.count(entry.allSourceFileInfos.contains) val deletedCnt = entry.allSourceFileInfos.size - commonCnt + // If there is no change in source dataset, this index can be applied by + // transformPlanToUseIndexOnlyScan. + entry.setTagValue( + IndexConstants.INDEX_HYBRIDSCAN_REQUIRED_TAG, + !(commonCnt == entry.allSourceFileInfos.size && commonCnt == inputSourceFiles.size)) + if (hybridScanDeleteEnabled && entry.hasLineageColumn(spark)) { commonCnt > 0 && deletedCnt <= HyperspaceConf.hybridScanDeleteMaxNumFiles(spark) } else { @@ -172,11 +178,14 @@ object RuleUtils { // Check pre-requisite. assert(getLogicalRelation(plan).isDefined) - val transformed = if (HyperspaceConf.hybridScanEnabled(spark)) { - transformPlanToUseHybridScan(spark, index, plan, useBucketSpec) - } else { - transformPlanToUseIndexOnlyScan(spark, index, plan, useBucketSpec) - } + val transformed = + if (!HyperspaceConf.hybridScanEnabled(spark) || !index + .getTagValue(IndexConstants.INDEX_HYBRIDSCAN_REQUIRED_TAG) + .getOrElse(false)) { + transformPlanToUseIndexOnlyScan(spark, index, plan, useBucketSpec) + } else { + transformPlanToUseHybridScan(spark, index, plan, useBucketSpec) + } assert(!transformed.equals(plan)) transformed } diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala index 1932bd081..9b5cf8909 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala @@ -133,7 +133,8 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { plan: LogicalPlan, hybridScanEnabled: Boolean, hybridScanDeleteEnabled: Boolean, - expectCandidateIndex: Boolean): Unit = { + expectCandidateIndex: Boolean, + expectedHybridScanTag: Boolean): Unit = { withSQLConf( "spark.hyperspace.index.hybridscan.enabled" -> hybridScanEnabled.toString, "spark.hyperspace.index.hybridscan.delete.enabled" -> @@ -143,6 +144,10 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { if (expectCandidateIndex) { assert(indexes.length == 1) assert(indexes.head.name == "index1") + assert( + indexes.head + .getTagValue(IndexConstants.INDEX_HYBRIDSCAN_REQUIRED_TAG) + .getOrElse(false) == expectedHybridScanTag) } else { assert(indexes.isEmpty) } @@ -157,12 +162,14 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { optimizedPlan, hybridScanEnabled = false, hybridScanDeleteEnabled = false, - expectCandidateIndex = true) + expectCandidateIndex = true, + expectedHybridScanTag = false) verify( optimizedPlan, hybridScanEnabled = true, hybridScanDeleteEnabled = false, - expectCandidateIndex = true) + expectCandidateIndex = true, + expectedHybridScanTag = false) } // Scenario #1: Append new files. @@ -174,12 +181,14 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { optimizedPlan, hybridScanEnabled = false, hybridScanDeleteEnabled = false, - expectCandidateIndex = false) + expectCandidateIndex = false, + expectedHybridScanTag = false) verify( optimizedPlan, hybridScanEnabled = true, hybridScanDeleteEnabled = false, - expectCandidateIndex = true) + expectCandidateIndex = true, + expectedHybridScanTag = true) } // Scenario #2: Delete 1 file. @@ -191,17 +200,20 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { optimizedPlan, hybridScanEnabled = false, hybridScanDeleteEnabled = false, - expectCandidateIndex = false) + expectCandidateIndex = false, + expectedHybridScanTag = false) verify( optimizedPlan, hybridScanEnabled = true, hybridScanDeleteEnabled = false, - expectCandidateIndex = false) + expectCandidateIndex = false, + expectedHybridScanTag = false) verify( optimizedPlan, hybridScanEnabled = true, hybridScanDeleteEnabled = true, - expectCandidateIndex = true) + expectCandidateIndex = true, + expectedHybridScanTag = true) } // Scenario #3: Replace all files. @@ -213,12 +225,14 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { optimizedPlan, hybridScanEnabled = false, hybridScanDeleteEnabled = false, - expectCandidateIndex = false) + expectCandidateIndex = false, + expectedHybridScanTag = false) verify( optimizedPlan, hybridScanEnabled = true, hybridScanDeleteEnabled = true, - expectCandidateIndex = false) + expectCandidateIndex = false, + expectedHybridScanTag = false) } } } From 4a453207c3c024e775f6fbf469679b8e36ad9600 Mon Sep 17 00:00:00 2001 From: EJ Song Date: Sat, 24 Oct 2020 09:58:15 +0900 Subject: [PATCH 02/11] minor fix --- .../com/microsoft/hyperspace/index/rules/RuleUtils.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala index ecb4ea785..69b68f745 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala @@ -179,12 +179,12 @@ object RuleUtils { assert(getLogicalRelation(plan).isDefined) val transformed = - if (!HyperspaceConf.hybridScanEnabled(spark) || !index + if (HyperspaceConf.hybridScanEnabled(spark) && index .getTagValue(IndexConstants.INDEX_HYBRIDSCAN_REQUIRED_TAG) .getOrElse(false)) { - transformPlanToUseIndexOnlyScan(spark, index, plan, useBucketSpec) - } else { transformPlanToUseHybridScan(spark, index, plan, useBucketSpec) + } else { + transformPlanToUseIndexOnlyScan(spark, index, plan, useBucketSpec) } assert(!transformed.equals(plan)) transformed From af91613c8871ec4c4655313601152bfdd8052b9b Mon Sep 17 00:00:00 2001 From: sezruby Date: Sat, 24 Oct 2020 15:08:45 +0900 Subject: [PATCH 03/11] review commit --- .../hyperspace/index/IndexLogEntry.scala | 27 ++++++++++++------- .../hyperspace/index/rules/RuleUtils.scala | 5 ++-- .../index/rules/RuleUtilsTest.scala | 2 +- 3 files changed, 21 insertions(+), 13 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index 5051eb5ef..1d7cdb092 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.types.{DataType, StructType} import com.microsoft.hyperspace.HyperspaceException @@ -455,19 +456,25 @@ case class IndexLogEntry( * A mutable map for holding auxiliary information of this index log entry while applying rules. */ @JsonIgnore - private val tags: mutable.Map[IndexLogEntryTag[_], Any] = mutable.Map.empty + private val tags: mutable.Map[(LogicalPlan, IndexLogEntryTag[_]), Any] = mutable.Map.empty - def setTagValue[T](tag: IndexLogEntryTag[T], value: T): Unit = { - tags(tag) = value - } + def setTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T], value: T): Unit = + tags((plan, tag)) = value - def getTagValue[T](tag: IndexLogEntryTag[T]): Option[T] = { - tags.get(tag).map(_.asInstanceOf[T]) - } + def getTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Option[T] = + tags.get((plan, tag)).map(_.asInstanceOf[T]) - def unsetTagValue[T](tag: IndexLogEntryTag[T]): Unit = { - tags -= tag - } + def unsetTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Unit = + tags.remove((plan, tag)) + + def setTagValue[T](tag: IndexLogEntryTag[T], value: T): Unit = + tags((null, tag)) = value + + def getTagValue[T](tag: IndexLogEntryTag[T]): Option[T] = + tags.get((null, tag)).map(_.asInstanceOf[T]) + + def unsetTagValue[T](tag: IndexLogEntryTag[T]): Unit = + tags.remove((null, tag)) } // A tag of a `IndexLogEntry`, which defines name and type. diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala index 69b68f745..c772e0795 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala @@ -85,6 +85,7 @@ object RuleUtils { // If there is no change in source dataset, this index can be applied by // transformPlanToUseIndexOnlyScan. entry.setTagValue( + plan, IndexConstants.INDEX_HYBRIDSCAN_REQUIRED_TAG, !(commonCnt == entry.allSourceFileInfos.size && commonCnt == inputSourceFiles.size)) @@ -180,8 +181,8 @@ object RuleUtils { val transformed = if (HyperspaceConf.hybridScanEnabled(spark) && index - .getTagValue(IndexConstants.INDEX_HYBRIDSCAN_REQUIRED_TAG) - .getOrElse(false)) { + .getTagValue(plan, IndexConstants.INDEX_HYBRIDSCAN_REQUIRED_TAG) + .getOrElse(false)) { transformPlanToUseHybridScan(spark, index, plan, useBucketSpec) } else { transformPlanToUseIndexOnlyScan(spark, index, plan, useBucketSpec) diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala index 9b5cf8909..0358e20f8 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala @@ -146,7 +146,7 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { assert(indexes.head.name == "index1") assert( indexes.head - .getTagValue(IndexConstants.INDEX_HYBRIDSCAN_REQUIRED_TAG) + .getTagValue(plan, IndexConstants.INDEX_HYBRIDSCAN_REQUIRED_TAG) .getOrElse(false) == expectedHybridScanTag) } else { assert(indexes.isEmpty) From 93c665738c0e18f458b16ca7aa62667c0f0b3013 Mon Sep 17 00:00:00 2001 From: sezruby Date: Sat, 24 Oct 2020 15:35:56 +0900 Subject: [PATCH 04/11] fix --- .../scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala index c772e0795..56edbdb80 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala @@ -181,7 +181,7 @@ object RuleUtils { val transformed = if (HyperspaceConf.hybridScanEnabled(spark) && index - .getTagValue(plan, IndexConstants.INDEX_HYBRIDSCAN_REQUIRED_TAG) + .getTagValue(getLogicalRelation(plan).get, IndexConstants.INDEX_HYBRIDSCAN_REQUIRED_TAG) .getOrElse(false)) { transformPlanToUseHybridScan(spark, index, plan, useBucketSpec) } else { From c99e84c6b2753fc98e75ca937c21da43e9097732 Mon Sep 17 00:00:00 2001 From: sezruby Date: Sat, 24 Oct 2020 17:01:36 +0900 Subject: [PATCH 05/11] review commit --- .../hyperspace/index/rules/RuleUtils.scala | 52 +++++++++++++------ .../index/rules/RuleUtilsTest.scala | 29 +++++------ 2 files changed, 49 insertions(+), 32 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala index 56edbdb80..1404f311c 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala @@ -81,19 +81,32 @@ object RuleUtils { // Find the number of common files between the source relations & index source files. val commonCnt = inputSourceFiles.count(entry.allSourceFileInfos.contains) val deletedCnt = entry.allSourceFileInfos.size - commonCnt - - // If there is no change in source dataset, this index can be applied by - // transformPlanToUseIndexOnlyScan. - entry.setTagValue( - plan, - IndexConstants.INDEX_HYBRIDSCAN_REQUIRED_TAG, - !(commonCnt == entry.allSourceFileInfos.size && commonCnt == inputSourceFiles.size)) + lazy val hybridScanRequired = + !(commonCnt == entry.allSourceFileInfos.size && commonCnt == inputSourceFiles.size) if (hybridScanDeleteEnabled && entry.hasLineageColumn(spark)) { - commonCnt > 0 && deletedCnt <= HyperspaceConf.hybridScanDeleteMaxNumFiles(spark) + if (commonCnt > 0 && deletedCnt <= HyperspaceConf.hybridScanDeleteMaxNumFiles(spark)) { + // If there is no change in source dataset, this index can be applied by + // transformPlanToUseIndexOnlyScan. + entry.setTagValue( + plan, + IndexConstants.INDEX_HYBRIDSCAN_REQUIRED_TAG, + hybridScanRequired) + true + } else { + false + } } else { // For append-only Hybrid Scan, deleted files are not allowed. - deletedCnt == 0 && commonCnt > 0 + if (deletedCnt == 0 && commonCnt > 0) { + entry.setTagValue( + plan, + IndexConstants.INDEX_HYBRIDSCAN_REQUIRED_TAG, + hybridScanRequired) + true + } else { + false + } } } @@ -179,14 +192,19 @@ object RuleUtils { // Check pre-requisite. assert(getLogicalRelation(plan).isDefined) - val transformed = - if (HyperspaceConf.hybridScanEnabled(spark) && index - .getTagValue(getLogicalRelation(plan).get, IndexConstants.INDEX_HYBRIDSCAN_REQUIRED_TAG) - .getOrElse(false)) { - transformPlanToUseHybridScan(spark, index, plan, useBucketSpec) - } else { - transformPlanToUseIndexOnlyScan(spark, index, plan, useBucketSpec) - } + // If there is no need to do HybridScan even with HybridScanConfig enabled, the index can + // be applied with the general way -transformPlanToUseIndexOnlyScan- and the outcome will + // be same as there is no source data change. + // This tag should always exist if Hybrid Scan is enabled. + lazy val hybridScanRequired = index.getTagValue( + getLogicalRelation(plan).get, + IndexConstants.INDEX_HYBRIDSCAN_REQUIRED_TAG) + + val transformed = if (HyperspaceConf.hybridScanEnabled(spark) && hybridScanRequired.get) { + transformPlanToUseHybridScan(spark, index, plan, useBucketSpec) + } else { + transformPlanToUseIndexOnlyScan(spark, index, plan, useBucketSpec) + } assert(!transformed.equals(plan)) transformed } diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala index 0358e20f8..3328097d9 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala @@ -134,7 +134,7 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { hybridScanEnabled: Boolean, hybridScanDeleteEnabled: Boolean, expectCandidateIndex: Boolean, - expectedHybridScanTag: Boolean): Unit = { + expectedHybridScanTag: Option[Boolean]): Unit = { withSQLConf( "spark.hyperspace.index.hybridscan.enabled" -> hybridScanEnabled.toString, "spark.hyperspace.index.hybridscan.delete.enabled" -> @@ -142,12 +142,11 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { val indexes = RuleUtils .getCandidateIndexes(spark, allIndexes, plan) if (expectCandidateIndex) { - assert(indexes.length == 1) - assert(indexes.head.name == "index1") + assert(indexes.length === 1) + assert(indexes.head.name === "index1") assert( - indexes.head - .getTagValue(plan, IndexConstants.INDEX_HYBRIDSCAN_REQUIRED_TAG) - .getOrElse(false) == expectedHybridScanTag) + indexes.head.getTagValue(plan, IndexConstants.INDEX_HYBRIDSCAN_REQUIRED_TAG) + === expectedHybridScanTag) } else { assert(indexes.isEmpty) } @@ -163,13 +162,13 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { hybridScanEnabled = false, hybridScanDeleteEnabled = false, expectCandidateIndex = true, - expectedHybridScanTag = false) + expectedHybridScanTag = None) verify( optimizedPlan, hybridScanEnabled = true, hybridScanDeleteEnabled = false, expectCandidateIndex = true, - expectedHybridScanTag = false) + expectedHybridScanTag = Some(false)) } // Scenario #1: Append new files. @@ -182,13 +181,13 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { hybridScanEnabled = false, hybridScanDeleteEnabled = false, expectCandidateIndex = false, - expectedHybridScanTag = false) + expectedHybridScanTag = None) verify( optimizedPlan, hybridScanEnabled = true, hybridScanDeleteEnabled = false, expectCandidateIndex = true, - expectedHybridScanTag = true) + expectedHybridScanTag = Some(true)) } // Scenario #2: Delete 1 file. @@ -201,19 +200,19 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { hybridScanEnabled = false, hybridScanDeleteEnabled = false, expectCandidateIndex = false, - expectedHybridScanTag = false) + expectedHybridScanTag = None) verify( optimizedPlan, hybridScanEnabled = true, hybridScanDeleteEnabled = false, expectCandidateIndex = false, - expectedHybridScanTag = false) + expectedHybridScanTag = None) verify( optimizedPlan, hybridScanEnabled = true, hybridScanDeleteEnabled = true, expectCandidateIndex = true, - expectedHybridScanTag = true) + expectedHybridScanTag = Some(true)) } // Scenario #3: Replace all files. @@ -226,13 +225,13 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { hybridScanEnabled = false, hybridScanDeleteEnabled = false, expectCandidateIndex = false, - expectedHybridScanTag = false) + expectedHybridScanTag = None) verify( optimizedPlan, hybridScanEnabled = true, hybridScanDeleteEnabled = true, expectCandidateIndex = false, - expectedHybridScanTag = false) + expectedHybridScanTag = None) } } } From 7acc70313dd8a95ae5c24436d40dd2dc17a87167 Mon Sep 17 00:00:00 2001 From: sezruby Date: Sat, 24 Oct 2020 17:24:30 +0900 Subject: [PATCH 06/11] minor fix --- .../hyperspace/index/rules/RuleUtils.scala | 44 ++++++++----------- 1 file changed, 18 insertions(+), 26 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala index 1404f311c..5359d0740 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala @@ -81,33 +81,25 @@ object RuleUtils { // Find the number of common files between the source relations & index source files. val commonCnt = inputSourceFiles.count(entry.allSourceFileInfos.contains) val deletedCnt = entry.allSourceFileInfos.size - commonCnt - lazy val hybridScanRequired = - !(commonCnt == entry.allSourceFileInfos.size && commonCnt == inputSourceFiles.size) - - if (hybridScanDeleteEnabled && entry.hasLineageColumn(spark)) { - if (commonCnt > 0 && deletedCnt <= HyperspaceConf.hybridScanDeleteMaxNumFiles(spark)) { - // If there is no change in source dataset, this index can be applied by - // transformPlanToUseIndexOnlyScan. - entry.setTagValue( - plan, - IndexConstants.INDEX_HYBRIDSCAN_REQUIRED_TAG, - hybridScanRequired) - true - } else { - false - } - } else { - // For append-only Hybrid Scan, deleted files are not allowed. - if (deletedCnt == 0 && commonCnt > 0) { - entry.setTagValue( - plan, - IndexConstants.INDEX_HYBRIDSCAN_REQUIRED_TAG, - hybridScanRequired) - true - } else { - false - } + + lazy val isDeleteCandidate = hybridScanDeleteEnabled && entry.hasLineageColumn(spark) && + commonCnt > 0 && deletedCnt <= HyperspaceConf.hybridScanDeleteMaxNumFiles(spark) + + // For append-only Hybrid Scan, deleted files are not allowed + lazy val isAppendOnlyCandidate = !hybridScanDeleteEnabled && deletedCnt == 0 && + commonCnt > 0 + + val isCandidate = isDeleteCandidate || isAppendOnlyCandidate + if (isCandidate) { + // If there is no change in source dataset, the index can be applied by + // transformPlanToUseIndexOnlyScan. + entry.setTagValue( + plan, + IndexConstants.INDEX_HYBRIDSCAN_REQUIRED_TAG, + !(commonCnt == entry.allSourceFileInfos.size && + commonCnt == inputSourceFiles.size)) } + isCandidate } if (hybridScanEnabled) { From e3368cba746ea58bd521c18fd1c9522327aae0d7 Mon Sep 17 00:00:00 2001 From: sezruby Date: Sat, 24 Oct 2020 17:28:34 +0900 Subject: [PATCH 07/11] minor fix --- .../com/microsoft/hyperspace/index/rules/RuleUtils.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala index 5359d0740..356274de3 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala @@ -184,9 +184,8 @@ object RuleUtils { // Check pre-requisite. assert(getLogicalRelation(plan).isDefined) - // If there is no need to do HybridScan even with HybridScanConfig enabled, the index can - // be applied with the general way -transformPlanToUseIndexOnlyScan- and the outcome will - // be same as there is no source data change. + // If there is no change in source data files, the index can be applied with the general + // way, transformPlanToUseIndexOnlyScan, regardless of Hybrid Scan config. // This tag should always exist if Hybrid Scan is enabled. lazy val hybridScanRequired = index.getTagValue( getLogicalRelation(plan).get, From 0d1290b6ee2f4d855db2f79943ac4298b2cfd632 Mon Sep 17 00:00:00 2001 From: sezruby Date: Sun, 25 Oct 2020 12:09:55 +0900 Subject: [PATCH 08/11] Create IndexLogEntryTags --- .../hyperspace/index/IndexConstants.scala | 6 ----- .../hyperspace/index/IndexLogEntryTags.scala | 24 +++++++++++++++++++ .../hyperspace/index/rules/RuleUtils.scala | 6 ++--- .../index/rules/RuleUtilsTest.scala | 4 ++-- 4 files changed, 29 insertions(+), 11 deletions(-) create mode 100644 src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala index 0e393b24b..77c4ebeb1 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala @@ -56,12 +56,6 @@ object IndexConstants { // See https://github.com/microsoft/hyperspace/issues/185 val INDEX_RELATION_IDENTIFIER: (String, String) = "indexRelation" -> "true" - // Tags are used while applying rules. - // INDEX_HYBRIDSCAN_REQUIRED_TAG indicates if Hybrid Scan is required for this index or not. - // This is set in getCandidateIndexes and utilized in transformPlanToUseIndex. - val INDEX_HYBRIDSCAN_REQUIRED_TAG: IndexLogEntryTag[Boolean] = - IndexLogEntryTag[Boolean]("hybridScanRequired") - // Default number of buckets is set the default value of "spark.sql.shuffle.partitions". val INDEX_NUM_BUCKETS_DEFAULT: Int = SQLConf.SHUFFLE_PARTITIONS.defaultValue.get diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala new file mode 100644 index 000000000..c321a4303 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala @@ -0,0 +1,24 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index + +object IndexLogEntryTags { + // INDEX_HYBRIDSCAN_REQUIRED_TAG indicates if Hybrid Scan is required for this index or not. + // This is set in getCandidateIndexes and utilized in transformPlanToUseIndex. + val INDEX_HYBRIDSCAN_REQUIRED_TAG: IndexLogEntryTag[Boolean] = + IndexLogEntryTag[Boolean]("hybridScanRequired") +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala index 356274de3..56494ead2 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFil import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.types.{StringType, StructType} -import com.microsoft.hyperspace.index.{FileInfo, IndexConstants, IndexLogEntry, LogicalPlanSignatureProvider} +import com.microsoft.hyperspace.index.{FileInfo, IndexConstants, IndexLogEntry, IndexLogEntryTags, LogicalPlanSignatureProvider} import com.microsoft.hyperspace.index.plans.logical.BucketUnion import com.microsoft.hyperspace.util.HyperspaceConf @@ -95,7 +95,7 @@ object RuleUtils { // transformPlanToUseIndexOnlyScan. entry.setTagValue( plan, - IndexConstants.INDEX_HYBRIDSCAN_REQUIRED_TAG, + IndexLogEntryTags.INDEX_HYBRIDSCAN_REQUIRED_TAG, !(commonCnt == entry.allSourceFileInfos.size && commonCnt == inputSourceFiles.size)) } @@ -189,7 +189,7 @@ object RuleUtils { // This tag should always exist if Hybrid Scan is enabled. lazy val hybridScanRequired = index.getTagValue( getLogicalRelation(plan).get, - IndexConstants.INDEX_HYBRIDSCAN_REQUIRED_TAG) + IndexLogEntryTags.INDEX_HYBRIDSCAN_REQUIRED_TAG) val transformed = if (HyperspaceConf.hybridScanEnabled(spark) && hybridScanRequired.get) { transformPlanToUseHybridScan(spark, index, plan, useBucketSpec) diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala index 3328097d9..9f03db6c7 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFil import org.apache.spark.sql.types.{IntegerType, StringType} import com.microsoft.hyperspace.actions.Constants -import com.microsoft.hyperspace.index.{IndexCollectionManager, IndexConfig, IndexConstants} +import com.microsoft.hyperspace.index.{IndexCollectionManager, IndexConfig, IndexConstants, IndexLogEntryTags} import com.microsoft.hyperspace.index.IndexConstants.INDEX_HYBRID_SCAN_ENABLED import com.microsoft.hyperspace.util.{FileUtils, PathUtils} @@ -145,7 +145,7 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { assert(indexes.length === 1) assert(indexes.head.name === "index1") assert( - indexes.head.getTagValue(plan, IndexConstants.INDEX_HYBRIDSCAN_REQUIRED_TAG) + indexes.head.getTagValue(plan, IndexLogEntryTags.INDEX_HYBRIDSCAN_REQUIRED_TAG) === expectedHybridScanTag) } else { assert(indexes.isEmpty) From 94b73f8257ae2675ea74dafcbf87b1b36da0a224 Mon Sep 17 00:00:00 2001 From: sezruby Date: Wed, 28 Oct 2020 21:38:28 +0900 Subject: [PATCH 09/11] build fix --- .../hyperspace/index/rules/RuleUtils.scala | 33 +++++++++---------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala index 004674358..58d4b0523 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala @@ -96,8 +96,7 @@ object RuleUtils { entry.setTagValue( plan, IndexLogEntryTags.INDEX_HYBRIDSCAN_REQUIRED_TAG, - !(commonCnt == entry.allSourceFileInfos.size && - commonCnt == inputSourceFiles.size)) + !(commonCnt == entry.sourceFileInfoSet.size && commonCnt == inputSourceFiles.size)) } isCandidate } @@ -133,21 +132,6 @@ object RuleUtils { } } - /** - * Extract the LogicalRelation node if the given logical plan is linear. - * - * @param logicalPlan given logical plan to extract LogicalRelation from. - * @return if the plan is linear, the LogicalRelation node; Otherwise None. - */ - def getLogicalRelation(logicalPlan: LogicalPlan): Option[LogicalRelation] = { - val lrs = logicalPlan.collect { case r: LogicalRelation => r } - if (lrs.length == 1) { - Some(lrs.head) - } else { - None // logicalPlan is non-linear or it has no LogicalRelation. - } - } - /** * Check if an index was applied the given relation or not. * This can be determined by an identifier in options field of HadoopFsRelation. @@ -200,6 +184,21 @@ object RuleUtils { transformed } + /** + * Extract the LogicalRelation node if the given logical plan is linear. + * + * @param logicalPlan given logical plan to extract LogicalRelation from. + * @return if the plan is linear, the LogicalRelation node; Otherwise None. + */ + def getLogicalRelation(logicalPlan: LogicalPlan): Option[LogicalRelation] = { + val lrs = logicalPlan.collect { case r: LogicalRelation => r } + if (lrs.length == 1) { + Some(lrs.head) + } else { + None // logicalPlan is non-linear or it has no LogicalRelation. + } + } + /** * Transform the current plan to utilize index. * The transformed plan reads data from indexes instead of the source relations. From 80c6d57fcec4952328e4ed7c71e69c6e67ec52bf Mon Sep 17 00:00:00 2001 From: EJ Song Date: Fri, 6 Nov 2020 11:15:38 +0900 Subject: [PATCH 10/11] Minor fix --- .../microsoft/hyperspace/index/rules/RuleUtils.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala index d34ad6178..3f6a49700 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala @@ -85,7 +85,7 @@ object RuleUtils { lazy val isDeleteCandidate = hybridScanDeleteEnabled && entry.hasLineageColumn(spark) && commonCnt > 0 && deletedCnt <= HyperspaceConf.hybridScanDeleteMaxNumFiles(spark) - // For append-only Hybrid Scan, deleted files are not allowed + // For append-only Hybrid Scan, deleted files are not allowed. lazy val isAppendOnlyCandidate = !hybridScanDeleteEnabled && deletedCnt == 0 && commonCnt > 0 @@ -168,14 +168,14 @@ object RuleUtils { // Check pre-requisite. assert(getLogicalRelation(plan).isDefined) - // If there is no change in source data files, the index can be applied with the general - // way, transformPlanToUseIndexOnlyScan, regardless of Hybrid Scan config. + // If there is no change in source data files, the index can be applied by + // transformPlanToUseIndexOnlyScan regardless of Hybrid Scan config. // This tag should always exist if Hybrid Scan is enabled. lazy val hybridScanRequired = index.getTagValue( getLogicalRelation(plan).get, - IndexLogEntryTags.INDEX_HYBRIDSCAN_REQUIRED_TAG) + IndexLogEntryTags.INDEX_HYBRIDSCAN_REQUIRED_TAG).get - val transformed = if (HyperspaceConf.hybridScanEnabled(spark) && hybridScanRequired.get) { + val transformed = if (HyperspaceConf.hybridScanEnabled(spark) && hybridScanRequired) { transformPlanToUseHybridScan(spark, index, plan, useBucketSpec) } else { transformPlanToUseIndexOnlyScan(spark, index, plan, useBucketSpec) From 11a1a27bd48839aa963383c1a99893ba956c468f Mon Sep 17 00:00:00 2001 From: EJ Song Date: Fri, 13 Nov 2020 15:24:22 +0900 Subject: [PATCH 11/11] Review commit --- .../hyperspace/index/IndexLogEntry.scala | 18 ++++++++++++------ .../hyperspace/index/IndexLogEntryTags.scala | 4 ++-- .../hyperspace/index/rules/RuleUtils.scala | 6 +++--- .../hyperspace/index/rules/RuleUtilsTest.scala | 2 +- 4 files changed, 18 insertions(+), 12 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index b44ad5382..0f3f044ba 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -495,23 +495,29 @@ case class IndexLogEntry( @JsonIgnore private val tags: mutable.Map[(LogicalPlan, IndexLogEntryTag[_]), Any] = mutable.Map.empty - def setTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T], value: T): Unit = + def setTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T], value: T): Unit = { tags((plan, tag)) = value + } - def getTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Option[T] = + def getTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Option[T] = { tags.get((plan, tag)).map(_.asInstanceOf[T]) + } - def unsetTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Unit = + def unsetTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Unit = { tags.remove((plan, tag)) + } - def setTagValue[T](tag: IndexLogEntryTag[T], value: T): Unit = + def setTagValue[T](tag: IndexLogEntryTag[T], value: T): Unit = { tags((null, tag)) = value + } - def getTagValue[T](tag: IndexLogEntryTag[T]): Option[T] = + def getTagValue[T](tag: IndexLogEntryTag[T]): Option[T] = { tags.get((null, tag)).map(_.asInstanceOf[T]) + } - def unsetTagValue[T](tag: IndexLogEntryTag[T]): Unit = + def unsetTagValue[T](tag: IndexLogEntryTag[T]): Unit = { tags.remove((null, tag)) + } } // A tag of a `IndexLogEntry`, which defines name and type. diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala index c321a4303..e9e5c8044 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala @@ -17,8 +17,8 @@ package com.microsoft.hyperspace.index object IndexLogEntryTags { - // INDEX_HYBRIDSCAN_REQUIRED_TAG indicates if Hybrid Scan is required for this index or not. + // HYBRIDSCAN_REQUIRED indicates if Hybrid Scan is required for this index or not. // This is set in getCandidateIndexes and utilized in transformPlanToUseIndex. - val INDEX_HYBRIDSCAN_REQUIRED_TAG: IndexLogEntryTag[Boolean] = + val HYBRIDSCAN_REQUIRED: IndexLogEntryTag[Boolean] = IndexLogEntryTag[Boolean]("hybridScanRequired") } diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala index 3f6a49700..44bc69c03 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala @@ -91,11 +91,11 @@ object RuleUtils { val isCandidate = isDeleteCandidate || isAppendOnlyCandidate if (isCandidate) { - // If there is no change in source dataset, the index can be applied by + // If there is no change in source dataset, the index will be applied by // transformPlanToUseIndexOnlyScan. entry.setTagValue( plan, - IndexLogEntryTags.INDEX_HYBRIDSCAN_REQUIRED_TAG, + IndexLogEntryTags.HYBRIDSCAN_REQUIRED, !(commonCnt == entry.sourceFileInfoSet.size && commonCnt == inputSourceFiles.size)) } isCandidate @@ -173,7 +173,7 @@ object RuleUtils { // This tag should always exist if Hybrid Scan is enabled. lazy val hybridScanRequired = index.getTagValue( getLogicalRelation(plan).get, - IndexLogEntryTags.INDEX_HYBRIDSCAN_REQUIRED_TAG).get + IndexLogEntryTags.HYBRIDSCAN_REQUIRED).get val transformed = if (HyperspaceConf.hybridScanEnabled(spark) && hybridScanRequired) { transformPlanToUseHybridScan(spark, index, plan, useBucketSpec) diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala index 61842029e..3bd3c10c0 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala @@ -145,7 +145,7 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { assert(indexes.length === 1) assert(indexes.head.name === "index1") assert( - indexes.head.getTagValue(plan, IndexLogEntryTags.INDEX_HYBRIDSCAN_REQUIRED_TAG) + indexes.head.getTagValue(plan, IndexLogEntryTags.HYBRIDSCAN_REQUIRED) === expectedHybridScanTag) } else { assert(indexes.isEmpty)