From e12ef653920e4f5da9be358e235f3886f11d95af Mon Sep 17 00:00:00 2001 From: sezruby Date: Wed, 25 Nov 2020 14:18:11 +0900 Subject: [PATCH 01/11] Add DeltaLakeFileBasedSource --- build.sbt | 6 + .../actions/RefreshActionBase.scala | 46 +++-- .../hyperspace/index/IndexConstants.scala | 2 + .../hyperspace/index/rules/RuleUtils.scala | 78 +++++---- .../FileBasedSourceProviderManager.scala | 6 +- .../default/DefaultFileBasedSource.scala | 8 + .../delta/DeltaLakeFileBasedSource.scala | 142 ++++++++++++++++ .../hyperspace/index/sources/interfaces.scala | 3 + .../hyperspace/util/HyperspaceConf.scala | 3 +- .../index/DeltaLakeIntegrationTest.scala | 158 ++++++++++++++++++ 10 files changed, 392 insertions(+), 60 deletions(-) create mode 100644 src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeFileBasedSource.scala create mode 100644 src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala diff --git a/build.sbt b/build.sbt index c6eeda951..e206f6727 100644 --- a/build.sbt +++ b/build.sbt @@ -30,6 +30,7 @@ libraryDependencies ++= Seq( "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided" withSources(), "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided" withSources(), "org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "provided" withSources(), + "io.delta" %% "delta-core" % "0.6.1" % "provided" withSources(), // Test dependencies "org.scalatest" %% "scalatest" % "3.0.5" % "test", @@ -40,6 +41,11 @@ libraryDependencies ++= Seq( "org.apache.spark" %% "spark-hive" % sparkVersion.value % "test" classifier "tests" ) +assemblyMergeStrategy in assembly := { + case PathList("run-tests.py") => MergeStrategy.first + case x => (assemblyMergeStrategy in assembly).value(x) +} + scalacOptions ++= Seq( "-target:jvm-1.8" ) diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala index 914152571..59ed083d2 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala @@ -70,11 +70,19 @@ private[actions] abstract class RefreshActionBase( val latestRelation = Hyperspace.getContext(spark).sourceProviderManager.refreshRelation(relations.head) val dataSchema = DataType.fromJson(latestRelation.dataSchemaJson).asInstanceOf[StructType] - spark.read - .schema(dataSchema) - .format(latestRelation.fileFormat) - .options(latestRelation.options) - .load(latestRelation.rootPaths: _*) + if (latestRelation.rootPaths.size == 1) { + spark.read + .schema(dataSchema) + .format(latestRelation.fileFormat) + .options(latestRelation.options) + .load(latestRelation.rootPaths.head) + } else { + spark.read + .schema(dataSchema) + .format(latestRelation.fileFormat) + .options(latestRelation.options) + .load(latestRelation.rootPaths: _*) + } } protected lazy val indexConfig: IndexConfig = { @@ -113,25 +121,15 @@ private[actions] abstract class RefreshActionBase( * Build Set[FileInfo] to compare the source file list with the previous index version. */ protected lazy val currentFiles: Set[FileInfo] = { - df.queryExecution.optimizedPlan - .collect { - case LogicalRelation( - HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _), - _, - _, - _) => - location - .allFiles() - .map { f => - // For each file, if it already has a file id, add that id to its corresponding - // FileInfo. Note that if content of an existing file is changed, it is treated - // as a new file (i.e. its current file id is no longer valid). - val id = fileIdTracker.addFile(f) - FileInfo(f, id, asFullPath = true) - } - } - .flatten - .toSet + val curFiles = df.queryExecution.optimizedPlan.collect { + case relation: LogicalRelation => + Hyperspace + .getContext(spark) + .sourceProviderManager + .allFiles(relation) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = true)) + } + curFiles.head.toSet } /** diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala index 35370b790..2a36b55b7 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala @@ -64,6 +64,8 @@ object IndexConstants { "spark.hyperspace.index.cache.expiryDurationInSeconds" val INDEX_CACHE_EXPIRY_DURATION_SECONDS_DEFAULT = "300" + val DELTA_FORMAT_STR = "delta" + // Operation Log constants val HYPERSPACE_LOG = "_hyperspace_log" val INDEX_VERSION_DIRECTORY_PREFIX = "v__" diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala index 201356858..5021f9907 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala @@ -24,10 +24,12 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, In, Literal, Not} import org.apache.spark.sql.catalyst.optimizer.OptimizeIn import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.delta.files.TahoeLogFileIndex import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.types.{LongType, StructType} +import com.microsoft.hyperspace.Hyperspace import com.microsoft.hyperspace.index._ import com.microsoft.hyperspace.index.plans.logical.BucketUnion import com.microsoft.hyperspace.util.HyperspaceConf @@ -105,24 +107,23 @@ object RuleUtils { // TODO: Duplicate listing files for the given relation as in // [[transformPlanToUseHybridScan]] // See https://github.com/microsoft/hyperspace/issues/160 - val filesByRelations = plan - .collect { - case LogicalRelation( - HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _), - _, - _, - _) => - location.allFiles.map( - f => - // For a given file, file id is only meaningful in the context of a given - // index. At this point, we do not know which index, if any, would be picked. - // Therefore, we simply set the file id to UNKNOWN_FILE_ID. - FileInfo( - f.getPath.toString, - f.getLen, - f.getModificationTime, - IndexConstants.UNKNOWN_FILE_ID)) - } + val filesByRelations = plan.collect { + case rel: LogicalRelation => + Hyperspace + .getContext(spark) + .sourceProviderManager + .allFiles(rel) + .map { f => + // For a given file, file id is only meaningful in the context of a given + // index. At this point, we do not know which index, if any, would be picked. + // Therefore, we simply set the file id to UNKNOWN_FILE_ID. + FileInfo( + f.getPath.toString, + f.getLen, + f.getModificationTime, + IndexConstants.UNKNOWN_FILE_ID) + } + } assert(filesByRelations.length == 1) // index.deletedFiles and index.appendedFiles should be non-empty until Hybrid Scan // handles the lists properly. Otherwise, as the source file list of each index entry @@ -265,7 +266,8 @@ object RuleUtils { index: IndexLogEntry, plan: LogicalPlan, useBucketSpec: Boolean): LogicalPlan = { - val isParquetSourceFormat = index.relations.head.fileFormat.equals("parquet") + val fileFormat = index.relations.head.fileFormat + val isParquetSourceFormat = fileFormat.equals("parquet") || fileFormat.equals("delta") var unhandledAppendedFiles: Seq[Path] = Nil // Get transformed plan with index data and appended files if applicable. @@ -275,11 +277,14 @@ object RuleUtils { // can be transformed to 'Project -> Filter -> Logical Relation'. Thus, with transformDown, // it will be matched again and transformed recursively which causes stack overflow exception. case baseRelation @ LogicalRelation( - _ @HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _), + _ @HadoopFsRelation(location: FileIndex, _, _, _, _, _), baseOutput, _, _) => - val curFiles = location.allFiles + val curFiles = Hyperspace + .getContext(spark) + .sourceProviderManager + .allFiles(baseRelation) .map(f => FileInfo(f, index.fileIdTracker.addFile(f), asFullPath = true)) val (filesDeleted, filesAppended) = @@ -407,11 +412,11 @@ object RuleUtils { // Transform the location of LogicalRelation with appended files. val planForAppended = originalPlan transformDown { case baseRelation @ LogicalRelation( - fsRelation @ HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _), + fsRelation @ HadoopFsRelation(location: FileIndex, _, _, _, _, _), baseOutput, _, _) => - val options = extractBasePath(location.partitionSpec) + val options = extractBasePath(location) .map { basePath => // Set "basePath" so that partitioned columns are also included in the output schema. Map("basePath" -> basePath) @@ -437,19 +442,24 @@ object RuleUtils { planForAppended } - private def extractBasePath(partitionSpec: PartitionSpec): Option[String] = { - if (partitionSpec == PartitionSpec.emptySpec) { + private def extractBasePath(location: FileIndex): Option[String] = { + if (location.partitionSchema.isEmpty) { None } else { - // For example, we could have the following in PartitionSpec: - // - partition columns = "col1", "col2" - // - partitions: "/path/col1=1/col2=1", "/path/col1=1/col2=2", etc. - // , and going up the same number of directory levels as the number of partition columns - // will compute the base path. Note that PartitionSpec.partitions will always contain - // all the partitions in the path, so "partitions.head" is taken as an initial value. - val basePath = partitionSpec.partitionColumns - .foldLeft(partitionSpec.partitions.head.path)((path, _) => path.getParent) - Some(basePath.toString) + location match { + case p: PartitioningAwareFileIndex => + // For example, we could have the following in PartitionSpec: + // - partition columns = "col1", "col2" + // - partitions: "/path/col1=1/col2=1", "/path/col1=1/col2=2", etc. + // , and going up the same number of directory levels as the number of partition columns + // will compute the base path. Note that PartitionSpec.partitions will always contain + // all the partitions in the path, so "partitions.head" is taken as an initial value. + val basePath = p.partitionSpec.partitionColumns + .foldLeft(p.partitionSpec.partitions.head.path)((path, _) => path.getParent) + Some(basePath.toString) + case d: TahoeLogFileIndex => + Some(d.path.toString) + } } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala index 91004136e..896533fc8 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala @@ -18,6 +18,7 @@ package com.microsoft.hyperspace.index.sources import scala.util.{Success, Try} +import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.util.hyperspace.Utils @@ -47,7 +48,6 @@ class FileBasedSourceProviderManager(spark: SparkSession) { * Runs createRelation() for each provider. * * @param logicalRelation Logical relation to create [[Relation]] from. - * @param fileIdTracker [[FileIdTracker]] to use when populating the data of [[Relation]]. * @return [[Relation]] created from the given logical relation. * @throws HyperspaceException if multiple providers returns [[Some]] or * if no providers return [[Some]]. @@ -80,6 +80,10 @@ class FileBasedSourceProviderManager(spark: SparkSession) { run(p => p.signature(logicalRelation)) } + def allFiles(logicalRelation: LogicalRelation): Seq[FileStatus] = { + run(p => p.allFiles(logicalRelation)) + } + /** * Runs the given function 'f', which executes a [[FileBasedSourceProvider]]'s API that returns * [[Option]] for each provider built. This function ensures that only one provider returns diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala index 68c1b8cbc..ff5399201 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala @@ -150,6 +150,14 @@ class DefaultFileBasedSource(private val spark: SparkSession) extends FileBasedS fileStatus.getLen.toString + fileStatus.getModificationTime.toString + fileStatus.getPath.toString } + + override def allFiles(logicalRelation: LogicalRelation): Option[Seq[FileStatus]] = { + logicalRelation.relation match { + case HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _) => + Some(location.allFiles) + case _ => None + } + } } /** diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeFileBasedSource.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeFileBasedSource.scala new file mode 100644 index 000000000..d91d6efab --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeFileBasedSource.scala @@ -0,0 +1,142 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.sources.delta + +import java.util.Locale + +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.delta.files.TahoeLogFileIndex +import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex} +import org.apache.spark.sql.sources.DataSourceRegister + +import com.microsoft.hyperspace.index.{Content, FileIdTracker, Hdfs, IndexConstants, Relation} +import com.microsoft.hyperspace.index.sources.{FileBasedSourceProvider, SourceProvider, SourceProviderBuilder} +import com.microsoft.hyperspace.util.{CacheWithTransform, HashingUtils, HyperspaceConf, PathUtils} + +/** + * Default implementation for file-based Spark built-in sources such as parquet, csv, json, etc. + * + * This source can support relations that meet the following criteria: + * - The relation is [[HadoopFsRelation]] with [[PartitioningAwareFileIndex]] as file index. + * - Its file format implements [[DataSourceRegister]]. + */ +class DeltaLakeFileBasedSource(private val spark: SparkSession) extends FileBasedSourceProvider { + + /** + * Creates [[Relation]] for IndexLogEntry using the given [[LogicalRelation]]. + * + * @param logicalRelation logical relation to derive [[Relation]] from. + * @return [[Relation]] object if the given 'logicalRelation' can be processed by this provider. + * Otherwise, None. + */ + override def createRelation( + logicalRelation: LogicalRelation, + fileIdTracker: FileIdTracker): Option[Relation] = { + logicalRelation.relation match { + case HadoopFsRelation(location: TahoeLogFileIndex, _, dataSchema, _, _, options) => + val files = location + .getSnapshot(stalenessAcceptable = false) + .filesForScan(projection = Nil, location.partitionFilters, keepStats = false) + .files + .map { f => + new FileStatus( + /* length */ f.size, + /* isDir */ false, + /* blockReplication */ 0, + /* blockSize */ 1, + /* modificationTime */ f.modificationTime, + new Path(location.path, f.path)) + } + // Note that source files are currently fingerprinted when the optimized plan is + // fingerprinted by LogicalPlanFingerprint. + val sourceDataProperties = + Hdfs.Properties(Content.fromLeafFiles(files, fileIdTracker).get) + val fileFormatName = "delta" + // "path" key in options can incur multiple data read unexpectedly and keep + // the table version info as metadata. + val opts = options - "path" + ("versionAsOf" -> location.tableVersion.toString) + Some( + Relation( + Seq(PathUtils.makeAbsolute(location.path).toString), + Hdfs(sourceDataProperties), + dataSchema.json, + fileFormatName, + opts)) + case _ => None + } + } + + /** + * Given a [[Relation]], returns a new [[Relation]] that will have the latest source. + * + * @param relation [[Relation]] object to reconstruct [[DataFrame]] with. + * @return [[Relation]] object if the given 'relation' can be processed by this provider. + * Otherwise, None. + */ + override def refreshRelation(relation: Relation): Option[Relation] = { + if (relation.fileFormat.equals(IndexConstants.DELTA_FORMAT_STR)) { + Some(relation.copy(options = relation.options - "versionAsOf" - "timestampAsOf")) + } else { + None + } + } + + /** + * Computes the signature using the given [[LogicalRelation]]. This computes a signature of + * using version info and table name. + * + * @param logicalRelation logical relation to compute signature from. + * @return Signature computed if the given 'logicalRelation' can be processed by this provider. + * Otherwise, None. + */ + override def signature(logicalRelation: LogicalRelation): Option[String] = { + logicalRelation.relation match { + case HadoopFsRelation(location: TahoeLogFileIndex, _, _, _, _, _) => + Some(location.tableVersion + location.path.toString) + case _ => None + } + } + + override def allFiles(logicalRelation: LogicalRelation): Option[Seq[FileStatus]] = { + logicalRelation.relation match { + case HadoopFsRelation(location: TahoeLogFileIndex, _, _, _, _, _) => + val files = location + .getSnapshot(stalenessAcceptable = false) + .filesForScan(projection = Nil, location.partitionFilters, keepStats = false) + .files + .map { f => + new FileStatus( + /* length */ f.size, + /* isDir */ false, + /* blockReplication */ 0, + /* blockSize */ 1, + /* modificationTime */ f.modificationTime, + new Path(location.path, f.path)) + } + Some(files) + case _ => None + } + } +} + +/** + * Builder for building [[DeltaLakeFileBasedSource]]. + */ +class DeltaLakeFileBasedSourceBuilder extends SourceProviderBuilder { + override def build(spark: SparkSession): SourceProvider = new DeltaLakeFileBasedSource(spark) +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala index 120396c54..bd2c0e7c0 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala @@ -16,6 +16,7 @@ package com.microsoft.hyperspace.index.sources +import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.execution.datasources.LogicalRelation @@ -101,4 +102,6 @@ trait FileBasedSourceProvider extends SourceProvider { * Otherwise, None. */ def signature(logicalRelation: LogicalRelation): Option[String] + + def allFiles(logicalRelation: LogicalRelation): Option[Seq[FileStatus]] } diff --git a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala index b7dfef307..1d198602e 100644 --- a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala +++ b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala @@ -75,7 +75,8 @@ object HyperspaceConf { spark.sessionState.conf .getConfString( "spark.hyperspace.index.sources.fileBasedBuilders", - "com.microsoft.hyperspace.index.sources.default.DefaultFileBasedSourceBuilder") + "com.microsoft.hyperspace.index.sources.default.DefaultFileBasedSourceBuilder," + + "com.microsoft.hyperspace.index.sources.delta.DeltaLakeFileBasedSourceBuilder") } def supportedFileFormatsForDefaultFileBasedSource(spark: SparkSession): String = { diff --git a/src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala b/src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala new file mode 100644 index 000000000..e37ad9f07 --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala @@ -0,0 +1,158 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index + +import io.delta.tables.DeltaTable +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.{DataFrame, QueryTest} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources._ + +import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData} + +class DeltaLakeIntegrationTest extends QueryTest with HyperspaceSuite { + override val systemPath = new Path("src/test/resources/deltaLakeIntegrationTest") + + private val sampleData = SampleData.testData + private var hyperspace: Hyperspace = _ + + override def beforeAll(): Unit = { + super.beforeAll() + hyperspace = new Hyperspace(spark) + } + + before { + spark.enableHyperspace() + } + + after { + spark.disableHyperspace() + } + + test("Verify createIndex and refreshIndex on Delta Lake table.") { + withTempPathAsString { dataPath => + import spark.implicits._ + val dfFromSample = sampleData.toDF("Date", "RGUID", "Query", "imprs", "clicks") + dfFromSample.write.format("delta").save(dataPath) + + val deltaDf = spark.read.format("delta").load(dataPath) + hyperspace.createIndex(deltaDf, IndexConfig("deltaIndex", Seq("clicks"), Seq("Query"))) + + withIndex("deltaIndex") { + def query(version: Option[Long] = None): DataFrame = { + if (version.isDefined) { + val deltaDf = + spark.read.format("delta").option("versionAsOf", version.get).load(dataPath) + deltaDf.filter(deltaDf("clicks") <= 2000).select(deltaDf("query")) + } else { + val deltaDf = spark.read.format("delta").load(dataPath) + deltaDf.filter(deltaDf("clicks") <= 2000).select(deltaDf("query")) + } + } + + assert(verifyIndexUse(query().queryExecution.optimizedPlan, "deltaIndex")) + + // Create a new version by deleting entries. + val deltaTable = DeltaTable.forPath(dataPath) + deltaTable.delete("clicks > 2000") + + // The index should not be applied for the updated version. + assert(!verifyIndexUse(query().queryExecution.optimizedPlan, "deltaIndex")) + + // The index should be applied for the version at index creation. + assert(verifyIndexUse(query(Some(0)).queryExecution.optimizedPlan, "deltaIndex")) + + hyperspace.refreshIndex("deltaIndex") + + // The index should be applied for the updated version. + assert(verifyIndexUse(query().queryExecution.optimizedPlan, "deltaIndex/v__=1")) + + // The index should not be applied for the version at index creation. + assert(!verifyIndexUse(query(Some(0)).queryExecution.optimizedPlan, "deltaIndex")) + } + } + } + + test("Verify Hybrid Scan on Delta Lake table.") { + withTempPathAsString { dataPath => + import spark.implicits._ + val dfFromSample = sampleData.toDF("Date", "RGUID", "Query", "imprs", "clicks") + dfFromSample.write.format("delta").save(dataPath) + + val deltaDf = spark.read.format("delta").load(dataPath) + withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") { + hyperspace.createIndex(deltaDf, IndexConfig("deltaIndex", Seq("clicks"), Seq("Query"))) + } + + withIndex("deltaIndex") { + def query(version: Option[Long] = None): DataFrame = { + if (version.isDefined) { + val deltaDf = + spark.read.format("delta").option("versionAsOf", version.get).load(dataPath) + deltaDf.filter(deltaDf("clicks") <= 2000).select(deltaDf("query")) + } else { + val deltaDf = spark.read.format("delta").load(dataPath) + deltaDf.filter(deltaDf("clicks") <= 2000).select(deltaDf("query")) + } + } + + assert(verifyIndexUse(query().queryExecution.optimizedPlan, "deltaIndex", false)) + + // Create a new version by deleting entries. + val deltaTable = DeltaTable.forPath(dataPath) + deltaTable.delete("clicks > 5000") + + withSQLConf( + "spark.hyperspace.index.hybridscan.enabled" -> "true", + "spark.hyperspace.index.hybridscan.delete.enabled" -> "true") { + // The index should be applied for the updated version. + assert(verifyIndexUse(query().queryExecution.optimizedPlan, "deltaIndex", true)) + + // Append data. + dfFromSample + .limit(3) + .write + .format("delta") + .mode("append") + .save(dataPath) + + // The index should be applied for the updated version. + assert(verifyIndexUse(query().queryExecution.optimizedPlan, "deltaIndex", true)) + } + } + } + } + + def verifyIndexUse( + plan: LogicalPlan, + indexName: String, + isHybridScan: Boolean = false): Boolean = { + val rootPaths = plan.collect { + case LogicalRelation( + HadoopFsRelation(location: InMemoryFileIndex, _, _, _, _, _), + _, + _, + _) => + location.rootPaths + }.flatten + if (!isHybridScan) { + rootPaths.nonEmpty && rootPaths.forall(_.toString.contains(indexName)) + } else { + rootPaths.nonEmpty && rootPaths.count(_.toString.contains(indexName)) > 0 + } + } +} From 44528228e2c79d63479ebf0c36a8ba825762d8a9 Mon Sep 17 00:00:00 2001 From: sezruby Date: Wed, 25 Nov 2020 14:47:23 +0900 Subject: [PATCH 02/11] Test fix --- .../hyperspace/actions/CreateActionBase.scala | 10 ++++-- .../hyperspace/actions/OptimizeAction.scala | 2 +- .../index/rules/DeltaLakeRuleUtils.scala | 33 +++++++++++++++++++ .../hyperspace/index/rules/RuleUtils.scala | 5 +-- 4 files changed, 45 insertions(+), 5 deletions(-) create mode 100644 src/main/scala/com/microsoft/hyperspace/index/rules/DeltaLakeRuleUtils.scala diff --git a/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala b/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala index 0b947e4ae..86a0a89d4 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.functions.input_file_name import com.microsoft.hyperspace.{Hyperspace, HyperspaceException} import com.microsoft.hyperspace.index._ import com.microsoft.hyperspace.index.DataFrameWriterExtensions.Bucketizer +import com.microsoft.hyperspace.index.rules.DeltaLakeRuleUtils import com.microsoft.hyperspace.util.{HyperspaceConf, PathUtils, ResolverUtils} /** @@ -174,9 +175,14 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager) // + file:/C:/hyperspace/src/test/part-00003.snappy.parquet import spark.implicits._ val dataPathColumn = "_data_path" - val lineageDF = fileIdTracker.getFileToIdMap.toSeq + val isDeltaLakeSource = DeltaLakeRuleUtils.isDeltaLakeSource(df.queryExecution.optimizedPlan) + val lineageDF = fileIdTracker.getFileToIdMap.toSeq .map { kv => - (kv._1._1.replace("file:/", "file:///"), kv._2) + if (isDeltaLakeSource) { + (kv._1._1, kv._2) + } else { + (kv._1._1.replace("file:/", "file:///"), kv._2) + } } .toDF(dataPathColumn, IndexConstants.DATA_FILE_NAME_ID) diff --git a/src/main/scala/com/microsoft/hyperspace/actions/OptimizeAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/OptimizeAction.scala index 985047aa7..f2fb8575b 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/OptimizeAction.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/OptimizeAction.scala @@ -93,7 +93,7 @@ class OptimizeAction( repartitionedDf.write.saveWithBuckets( repartitionedDf, indexDataPath.toString, - logEntry.asInstanceOf[IndexLogEntry].numBuckets, + numBuckets, indexConfig.indexedColumns, SaveMode.Overwrite) } diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/DeltaLakeRuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/DeltaLakeRuleUtils.scala new file mode 100644 index 000000000..a81c99ef9 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/DeltaLakeRuleUtils.scala @@ -0,0 +1,33 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.rules + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.delta.files.TahoeLogFileIndex +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} + +object DeltaLakeRuleUtils { + + def isDeltaLakeSource(plan: LogicalPlan): Boolean = { + val numTahoe = plan.collect { + case LogicalRelation(HadoopFsRelation(_: TahoeLogFileIndex, _, _, _, _, _), _, _, _) => + true + } + assert(numTahoe.size <= 1) + numTahoe.nonEmpty + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala index 5021f9907..a6363a476 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala @@ -175,13 +175,14 @@ object RuleUtils { plan: LogicalPlan, useBucketSpec: Boolean): LogicalPlan = { // Check pre-requisite. - assert(getLogicalRelation(plan).isDefined) + val logicalRelation = getLogicalRelation(plan) + assert(logicalRelation.isDefined) // If there is no change in source data files, the index can be applied by // transformPlanToUseIndexOnlyScan regardless of Hybrid Scan config. // This tag should always exist if Hybrid Scan is enabled. lazy val hybridScanRequired = - index.getTagValue(getLogicalRelation(plan).get, IndexLogEntryTags.HYBRIDSCAN_REQUIRED).get + index.getTagValue(logicalRelation.get, IndexLogEntryTags.HYBRIDSCAN_REQUIRED).get val transformed = if (HyperspaceConf.hybridScanEnabled(spark) && hybridScanRequired) { transformPlanToUseHybridScan(spark, index, plan, useBucketSpec) From 8f07454a5e6d7b0d1c8bcde74036c7a736edb7c3 Mon Sep 17 00:00:00 2001 From: sezruby Date: Wed, 25 Nov 2020 14:57:05 +0900 Subject: [PATCH 03/11] Review commit --- .../hyperspace/actions/CreateActionBase.scala | 5 +++-- .../actions/RefreshActionBase.scala | 20 +++++++++---------- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala b/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala index 86a0a89d4..a6de95417 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala @@ -175,8 +175,9 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager) // + file:/C:/hyperspace/src/test/part-00003.snappy.parquet import spark.implicits._ val dataPathColumn = "_data_path" - val isDeltaLakeSource = DeltaLakeRuleUtils.isDeltaLakeSource(df.queryExecution.optimizedPlan) - val lineageDF = fileIdTracker.getFileToIdMap.toSeq + val isDeltaLakeSource = + DeltaLakeRuleUtils.isDeltaLakeSource(df.queryExecution.optimizedPlan) + val lineageDF = fileIdTracker.getFileToIdMap.toSeq .map { kv => if (isDeltaLakeSource) { (kv._1._1, kv._2) diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala index 59ed083d2..0c5a20586 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala @@ -70,18 +70,18 @@ private[actions] abstract class RefreshActionBase( val latestRelation = Hyperspace.getContext(spark).sourceProviderManager.refreshRelation(relations.head) val dataSchema = DataType.fromJson(latestRelation.dataSchemaJson).asInstanceOf[StructType] + val df = spark.read + .schema(dataSchema) + .format(latestRelation.fileFormat) + .options(latestRelation.options) + // Due to the difference in how the "path" option is set: https://github.com/apache/spark/ + // blob/ef1441b56c5cab02335d8d2e4ff95cf7e9c9b9ca/sql/core/src/main/scala/org/apache/spark/ + // sql/DataFrameReader.scala#L197 + // load() with a single parameter needs to be handled differently. if (latestRelation.rootPaths.size == 1) { - spark.read - .schema(dataSchema) - .format(latestRelation.fileFormat) - .options(latestRelation.options) - .load(latestRelation.rootPaths.head) + df.load(latestRelation.rootPaths.head) } else { - spark.read - .schema(dataSchema) - .format(latestRelation.fileFormat) - .options(latestRelation.options) - .load(latestRelation.rootPaths: _*) + df.load(latestRelation.rootPaths: _*) } } From 61c152ce73ddf97a0fb74775fcab0e2fe76f6b93 Mon Sep 17 00:00:00 2001 From: sezruby Date: Wed, 25 Nov 2020 15:58:14 +0900 Subject: [PATCH 04/11] Review commit --- .../hyperspace/actions/CreateActionBase.scala | 16 +++------ .../hyperspace/index/IndexConstants.scala | 2 -- .../index/rules/DeltaLakeRuleUtils.scala | 33 ----------------- .../hyperspace/index/rules/RuleUtils.scala | 19 ++-------- .../FileBasedSourceProviderManager.scala | 12 ++++++- .../default/DefaultFileBasedSource.scala | 35 +++++++++++++++++-- .../delta/DeltaLakeFileBasedSource.scala | 33 +++++++++++++---- .../hyperspace/index/sources/interfaces.scala | 9 ++++- 8 files changed, 86 insertions(+), 73 deletions(-) delete mode 100644 src/main/scala/com/microsoft/hyperspace/index/rules/DeltaLakeRuleUtils.scala diff --git a/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala b/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala index a6de95417..d92350f8f 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala @@ -24,7 +24,6 @@ import org.apache.spark.sql.functions.input_file_name import com.microsoft.hyperspace.{Hyperspace, HyperspaceException} import com.microsoft.hyperspace.index._ import com.microsoft.hyperspace.index.DataFrameWriterExtensions.Bucketizer -import com.microsoft.hyperspace.index.rules.DeltaLakeRuleUtils import com.microsoft.hyperspace.util.{HyperspaceConf, PathUtils, ResolverUtils} /** @@ -175,17 +174,10 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager) // + file:/C:/hyperspace/src/test/part-00003.snappy.parquet import spark.implicits._ val dataPathColumn = "_data_path" - val isDeltaLakeSource = - DeltaLakeRuleUtils.isDeltaLakeSource(df.queryExecution.optimizedPlan) - val lineageDF = fileIdTracker.getFileToIdMap.toSeq - .map { kv => - if (isDeltaLakeSource) { - (kv._1._1, kv._2) - } else { - (kv._1._1.replace("file:/", "file:///"), kv._2) - } - } - .toDF(dataPathColumn, IndexConstants.DATA_FILE_NAME_ID) + val relation = df.queryExecution.optimizedPlan.asInstanceOf[LogicalRelation] + val lineagePairs = + Hyperspace.getContext(spark).sourceProviderManager.lineagePairs(relation, fileIdTracker) + val lineageDF = lineagePairs.toDF(dataPathColumn, IndexConstants.DATA_FILE_NAME_ID) df.withColumn(dataPathColumn, input_file_name()) .join(lineageDF.hint("broadcast"), dataPathColumn) diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala index 2a36b55b7..35370b790 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala @@ -64,8 +64,6 @@ object IndexConstants { "spark.hyperspace.index.cache.expiryDurationInSeconds" val INDEX_CACHE_EXPIRY_DURATION_SECONDS_DEFAULT = "300" - val DELTA_FORMAT_STR = "delta" - // Operation Log constants val HYPERSPACE_LOG = "_hyperspace_log" val INDEX_VERSION_DIRECTORY_PREFIX = "v__" diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/DeltaLakeRuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/DeltaLakeRuleUtils.scala deleted file mode 100644 index a81c99ef9..000000000 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/DeltaLakeRuleUtils.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright (2020) The Hyperspace Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.microsoft.hyperspace.index.rules - -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.delta.files.TahoeLogFileIndex -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} - -object DeltaLakeRuleUtils { - - def isDeltaLakeSource(plan: LogicalPlan): Boolean = { - val numTahoe = plan.collect { - case LogicalRelation(HadoopFsRelation(_: TahoeLogFileIndex, _, _, _, _, _), _, _, _) => - true - } - assert(numTahoe.size <= 1) - numTahoe.nonEmpty - } -} diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala index a6363a476..a145c356a 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala @@ -417,7 +417,7 @@ object RuleUtils { baseOutput, _, _) => - val options = extractBasePath(location) + val options = extractBasePath(spark, location) .map { basePath => // Set "basePath" so that partitioned columns are also included in the output schema. Map("basePath" -> basePath) @@ -443,24 +443,11 @@ object RuleUtils { planForAppended } - private def extractBasePath(location: FileIndex): Option[String] = { + private def extractBasePath(spark: SparkSession, location: FileIndex): Option[String] = { if (location.partitionSchema.isEmpty) { None } else { - location match { - case p: PartitioningAwareFileIndex => - // For example, we could have the following in PartitionSpec: - // - partition columns = "col1", "col2" - // - partitions: "/path/col1=1/col2=1", "/path/col1=1/col2=2", etc. - // , and going up the same number of directory levels as the number of partition columns - // will compute the base path. Note that PartitionSpec.partitions will always contain - // all the partitions in the path, so "partitions.head" is taken as an initial value. - val basePath = p.partitionSpec.partitionColumns - .foldLeft(p.partitionSpec.partitions.head.path)((path, _) => path.getParent) - Some(basePath.toString) - case d: TahoeLogFileIndex => - Some(d.path.toString) - } + Some(Hyperspace.getContext(spark).sourceProviderManager.partitionBasePath(location)) } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala index 896533fc8..95056ad44 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala @@ -20,7 +20,7 @@ import scala.util.{Success, Try} import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.execution.datasources.{FileIndex, LogicalRelation} import org.apache.spark.util.hyperspace.Utils import com.microsoft.hyperspace.HyperspaceException @@ -84,6 +84,16 @@ class FileBasedSourceProviderManager(spark: SparkSession) { run(p => p.allFiles(logicalRelation)) } + def partitionBasePath(location: FileIndex): String = { + run(p => p.partitionBasePath(location)) + } + + def lineagePairs( + logicalRelation: LogicalRelation, + fileIdTracker: FileIdTracker): Seq[(String, Long)] = { + run(p => p.lineagePairs(logicalRelation, fileIdTracker)) + } + /** * Runs the given function 'f', which executes a [[FileBasedSourceProvider]]'s API that returns * [[Option]] for each provider built. This function ensures that only one provider returns diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala index ff5399201..b5670a9e4 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala @@ -20,7 +20,7 @@ import java.util.Locale import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.{DataFrame, SparkSession} -import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex} +import org.apache.spark.sql.execution.datasources.{FileFormat, FileIndex, HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex} import org.apache.spark.sql.sources.DataSourceRegister import com.microsoft.hyperspace.index.{Content, FileIdTracker, Hdfs, Relation} @@ -46,7 +46,7 @@ class DefaultFileBasedSource(private val spark: SparkSession) extends FileBasedS * Creates [[Relation]] for IndexLogEntry using the given [[LogicalRelation]]. * * @param logicalRelation logical relation to derive [[Relation]] from. - * @param fileIdTracker [[FileIdTracker]] to use when populating the data of [[Relation]]. + * @param fileIdTracker [[FileIdTracker]] to use when populating the data of [[Relation]]. * @return [[Relation]] object if the given 'logicalRelation' can be processed by this provider. * Otherwise, None. */ @@ -158,6 +158,37 @@ class DefaultFileBasedSource(private val spark: SparkSession) extends FileBasedS case _ => None } } + + override def partitionBasePath(location: FileIndex): Option[String] = { + location match { + case p: PartitioningAwareFileIndex => + // For example, we could have the following in PartitionSpec: + // - partition columns = "col1", "col2" + // - partitions: "/path/col1=1/col2=1", "/path/col1=1/col2=2", etc. + // , and going up the same number of directory levels as the number of partition columns + // will compute the base path. Note that PartitionSpec.partitions will always contain + // all the partitions in the path, so "partitions.head" is taken as an initial value. + val basePath = p.partitionSpec.partitionColumns + .foldLeft(p.partitionSpec.partitions.head.path)((path, _) => path.getParent) + Some(basePath.toString) + case _ => + None + } + } + + override def lineagePairs( + logicalRelation: LogicalRelation, + fileIdTracker: FileIdTracker): Option[Seq[(String, Long)]] = { + logicalRelation.relation match { + case HadoopFsRelation(_: PartitioningAwareFileIndex, _, _, _, format, _) + if isSupportedFileFormat(format) => + Some(fileIdTracker.getFileToIdMap.toSeq.map { kv => + (kv._1._1.replace("file:/", "file:///"), kv._2) + }) + case _ => + None + } + } } /** diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeFileBasedSource.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeFileBasedSource.scala index d91d6efab..ee2e1eda9 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeFileBasedSource.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeFileBasedSource.scala @@ -16,17 +16,15 @@ package com.microsoft.hyperspace.index.sources.delta -import java.util.Locale - import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.delta.files.TahoeLogFileIndex -import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex} +import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.sources.DataSourceRegister -import com.microsoft.hyperspace.index.{Content, FileIdTracker, Hdfs, IndexConstants, Relation} +import com.microsoft.hyperspace.index.{Content, FileIdTracker, Hdfs, Relation} import com.microsoft.hyperspace.index.sources.{FileBasedSourceProvider, SourceProvider, SourceProviderBuilder} -import com.microsoft.hyperspace.util.{CacheWithTransform, HashingUtils, HyperspaceConf, PathUtils} +import com.microsoft.hyperspace.util.PathUtils /** * Default implementation for file-based Spark built-in sources such as parquet, csv, json, etc. @@ -36,6 +34,7 @@ import com.microsoft.hyperspace.util.{CacheWithTransform, HashingUtils, Hyperspa * - Its file format implements [[DataSourceRegister]]. */ class DeltaLakeFileBasedSource(private val spark: SparkSession) extends FileBasedSourceProvider { + val DELTA_FORMAT_STR = "delta" /** * Creates [[Relation]] for IndexLogEntry using the given [[LogicalRelation]]. @@ -89,7 +88,7 @@ class DeltaLakeFileBasedSource(private val spark: SparkSession) extends FileBase * Otherwise, None. */ override def refreshRelation(relation: Relation): Option[Relation] = { - if (relation.fileFormat.equals(IndexConstants.DELTA_FORMAT_STR)) { + if (relation.fileFormat.equals(DELTA_FORMAT_STR)) { Some(relation.copy(options = relation.options - "versionAsOf" - "timestampAsOf")) } else { None @@ -132,6 +131,28 @@ class DeltaLakeFileBasedSource(private val spark: SparkSession) extends FileBase case _ => None } } + + override def partitionBasePath(location: FileIndex): Option[String] = { + location match { + case d: TahoeLogFileIndex => + Some(d.path.toString) + case _ => + None + } + } + + override def lineagePairs( + logicalRelation: LogicalRelation, + fileIdTracker: FileIdTracker): Option[Seq[(String, Long)]] = { + logicalRelation.relation match { + case HadoopFsRelation(_: TahoeLogFileIndex, _, _, _, _, _) => + Some(fileIdTracker.getFileToIdMap.toSeq.map { kv => + (kv._1._1, kv._2) + }) + case _ => + None + } + } } /** diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala index bd2c0e7c0..049e1dd27 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala @@ -18,7 +18,7 @@ package com.microsoft.hyperspace.index.sources import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.{DataFrame, SparkSession} -import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.execution.datasources.{FileIndex, LogicalRelation} import com.microsoft.hyperspace.index.{FileIdTracker, Relation} @@ -104,4 +104,11 @@ trait FileBasedSourceProvider extends SourceProvider { def signature(logicalRelation: LogicalRelation): Option[String] def allFiles(logicalRelation: LogicalRelation): Option[Seq[FileStatus]] + + def partitionBasePath(location: FileIndex): Option[String] + + def lineagePairs( + logicalRelation: LogicalRelation, + fileIdTracker: FileIdTracker): Option[Seq[(String, Long)]] + } From 3455efbd7bebdcd0c6e48247be7e7ad23f3cf860 Mon Sep 17 00:00:00 2001 From: sezruby Date: Wed, 25 Nov 2020 16:44:50 +0900 Subject: [PATCH 05/11] Disable delta --- .../com/microsoft/hyperspace/util/HyperspaceConf.scala | 3 +-- .../hyperspace/index/DeltaLakeIntegrationTest.scala | 8 ++++++++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala index 1d198602e..b7dfef307 100644 --- a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala +++ b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala @@ -75,8 +75,7 @@ object HyperspaceConf { spark.sessionState.conf .getConfString( "spark.hyperspace.index.sources.fileBasedBuilders", - "com.microsoft.hyperspace.index.sources.default.DefaultFileBasedSourceBuilder," + - "com.microsoft.hyperspace.index.sources.delta.DeltaLakeFileBasedSourceBuilder") + "com.microsoft.hyperspace.index.sources.default.DefaultFileBasedSourceBuilder") } def supportedFileFormatsForDefaultFileBasedSource(spark: SparkSession): String = { diff --git a/src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala b/src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala index e37ad9f07..835f7a86f 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala @@ -32,9 +32,17 @@ class DeltaLakeIntegrationTest extends QueryTest with HyperspaceSuite { override def beforeAll(): Unit = { super.beforeAll() + spark.conf.set("spark.hyperspace.index.sources.fileBasedBuilders", + "com.microsoft.hyperspace.index.sources.delta.DeltaLakeFileBasedSourceBuilder," + + "com.microsoft.hyperspace.index.sources.default.DefaultFileBasedSourceBuilder") hyperspace = new Hyperspace(spark) } + override def afterAll(): Unit = { + super.afterAll() + spark.conf.unset("spark.hyperspace.index.sources.fileBasedBuilders") + } + before { spark.enableHyperspace() } From 02f8c4546531a1213f3b68ddb3d9017081f05085 Mon Sep 17 00:00:00 2001 From: sezruby Date: Wed, 25 Nov 2020 17:35:09 +0900 Subject: [PATCH 06/11] Add comment --- .../FileBasedSourceProviderManager.scala | 26 ++++++++++++++++++ .../default/DefaultFileBasedSource.scala | 23 +++++++++++++++- .../delta/DeltaLakeFileBasedSource.scala | 27 ++++++++++++++++--- .../hyperspace/index/sources/interfaces.scala | 23 +++++++++++++++- 4 files changed, 93 insertions(+), 6 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala index 95056ad44..64b81a672 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala @@ -48,6 +48,7 @@ class FileBasedSourceProviderManager(spark: SparkSession) { * Runs createRelation() for each provider. * * @param logicalRelation Logical relation to create [[Relation]] from. + * @param fileIdTracker [[FileIdTracker]] to use when populating the data of [[Relation]]. * @return [[Relation]] created from the given logical relation. * @throws HyperspaceException if multiple providers returns [[Some]] or * if no providers return [[Some]]. @@ -80,14 +81,39 @@ class FileBasedSourceProviderManager(spark: SparkSession) { run(p => p.signature(logicalRelation)) } + /** + * Runs allFiles() for each provider. + * + * @param logicalRelation Logical relation to retrieve all input files. + * @return List of all input files. + * @throws HyperspaceException if multiple providers returns [[Some]] or + * if no providers return [[Some]]. + */ def allFiles(logicalRelation: LogicalRelation): Seq[FileStatus] = { run(p => p.allFiles(logicalRelation)) } + /** + * Runs partitionBasePath() for each provider. + * + * @param location Partitioned location. + * @return basePath string to read the given partitioned location. + * @throws HyperspaceException if multiple providers returns [[Some]] or + * if no providers return [[Some]]. + */ def partitionBasePath(location: FileIndex): String = { run(p => p.partitionBasePath(location)) } + /** + * Runs lineagePairs() for each provider. + * + * @param logicalRelation Logical Relation to check the relation type. + * @param fileIdTracker [[FileIdTracker]] to create the list of (file path, file id). + * @return List of (file path, file id). + * @throws HyperspaceException if multiple providers returns [[Some]] or + * if no providers return [[Some]]. + */ def lineagePairs( logicalRelation: LogicalRelation, fileIdTracker: FileIdTracker): Seq[(String, Long)] = { diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala index b5670a9e4..df2e3dc88 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala @@ -46,7 +46,7 @@ class DefaultFileBasedSource(private val spark: SparkSession) extends FileBasedS * Creates [[Relation]] for IndexLogEntry using the given [[LogicalRelation]]. * * @param logicalRelation logical relation to derive [[Relation]] from. - * @param fileIdTracker [[FileIdTracker]] to use when populating the data of [[Relation]]. + * @param fileIdTracker [[FileIdTracker]] to use when populating the data of [[Relation]]. * @return [[Relation]] object if the given 'logicalRelation' can be processed by this provider. * Otherwise, None. */ @@ -151,6 +151,12 @@ class DefaultFileBasedSource(private val spark: SparkSession) extends FileBasedS fileStatus.getPath.toString } + /** + * Retrieves all input files from the given [[LogicalRelation]]. + * + * @param logicalRelation Logical relation to retrieve input files from. + * @return List of [[FileStatus]] for the given relation. + */ override def allFiles(logicalRelation: LogicalRelation): Option[Seq[FileStatus]] = { logicalRelation.relation match { case HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _) => @@ -159,6 +165,12 @@ class DefaultFileBasedSource(private val spark: SparkSession) extends FileBasedS } } + /** + * Constructs the basePath for the given [[FileIndex]]. + * + * @param location Partitioned data location. + * @return basePath to read the given partitioned location. + */ override def partitionBasePath(location: FileIndex): Option[String] = { location match { case p: PartitioningAwareFileIndex => @@ -176,6 +188,15 @@ class DefaultFileBasedSource(private val spark: SparkSession) extends FileBasedS } } + /** + * Returns list of pairs of (file path, file id) to build lineage column. + * + * File paths should be the same format with "input_file_name()" of the given relation type. + * + * @param logicalRelation Logical relation to check the relation type. + * @param fileIdTracker [[FileIdTracker]] to create the list of (file path, file id). + * @return List of pairs of (file path, file id). + */ override def lineagePairs( logicalRelation: LogicalRelation, fileIdTracker: FileIdTracker): Option[Seq[(String, Long)]] = { diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeFileBasedSource.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeFileBasedSource.scala index ee2e1eda9..3fdcd88b3 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeFileBasedSource.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeFileBasedSource.scala @@ -20,18 +20,16 @@ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.delta.files.TahoeLogFileIndex import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, LogicalRelation} -import org.apache.spark.sql.sources.DataSourceRegister import com.microsoft.hyperspace.index.{Content, FileIdTracker, Hdfs, Relation} import com.microsoft.hyperspace.index.sources.{FileBasedSourceProvider, SourceProvider, SourceProviderBuilder} import com.microsoft.hyperspace.util.PathUtils /** - * Default implementation for file-based Spark built-in sources such as parquet, csv, json, etc. + * Delta Lake file-based source provider. * * This source can support relations that meet the following criteria: - * - The relation is [[HadoopFsRelation]] with [[PartitioningAwareFileIndex]] as file index. - * - Its file format implements [[DataSourceRegister]]. + * - The relation is [[HadoopFsRelation]] with [[TahoeLogFileIndex]] as file index. */ class DeltaLakeFileBasedSource(private val spark: SparkSession) extends FileBasedSourceProvider { val DELTA_FORMAT_STR = "delta" @@ -111,6 +109,12 @@ class DeltaLakeFileBasedSource(private val spark: SparkSession) extends FileBase } } + /** + * Retrieves all input files from the given [[LogicalRelation]]. + * + * @param logicalRelation Logical relation to retrieve input files from. + * @return List of [[FileStatus]] for the given relation. + */ override def allFiles(logicalRelation: LogicalRelation): Option[Seq[FileStatus]] = { logicalRelation.relation match { case HadoopFsRelation(location: TahoeLogFileIndex, _, _, _, _, _) => @@ -132,6 +136,12 @@ class DeltaLakeFileBasedSource(private val spark: SparkSession) extends FileBase } } + /** + * Constructs the basePath for the given [[FileIndex]]. + * + * @param location Partitioned data location. + * @return basePath to read the given partitioned location. + */ override def partitionBasePath(location: FileIndex): Option[String] = { location match { case d: TahoeLogFileIndex => @@ -141,6 +151,15 @@ class DeltaLakeFileBasedSource(private val spark: SparkSession) extends FileBase } } + /** + * Returns list of pairs of (file path, file id) to build lineage column. + * + * File paths should be the same format with "input_file_name()" of the given relation type. + * + * @param logicalRelation Logical relation to check the relation type. + * @param fileIdTracker [[FileIdTracker]] to create the list of (file path, file id). + * @return List of pairs of (file path, file id). + */ override def lineagePairs( logicalRelation: LogicalRelation, fileIdTracker: FileIdTracker): Option[Seq[(String, Long)]] = { diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala index 049e1dd27..361a1fd02 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala @@ -97,16 +97,37 @@ trait FileBasedSourceProvider extends SourceProvider { * * If the given logical relation does not belong to this provider, None should be returned. * - * @param logicalRelation logical relation to compute signature from. + * @param logicalRelation Logical relation to compute signature from. * @return Signature computed if the given 'logicalRelation' can be processed by this provider. * Otherwise, None. */ def signature(logicalRelation: LogicalRelation): Option[String] + /** + * Retrieves all input files from the given [[LogicalRelation]]. + * + * @param logicalRelation Logical relation to retrieve input files from. + * @return List of [[FileStatus]] for the given relation. + */ def allFiles(logicalRelation: LogicalRelation): Option[Seq[FileStatus]] + /** + * Constructs the basePath for the given [[FileIndex]]. + * + * @param location Partitioned data location. + * @return basePath to read the given partitioned location. + */ def partitionBasePath(location: FileIndex): Option[String] + /** + * Returns list of pairs of (file path, file id) to build lineage column. + * + * File paths should be the same format with "input_file_name()" of the given relation type. + * + * @param logicalRelation Logical relation to check the relation type. + * @param fileIdTracker [[FileIdTracker]] to create the list of (file path, file id). + * @return List of pairs of (file path, file id). + */ def lineagePairs( logicalRelation: LogicalRelation, fileIdTracker: FileIdTracker): Option[Seq[(String, Long)]] From c5e8b778ee93670391b4967c72e2ec99d1243572 Mon Sep 17 00:00:00 2001 From: sezruby Date: Mon, 30 Nov 2020 13:23:06 +0900 Subject: [PATCH 07/11] minor fix --- .../com/microsoft/hyperspace/index/rules/RuleUtils.scala | 1 - .../index/sources/default/DefaultFileBasedSource.scala | 8 ++++---- .../index/sources/delta/DeltaLakeFileBasedSource.scala | 4 ++-- .../microsoft/hyperspace/index/sources/interfaces.scala | 2 +- 4 files changed, 7 insertions(+), 8 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala index a145c356a..47634af6f 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala @@ -24,7 +24,6 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, In, Literal, Not} import org.apache.spark.sql.catalyst.optimizer.OptimizeIn import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.delta.files.TahoeLogFileIndex import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.types.{LongType, StructType} diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala index df2e3dc88..1e7013451 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala @@ -45,7 +45,7 @@ class DefaultFileBasedSource(private val spark: SparkSession) extends FileBasedS /** * Creates [[Relation]] for IndexLogEntry using the given [[LogicalRelation]]. * - * @param logicalRelation logical relation to derive [[Relation]] from. + * @param logicalRelation Logical relation to derive [[Relation]] from. * @param fileIdTracker [[FileIdTracker]] to use when populating the data of [[Relation]]. * @return [[Relation]] object if the given 'logicalRelation' can be processed by this provider. * Otherwise, None. @@ -123,7 +123,7 @@ class DefaultFileBasedSource(private val spark: SparkSession) extends FileBasedS * Computes the signature using the given [[LogicalRelation]]. This computes a signature of * using all the files found in [[PartitioningAwareFileIndex]]. * - * @param logicalRelation logical relation to compute signature from. + * @param logicalRelation Logical relation to compute signature from. * @return Signature computed if the given 'logicalRelation' can be processed by this provider. * Otherwise, None. */ @@ -143,8 +143,8 @@ class DefaultFileBasedSource(private val spark: SparkSession) extends FileBasedS /** * Fingerprints a file. * - * @param fileStatus file status. - * @return the fingerprint of a file. + * @param fileStatus File status. + * @return The fingerprint of a file. */ private def fingerprint(fileStatus: FileStatus): String = { fileStatus.getLen.toString + fileStatus.getModificationTime.toString + diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeFileBasedSource.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeFileBasedSource.scala index 3fdcd88b3..58481ce87 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeFileBasedSource.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeFileBasedSource.scala @@ -37,7 +37,7 @@ class DeltaLakeFileBasedSource(private val spark: SparkSession) extends FileBase /** * Creates [[Relation]] for IndexLogEntry using the given [[LogicalRelation]]. * - * @param logicalRelation logical relation to derive [[Relation]] from. + * @param logicalRelation Logical relation to derive [[Relation]] from. * @return [[Relation]] object if the given 'logicalRelation' can be processed by this provider. * Otherwise, None. */ @@ -97,7 +97,7 @@ class DeltaLakeFileBasedSource(private val spark: SparkSession) extends FileBase * Computes the signature using the given [[LogicalRelation]]. This computes a signature of * using version info and table name. * - * @param logicalRelation logical relation to compute signature from. + * @param logicalRelation Logical relation to compute signature from. * @return Signature computed if the given 'logicalRelation' can be processed by this provider. * Otherwise, None. */ diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala index 361a1fd02..5a7351b5c 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala @@ -67,7 +67,7 @@ trait FileBasedSourceProvider extends SourceProvider { * * If the given logical relation does not belong to this provider, None should be returned. * - * @param logicalRelation logical relation to derive [[Relation]] from. + * @param logicalRelation Logical relation to derive [[Relation]] from. * @param fileIdTracker [[FileIdTracker]] to use when populating the data of [[Relation]]. * @return [[Relation]] object if the given 'logicalRelation' can be processed by this provider. * Otherwise, None. From 496c3456041c122cbf7e63220b805e5cc3ee54f1 Mon Sep 17 00:00:00 2001 From: sezruby Date: Fri, 4 Dec 2020 18:48:26 +0900 Subject: [PATCH 08/11] Minor fix --- .../index/DeltaLakeIntegrationTest.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala b/src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala index 835f7a86f..02cc5cc3f 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala @@ -72,25 +72,25 @@ class DeltaLakeIntegrationTest extends QueryTest with HyperspaceSuite { } } - assert(verifyIndexUse(query().queryExecution.optimizedPlan, "deltaIndex")) + assert(verifyIndexUsage(query().queryExecution.optimizedPlan, "deltaIndex")) // Create a new version by deleting entries. val deltaTable = DeltaTable.forPath(dataPath) deltaTable.delete("clicks > 2000") // The index should not be applied for the updated version. - assert(!verifyIndexUse(query().queryExecution.optimizedPlan, "deltaIndex")) + assert(!verifyIndexUsage(query().queryExecution.optimizedPlan, "deltaIndex")) // The index should be applied for the version at index creation. - assert(verifyIndexUse(query(Some(0)).queryExecution.optimizedPlan, "deltaIndex")) + assert(verifyIndexUsage(query(Some(0)).queryExecution.optimizedPlan, "deltaIndex")) hyperspace.refreshIndex("deltaIndex") // The index should be applied for the updated version. - assert(verifyIndexUse(query().queryExecution.optimizedPlan, "deltaIndex/v__=1")) + assert(verifyIndexUsage(query().queryExecution.optimizedPlan, "deltaIndex/v__=1")) // The index should not be applied for the version at index creation. - assert(!verifyIndexUse(query(Some(0)).queryExecution.optimizedPlan, "deltaIndex")) + assert(!verifyIndexUsage(query(Some(0)).queryExecution.optimizedPlan, "deltaIndex")) } } } @@ -118,7 +118,7 @@ class DeltaLakeIntegrationTest extends QueryTest with HyperspaceSuite { } } - assert(verifyIndexUse(query().queryExecution.optimizedPlan, "deltaIndex", false)) + assert(verifyIndexUsage(query().queryExecution.optimizedPlan, "deltaIndex", false)) // Create a new version by deleting entries. val deltaTable = DeltaTable.forPath(dataPath) @@ -128,7 +128,7 @@ class DeltaLakeIntegrationTest extends QueryTest with HyperspaceSuite { "spark.hyperspace.index.hybridscan.enabled" -> "true", "spark.hyperspace.index.hybridscan.delete.enabled" -> "true") { // The index should be applied for the updated version. - assert(verifyIndexUse(query().queryExecution.optimizedPlan, "deltaIndex", true)) + assert(verifyIndexUsage(query().queryExecution.optimizedPlan, "deltaIndex", true)) // Append data. dfFromSample @@ -139,13 +139,13 @@ class DeltaLakeIntegrationTest extends QueryTest with HyperspaceSuite { .save(dataPath) // The index should be applied for the updated version. - assert(verifyIndexUse(query().queryExecution.optimizedPlan, "deltaIndex", true)) + assert(verifyIndexUsage(query().queryExecution.optimizedPlan, "deltaIndex", true)) } } } } - def verifyIndexUse( + def verifyIndexUsage( plan: LogicalPlan, indexName: String, isHybridScan: Boolean = false): Boolean = { From b3dfe92c28aaa840b7beb5b561fe24f1b06895a4 Mon Sep 17 00:00:00 2001 From: sezruby Date: Fri, 4 Dec 2020 22:48:17 +0900 Subject: [PATCH 09/11] test fix --- .../scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala index a4b6b9f74..6d86b24dc 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala @@ -171,7 +171,7 @@ object RuleUtils { // If there is no change in source data files, the index can be applied by // transformPlanToUseIndexOnlyScan regardless of Hybrid Scan config. // This tag should always exist if Hybrid Scan is enabled. - val hybridScanRequired = + val hybridScanRequired = HyperspaceConf.hybridScanEnabled(spark) && index.getTagValue(logicalRelation.get, IndexLogEntryTags.HYBRIDSCAN_REQUIRED).get // If the index has appended files and/or deleted files, which means the current index data From 952d06a18eca2ffdc69e9658e5670847a8deae59 Mon Sep 17 00:00:00 2001 From: sezruby Date: Tue, 8 Dec 2020 17:37:31 +0900 Subject: [PATCH 10/11] Review commit --- .../default/DefaultFileBasedSource.scala | 4 ++- .../delta/DeltaLakeFileBasedSource.scala | 25 +++++++++++-------- .../index/DeltaLakeIntegrationTest.scala | 24 +++++++++--------- 3 files changed, 30 insertions(+), 23 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala index 8f66f98e3..dee45add1 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala @@ -221,7 +221,9 @@ class DefaultFileBasedSource(private val spark: SparkSession) extends FileBasedS /** * Returns list of pairs of (file path, file id) to build lineage column. * - * File paths should be the same format with "input_file_name()" of the given relation type. + * File paths should be the same format as "input_file_name()" of the given relation type. + * For [[DefaultFileBasedSource]], each file path should be in this format: + * `file:///path/to/file` * * @param logicalRelation Logical relation to check the relation type. * @param fileIdTracker [[FileIdTracker]] to create the list of (file path, file id). diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeFileBasedSource.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeFileBasedSource.scala index 58481ce87..3c803a73b 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeFileBasedSource.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeFileBasedSource.scala @@ -32,7 +32,17 @@ import com.microsoft.hyperspace.util.PathUtils * - The relation is [[HadoopFsRelation]] with [[TahoeLogFileIndex]] as file index. */ class DeltaLakeFileBasedSource(private val spark: SparkSession) extends FileBasedSourceProvider { - val DELTA_FORMAT_STR = "delta" + private val DELTA_FORMAT_STR = "delta" + + private def toFileStatus(fileSize: Long, modificationTime: Long, path: Path): FileStatus = { + new FileStatus( + /* length */ fileSize, + /* isDir */ false, + /* blockReplication */ 0, + /* blockSize */ 1, + /* modificationTime */ modificationTime, + path) + } /** * Creates [[Relation]] for IndexLogEntry using the given [[LogicalRelation]]. @@ -122,14 +132,7 @@ class DeltaLakeFileBasedSource(private val spark: SparkSession) extends FileBase .getSnapshot(stalenessAcceptable = false) .filesForScan(projection = Nil, location.partitionFilters, keepStats = false) .files - .map { f => - new FileStatus( - /* length */ f.size, - /* isDir */ false, - /* blockReplication */ 0, - /* blockSize */ 1, - /* modificationTime */ f.modificationTime, - new Path(location.path, f.path)) + .map { f => toFileStatus(f.size, f.modificationTime, new Path(location.path, f.path)) } Some(files) case _ => None @@ -154,7 +157,9 @@ class DeltaLakeFileBasedSource(private val spark: SparkSession) extends FileBase /** * Returns list of pairs of (file path, file id) to build lineage column. * - * File paths should be the same format with "input_file_name()" of the given relation type. + * File paths should be the same format as "input_file_name()" of the given relation type. + * For [[DeltaLakeFileBasedSource]], each file path should be in this format: + * `file:/path/to/file` * * @param logicalRelation Logical relation to check the relation type. * @param fileIdTracker [[FileIdTracker]] to create the list of (file path, file id). diff --git a/src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala b/src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala index 02cc5cc3f..b61682c70 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala @@ -72,25 +72,25 @@ class DeltaLakeIntegrationTest extends QueryTest with HyperspaceSuite { } } - assert(verifyIndexUsage(query().queryExecution.optimizedPlan, "deltaIndex")) + assert(isIndexUsed(query().queryExecution.optimizedPlan, "deltaIndex")) // Create a new version by deleting entries. val deltaTable = DeltaTable.forPath(dataPath) deltaTable.delete("clicks > 2000") // The index should not be applied for the updated version. - assert(!verifyIndexUsage(query().queryExecution.optimizedPlan, "deltaIndex")) + assert(!isIndexUsed(query().queryExecution.optimizedPlan, "deltaIndex")) // The index should be applied for the version at index creation. - assert(verifyIndexUsage(query(Some(0)).queryExecution.optimizedPlan, "deltaIndex")) + assert(isIndexUsed(query(Some(0)).queryExecution.optimizedPlan, "deltaIndex")) hyperspace.refreshIndex("deltaIndex") // The index should be applied for the updated version. - assert(verifyIndexUsage(query().queryExecution.optimizedPlan, "deltaIndex/v__=1")) + assert(isIndexUsed(query().queryExecution.optimizedPlan, "deltaIndex/v__=1")) // The index should not be applied for the version at index creation. - assert(!verifyIndexUsage(query(Some(0)).queryExecution.optimizedPlan, "deltaIndex")) + assert(!isIndexUsed(query(Some(0)).queryExecution.optimizedPlan, "deltaIndex")) } } } @@ -118,7 +118,7 @@ class DeltaLakeIntegrationTest extends QueryTest with HyperspaceSuite { } } - assert(verifyIndexUsage(query().queryExecution.optimizedPlan, "deltaIndex", false)) + assert(isIndexUsed(query().queryExecution.optimizedPlan, "deltaIndex", false)) // Create a new version by deleting entries. val deltaTable = DeltaTable.forPath(dataPath) @@ -128,7 +128,7 @@ class DeltaLakeIntegrationTest extends QueryTest with HyperspaceSuite { "spark.hyperspace.index.hybridscan.enabled" -> "true", "spark.hyperspace.index.hybridscan.delete.enabled" -> "true") { // The index should be applied for the updated version. - assert(verifyIndexUsage(query().queryExecution.optimizedPlan, "deltaIndex", true)) + assert(isIndexUsed(query().queryExecution.optimizedPlan, "deltaIndex", true)) // Append data. dfFromSample @@ -139,15 +139,15 @@ class DeltaLakeIntegrationTest extends QueryTest with HyperspaceSuite { .save(dataPath) // The index should be applied for the updated version. - assert(verifyIndexUsage(query().queryExecution.optimizedPlan, "deltaIndex", true)) + assert(isIndexUsed(query().queryExecution.optimizedPlan, "deltaIndex", true)) } } } } - def verifyIndexUsage( + def isIndexUsed( plan: LogicalPlan, - indexName: String, + indexPathSubStr: String, isHybridScan: Boolean = false): Boolean = { val rootPaths = plan.collect { case LogicalRelation( @@ -158,9 +158,9 @@ class DeltaLakeIntegrationTest extends QueryTest with HyperspaceSuite { location.rootPaths }.flatten if (!isHybridScan) { - rootPaths.nonEmpty && rootPaths.forall(_.toString.contains(indexName)) + rootPaths.nonEmpty && rootPaths.forall(_.toString.contains(indexPathSubStr)) } else { - rootPaths.nonEmpty && rootPaths.count(_.toString.contains(indexName)) > 0 + rootPaths.nonEmpty && rootPaths.exists(_.toString.contains(indexPathSubStr)) } } } From 56d572114dc99cb7f1426ea970128da4e8d3a64c Mon Sep 17 00:00:00 2001 From: sezruby Date: Wed, 9 Dec 2020 10:35:33 +0900 Subject: [PATCH 11/11] review commit --- .../sources/delta/DeltaLakeFileBasedSource.scala | 13 ++++--------- .../hyperspace/index/DeltaLakeIntegrationTest.scala | 5 +++-- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeFileBasedSource.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeFileBasedSource.scala index 3c803a73b..c257162cc 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeFileBasedSource.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeFileBasedSource.scala @@ -40,7 +40,7 @@ class DeltaLakeFileBasedSource(private val spark: SparkSession) extends FileBase /* isDir */ false, /* blockReplication */ 0, /* blockSize */ 1, - /* modificationTime */ modificationTime, + /* modificationTime */ modificationTime, path) } @@ -61,13 +61,7 @@ class DeltaLakeFileBasedSource(private val spark: SparkSession) extends FileBase .filesForScan(projection = Nil, location.partitionFilters, keepStats = false) .files .map { f => - new FileStatus( - /* length */ f.size, - /* isDir */ false, - /* blockReplication */ 0, - /* blockSize */ 1, - /* modificationTime */ f.modificationTime, - new Path(location.path, f.path)) + toFileStatus(f.size, f.modificationTime, new Path(location.path, f.path)) } // Note that source files are currently fingerprinted when the optimized plan is // fingerprinted by LogicalPlanFingerprint. @@ -132,7 +126,8 @@ class DeltaLakeFileBasedSource(private val spark: SparkSession) extends FileBase .getSnapshot(stalenessAcceptable = false) .filesForScan(projection = Nil, location.partitionFilters, keepStats = false) .files - .map { f => toFileStatus(f.size, f.modificationTime, new Path(location.path, f.path)) + .map { f => + toFileStatus(f.size, f.modificationTime, new Path(location.path, f.path)) } Some(files) case _ => None diff --git a/src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala b/src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala index b61682c70..88fb3e213 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala @@ -32,9 +32,10 @@ class DeltaLakeIntegrationTest extends QueryTest with HyperspaceSuite { override def beforeAll(): Unit = { super.beforeAll() - spark.conf.set("spark.hyperspace.index.sources.fileBasedBuilders", + spark.conf.set( + "spark.hyperspace.index.sources.fileBasedBuilders", "com.microsoft.hyperspace.index.sources.delta.DeltaLakeFileBasedSourceBuilder," + - "com.microsoft.hyperspace.index.sources.default.DefaultFileBasedSourceBuilder") + "com.microsoft.hyperspace.index.sources.default.DefaultFileBasedSourceBuilder") hyperspace = new Hyperspace(spark) }