Skip to content
This repository was archived by the owner on Jun 14, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided" withSources(),
"org.apache.spark" %% "spark-core" % sparkVersion.value % "provided" withSources(),
"org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "provided" withSources(),
"io.delta" %% "delta-core" % "0.6.1" % "provided" withSources(),

// Test dependencies
"org.scalatest" %% "scalatest" % "3.0.5" % "test",
Expand All @@ -40,6 +41,11 @@ libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-hive" % sparkVersion.value % "test" classifier "tests"
)

assemblyMergeStrategy in assembly := {
case PathList("run-tests.py") => MergeStrategy.first
case x => (assemblyMergeStrategy in assembly).value(x)
}

scalacOptions ++= Seq(
"-target:jvm-1.8"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,10 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
// + file:/C:/hyperspace/src/test/part-00003.snappy.parquet
import spark.implicits._
val dataPathColumn = "_data_path"
val lineageDF = fileIdTracker.getFileToIdMap.toSeq
.map { kv =>
(kv._1._1.replace("file:/", "file:///"), kv._2)
}
.toDF(dataPathColumn, IndexConstants.DATA_FILE_NAME_ID)
val relation = df.queryExecution.optimizedPlan.asInstanceOf[LogicalRelation]
val lineagePairs =
Hyperspace.getContext(spark).sourceProviderManager.lineagePairs(relation, fileIdTracker)
val lineageDF = lineagePairs.toDF(dataPathColumn, IndexConstants.DATA_FILE_NAME_ID)

df.withColumn(dataPathColumn, input_file_name())
.join(lineageDF.hint("broadcast"), dataPathColumn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class OptimizeAction(
repartitionedDf.write.saveWithBuckets(
repartitionedDf,
indexDataPath.toString,
logEntry.asInstanceOf[IndexLogEntry].numBuckets,
numBuckets,
indexConfig.indexedColumns,
SaveMode.Overwrite)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,19 @@ private[actions] abstract class RefreshActionBase(
val latestRelation =
Hyperspace.getContext(spark).sourceProviderManager.refreshRelation(relations.head)
val dataSchema = DataType.fromJson(latestRelation.dataSchemaJson).asInstanceOf[StructType]
spark.read
val df = spark.read
.schema(dataSchema)
.format(latestRelation.fileFormat)
.options(latestRelation.options)
.load(latestRelation.rootPaths: _*)
// Due to the difference in how the "path" option is set: https://github.com/apache/spark/
// blob/ef1441b56c5cab02335d8d2e4ff95cf7e9c9b9ca/sql/core/src/main/scala/org/apache/spark/
// sql/DataFrameReader.scala#L197
// load() with a single parameter needs to be handled differently.
if (latestRelation.rootPaths.size == 1) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delta Lake only allows one path in load()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, we can do something like:

val df = spark.read
  .schema(dataSchema)
  .format(latestRelation.fileFormat)
  .options(latestRelation.options)
// Due to the difference in how the "path" option is set: https://github.com/apache/spark/blob/ef1441b56c5cab02335d8d2e4ff95cf7e9c9b9ca/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L197,
// load() with a single parameter needs to be handled differently.
if (latestRelation.rootPaths.size == 1) {
  df.load(latestRelation.rootPaths.head)
} else {
  df.load(latestRelation.rootPaths: _*)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, what happens if the delta lake implementation adds the "path" option in latestRelation.options?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

df.load(latestRelation.rootPaths.head)
} else {
df.load(latestRelation.rootPaths: _*)
}
}

protected lazy val indexConfig: IndexConfig = {
Expand Down Expand Up @@ -113,25 +121,15 @@ private[actions] abstract class RefreshActionBase(
* Build Set[FileInfo] to compare the source file list with the previous index version.
*/
protected lazy val currentFiles: Set[FileInfo] = {
df.queryExecution.optimizedPlan
.collect {
case LogicalRelation(
HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _),
_,
_,
_) =>
location
.allFiles()
.map { f =>
// For each file, if it already has a file id, add that id to its corresponding
// FileInfo. Note that if content of an existing file is changed, it is treated
// as a new file (i.e. its current file id is no longer valid).
val id = fileIdTracker.addFile(f)
FileInfo(f, id, asFullPath = true)
}
}
.flatten
.toSet
val curFiles = df.queryExecution.optimizedPlan.collect {
case relation: LogicalRelation =>
Hyperspace
.getContext(spark)
.sourceProviderManager
.allFiles(relation)
.map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = true))
}
curFiles.head.toSet
}

/**
Expand Down
69 changes: 33 additions & 36 deletions src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.types.{LongType, StructType}

import com.microsoft.hyperspace.Hyperspace
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.index.plans.logical.BucketUnion
import com.microsoft.hyperspace.util.HyperspaceConf
Expand Down Expand Up @@ -105,24 +106,23 @@ object RuleUtils {
// TODO: Duplicate listing files for the given relation as in
// [[transformPlanToUseHybridScan]]
// See https://github.com/microsoft/hyperspace/issues/160
val filesByRelations = plan
.collect {
case LogicalRelation(
HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _),
_,
_,
_) =>
location.allFiles.map(
f =>
// For a given file, file id is only meaningful in the context of a given
// index. At this point, we do not know which index, if any, would be picked.
// Therefore, we simply set the file id to UNKNOWN_FILE_ID.
FileInfo(
f.getPath.toString,
f.getLen,
f.getModificationTime,
IndexConstants.UNKNOWN_FILE_ID))
}
val filesByRelations = plan.collect {
case rel: LogicalRelation =>
Hyperspace
.getContext(spark)
.sourceProviderManager
.allFiles(rel)
.map { f =>
// For a given file, file id is only meaningful in the context of a given
// index. At this point, we do not know which index, if any, would be picked.
// Therefore, we simply set the file id to UNKNOWN_FILE_ID.
FileInfo(
f.getPath.toString,
f.getLen,
f.getModificationTime,
IndexConstants.UNKNOWN_FILE_ID)
}
}
assert(filesByRelations.length == 1)
indexes.filter(index =>
index.created && isHybridScanCandidate(index, filesByRelations.flatten))
Expand Down Expand Up @@ -165,13 +165,14 @@ object RuleUtils {
plan: LogicalPlan,
useBucketSpec: Boolean): LogicalPlan = {
// Check pre-requisite.
assert(getLogicalRelation(plan).isDefined)
val logicalRelation = getLogicalRelation(plan)
assert(logicalRelation.isDefined)

// If there is no change in source data files, the index can be applied by
// transformPlanToUseIndexOnlyScan regardless of Hybrid Scan config.
// This tag should always exist if Hybrid Scan is enabled.
val hybridScanRequired = HyperspaceConf.hybridScanEnabled(spark) &&
index.getTagValue(getLogicalRelation(plan).get, IndexLogEntryTags.HYBRIDSCAN_REQUIRED).get
index.getTagValue(logicalRelation.get, IndexLogEntryTags.HYBRIDSCAN_REQUIRED).get

// If the index has appended files and/or deleted files, which means the current index data
// is outdated, Hybrid Scan should be used to handle the newly updated source files.
Expand Down Expand Up @@ -261,7 +262,8 @@ object RuleUtils {
index: IndexLogEntry,
plan: LogicalPlan,
useBucketSpec: Boolean): LogicalPlan = {
val isParquetSourceFormat = index.relations.head.fileFormat.equals("parquet")
val fileFormat = index.relations.head.fileFormat
val isParquetSourceFormat = fileFormat.equals("parquet") || fileFormat.equals("delta")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, does this mean adding a new source provider is not enough?

Can we introduce hasParquetAsSourceFormat to the provider API and record this info in the metadata?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can do this as a separate PR if that is preferred. Please create an issue in that case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this case is too specific to create an API to source provider; and it refers fileformat string in index metadata, not relation.
So it's better to create the function in IndexLogEntry or some Utils class if needed. WDYT? @imback82

Copy link
Contributor

@imback82 imback82 Dec 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be able to plug in a source provider defined externally without changing Hyperspace codebase. For example, let's say I have a source format "blah" that uses parquet internally, and how can I plug in without modifying Hyperspace? One easy way to think about is whether you can implement delta source provider outside Hyperspace.

I think this case is too specific to create an API to source provider; and it refers fileformat string in index metadata, not relation.

You can do this in the create path and record it in the metadata.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got your point - it should be able to add a new source provider externally.
Let me handle this with a new pr & issue. Thanks!

var unhandledAppendedFiles: Seq[Path] = Nil

// Get transformed plan with index data and appended files if applicable.
Expand All @@ -271,7 +273,7 @@ object RuleUtils {
// can be transformed to 'Project -> Filter -> Logical Relation'. Thus, with transformDown,
// it will be matched again and transformed recursively which causes stack overflow exception.
case baseRelation @ LogicalRelation(
_ @HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _),
_ @HadoopFsRelation(location: FileIndex, _, _, _, _, _),
baseOutput,
_,
_) =>
Expand All @@ -281,7 +283,10 @@ object RuleUtils {
// appendedFiles and deletedFiles in IndexLogEntry.
(index.deletedFiles, index.appendedFiles.map(f => new Path(f.name)).toSeq)
} else {
val curFiles = location.allFiles
val curFiles = Hyperspace
.getContext(spark)
.sourceProviderManager
.allFiles(baseRelation)
.map(f => FileInfo(f, index.fileIdTracker.addFile(f), asFullPath = true))
if (HyperspaceConf.hybridScanDeleteEnabled(spark) && index.hasLineageColumn) {
val (exist, nonExist) = curFiles.partition(index.sourceFileInfoSet.contains)
Expand Down Expand Up @@ -410,11 +415,11 @@ object RuleUtils {
// Transform the location of LogicalRelation with appended files.
val planForAppended = originalPlan transformDown {
case baseRelation @ LogicalRelation(
fsRelation @ HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _),
fsRelation @ HadoopFsRelation(location: FileIndex, _, _, _, _, _),
baseOutput,
_,
_) =>
val options = extractBasePath(location.partitionSpec)
val options = extractBasePath(spark, location)
.map { basePath =>
// Set "basePath" so that partitioned columns are also included in the output schema.
Map("basePath" -> basePath)
Expand All @@ -440,19 +445,11 @@ object RuleUtils {
planForAppended
}

private def extractBasePath(partitionSpec: PartitionSpec): Option[String] = {
if (partitionSpec == PartitionSpec.emptySpec) {
private def extractBasePath(spark: SparkSession, location: FileIndex): Option[String] = {
if (location.partitionSchema.isEmpty) {
None
} else {
// For example, we could have the following in PartitionSpec:
// - partition columns = "col1", "col2"
// - partitions: "/path/col1=1/col2=1", "/path/col1=1/col2=2", etc.
// , and going up the same number of directory levels as the number of partition columns
// will compute the base path. Note that PartitionSpec.partitions will always contain
// all the partitions in the path, so "partitions.head" is taken as an initial value.
val basePath = partitionSpec.partitionColumns
.foldLeft(partitionSpec.partitions.head.path)((path, _) => path.getParent)
Some(basePath.toString)
Some(Hyperspace.getContext(spark).sourceProviderManager.partitionBasePath(location))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ package com.microsoft.hyperspace.index.sources

import scala.util.{Success, Try}

import org.apache.hadoop.fs.FileStatus
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.{FileIndex, LogicalRelation}
import org.apache.spark.util.hyperspace.Utils

import com.microsoft.hyperspace.HyperspaceException
Expand Down Expand Up @@ -80,6 +81,45 @@ class FileBasedSourceProviderManager(spark: SparkSession) {
run(p => p.signature(logicalRelation))
}

/**
* Runs allFiles() for each provider.
*
* @param logicalRelation Logical relation to retrieve all input files.
* @return List of all input files.
* @throws HyperspaceException if multiple providers returns [[Some]] or
* if no providers return [[Some]].
*/
def allFiles(logicalRelation: LogicalRelation): Seq[FileStatus] = {
run(p => p.allFiles(logicalRelation))
}

/**
* Runs partitionBasePath() for each provider.
*
* @param location Partitioned location.
* @return basePath string to read the given partitioned location.
* @throws HyperspaceException if multiple providers returns [[Some]] or
* if no providers return [[Some]].
*/
def partitionBasePath(location: FileIndex): String = {
run(p => p.partitionBasePath(location))
}

/**
* Runs lineagePairs() for each provider.
*
* @param logicalRelation Logical Relation to check the relation type.
* @param fileIdTracker [[FileIdTracker]] to create the list of (file path, file id).
* @return List of (file path, file id).
* @throws HyperspaceException if multiple providers returns [[Some]] or
* if no providers return [[Some]].
*/
def lineagePairs(
logicalRelation: LogicalRelation,
fileIdTracker: FileIdTracker): Seq[(String, Long)] = {
run(p => p.lineagePairs(logicalRelation, fileIdTracker))
}

/**
* Runs the given function 'f', which executes a [[FileBasedSourceProvider]]'s API that returns
* [[Option]] for each provider built. This function ensures that only one provider returns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex}
import org.apache.spark.sql.execution.datasources.{FileFormat, FileIndex, HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex}
import org.apache.spark.sql.sources.DataSourceRegister

import com.microsoft.hyperspace.HyperspaceException
Expand All @@ -49,7 +49,7 @@ class DefaultFileBasedSource(private val spark: SparkSession) extends FileBasedS
/**
* Creates [[Relation]] for IndexLogEntry using the given [[LogicalRelation]].
*
* @param logicalRelation logical relation to derive [[Relation]] from.
* @param logicalRelation Logical relation to derive [[Relation]] from.
* @param fileIdTracker [[FileIdTracker]] to use when populating the data of [[Relation]].
* @return [[Relation]] object if the given 'logicalRelation' can be processed by this provider.
* Otherwise, None.
Expand Down Expand Up @@ -153,7 +153,7 @@ class DefaultFileBasedSource(private val spark: SparkSession) extends FileBasedS
* Computes the signature using the given [[LogicalRelation]]. This computes a signature of
* using all the files found in [[PartitioningAwareFileIndex]].
*
* @param logicalRelation logical relation to compute signature from.
* @param logicalRelation Logical relation to compute signature from.
* @return Signature computed if the given 'logicalRelation' can be processed by this provider.
* Otherwise, None.
*/
Expand All @@ -173,13 +173,75 @@ class DefaultFileBasedSource(private val spark: SparkSession) extends FileBasedS
/**
* Fingerprints a file.
*
* @param fileStatus file status.
* @return the fingerprint of a file.
* @param fileStatus File status.
* @return The fingerprint of a file.
*/
private def fingerprint(fileStatus: FileStatus): String = {
fileStatus.getLen.toString + fileStatus.getModificationTime.toString +
fileStatus.getPath.toString
}

/**
* Retrieves all input files from the given [[LogicalRelation]].
*
* @param logicalRelation Logical relation to retrieve input files from.
* @return List of [[FileStatus]] for the given relation.
*/
override def allFiles(logicalRelation: LogicalRelation): Option[Seq[FileStatus]] = {
logicalRelation.relation match {
case HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _) =>
Some(location.allFiles)
case _ => None
}
}

/**
* Constructs the basePath for the given [[FileIndex]].
*
* @param location Partitioned data location.
* @return basePath to read the given partitioned location.
*/
override def partitionBasePath(location: FileIndex): Option[String] = {
location match {
case p: PartitioningAwareFileIndex =>
// For example, we could have the following in PartitionSpec:
// - partition columns = "col1", "col2"
// - partitions: "/path/col1=1/col2=1", "/path/col1=1/col2=2", etc.
// , and going up the same number of directory levels as the number of partition columns
// will compute the base path. Note that PartitionSpec.partitions will always contain
// all the partitions in the path, so "partitions.head" is taken as an initial value.
val basePath = p.partitionSpec.partitionColumns
.foldLeft(p.partitionSpec.partitions.head.path)((path, _) => path.getParent)
Some(basePath.toString)
case _ =>
None
}
}

/**
* Returns list of pairs of (file path, file id) to build lineage column.
*
* File paths should be the same format as "input_file_name()" of the given relation type.
* For [[DefaultFileBasedSource]], each file path should be in this format:
* `file:///path/to/file`
*
* @param logicalRelation Logical relation to check the relation type.
* @param fileIdTracker [[FileIdTracker]] to create the list of (file path, file id).
* @return List of pairs of (file path, file id).
*/
override def lineagePairs(
logicalRelation: LogicalRelation,
fileIdTracker: FileIdTracker): Option[Seq[(String, Long)]] = {
logicalRelation.relation match {
case HadoopFsRelation(_: PartitioningAwareFileIndex, _, _, _, format, _)
if isSupportedFileFormat(format) =>
Some(fileIdTracker.getFileToIdMap.toSeq.map { kv =>
(kv._1._1.replace("file:/", "file:///"), kv._2)
})
case _ =>
None
}
}
}

/**
Expand Down
Loading