diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index 6446a025b..cc36557ac 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -20,12 +20,14 @@ import java.io.FileNotFoundException import scala.annotation.tailrec import scala.collection.mutable.{HashMap, ListBuffer} +import scala.collection.mutable import com.fasterxml.jackson.annotation.JsonIgnore import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.types.{DataType, StructType} import com.microsoft.hyperspace.HyperspaceException @@ -486,8 +488,41 @@ case class IndexLogEntry( override def hashCode(): Int = { config.hashCode + signature.hashCode + numBuckets.hashCode + content.hashCode } + + /** + * A mutable map for holding auxiliary information of this index log entry while applying rules. + */ + @JsonIgnore + private val tags: mutable.Map[(LogicalPlan, IndexLogEntryTag[_]), Any] = mutable.Map.empty + + def setTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T], value: T): Unit = { + tags((plan, tag)) = value + } + + def getTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Option[T] = { + tags.get((plan, tag)).map(_.asInstanceOf[T]) + } + + def unsetTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Unit = { + tags.remove((plan, tag)) + } + + def setTagValue[T](tag: IndexLogEntryTag[T], value: T): Unit = { + tags((null, tag)) = value + } + + def getTagValue[T](tag: IndexLogEntryTag[T]): Option[T] = { + tags.get((null, tag)).map(_.asInstanceOf[T]) + } + + def unsetTagValue[T](tag: IndexLogEntryTag[T]): Unit = { + tags.remove((null, tag)) + } } +// A tag of a `IndexLogEntry`, which defines name and type. +case class IndexLogEntryTag[T](name: String) + object IndexLogEntry { val VERSION: String = "0.1" diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala new file mode 100644 index 000000000..e9e5c8044 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntryTags.scala @@ -0,0 +1,24 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index + +object IndexLogEntryTags { + // HYBRIDSCAN_REQUIRED indicates if Hybrid Scan is required for this index or not. + // This is set in getCandidateIndexes and utilized in transformPlanToUseIndex. + val HYBRIDSCAN_REQUIRED: IndexLogEntryTag[Boolean] = + IndexLogEntryTag[Boolean]("hybridScanRequired") +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala index 9bfc222ff..44bc69c03 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFil import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.types.{StringType, StructType} -import com.microsoft.hyperspace.index.{FileInfo, IndexConstants, IndexLogEntry, LogicalPlanSignatureProvider} +import com.microsoft.hyperspace.index.{FileInfo, IndexConstants, IndexLogEntry, IndexLogEntryTags, LogicalPlanSignatureProvider} import com.microsoft.hyperspace.index.plans.logical.BucketUnion import com.microsoft.hyperspace.util.HyperspaceConf @@ -82,12 +82,23 @@ object RuleUtils { val commonCnt = inputSourceFiles.count(entry.sourceFileInfoSet.contains) val deletedCnt = entry.sourceFileInfoSet.size - commonCnt - if (hybridScanDeleteEnabled && entry.hasLineageColumn(spark)) { + lazy val isDeleteCandidate = hybridScanDeleteEnabled && entry.hasLineageColumn(spark) && commonCnt > 0 && deletedCnt <= HyperspaceConf.hybridScanDeleteMaxNumFiles(spark) - } else { - // For append-only Hybrid Scan, deleted files are not allowed. - deletedCnt == 0 && commonCnt > 0 + + // For append-only Hybrid Scan, deleted files are not allowed. + lazy val isAppendOnlyCandidate = !hybridScanDeleteEnabled && deletedCnt == 0 && + commonCnt > 0 + + val isCandidate = isDeleteCandidate || isAppendOnlyCandidate + if (isCandidate) { + // If there is no change in source dataset, the index will be applied by + // transformPlanToUseIndexOnlyScan. + entry.setTagValue( + plan, + IndexLogEntryTags.HYBRIDSCAN_REQUIRED, + !(commonCnt == entry.sourceFileInfoSet.size && commonCnt == inputSourceFiles.size)) } + isCandidate } if (hybridScanEnabled) { @@ -157,7 +168,14 @@ object RuleUtils { // Check pre-requisite. assert(getLogicalRelation(plan).isDefined) - val transformed = if (HyperspaceConf.hybridScanEnabled(spark)) { + // If there is no change in source data files, the index can be applied by + // transformPlanToUseIndexOnlyScan regardless of Hybrid Scan config. + // This tag should always exist if Hybrid Scan is enabled. + lazy val hybridScanRequired = index.getTagValue( + getLogicalRelation(plan).get, + IndexLogEntryTags.HYBRIDSCAN_REQUIRED).get + + val transformed = if (HyperspaceConf.hybridScanEnabled(spark) && hybridScanRequired) { transformPlanToUseHybridScan(spark, index, plan, useBucketSpec) } else { transformPlanToUseIndexOnlyScan(spark, index, plan, useBucketSpec) diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala index cfdc4f04f..3bd3c10c0 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFil import org.apache.spark.sql.types.{IntegerType, StringType} import com.microsoft.hyperspace.actions.Constants -import com.microsoft.hyperspace.index.{FileInfo, IndexCollectionManager, IndexConfig, IndexConstants, LogicalPlanFingerprint, Signature} +import com.microsoft.hyperspace.index.{FileInfo, IndexCollectionManager, IndexConfig, IndexConstants, IndexLogEntryTags, LogicalPlanFingerprint, Signature} import com.microsoft.hyperspace.index.IndexConstants.INDEX_HYBRID_SCAN_ENABLED import com.microsoft.hyperspace.util.{FileUtils, PathUtils} @@ -133,7 +133,8 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { plan: LogicalPlan, hybridScanEnabled: Boolean, hybridScanDeleteEnabled: Boolean, - expectCandidateIndex: Boolean): Unit = { + expectCandidateIndex: Boolean, + expectedHybridScanTag: Option[Boolean]): Unit = { withSQLConf( "spark.hyperspace.index.hybridscan.enabled" -> hybridScanEnabled.toString, "spark.hyperspace.index.hybridscan.delete.enabled" -> @@ -141,8 +142,11 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { val indexes = RuleUtils .getCandidateIndexes(spark, allIndexes, plan) if (expectCandidateIndex) { - assert(indexes.length == 1) - assert(indexes.head.name == "index1") + assert(indexes.length === 1) + assert(indexes.head.name === "index1") + assert( + indexes.head.getTagValue(plan, IndexLogEntryTags.HYBRIDSCAN_REQUIRED) + === expectedHybridScanTag) } else { assert(indexes.isEmpty) } @@ -157,12 +161,14 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { optimizedPlan, hybridScanEnabled = false, hybridScanDeleteEnabled = false, - expectCandidateIndex = true) + expectCandidateIndex = true, + expectedHybridScanTag = None) verify( optimizedPlan, hybridScanEnabled = true, hybridScanDeleteEnabled = false, - expectCandidateIndex = true) + expectCandidateIndex = true, + expectedHybridScanTag = Some(false)) } // Scenario #1: Append new files. @@ -174,12 +180,14 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { optimizedPlan, hybridScanEnabled = false, hybridScanDeleteEnabled = false, - expectCandidateIndex = false) + expectCandidateIndex = false, + expectedHybridScanTag = None) verify( optimizedPlan, hybridScanEnabled = true, hybridScanDeleteEnabled = false, - expectCandidateIndex = true) + expectCandidateIndex = true, + expectedHybridScanTag = Some(true)) } // Scenario #2: Delete 1 file. @@ -191,17 +199,20 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { optimizedPlan, hybridScanEnabled = false, hybridScanDeleteEnabled = false, - expectCandidateIndex = false) + expectCandidateIndex = false, + expectedHybridScanTag = None) verify( optimizedPlan, hybridScanEnabled = true, hybridScanDeleteEnabled = false, - expectCandidateIndex = false) + expectCandidateIndex = false, + expectedHybridScanTag = None) verify( optimizedPlan, hybridScanEnabled = true, hybridScanDeleteEnabled = true, - expectCandidateIndex = true) + expectCandidateIndex = true, + expectedHybridScanTag = Some(true)) } // Scenario #3: Replace all files. @@ -213,12 +224,14 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { optimizedPlan, hybridScanEnabled = false, hybridScanDeleteEnabled = false, - expectCandidateIndex = false) + expectCandidateIndex = false, + expectedHybridScanTag = None) verify( optimizedPlan, hybridScanEnabled = true, hybridScanDeleteEnabled = true, - expectCandidateIndex = false) + expectCandidateIndex = false, + expectedHybridScanTag = None) } } }