Skip to content
This repository was archived by the owner on Jun 14, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/main/scala/com/microsoft/hyperspace/Hyperspace.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.index.IndexConstants.{OPTIMIZE_MODE_QUICK, REFRESH_MODE_FULL}
import com.microsoft.hyperspace.index.plananalysis.PlanAnalyzer
import com.microsoft.hyperspace.index.sources.FileBasedSourceProviderManager

class Hyperspace(spark: SparkSession) {
private val indexManager: IndexManager = Hyperspace.getContext(spark).indexCollectionManager
Expand Down Expand Up @@ -169,6 +170,14 @@ object Hyperspace {
context.get()
}

private[hyperspace] def getContext: HyperspaceContext = {
val sparkSession = SparkSession.getActiveSession.getOrElse {
throw HyperspaceException("Could not find active SparkSession.")
}

getContext(sparkSession)
}

def apply(): Hyperspace = {
val sparkSession = SparkSession.getActiveSession.getOrElse {
throw HyperspaceException("Could not find active SparkSession.")
Expand All @@ -180,4 +189,6 @@ object Hyperspace {

private[hyperspace] class HyperspaceContext(val spark: SparkSession) {
val indexCollectionManager = CachingIndexCollectionManager(spark)

val sourceProviderManager = new FileBasedSourceProviderManager(spark)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ package com.microsoft.hyperspace.actions

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.sources.DataSourceRegister

import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.{Hyperspace, HyperspaceException}
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.index.DataFrameWriterExtensions.Bucketizer
import com.microsoft.hyperspace.util.{HyperspaceConf, PathUtils, ResolverUtils}
Expand Down Expand Up @@ -64,7 +63,7 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)

signatureProvider.signature(df.queryExecution.optimizedPlan) match {
case Some(s) =>
val relations = sourceRelations(df)
val relations = sourceRelations(spark, df)
// Currently we only support to create an index on a LogicalRelation.
assert(relations.size == 1)

Expand Down Expand Up @@ -98,37 +97,10 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
}
}

protected def sourceRelations(df: DataFrame): Seq[Relation] =
protected def sourceRelations(spark: SparkSession, df: DataFrame): Seq[Relation] =
df.queryExecution.optimizedPlan.collect {
case LogicalRelation(
HadoopFsRelation(
location: PartitioningAwareFileIndex,
_,
dataSchema,
_,
fileFormat,
options),
_,
_,
_) =>
val files = location.allFiles
// Note that source files are currently fingerprinted when the optimized plan is
// fingerprinted by LogicalPlanFingerprint.
val sourceDataProperties =
Hdfs.Properties(Content.fromLeafFiles(files, fileIdTracker).get)

val fileFormatName = fileFormat match {
case d: DataSourceRegister => d.shortName
case other => throw HyperspaceException(s"Unsupported file format: $other")
}
// "path" key in options can incur multiple data read unexpectedly.
val opts = options - "path"
Relation(
location.rootPaths.map(_.toString),
Hdfs(sourceDataProperties),
dataSchema.json,
fileFormatName,
opts)
case p: LogicalRelation =>
Hyperspace.getContext(spark).sourceProviderManager.createRelation(p, fileIdTracker)
}

protected def write(spark: SparkSession, df: DataFrame, indexConfig: IndexConfig): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex}
import org.apache.spark.sql.types.{DataType, StructType}

import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.{Hyperspace, HyperspaceException}
import com.microsoft.hyperspace.actions.Constants.States.{ACTIVE, REFRESHING}
import com.microsoft.hyperspace.index._

Expand Down Expand Up @@ -66,13 +66,15 @@ private[actions] abstract class RefreshActionBase(

// Reconstruct a df from schema
protected lazy val df = {
val rels = previousIndexLogEntry.relations
val dataSchema = DataType.fromJson(rels.head.dataSchemaJson).asInstanceOf[StructType]
val relations = previousIndexLogEntry.relations
val latestRelation =
Hyperspace.getContext(spark).sourceProviderManager.refreshRelation(relations.head)
val dataSchema = DataType.fromJson(latestRelation.dataSchemaJson).asInstanceOf[StructType]
spark.read
.schema(dataSchema)
.format(rels.head.fileFormat)
.options(rels.head.options)
.load(rels.head.rootPaths: _*)
.format(latestRelation.fileFormat)
.options(latestRelation.options)
.load(latestRelation.rootPaths: _*)
}

protected lazy val indexConfig: IndexConfig = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

package com.microsoft.hyperspace.index

import org.apache.hadoop.fs.FileStatus
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex}
import org.apache.spark.sql.execution.datasources.LogicalRelation

import com.microsoft.hyperspace.{Hyperspace, HyperspaceException}
import com.microsoft.hyperspace.util.HashingUtils

/**
Expand Down Expand Up @@ -49,15 +49,8 @@ class FileBasedSignatureProvider extends LogicalPlanSignatureProvider {
private def fingerprintVisitor(logicalPlan: LogicalPlan): Option[String] = {
var fingerprint = ""
logicalPlan.foreachUp {
// Currently we are only collecting plan fingerprint from hdfs file based scan nodes.
case LogicalRelation(
HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _),
_,
_,
_) =>
fingerprint ++= location.allFiles.sortBy(_.getPath.toString).foldLeft("")(
(accumulate: String, fileStatus: FileStatus) =>
HashingUtils.md5Hex(accumulate + getFingerprint(fileStatus)))
case p: LogicalRelation =>
fingerprint ++= Hyperspace.getContext.sourceProviderManager.signature(p)
case _ =>
}

Expand All @@ -66,15 +59,4 @@ class FileBasedSignatureProvider extends LogicalPlanSignatureProvider {
case _ => Some(fingerprint)
}
}

/**
* Get the fingerprint of a file.
*
* @param fileStatus file status.
* @return the fingerprint of a file.
*/
private def getFingerprint(fileStatus: FileStatus): String = {
fileStatus.getLen.toString + fileStatus.getModificationTime.toString +
fileStatus.getPath.toString
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright (2020) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.index.sources

import scala.util.{Success, Try}

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.util.hyperspace.Utils

import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.index.{FileIdTracker, Relation}
import com.microsoft.hyperspace.util.{CacheWithTransform, HyperspaceConf}

/**
* [[FileBasedSourceProviderManager]] is responsible for loading source providers which implements
* [[FileBasedSourceProvider]] and running APIs for each provider loaded.
*
* Each API in [[FileBasedSourceProvider]] returns [[Option]] and this manager ensures that only
* one provider returns [[Some]] for each API.
*
* @param spark Spark session.
*/
class FileBasedSourceProviderManager(spark: SparkSession) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to create a test for this.

private val sourceProviders: CacheWithTransform[String, Seq[FileBasedSourceProvider]] =
new CacheWithTransform[String, Seq[FileBasedSourceProvider]]({ () =>
HyperspaceConf.fileBasedSourceBuilders(spark)
}, { builderClassNames =>
buildProviders(builderClassNames)
})

/**
* Runs createRelation() for each provider.
*
* @param logicalRelation Logical relation to create [[Relation]] from.
* @param fileIdTracker [[FileIdTracker]] to use when populating the data of [[Relation]].
* @return [[Relation]] created from the given logical relation.
* @throws HyperspaceException if multiple providers returns [[Some]] or
* if no providers return [[Some]].
*/
def createRelation(logicalRelation: LogicalRelation, fileIdTracker: FileIdTracker): Relation = {
run(p => p.createRelation(logicalRelation, fileIdTracker))
}

/**
* Runs refreshRelation() for each provider.
*
* @param relation [[Relation]] to refresh.
* @return Refreshed [[Relation]].
* @throws HyperspaceException if multiple providers returns [[Some]] or
* if no providers return [[Some]].
*/
def refreshRelation(relation: Relation): Relation = {
run(p => p.refreshRelation(relation))
}

/**
* Runs signature() for each provider.
*
* @param logicalRelation Logical relation to compute signature from.
* @return Computed signature.
* @throws HyperspaceException if multiple providers returns [[Some]] or
* if no providers return [[Some]].
*/
def signature(logicalRelation: LogicalRelation): String = {
run(p => p.signature(logicalRelation))
}

/**
* Runs the given function 'f', which executes a [[FileBasedSourceProvider]]'s API that returns
* [[Option]] for each provider built. This function ensures that only one provider returns
* [[Some]] when 'f' is executed.
*
* @param f Function that runs a [[FileBasedSourceProvider]]'s API that returns [[Option]]
* given a provider.
* @tparam A Type of the object that 'f' returns, wrapped in [[Option]].
* @return The object in [[Some]] that 'f' returns.
*/
private def run[A](f: FileBasedSourceProvider => Option[A]): A = {
sourceProviders
.load()
.foldLeft(Option.empty[(A, FileBasedSourceProvider)]) { (result, provider) =>
val cur = f(provider)
if (cur.isDefined) {
if (result.isDefined) {
throw HyperspaceException(
"Multiple source providers returned valid results: " +
s"'${provider.getClass.getName}' and '${result.get._2.getClass.getName}'")
}
Some(cur.get, provider)
} else {
result
}
}
.map(_._1)
.getOrElse {
throw HyperspaceException("No source provider returned valid results.")
}
}

/**
* Given a comma separated class names that implement [[SourceProviderBuilder]], this method
* builds source providers that implement [[FileBasedSourceProvider]].
*
* @param builderClassNames Name of classes to load as [[SourceProviderBuilder]].
* @return [[FileBasedSourceProvider]] objects built.
* @throws HyperspaceException if given builders cannot be loaded or
* if builder doesn't build [[FileBasedSourceProvider]].
*/
private def buildProviders(builderClassNames: String): Seq[FileBasedSourceProvider] = {
val builders = builderClassNames.split(",").map(_.trim).map { name =>
Try(Utils.classForName(name).getConstructor().newInstance()) match {
case Success(builder: SourceProviderBuilder) => builder
case _ => throw HyperspaceException(s"Cannot load SourceProviderBuilder: '$name'")
}
}

builders.map { builder =>
builder.build(spark) match {
case p: FileBasedSourceProvider => p
case other =>
throw HyperspaceException(
s"'$builder' did not build FileBasedSourceProvider: '$other')")
}
}
}
}
Loading