diff --git a/build.sbt b/build.sbt index c6eeda951..e206f6727 100644 --- a/build.sbt +++ b/build.sbt @@ -30,6 +30,7 @@ libraryDependencies ++= Seq( "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided" withSources(), "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided" withSources(), "org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "provided" withSources(), + "io.delta" %% "delta-core" % "0.6.1" % "provided" withSources(), // Test dependencies "org.scalatest" %% "scalatest" % "3.0.5" % "test", @@ -40,6 +41,11 @@ libraryDependencies ++= Seq( "org.apache.spark" %% "spark-hive" % sparkVersion.value % "test" classifier "tests" ) +assemblyMergeStrategy in assembly := { + case PathList("run-tests.py") => MergeStrategy.first + case x => (assemblyMergeStrategy in assembly).value(x) +} + scalacOptions ++= Seq( "-target:jvm-1.8" ) diff --git a/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala b/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala index 0b947e4ae..d92350f8f 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala @@ -174,11 +174,10 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager) // + file:/C:/hyperspace/src/test/part-00003.snappy.parquet import spark.implicits._ val dataPathColumn = "_data_path" - val lineageDF = fileIdTracker.getFileToIdMap.toSeq - .map { kv => - (kv._1._1.replace("file:/", "file:///"), kv._2) - } - .toDF(dataPathColumn, IndexConstants.DATA_FILE_NAME_ID) + val relation = df.queryExecution.optimizedPlan.asInstanceOf[LogicalRelation] + val lineagePairs = + Hyperspace.getContext(spark).sourceProviderManager.lineagePairs(relation, fileIdTracker) + val lineageDF = lineagePairs.toDF(dataPathColumn, IndexConstants.DATA_FILE_NAME_ID) df.withColumn(dataPathColumn, input_file_name()) .join(lineageDF.hint("broadcast"), dataPathColumn) diff --git a/src/main/scala/com/microsoft/hyperspace/actions/OptimizeAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/OptimizeAction.scala index 985047aa7..f2fb8575b 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/OptimizeAction.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/OptimizeAction.scala @@ -93,7 +93,7 @@ class OptimizeAction( repartitionedDf.write.saveWithBuckets( repartitionedDf, indexDataPath.toString, - logEntry.asInstanceOf[IndexLogEntry].numBuckets, + numBuckets, indexConfig.indexedColumns, SaveMode.Overwrite) } diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala index 914152571..0c5a20586 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala @@ -70,11 +70,19 @@ private[actions] abstract class RefreshActionBase( val latestRelation = Hyperspace.getContext(spark).sourceProviderManager.refreshRelation(relations.head) val dataSchema = DataType.fromJson(latestRelation.dataSchemaJson).asInstanceOf[StructType] - spark.read + val df = spark.read .schema(dataSchema) .format(latestRelation.fileFormat) .options(latestRelation.options) - .load(latestRelation.rootPaths: _*) + // Due to the difference in how the "path" option is set: https://github.com/apache/spark/ + // blob/ef1441b56c5cab02335d8d2e4ff95cf7e9c9b9ca/sql/core/src/main/scala/org/apache/spark/ + // sql/DataFrameReader.scala#L197 + // load() with a single parameter needs to be handled differently. + if (latestRelation.rootPaths.size == 1) { + df.load(latestRelation.rootPaths.head) + } else { + df.load(latestRelation.rootPaths: _*) + } } protected lazy val indexConfig: IndexConfig = { @@ -113,25 +121,15 @@ private[actions] abstract class RefreshActionBase( * Build Set[FileInfo] to compare the source file list with the previous index version. */ protected lazy val currentFiles: Set[FileInfo] = { - df.queryExecution.optimizedPlan - .collect { - case LogicalRelation( - HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _), - _, - _, - _) => - location - .allFiles() - .map { f => - // For each file, if it already has a file id, add that id to its corresponding - // FileInfo. Note that if content of an existing file is changed, it is treated - // as a new file (i.e. its current file id is no longer valid). - val id = fileIdTracker.addFile(f) - FileInfo(f, id, asFullPath = true) - } - } - .flatten - .toSet + val curFiles = df.queryExecution.optimizedPlan.collect { + case relation: LogicalRelation => + Hyperspace + .getContext(spark) + .sourceProviderManager + .allFiles(relation) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = true)) + } + curFiles.head.toSet } /** diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala index f3ba04d33..6d86b24dc 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.types.{LongType, StructType} +import com.microsoft.hyperspace.Hyperspace import com.microsoft.hyperspace.index._ import com.microsoft.hyperspace.index.plans.logical.BucketUnion import com.microsoft.hyperspace.util.HyperspaceConf @@ -105,24 +106,23 @@ object RuleUtils { // TODO: Duplicate listing files for the given relation as in // [[transformPlanToUseHybridScan]] // See https://github.com/microsoft/hyperspace/issues/160 - val filesByRelations = plan - .collect { - case LogicalRelation( - HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _), - _, - _, - _) => - location.allFiles.map( - f => - // For a given file, file id is only meaningful in the context of a given - // index. At this point, we do not know which index, if any, would be picked. - // Therefore, we simply set the file id to UNKNOWN_FILE_ID. - FileInfo( - f.getPath.toString, - f.getLen, - f.getModificationTime, - IndexConstants.UNKNOWN_FILE_ID)) - } + val filesByRelations = plan.collect { + case rel: LogicalRelation => + Hyperspace + .getContext(spark) + .sourceProviderManager + .allFiles(rel) + .map { f => + // For a given file, file id is only meaningful in the context of a given + // index. At this point, we do not know which index, if any, would be picked. + // Therefore, we simply set the file id to UNKNOWN_FILE_ID. + FileInfo( + f.getPath.toString, + f.getLen, + f.getModificationTime, + IndexConstants.UNKNOWN_FILE_ID) + } + } assert(filesByRelations.length == 1) indexes.filter(index => index.created && isHybridScanCandidate(index, filesByRelations.flatten)) @@ -165,13 +165,14 @@ object RuleUtils { plan: LogicalPlan, useBucketSpec: Boolean): LogicalPlan = { // Check pre-requisite. - assert(getLogicalRelation(plan).isDefined) + val logicalRelation = getLogicalRelation(plan) + assert(logicalRelation.isDefined) // If there is no change in source data files, the index can be applied by // transformPlanToUseIndexOnlyScan regardless of Hybrid Scan config. // This tag should always exist if Hybrid Scan is enabled. val hybridScanRequired = HyperspaceConf.hybridScanEnabled(spark) && - index.getTagValue(getLogicalRelation(plan).get, IndexLogEntryTags.HYBRIDSCAN_REQUIRED).get + index.getTagValue(logicalRelation.get, IndexLogEntryTags.HYBRIDSCAN_REQUIRED).get // If the index has appended files and/or deleted files, which means the current index data // is outdated, Hybrid Scan should be used to handle the newly updated source files. @@ -261,7 +262,8 @@ object RuleUtils { index: IndexLogEntry, plan: LogicalPlan, useBucketSpec: Boolean): LogicalPlan = { - val isParquetSourceFormat = index.relations.head.fileFormat.equals("parquet") + val fileFormat = index.relations.head.fileFormat + val isParquetSourceFormat = fileFormat.equals("parquet") || fileFormat.equals("delta") var unhandledAppendedFiles: Seq[Path] = Nil // Get transformed plan with index data and appended files if applicable. @@ -271,7 +273,7 @@ object RuleUtils { // can be transformed to 'Project -> Filter -> Logical Relation'. Thus, with transformDown, // it will be matched again and transformed recursively which causes stack overflow exception. case baseRelation @ LogicalRelation( - _ @HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _), + _ @HadoopFsRelation(location: FileIndex, _, _, _, _, _), baseOutput, _, _) => @@ -281,7 +283,10 @@ object RuleUtils { // appendedFiles and deletedFiles in IndexLogEntry. (index.deletedFiles, index.appendedFiles.map(f => new Path(f.name)).toSeq) } else { - val curFiles = location.allFiles + val curFiles = Hyperspace + .getContext(spark) + .sourceProviderManager + .allFiles(baseRelation) .map(f => FileInfo(f, index.fileIdTracker.addFile(f), asFullPath = true)) if (HyperspaceConf.hybridScanDeleteEnabled(spark) && index.hasLineageColumn) { val (exist, nonExist) = curFiles.partition(index.sourceFileInfoSet.contains) @@ -410,11 +415,11 @@ object RuleUtils { // Transform the location of LogicalRelation with appended files. val planForAppended = originalPlan transformDown { case baseRelation @ LogicalRelation( - fsRelation @ HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _), + fsRelation @ HadoopFsRelation(location: FileIndex, _, _, _, _, _), baseOutput, _, _) => - val options = extractBasePath(location.partitionSpec) + val options = extractBasePath(spark, location) .map { basePath => // Set "basePath" so that partitioned columns are also included in the output schema. Map("basePath" -> basePath) @@ -440,19 +445,11 @@ object RuleUtils { planForAppended } - private def extractBasePath(partitionSpec: PartitionSpec): Option[String] = { - if (partitionSpec == PartitionSpec.emptySpec) { + private def extractBasePath(spark: SparkSession, location: FileIndex): Option[String] = { + if (location.partitionSchema.isEmpty) { None } else { - // For example, we could have the following in PartitionSpec: - // - partition columns = "col1", "col2" - // - partitions: "/path/col1=1/col2=1", "/path/col1=1/col2=2", etc. - // , and going up the same number of directory levels as the number of partition columns - // will compute the base path. Note that PartitionSpec.partitions will always contain - // all the partitions in the path, so "partitions.head" is taken as an initial value. - val basePath = partitionSpec.partitionColumns - .foldLeft(partitionSpec.partitions.head.path)((path, _) => path.getParent) - Some(basePath.toString) + Some(Hyperspace.getContext(spark).sourceProviderManager.partitionBasePath(location)) } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala index 91004136e..64b81a672 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala @@ -18,8 +18,9 @@ package com.microsoft.hyperspace.index.sources import scala.util.{Success, Try} +import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.execution.datasources.{FileIndex, LogicalRelation} import org.apache.spark.util.hyperspace.Utils import com.microsoft.hyperspace.HyperspaceException @@ -80,6 +81,45 @@ class FileBasedSourceProviderManager(spark: SparkSession) { run(p => p.signature(logicalRelation)) } + /** + * Runs allFiles() for each provider. + * + * @param logicalRelation Logical relation to retrieve all input files. + * @return List of all input files. + * @throws HyperspaceException if multiple providers returns [[Some]] or + * if no providers return [[Some]]. + */ + def allFiles(logicalRelation: LogicalRelation): Seq[FileStatus] = { + run(p => p.allFiles(logicalRelation)) + } + + /** + * Runs partitionBasePath() for each provider. + * + * @param location Partitioned location. + * @return basePath string to read the given partitioned location. + * @throws HyperspaceException if multiple providers returns [[Some]] or + * if no providers return [[Some]]. + */ + def partitionBasePath(location: FileIndex): String = { + run(p => p.partitionBasePath(location)) + } + + /** + * Runs lineagePairs() for each provider. + * + * @param logicalRelation Logical Relation to check the relation type. + * @param fileIdTracker [[FileIdTracker]] to create the list of (file path, file id). + * @return List of (file path, file id). + * @throws HyperspaceException if multiple providers returns [[Some]] or + * if no providers return [[Some]]. + */ + def lineagePairs( + logicalRelation: LogicalRelation, + fileIdTracker: FileIdTracker): Seq[(String, Long)] = { + run(p => p.lineagePairs(logicalRelation, fileIdTracker)) + } + /** * Runs the given function 'f', which executes a [[FileBasedSourceProvider]]'s API that returns * [[Option]] for each provider built. This function ensures that only one provider returns diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala index 974b53e7e..dee45add1 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala @@ -22,7 +22,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.{DataFrame, SparkSession} -import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex} +import org.apache.spark.sql.execution.datasources.{FileFormat, FileIndex, HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex} import org.apache.spark.sql.sources.DataSourceRegister import com.microsoft.hyperspace.HyperspaceException @@ -49,7 +49,7 @@ class DefaultFileBasedSource(private val spark: SparkSession) extends FileBasedS /** * Creates [[Relation]] for IndexLogEntry using the given [[LogicalRelation]]. * - * @param logicalRelation logical relation to derive [[Relation]] from. + * @param logicalRelation Logical relation to derive [[Relation]] from. * @param fileIdTracker [[FileIdTracker]] to use when populating the data of [[Relation]]. * @return [[Relation]] object if the given 'logicalRelation' can be processed by this provider. * Otherwise, None. @@ -153,7 +153,7 @@ class DefaultFileBasedSource(private val spark: SparkSession) extends FileBasedS * Computes the signature using the given [[LogicalRelation]]. This computes a signature of * using all the files found in [[PartitioningAwareFileIndex]]. * - * @param logicalRelation logical relation to compute signature from. + * @param logicalRelation Logical relation to compute signature from. * @return Signature computed if the given 'logicalRelation' can be processed by this provider. * Otherwise, None. */ @@ -173,13 +173,75 @@ class DefaultFileBasedSource(private val spark: SparkSession) extends FileBasedS /** * Fingerprints a file. * - * @param fileStatus file status. - * @return the fingerprint of a file. + * @param fileStatus File status. + * @return The fingerprint of a file. */ private def fingerprint(fileStatus: FileStatus): String = { fileStatus.getLen.toString + fileStatus.getModificationTime.toString + fileStatus.getPath.toString } + + /** + * Retrieves all input files from the given [[LogicalRelation]]. + * + * @param logicalRelation Logical relation to retrieve input files from. + * @return List of [[FileStatus]] for the given relation. + */ + override def allFiles(logicalRelation: LogicalRelation): Option[Seq[FileStatus]] = { + logicalRelation.relation match { + case HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _) => + Some(location.allFiles) + case _ => None + } + } + + /** + * Constructs the basePath for the given [[FileIndex]]. + * + * @param location Partitioned data location. + * @return basePath to read the given partitioned location. + */ + override def partitionBasePath(location: FileIndex): Option[String] = { + location match { + case p: PartitioningAwareFileIndex => + // For example, we could have the following in PartitionSpec: + // - partition columns = "col1", "col2" + // - partitions: "/path/col1=1/col2=1", "/path/col1=1/col2=2", etc. + // , and going up the same number of directory levels as the number of partition columns + // will compute the base path. Note that PartitionSpec.partitions will always contain + // all the partitions in the path, so "partitions.head" is taken as an initial value. + val basePath = p.partitionSpec.partitionColumns + .foldLeft(p.partitionSpec.partitions.head.path)((path, _) => path.getParent) + Some(basePath.toString) + case _ => + None + } + } + + /** + * Returns list of pairs of (file path, file id) to build lineage column. + * + * File paths should be the same format as "input_file_name()" of the given relation type. + * For [[DefaultFileBasedSource]], each file path should be in this format: + * `file:///path/to/file` + * + * @param logicalRelation Logical relation to check the relation type. + * @param fileIdTracker [[FileIdTracker]] to create the list of (file path, file id). + * @return List of pairs of (file path, file id). + */ + override def lineagePairs( + logicalRelation: LogicalRelation, + fileIdTracker: FileIdTracker): Option[Seq[(String, Long)]] = { + logicalRelation.relation match { + case HadoopFsRelation(_: PartitioningAwareFileIndex, _, _, _, format, _) + if isSupportedFileFormat(format) => + Some(fileIdTracker.getFileToIdMap.toSeq.map { kv => + (kv._1._1.replace("file:/", "file:///"), kv._2) + }) + case _ => + None + } + } } /** diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeFileBasedSource.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeFileBasedSource.scala new file mode 100644 index 000000000..c257162cc --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeFileBasedSource.scala @@ -0,0 +1,182 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.sources.delta + +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.delta.files.TahoeLogFileIndex +import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, LogicalRelation} + +import com.microsoft.hyperspace.index.{Content, FileIdTracker, Hdfs, Relation} +import com.microsoft.hyperspace.index.sources.{FileBasedSourceProvider, SourceProvider, SourceProviderBuilder} +import com.microsoft.hyperspace.util.PathUtils + +/** + * Delta Lake file-based source provider. + * + * This source can support relations that meet the following criteria: + * - The relation is [[HadoopFsRelation]] with [[TahoeLogFileIndex]] as file index. + */ +class DeltaLakeFileBasedSource(private val spark: SparkSession) extends FileBasedSourceProvider { + private val DELTA_FORMAT_STR = "delta" + + private def toFileStatus(fileSize: Long, modificationTime: Long, path: Path): FileStatus = { + new FileStatus( + /* length */ fileSize, + /* isDir */ false, + /* blockReplication */ 0, + /* blockSize */ 1, + /* modificationTime */ modificationTime, + path) + } + + /** + * Creates [[Relation]] for IndexLogEntry using the given [[LogicalRelation]]. + * + * @param logicalRelation Logical relation to derive [[Relation]] from. + * @return [[Relation]] object if the given 'logicalRelation' can be processed by this provider. + * Otherwise, None. + */ + override def createRelation( + logicalRelation: LogicalRelation, + fileIdTracker: FileIdTracker): Option[Relation] = { + logicalRelation.relation match { + case HadoopFsRelation(location: TahoeLogFileIndex, _, dataSchema, _, _, options) => + val files = location + .getSnapshot(stalenessAcceptable = false) + .filesForScan(projection = Nil, location.partitionFilters, keepStats = false) + .files + .map { f => + toFileStatus(f.size, f.modificationTime, new Path(location.path, f.path)) + } + // Note that source files are currently fingerprinted when the optimized plan is + // fingerprinted by LogicalPlanFingerprint. + val sourceDataProperties = + Hdfs.Properties(Content.fromLeafFiles(files, fileIdTracker).get) + val fileFormatName = "delta" + // "path" key in options can incur multiple data read unexpectedly and keep + // the table version info as metadata. + val opts = options - "path" + ("versionAsOf" -> location.tableVersion.toString) + Some( + Relation( + Seq(PathUtils.makeAbsolute(location.path).toString), + Hdfs(sourceDataProperties), + dataSchema.json, + fileFormatName, + opts)) + case _ => None + } + } + + /** + * Given a [[Relation]], returns a new [[Relation]] that will have the latest source. + * + * @param relation [[Relation]] object to reconstruct [[DataFrame]] with. + * @return [[Relation]] object if the given 'relation' can be processed by this provider. + * Otherwise, None. + */ + override def refreshRelation(relation: Relation): Option[Relation] = { + if (relation.fileFormat.equals(DELTA_FORMAT_STR)) { + Some(relation.copy(options = relation.options - "versionAsOf" - "timestampAsOf")) + } else { + None + } + } + + /** + * Computes the signature using the given [[LogicalRelation]]. This computes a signature of + * using version info and table name. + * + * @param logicalRelation Logical relation to compute signature from. + * @return Signature computed if the given 'logicalRelation' can be processed by this provider. + * Otherwise, None. + */ + override def signature(logicalRelation: LogicalRelation): Option[String] = { + logicalRelation.relation match { + case HadoopFsRelation(location: TahoeLogFileIndex, _, _, _, _, _) => + Some(location.tableVersion + location.path.toString) + case _ => None + } + } + + /** + * Retrieves all input files from the given [[LogicalRelation]]. + * + * @param logicalRelation Logical relation to retrieve input files from. + * @return List of [[FileStatus]] for the given relation. + */ + override def allFiles(logicalRelation: LogicalRelation): Option[Seq[FileStatus]] = { + logicalRelation.relation match { + case HadoopFsRelation(location: TahoeLogFileIndex, _, _, _, _, _) => + val files = location + .getSnapshot(stalenessAcceptable = false) + .filesForScan(projection = Nil, location.partitionFilters, keepStats = false) + .files + .map { f => + toFileStatus(f.size, f.modificationTime, new Path(location.path, f.path)) + } + Some(files) + case _ => None + } + } + + /** + * Constructs the basePath for the given [[FileIndex]]. + * + * @param location Partitioned data location. + * @return basePath to read the given partitioned location. + */ + override def partitionBasePath(location: FileIndex): Option[String] = { + location match { + case d: TahoeLogFileIndex => + Some(d.path.toString) + case _ => + None + } + } + + /** + * Returns list of pairs of (file path, file id) to build lineage column. + * + * File paths should be the same format as "input_file_name()" of the given relation type. + * For [[DeltaLakeFileBasedSource]], each file path should be in this format: + * `file:/path/to/file` + * + * @param logicalRelation Logical relation to check the relation type. + * @param fileIdTracker [[FileIdTracker]] to create the list of (file path, file id). + * @return List of pairs of (file path, file id). + */ + override def lineagePairs( + logicalRelation: LogicalRelation, + fileIdTracker: FileIdTracker): Option[Seq[(String, Long)]] = { + logicalRelation.relation match { + case HadoopFsRelation(_: TahoeLogFileIndex, _, _, _, _, _) => + Some(fileIdTracker.getFileToIdMap.toSeq.map { kv => + (kv._1._1, kv._2) + }) + case _ => + None + } + } +} + +/** + * Builder for building [[DeltaLakeFileBasedSource]]. + */ +class DeltaLakeFileBasedSourceBuilder extends SourceProviderBuilder { + override def build(spark: SparkSession): SourceProvider = new DeltaLakeFileBasedSource(spark) +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala index 120396c54..5a7351b5c 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala @@ -16,8 +16,9 @@ package com.microsoft.hyperspace.index.sources +import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.{DataFrame, SparkSession} -import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.execution.datasources.{FileIndex, LogicalRelation} import com.microsoft.hyperspace.index.{FileIdTracker, Relation} @@ -66,7 +67,7 @@ trait FileBasedSourceProvider extends SourceProvider { * * If the given logical relation does not belong to this provider, None should be returned. * - * @param logicalRelation logical relation to derive [[Relation]] from. + * @param logicalRelation Logical relation to derive [[Relation]] from. * @param fileIdTracker [[FileIdTracker]] to use when populating the data of [[Relation]]. * @return [[Relation]] object if the given 'logicalRelation' can be processed by this provider. * Otherwise, None. @@ -96,9 +97,39 @@ trait FileBasedSourceProvider extends SourceProvider { * * If the given logical relation does not belong to this provider, None should be returned. * - * @param logicalRelation logical relation to compute signature from. + * @param logicalRelation Logical relation to compute signature from. * @return Signature computed if the given 'logicalRelation' can be processed by this provider. * Otherwise, None. */ def signature(logicalRelation: LogicalRelation): Option[String] + + /** + * Retrieves all input files from the given [[LogicalRelation]]. + * + * @param logicalRelation Logical relation to retrieve input files from. + * @return List of [[FileStatus]] for the given relation. + */ + def allFiles(logicalRelation: LogicalRelation): Option[Seq[FileStatus]] + + /** + * Constructs the basePath for the given [[FileIndex]]. + * + * @param location Partitioned data location. + * @return basePath to read the given partitioned location. + */ + def partitionBasePath(location: FileIndex): Option[String] + + /** + * Returns list of pairs of (file path, file id) to build lineage column. + * + * File paths should be the same format with "input_file_name()" of the given relation type. + * + * @param logicalRelation Logical relation to check the relation type. + * @param fileIdTracker [[FileIdTracker]] to create the list of (file path, file id). + * @return List of pairs of (file path, file id). + */ + def lineagePairs( + logicalRelation: LogicalRelation, + fileIdTracker: FileIdTracker): Option[Seq[(String, Long)]] + } diff --git a/src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala b/src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala new file mode 100644 index 000000000..88fb3e213 --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala @@ -0,0 +1,167 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index + +import io.delta.tables.DeltaTable +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.{DataFrame, QueryTest} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources._ + +import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData} + +class DeltaLakeIntegrationTest extends QueryTest with HyperspaceSuite { + override val systemPath = new Path("src/test/resources/deltaLakeIntegrationTest") + + private val sampleData = SampleData.testData + private var hyperspace: Hyperspace = _ + + override def beforeAll(): Unit = { + super.beforeAll() + spark.conf.set( + "spark.hyperspace.index.sources.fileBasedBuilders", + "com.microsoft.hyperspace.index.sources.delta.DeltaLakeFileBasedSourceBuilder," + + "com.microsoft.hyperspace.index.sources.default.DefaultFileBasedSourceBuilder") + hyperspace = new Hyperspace(spark) + } + + override def afterAll(): Unit = { + super.afterAll() + spark.conf.unset("spark.hyperspace.index.sources.fileBasedBuilders") + } + + before { + spark.enableHyperspace() + } + + after { + spark.disableHyperspace() + } + + test("Verify createIndex and refreshIndex on Delta Lake table.") { + withTempPathAsString { dataPath => + import spark.implicits._ + val dfFromSample = sampleData.toDF("Date", "RGUID", "Query", "imprs", "clicks") + dfFromSample.write.format("delta").save(dataPath) + + val deltaDf = spark.read.format("delta").load(dataPath) + hyperspace.createIndex(deltaDf, IndexConfig("deltaIndex", Seq("clicks"), Seq("Query"))) + + withIndex("deltaIndex") { + def query(version: Option[Long] = None): DataFrame = { + if (version.isDefined) { + val deltaDf = + spark.read.format("delta").option("versionAsOf", version.get).load(dataPath) + deltaDf.filter(deltaDf("clicks") <= 2000).select(deltaDf("query")) + } else { + val deltaDf = spark.read.format("delta").load(dataPath) + deltaDf.filter(deltaDf("clicks") <= 2000).select(deltaDf("query")) + } + } + + assert(isIndexUsed(query().queryExecution.optimizedPlan, "deltaIndex")) + + // Create a new version by deleting entries. + val deltaTable = DeltaTable.forPath(dataPath) + deltaTable.delete("clicks > 2000") + + // The index should not be applied for the updated version. + assert(!isIndexUsed(query().queryExecution.optimizedPlan, "deltaIndex")) + + // The index should be applied for the version at index creation. + assert(isIndexUsed(query(Some(0)).queryExecution.optimizedPlan, "deltaIndex")) + + hyperspace.refreshIndex("deltaIndex") + + // The index should be applied for the updated version. + assert(isIndexUsed(query().queryExecution.optimizedPlan, "deltaIndex/v__=1")) + + // The index should not be applied for the version at index creation. + assert(!isIndexUsed(query(Some(0)).queryExecution.optimizedPlan, "deltaIndex")) + } + } + } + + test("Verify Hybrid Scan on Delta Lake table.") { + withTempPathAsString { dataPath => + import spark.implicits._ + val dfFromSample = sampleData.toDF("Date", "RGUID", "Query", "imprs", "clicks") + dfFromSample.write.format("delta").save(dataPath) + + val deltaDf = spark.read.format("delta").load(dataPath) + withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") { + hyperspace.createIndex(deltaDf, IndexConfig("deltaIndex", Seq("clicks"), Seq("Query"))) + } + + withIndex("deltaIndex") { + def query(version: Option[Long] = None): DataFrame = { + if (version.isDefined) { + val deltaDf = + spark.read.format("delta").option("versionAsOf", version.get).load(dataPath) + deltaDf.filter(deltaDf("clicks") <= 2000).select(deltaDf("query")) + } else { + val deltaDf = spark.read.format("delta").load(dataPath) + deltaDf.filter(deltaDf("clicks") <= 2000).select(deltaDf("query")) + } + } + + assert(isIndexUsed(query().queryExecution.optimizedPlan, "deltaIndex", false)) + + // Create a new version by deleting entries. + val deltaTable = DeltaTable.forPath(dataPath) + deltaTable.delete("clicks > 5000") + + withSQLConf( + "spark.hyperspace.index.hybridscan.enabled" -> "true", + "spark.hyperspace.index.hybridscan.delete.enabled" -> "true") { + // The index should be applied for the updated version. + assert(isIndexUsed(query().queryExecution.optimizedPlan, "deltaIndex", true)) + + // Append data. + dfFromSample + .limit(3) + .write + .format("delta") + .mode("append") + .save(dataPath) + + // The index should be applied for the updated version. + assert(isIndexUsed(query().queryExecution.optimizedPlan, "deltaIndex", true)) + } + } + } + } + + def isIndexUsed( + plan: LogicalPlan, + indexPathSubStr: String, + isHybridScan: Boolean = false): Boolean = { + val rootPaths = plan.collect { + case LogicalRelation( + HadoopFsRelation(location: InMemoryFileIndex, _, _, _, _, _), + _, + _, + _) => + location.rootPaths + }.flatten + if (!isHybridScan) { + rootPaths.nonEmpty && rootPaths.forall(_.toString.contains(indexPathSubStr)) + } else { + rootPaths.nonEmpty && rootPaths.exists(_.toString.contains(indexPathSubStr)) + } + } +}