diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala index aeb7eb4ed..f47bce98c 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala @@ -107,7 +107,10 @@ object RuleUtils { indexes.filter(index => index.created && isHybridScanCandidate(index, filesByRelations.flatten)) } else { - indexes.filter(index => index.created && signatureValid(index)) + indexes.filter( + index => + index.created && signatureValid(index) && + index.deletedFiles.isEmpty && index.appendedFiles.isEmpty) } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/FilterIndexRuleTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/FilterIndexRuleTest.scala index 5e66afd4c..dadee7dab 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/FilterIndexRuleTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/FilterIndexRuleTest.scala @@ -51,10 +51,10 @@ class FilterIndexRuleTest extends HyperspaceRuleTestSuite { scanNode = LogicalRelation(relation, Seq(c1, c2, c3, c4), None, false) val indexPlan = Project(Seq(c1, c2, c3), scanNode) - createIndex(indexName1, Seq(c3, c2), Seq(c1), indexPlan) + createIndexLogEntry(indexName1, Seq(c3, c2), Seq(c1), indexPlan) val index2Plan = Project(Seq(c1, c2, c3, c4), scanNode) - createIndex(indexName2, Seq(c4, c2), Seq(c1, c3), index2Plan) + createIndexLogEntry(indexName2, Seq(c4, c2), Seq(c1, c3), index2Plan) } before { @@ -134,9 +134,7 @@ class FilterIndexRuleTest extends HyperspaceRuleTestSuite { // Mark the relation that the rule is applied and verify the plan does not change. val newPlan = plan transform { case r @ LogicalRelation(h: HadoopFsRelation, _, _, _) => - r.copy( - relation = - h.copy(options = Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark)) + r.copy(relation = h.copy(options = Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark)) } assert(FilterIndexRule(newPlan).equals(newPlan)) } diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleTestSuite.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleTestSuite.scala index 4b5c29b98..d4f4f4471 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleTestSuite.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleTestSuite.scala @@ -30,11 +30,13 @@ import com.microsoft.hyperspace.index.Hdfs.Properties trait HyperspaceRuleTestSuite extends HyperspaceSuite { private val filenames = Seq("f1.parquet", "f2.parquet") - def createIndex( + def createIndexLogEntry( name: String, indexCols: Seq[AttributeReference], includedCols: Seq[AttributeReference], - plan: LogicalPlan): IndexLogEntry = { + plan: LogicalPlan, + appendedFiles: Seq[String] = Seq(), + deletedFiles: Seq[String] = Seq()): IndexLogEntry = { val signClass = new RuleTestHelper.TestSignatureProvider().getClass.getName LogicalPlanSignatureProvider.create(signClass).signature(plan) match { @@ -43,7 +45,7 @@ trait HyperspaceRuleTestSuite extends HyperspaceSuite { Seq( Relation( Seq("dummy"), - Hdfs(Properties(Content(Directory("/")))), + Hdfs(Properties(Content(Directory("/")), appendedFiles, deletedFiles)), "schema", "format", Map())), @@ -69,7 +71,7 @@ trait HyperspaceRuleTestSuite extends HyperspaceSuite { val logManager = new IndexLogManagerImpl(getIndexRootPath(name)) indexLogEntry.state = Constants.States.ACTIVE - logManager.writeLog(0, indexLogEntry) + assert(logManager.writeLog(0, indexLogEntry)) indexLogEntry case None => throw HyperspaceException("Invalid plan for index dataFrame.") diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleTest.scala index c5a51eea1..fb31302b2 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleTest.scala @@ -90,11 +90,11 @@ class JoinIndexRuleTest extends HyperspaceRuleTestSuite with SQLHelper { // +- Filter isnotnull(t2c1#4) // +- Relation[t2c1#4,t2c2#5,t2c3#6,t2c4#7] parquet - createIndex("t1i1", Seq(t1c1), Seq(t1c3), t1ProjectNode) - createIndex("t1i2", Seq(t1c1, t1c2), Seq(t1c3), t1ProjectNode) - createIndex("t1i3", Seq(t1c2), Seq(t1c3), t1ProjectNode) - createIndex("t2i1", Seq(t2c1), Seq(t2c3), t2ProjectNode) - createIndex("t2i2", Seq(t2c1, t2c2), Seq(t2c3), t2ProjectNode) + createIndexLogEntry("t1i1", Seq(t1c1), Seq(t1c3), t1ProjectNode) + createIndexLogEntry("t1i2", Seq(t1c1, t1c2), Seq(t1c3), t1ProjectNode) + createIndexLogEntry("t1i3", Seq(t1c2), Seq(t1c3), t1ProjectNode) + createIndexLogEntry("t2i1", Seq(t2c1), Seq(t2c3), t2ProjectNode) + createIndexLogEntry("t2i2", Seq(t2c1, t2c2), Seq(t2c3), t2ProjectNode) } before { @@ -208,8 +208,10 @@ class JoinIndexRuleTest extends HyperspaceRuleTestSuite with SQLHelper { { // Test: should update plan if index exists to cover all implicit columns - val t1TestIndex = createIndex("t1Idx", Seq(t1c1), Seq(t1c2, t1c3, t1c4), t1FilterNode) - val t2TestIndex = createIndex("t2Idx", Seq(t2c1), Seq(t2c2, t2c3, t2c4), t2FilterNode) + val t1TestIndex = + createIndexLogEntry("t1Idx", Seq(t1c1), Seq(t1c2, t1c3, t1c4), t1FilterNode) + val t2TestIndex = + createIndexLogEntry("t2Idx", Seq(t2c1), Seq(t2c2, t2c3, t2c4), t2FilterNode) // clear cache so the new indexes gets added to it clearCache() @@ -406,9 +408,7 @@ class JoinIndexRuleTest extends HyperspaceRuleTestSuite with SQLHelper { // Mark the relation that the rule is applied and verify the plan does not change. val newPlan = plan transform { case r @ LogicalRelation(h: HadoopFsRelation, _, _, _) => - r.copy( - relation = - h.copy(options = Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark)) + r.copy(relation = h.copy(options = Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark)) } assert(JoinIndexRule(newPlan).equals(newPlan)) } diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala index 757592ca2..1932bd081 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala @@ -20,12 +20,13 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, IsNotNull} import org.apache.spark.sql.catalyst.plans.{JoinType, SQLHelper} -import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, Project, RepartitionByExpression} +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, LogicalRelation, NoopCache} import org.apache.spark.sql.types.{IntegerType, StringType} import com.microsoft.hyperspace.actions.Constants import com.microsoft.hyperspace.index.{IndexCollectionManager, IndexConfig, IndexConstants} +import com.microsoft.hyperspace.index.IndexConstants.INDEX_HYBRID_SCAN_ENABLED import com.microsoft.hyperspace.util.{FileUtils, PathUtils} class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { @@ -79,11 +80,11 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { // +- Filter isnotnull(t2c1#4) // +- Relation[t2c1#4,t2c2#5,t2c3#6,t2c4#7] parquet - createIndex("t1i1", Seq(t1c1), Seq(t1c3), t1ProjectNode) - createIndex("t1i2", Seq(t1c1, t1c2), Seq(t1c3), t1ProjectNode) - createIndex("t1i3", Seq(t1c2), Seq(t1c3), t1ProjectNode) - createIndex("t2i1", Seq(t2c1), Seq(t2c3), t2ProjectNode) - createIndex("t2i2", Seq(t2c1, t2c2), Seq(t2c3), t2ProjectNode) + createIndexLogEntry("t1i1", Seq(t1c1), Seq(t1c3), t1ProjectNode) + createIndexLogEntry("t1i2", Seq(t1c1, t1c2), Seq(t1c3), t1ProjectNode) + createIndexLogEntry("t1i3", Seq(t1c2), Seq(t1c3), t1ProjectNode) + createIndexLogEntry("t2i1", Seq(t2c1), Seq(t2c3), t2ProjectNode) + createIndexLogEntry("t2i2", Seq(t2c1, t2c2), Seq(t2c3), t2ProjectNode) } test("Verify indexes are matched by signature correctly.") { @@ -280,6 +281,19 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { } } + test( + "RuleUtils.getCandidateIndexes: Verify indexes with non-empty 'deletedFiles' or " + + "'appendedFiles' are not usable indexes if hybrid scan is disabled.") { + withSQLConf(INDEX_HYBRID_SCAN_ENABLED -> "false") { + val entry1 = createIndexLogEntry("t1iTest", Seq(t1c1), Seq(t1c3), t1ProjectNode) + val entry2 = entry1.withAppendedAndDeletedFiles(Seq(), Seq("f1")) + val entry3 = entry1.withAppendedAndDeletedFiles(Seq("f2"), Seq()) + val usableIndexes = + RuleUtils.getCandidateIndexes(spark, Seq(entry1, entry2, entry3), t1ProjectNode) + assert(usableIndexes.equals(Seq(entry1))) + } + } + private def validateLogicalRelation(plan: LogicalPlan, expected: LogicalRelation): Unit = { val r = RuleUtils.getLogicalRelation(plan) assert(r.isDefined)