From b584275ca80f9f8a34a3c88ee3d3b686da111bc3 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Fri, 23 Oct 2020 17:59:16 -0700 Subject: [PATCH 01/13] initial commit --- .../com/microsoft/hyperspace/Hyperspace.scala | 12 +++ .../hyperspace/actions/CreateActionBase.scala | 48 ++++------ .../actions/RefreshActionBase.scala | 24 +++-- .../index/FileBasedSignatureProvider.scala | 30 ++---- .../default/DefaultSourceProvider.scala | 92 +++++++++++++++++++ .../hyperspace/index/sources/interfaces.scala | 30 ++++++ 6 files changed, 174 insertions(+), 62 deletions(-) create mode 100644 src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultSourceProvider.scala create mode 100644 src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala diff --git a/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala b/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala index f4df20177..11cb67e96 100644 --- a/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala +++ b/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala @@ -21,6 +21,8 @@ import org.apache.spark.sql.{DataFrame, SparkSession} import com.microsoft.hyperspace.index._ import com.microsoft.hyperspace.index.IndexConstants.{OPTIMIZE_MODE_QUICK, REFRESH_MODE_FULL} import com.microsoft.hyperspace.index.plananalysis.PlanAnalyzer +import com.microsoft.hyperspace.index.sources.SourceProvider +import com.microsoft.hyperspace.index.sources.default.DefaultSourceProvider class Hyperspace(spark: SparkSession) { private val indexManager: IndexManager = Hyperspace.getContext(spark).indexCollectionManager @@ -169,6 +171,14 @@ object Hyperspace { context.get() } + private[hyperspace] def getContext: HyperspaceContext = { + val sparkSession = SparkSession.getActiveSession.getOrElse { + throw HyperspaceException("Could not find active SparkSession.") + } + + getContext(sparkSession) + } + def apply(): Hyperspace = { val sparkSession = SparkSession.getActiveSession.getOrElse { throw HyperspaceException("Could not find active SparkSession.") @@ -180,4 +190,6 @@ object Hyperspace { private[hyperspace] class HyperspaceContext(val spark: SparkSession) { val indexCollectionManager = CachingIndexCollectionManager(spark) + + val sourceProviders: Seq[SourceProvider] = Seq(DefaultSourceProvider) } diff --git a/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala b/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala index 813891efa..78012559d 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala @@ -18,12 +18,16 @@ package com.microsoft.hyperspace.actions import org.apache.hadoop.fs.Path import org.apache.spark.sql.{DataFrame, SparkSession} -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex} +import org.apache.spark.sql.execution.datasources.{ + HadoopFsRelation, + LogicalRelation, + PartitioningAwareFileIndex +} import org.apache.spark.sql.expressions.UserDefinedFunction import org.apache.spark.sql.functions.{input_file_name, udf} import org.apache.spark.sql.sources.DataSourceRegister -import com.microsoft.hyperspace.HyperspaceException +import com.microsoft.hyperspace.{Hyperspace, HyperspaceException} import com.microsoft.hyperspace.index._ import com.microsoft.hyperspace.index.DataFrameWriterExtensions.Bucketizer import com.microsoft.hyperspace.util.{HyperspaceConf, PathUtils, ResolverUtils} @@ -63,7 +67,7 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager) signatureProvider.signature(df.queryExecution.optimizedPlan) match { case Some(s) => - val relations = sourceRelations(df) + val relations = sourceRelations(spark, df) // Currently we only support to create an index on a LogicalRelation. assert(relations.size == 1) @@ -90,35 +94,17 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager) } } - protected def sourceRelations(df: DataFrame): Seq[Relation] = + protected def sourceRelations(spark: SparkSession, df: DataFrame): Seq[Relation] = df.queryExecution.optimizedPlan.collect { - case LogicalRelation( - HadoopFsRelation( - location: PartitioningAwareFileIndex, - _, - dataSchema, - _, - fileFormat, - options), - _, - _, - _) => - val files = location.allFiles - // Note that source files are currently fingerprinted when the optimized plan is - // fingerprinted by LogicalPlanFingerprint. - val sourceDataProperties = Hdfs.Properties(Content.fromLeafFiles(files)) - val fileFormatName = fileFormat match { - case d: DataSourceRegister => d.shortName - case other => throw HyperspaceException(s"Unsupported file format: $other") - } - // "path" key in options can incur multiple data read unexpectedly. - val opts = options - "path" - Relation( - location.rootPaths.map(_.toString), - Hdfs(sourceDataProperties), - dataSchema.json, - fileFormatName, - opts) + case p: LogicalRelation => + Hyperspace + .getContext(spark) + .sourceProviders + .view + .map(source => source.createRelation(p)) + .collectFirst { case Some(x) => x } + .getOrElse( + throw HyperspaceException("No source provider could reconstruct the given relation.")) } protected def write(spark: SparkSession, df: DataFrame, indexConfig: IndexConfig): Unit = { diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala index 0d99da8bd..0332f21c4 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala @@ -17,10 +17,14 @@ package com.microsoft.hyperspace.actions import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex} +import org.apache.spark.sql.execution.datasources.{ + HadoopFsRelation, + LogicalRelation, + PartitioningAwareFileIndex +} import org.apache.spark.sql.types.{DataType, StructType} -import com.microsoft.hyperspace.HyperspaceException +import com.microsoft.hyperspace.{Hyperspace, HyperspaceException} import com.microsoft.hyperspace.actions.Constants.States.{ACTIVE, REFRESHING} import com.microsoft.hyperspace.index._ @@ -64,13 +68,15 @@ private[actions] abstract class RefreshActionBase( // Reconstruct a df from schema protected lazy val df = { - val rels = previousIndexLogEntry.relations - val dataSchema = DataType.fromJson(rels.head.dataSchemaJson).asInstanceOf[StructType] - spark.read - .schema(dataSchema) - .format(rels.head.fileFormat) - .options(rels.head.options) - .load(rels.head.rootPaths: _*) + val relations = previousIndexLogEntry.relations + Hyperspace + .getContext(spark) + .sourceProviders + .view + .map(source => source.reconstructDataFrame(spark, relations.head)) + .collectFirst { case Some(x) => x } + .getOrElse( + throw HyperspaceException("No source provider could reconstruct the given relation.")) } protected lazy val indexConfig: IndexConfig = { diff --git a/src/main/scala/com/microsoft/hyperspace/index/FileBasedSignatureProvider.scala b/src/main/scala/com/microsoft/hyperspace/index/FileBasedSignatureProvider.scala index fc4692146..c82e31ac0 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/FileBasedSignatureProvider.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/FileBasedSignatureProvider.scala @@ -16,11 +16,11 @@ package com.microsoft.hyperspace.index -import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex} +import org.apache.spark.sql.execution.datasources.LogicalRelation import com.microsoft.hyperspace.util.HashingUtils +import com.microsoft.hyperspace.{Hyperspace, HyperspaceException} /** * [[FileBasedSignatureProvider]] provides the logical plan signature based on files in the @@ -49,15 +49,12 @@ class FileBasedSignatureProvider extends LogicalPlanSignatureProvider { private def fingerprintVisitor(logicalPlan: LogicalPlan): Option[String] = { var fingerprint = "" logicalPlan.foreachUp { - // Currently we are only collecting plan fingerprint from hdfs file based scan nodes. - case LogicalRelation( - HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _), - _, - _, - _) => - fingerprint ++= location.allFiles.foldLeft("")( - (accumulate: String, fileStatus: FileStatus) => - HashingUtils.md5Hex(accumulate + getFingerprint(fileStatus))) + case p: LogicalRelation => + Hyperspace.getContext.sourceProviders.view + .map(source => source.signature(p)) + .collectFirst { case Some(x) => x } + .map(fingerprint ++= _) + .orElse(throw HyperspaceException("No signature is found from source providers")) case _ => } @@ -66,15 +63,4 @@ class FileBasedSignatureProvider extends LogicalPlanSignatureProvider { case _ => Some(fingerprint) } } - - /** - * Get the fingerprint of a file. - * - * @param fileStatus file status. - * @return the fingerprint of a file. - */ - private def getFingerprint(fileStatus: FileStatus): String = { - fileStatus.getLen.toString + fileStatus.getModificationTime.toString + - fileStatus.getPath.toString - } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultSourceProvider.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultSourceProvider.scala new file mode 100644 index 000000000..e3c4a26c7 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultSourceProvider.scala @@ -0,0 +1,92 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.sources.default + +import org.apache.hadoop.fs.FileStatus +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex} +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.types.{DataType, StructType} + +import com.microsoft.hyperspace.HyperspaceException +import com.microsoft.hyperspace.index.{Content, Hdfs, Relation} +import com.microsoft.hyperspace.index.sources.SourceProvider +import com.microsoft.hyperspace.util.HashingUtils + +object DefaultSourceProvider extends SourceProvider { + override def createRelation(logicalRelation: LogicalRelation): Option[Relation] = { + logicalRelation.relation match { + case HadoopFsRelation( + location: PartitioningAwareFileIndex, + _, + dataSchema, + _, + fileFormat, + options) => + val files = location.allFiles + // Note that source files are currently fingerprinted when the optimized plan is + // fingerprinted by LogicalPlanFingerprint. + val sourceDataProperties = Hdfs.Properties(Content.fromLeafFiles(files)) + val fileFormatName = fileFormat match { + case d: DataSourceRegister => d.shortName + case other => throw HyperspaceException(s"Unsupported file format: $other") + } + // "path" key in options can incur multiple data read unexpectedly. + val opts = options - "path" + Some( + Relation( + location.rootPaths.map(_.toString), + Hdfs(sourceDataProperties), + dataSchema.json, + fileFormatName, + opts)) + case _ => None + } + } + + override def signature(relation: LogicalRelation): Option[String] = { + // Currently we are only collecting plan fingerprint from hdfs file based scan nodes. + relation.relation match { + case HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _) => + val fingerprint = location.allFiles.foldLeft("") { (acc: String, f: FileStatus) => + HashingUtils.md5Hex(acc + getFingerprint(f)) + } + Some(fingerprint) + } + } + + /** + * Get the fingerprint of a file. + * + * @param fileStatus file status. + * @return the fingerprint of a file. + */ + private def getFingerprint(fileStatus: FileStatus): String = { + fileStatus.getLen.toString + fileStatus.getModificationTime.toString + + fileStatus.getPath.toString + } + + override def reconstructDataFrame(spark: SparkSession, relation: Relation): Option[DataFrame] = { + val dataSchema = DataType.fromJson(relation.dataSchemaJson).asInstanceOf[StructType] + val df = spark.read + .schema(dataSchema) + .format(relation.fileFormat) + .options(relation.options) + .load(relation.rootPaths: _*) + Some(df) + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala new file mode 100644 index 000000000..2a67c38ea --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala @@ -0,0 +1,30 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.sources + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.execution.datasources.LogicalRelation + +import com.microsoft.hyperspace.index.Relation + +trait SourceProvider { + def createRelation(logicalRelation: LogicalRelation): Option[Relation] + + def reconstructDataFrame(spark: SparkSession, relation: Relation): Option[DataFrame] + + def signature(relation: LogicalRelation): Option[String] +} From ecdb918887463ca64068233cc486eb6d1474d859 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Fri, 23 Oct 2020 19:17:21 -0700 Subject: [PATCH 02/13] fix tests --- .../microsoft/hyperspace/index/FileBasedSignatureProvider.scala | 2 +- .../com/microsoft/hyperspace/actions/CreateActionTest.scala | 2 +- .../com/microsoft/hyperspace/actions/RefreshActionTest.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/FileBasedSignatureProvider.scala b/src/main/scala/com/microsoft/hyperspace/index/FileBasedSignatureProvider.scala index c82e31ac0..d71ff8d5c 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/FileBasedSignatureProvider.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/FileBasedSignatureProvider.scala @@ -19,8 +19,8 @@ package com.microsoft.hyperspace.index import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.LogicalRelation -import com.microsoft.hyperspace.util.HashingUtils import com.microsoft.hyperspace.{Hyperspace, HyperspaceException} +import com.microsoft.hyperspace.util.HashingUtils /** * [[FileBasedSignatureProvider]] provides the logical plan signature based on files in the diff --git a/src/test/scala/com/microsoft/hyperspace/actions/CreateActionTest.scala b/src/test/scala/com/microsoft/hyperspace/actions/CreateActionTest.scala index c276fd640..8b8bf4240 100644 --- a/src/test/scala/com/microsoft/hyperspace/actions/CreateActionTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/actions/CreateActionTest.scala @@ -38,7 +38,7 @@ class CreateActionTest extends SparkFunSuite with SparkInvolvedSuite with SQLHel private val mockDataManager: IndexDataManager = mock(classOf[IndexDataManager]) object CreateActionBaseWrapper extends CreateActionBase(mockDataManager) { - def getSourceRelations(df: DataFrame): Seq[Relation] = sourceRelations(df) + def getSourceRelations(df: DataFrame): Seq[Relation] = sourceRelations(spark, df) } override def beforeAll(): Unit = { diff --git a/src/test/scala/com/microsoft/hyperspace/actions/RefreshActionTest.scala b/src/test/scala/com/microsoft/hyperspace/actions/RefreshActionTest.scala index a2743ac14..3ff60c686 100644 --- a/src/test/scala/com/microsoft/hyperspace/actions/RefreshActionTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/actions/RefreshActionTest.scala @@ -35,7 +35,7 @@ class RefreshActionTest extends SparkFunSuite with SparkInvolvedSuite { private var testLogEntry: LogEntry = _ object CreateActionBaseWrapper extends CreateActionBase(mockDataManager) { - def getSourceRelations(df: DataFrame): Seq[Relation] = sourceRelations(df) + def getSourceRelations(df: DataFrame): Seq[Relation] = sourceRelations(spark, df) } private def updateSourceFiles(): Unit = { From 8963ec64dff7324edf01c44b907ff85ce1c0882d Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Sat, 24 Oct 2020 14:54:22 -0700 Subject: [PATCH 03/13] Add comments --- .../hyperspace/index/sources/interfaces.scala | 44 ++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala index 2a67c38ea..fd99ccf8f 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala @@ -21,10 +21,52 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation import com.microsoft.hyperspace.index.Relation +/** + * ::Experimental:: + * A trait that a data source should implement so that an index can be created/managed and + * utilized for the data source. + * + * @since 0.3.0 + */ trait SourceProvider { + /** + * Creates [[Relation]] for IndexLogEntry using the given [[LogicalRelation]]. + * + * This API is used when an index is created. + * + * If the given logical relation does not belong to this provider, None should be returned. + * + * @param logicalRelation logical relation to derive [[Relation]] from. + * @return [[Relation]] object if the given 'logicalRelation' can be processed by this provider. + * Otherwise, None. + */ def createRelation(logicalRelation: LogicalRelation): Option[Relation] + /** + * Reconstructs [[DataFrame]] using the given [[Relation]]. + * + * This API is used when an index is refreshed. + * + * If the given relation does not belong to this provider, None should be returned. + * + * @param spark Spark session. + * @param relation [[Relation]] object to reconstruct [[DataFrame]] with. + * @return [[DataFrame]] object if the given 'relation' can be processed by this provider. + * Otherwise, None. + */ def reconstructDataFrame(spark: SparkSession, relation: Relation): Option[DataFrame] - def signature(relation: LogicalRelation): Option[String] + /** + * Computes the signature using the given [[LogicalRelation]]. + * + * This API is used when the signature of source needs to be computed, e.g., creating an index, + * computing query plan's signature, etc. + * + * If the given logical relation does not belong to this provider, None should be returned. + * + * @param logicalRelation logical relation to compute signature from. + * @return Signature computed if the given 'logicalRelation' can be processed by this provider. + * Otherwise, None. + */ + def signature(logicalRelation: LogicalRelation): Option[String] } From 1f1e7a552353a092db2fbb8e4563c147c86cd65b Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Sat, 24 Oct 2020 22:57:36 -0700 Subject: [PATCH 04/13] introduce supported format --- .../com/microsoft/hyperspace/Hyperspace.scala | 4 +- .../hyperspace/actions/CreateActionBase.scala | 11 +- .../actions/RefreshActionBase.scala | 2 +- .../default/DefaultFileBasedSource.scala | 149 ++++++++++++++++++ .../default/DefaultSourceProvider.scala | 92 ----------- .../hyperspace/index/sources/interfaces.scala | 3 +- 6 files changed, 156 insertions(+), 105 deletions(-) create mode 100644 src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala delete mode 100644 src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultSourceProvider.scala diff --git a/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala b/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala index 11cb67e96..f30767260 100644 --- a/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala +++ b/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala @@ -22,7 +22,7 @@ import com.microsoft.hyperspace.index._ import com.microsoft.hyperspace.index.IndexConstants.{OPTIMIZE_MODE_QUICK, REFRESH_MODE_FULL} import com.microsoft.hyperspace.index.plananalysis.PlanAnalyzer import com.microsoft.hyperspace.index.sources.SourceProvider -import com.microsoft.hyperspace.index.sources.default.DefaultSourceProvider +import com.microsoft.hyperspace.index.sources.default.DefaultFileBasedSource class Hyperspace(spark: SparkSession) { private val indexManager: IndexManager = Hyperspace.getContext(spark).indexCollectionManager @@ -191,5 +191,5 @@ object Hyperspace { private[hyperspace] class HyperspaceContext(val spark: SparkSession) { val indexCollectionManager = CachingIndexCollectionManager(spark) - val sourceProviders: Seq[SourceProvider] = Seq(DefaultSourceProvider) + val sourceProviders: Seq[SourceProvider] = Seq(new DefaultFileBasedSource(spark)) } diff --git a/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala b/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala index 78012559d..1c7806415 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala @@ -18,14 +18,9 @@ package com.microsoft.hyperspace.actions import org.apache.hadoop.fs.Path import org.apache.spark.sql.{DataFrame, SparkSession} -import org.apache.spark.sql.execution.datasources.{ - HadoopFsRelation, - LogicalRelation, - PartitioningAwareFileIndex -} +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.expressions.UserDefinedFunction import org.apache.spark.sql.functions.{input_file_name, udf} -import org.apache.spark.sql.sources.DataSourceRegister import com.microsoft.hyperspace.{Hyperspace, HyperspaceException} import com.microsoft.hyperspace.index._ @@ -103,8 +98,8 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager) .view .map(source => source.createRelation(p)) .collectFirst { case Some(x) => x } - .getOrElse( - throw HyperspaceException("No source provider could reconstruct the given relation.")) + .getOrElse(throw HyperspaceException( + "No source providers could reconstruct the given relation.")) } protected def write(spark: SparkSession, df: DataFrame, indexConfig: IndexConfig): Unit = { diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala index 0332f21c4..60fd4c46f 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala @@ -73,7 +73,7 @@ private[actions] abstract class RefreshActionBase( .getContext(spark) .sourceProviders .view - .map(source => source.reconstructDataFrame(spark, relations.head)) + .map(source => source.reconstructDataFrame(relations.head)) .collectFirst { case Some(x) => x } .getOrElse( throw HyperspaceException("No source provider could reconstruct the given relation.")) diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala new file mode 100644 index 000000000..cbf22221e --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala @@ -0,0 +1,149 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.sources.default + +import java.util.Locale + +import org.apache.hadoop.fs.FileStatus +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex} +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.types.{DataType, StructType} + +import com.microsoft.hyperspace.index.{Content, Hdfs, Relation} +import com.microsoft.hyperspace.index.sources.SourceProvider +import com.microsoft.hyperspace.util.HashingUtils + +/** + * Default implementation for file-based Spark built-in sources such as parquet, csv, json, etc. + * + * This source can support relations that meet the following criteria: + * - The relation is [[HadoopFsRelation]] with [[PartitioningAwareFileIndex]] as file index. + * - Its file format implements [[DataSourceRegister]]. + */ +class DefaultFileBasedSource(private val spark: SparkSession) extends SourceProvider { + private val supportedFormats = Set("avro", "csv", "json", "orc", "parquet", "text") + + /** + * Creates [[Relation]] for IndexLogEntry using the given [[LogicalRelation]]. + * + * @param logicalRelation logical relation to derive [[Relation]] from. + * @return [[Relation]] object if the given 'logicalRelation' can be processed by this provider. + * Otherwise, None. + */ + override def createRelation(logicalRelation: LogicalRelation): Option[Relation] = { + logicalRelation.relation match { + case HadoopFsRelation( + location: PartitioningAwareFileIndex, + _, + dataSchema, + _, + fileFormat, + options) if isSupportedFileFormat(fileFormat) => + val files = location.allFiles + // Note that source files are currently fingerprinted when the optimized plan is + // fingerprinted by LogicalPlanFingerprint. + val sourceDataProperties = Hdfs.Properties(Content.fromLeafFiles(files)) + val fileFormatName = fileFormat.asInstanceOf[DataSourceRegister].shortName + // "path" key in options can incur multiple data read unexpectedly. + val opts = options - "path" + Some( + Relation( + location.rootPaths.map(_.toString), + Hdfs(sourceDataProperties), + dataSchema.json, + fileFormatName, + opts)) + case _ => None + } + } + + private def isSupportedFileFormat(format: FileFormat): Boolean = { + format match { + case d: DataSourceRegister if isSupportedFileFormatName(d.shortName) => true + case _ => false + } + } + + private def isSupportedFileFormatName(name: String): Boolean = { + val supportedFileFormatsOverride = spark.sessionState.conf + .getConfString( + "spark.hyperspace.index.sources.defaultFileBasedSource.supportedFileFormats", + "") + if (supportedFileFormatsOverride.nonEmpty) { + supportedFileFormatsOverride + .toLowerCase(Locale.ROOT) + .split(",") + .map(_.trim) + .contains(name.toLowerCase(Locale.ROOT)) + } else { + supportedFormats.contains(name.toLowerCase(Locale.ROOT)) + } + } + + /** + * Reconstructs [[DataFrame]] using the given [[Relation]]. + * + * @param relation [[Relation]] object to reconstruct [[DataFrame]] with. + * @return [[DataFrame]] object if the given 'relation' can be processed by this provider. + * Otherwise, None. + */ + override def reconstructDataFrame(relation: Relation): Option[DataFrame] = { + if (isSupportedFileFormatName(relation.fileFormat)) { + val dataSchema = DataType.fromJson(relation.dataSchemaJson).asInstanceOf[StructType] + val df = spark.read + .schema(dataSchema) + .format(relation.fileFormat) + .options(relation.options) + .load(relation.rootPaths: _*) + Some(df) + } else { + None + } + } + + /** + * Computes the signature using the given [[LogicalRelation]]. + * + * @param logicalRelation logical relation to compute signature from. + * @return Signature computed if the given 'logicalRelation' can be processed by this provider. + * Otherwise, None. + */ + override def signature(logicalRelation: LogicalRelation): Option[String] = { + // Currently we are only collecting plan fingerprint from hdfs file based scan nodes. + logicalRelation.relation match { + case HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, format, _) + if isSupportedFileFormat(format) => + val result = location.allFiles.foldLeft("") { (acc: String, f: FileStatus) => + HashingUtils.md5Hex(acc + fingerprint(f)) + } + Some(result) + case _ => None + } + } + + /** + * Fingerprints a file. + * + * @param fileStatus file status. + * @return the fingerprint of a file. + */ + private def fingerprint(fileStatus: FileStatus): String = { + fileStatus.getLen.toString + fileStatus.getModificationTime.toString + + fileStatus.getPath.toString + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultSourceProvider.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultSourceProvider.scala deleted file mode 100644 index e3c4a26c7..000000000 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultSourceProvider.scala +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright (2020) The Hyperspace Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.microsoft.hyperspace.index.sources.default - -import org.apache.hadoop.fs.FileStatus -import org.apache.spark.sql.{DataFrame, SparkSession} -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex} -import org.apache.spark.sql.sources.DataSourceRegister -import org.apache.spark.sql.types.{DataType, StructType} - -import com.microsoft.hyperspace.HyperspaceException -import com.microsoft.hyperspace.index.{Content, Hdfs, Relation} -import com.microsoft.hyperspace.index.sources.SourceProvider -import com.microsoft.hyperspace.util.HashingUtils - -object DefaultSourceProvider extends SourceProvider { - override def createRelation(logicalRelation: LogicalRelation): Option[Relation] = { - logicalRelation.relation match { - case HadoopFsRelation( - location: PartitioningAwareFileIndex, - _, - dataSchema, - _, - fileFormat, - options) => - val files = location.allFiles - // Note that source files are currently fingerprinted when the optimized plan is - // fingerprinted by LogicalPlanFingerprint. - val sourceDataProperties = Hdfs.Properties(Content.fromLeafFiles(files)) - val fileFormatName = fileFormat match { - case d: DataSourceRegister => d.shortName - case other => throw HyperspaceException(s"Unsupported file format: $other") - } - // "path" key in options can incur multiple data read unexpectedly. - val opts = options - "path" - Some( - Relation( - location.rootPaths.map(_.toString), - Hdfs(sourceDataProperties), - dataSchema.json, - fileFormatName, - opts)) - case _ => None - } - } - - override def signature(relation: LogicalRelation): Option[String] = { - // Currently we are only collecting plan fingerprint from hdfs file based scan nodes. - relation.relation match { - case HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _) => - val fingerprint = location.allFiles.foldLeft("") { (acc: String, f: FileStatus) => - HashingUtils.md5Hex(acc + getFingerprint(f)) - } - Some(fingerprint) - } - } - - /** - * Get the fingerprint of a file. - * - * @param fileStatus file status. - * @return the fingerprint of a file. - */ - private def getFingerprint(fileStatus: FileStatus): String = { - fileStatus.getLen.toString + fileStatus.getModificationTime.toString + - fileStatus.getPath.toString - } - - override def reconstructDataFrame(spark: SparkSession, relation: Relation): Option[DataFrame] = { - val dataSchema = DataType.fromJson(relation.dataSchemaJson).asInstanceOf[StructType] - val df = spark.read - .schema(dataSchema) - .format(relation.fileFormat) - .options(relation.options) - .load(relation.rootPaths: _*) - Some(df) - } -} diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala index fd99ccf8f..3fe938a92 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala @@ -49,12 +49,11 @@ trait SourceProvider { * * If the given relation does not belong to this provider, None should be returned. * - * @param spark Spark session. * @param relation [[Relation]] object to reconstruct [[DataFrame]] with. * @return [[DataFrame]] object if the given 'relation' can be processed by this provider. * Otherwise, None. */ - def reconstructDataFrame(spark: SparkSession, relation: Relation): Option[DataFrame] + def reconstructDataFrame(relation: Relation): Option[DataFrame] /** * Computes the signature using the given [[LogicalRelation]]. From 03fd1ed9e88b97b5001786664d8b68073982389d Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Sun, 25 Oct 2020 00:22:02 -0700 Subject: [PATCH 05/13] revamp session based provider --- .../com/microsoft/hyperspace/Hyperspace.scala | 5 +- .../hyperspace/actions/CreateActionBase.scala | 9 +- .../actions/RefreshActionBase.scala | 16 +--- .../index/FileBasedSignatureProvider.scala | 6 +- .../index/sources/SourceProviderManager.scala | 84 +++++++++++++++++++ .../default/DefaultFileBasedSource.scala | 6 +- .../hyperspace/index/sources/interfaces.scala | 19 +++++ 7 files changed, 114 insertions(+), 31 deletions(-) create mode 100644 src/main/scala/com/microsoft/hyperspace/index/sources/SourceProviderManager.scala diff --git a/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala b/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala index f30767260..b4463c59e 100644 --- a/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala +++ b/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala @@ -21,8 +21,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession} import com.microsoft.hyperspace.index._ import com.microsoft.hyperspace.index.IndexConstants.{OPTIMIZE_MODE_QUICK, REFRESH_MODE_FULL} import com.microsoft.hyperspace.index.plananalysis.PlanAnalyzer -import com.microsoft.hyperspace.index.sources.SourceProvider -import com.microsoft.hyperspace.index.sources.default.DefaultFileBasedSource +import com.microsoft.hyperspace.index.sources.SourceProviderManager class Hyperspace(spark: SparkSession) { private val indexManager: IndexManager = Hyperspace.getContext(spark).indexCollectionManager @@ -191,5 +190,5 @@ object Hyperspace { private[hyperspace] class HyperspaceContext(val spark: SparkSession) { val indexCollectionManager = CachingIndexCollectionManager(spark) - val sourceProviders: Seq[SourceProvider] = Seq(new DefaultFileBasedSource(spark)) + val sourceProviderManager = new SourceProviderManager(spark) } diff --git a/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala b/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala index 1c7806415..803703e99 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala @@ -92,14 +92,7 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager) protected def sourceRelations(spark: SparkSession, df: DataFrame): Seq[Relation] = df.queryExecution.optimizedPlan.collect { case p: LogicalRelation => - Hyperspace - .getContext(spark) - .sourceProviders - .view - .map(source => source.createRelation(p)) - .collectFirst { case Some(x) => x } - .getOrElse(throw HyperspaceException( - "No source providers could reconstruct the given relation.")) + Hyperspace.getContext(spark).sourceProviderManager.createRelation(p) } protected def write(spark: SparkSession, df: DataFrame, indexConfig: IndexConfig): Unit = { diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala index 60fd4c46f..429a88a6a 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala @@ -17,12 +17,7 @@ package com.microsoft.hyperspace.actions import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.execution.datasources.{ - HadoopFsRelation, - LogicalRelation, - PartitioningAwareFileIndex -} -import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex} import com.microsoft.hyperspace.{Hyperspace, HyperspaceException} import com.microsoft.hyperspace.actions.Constants.States.{ACTIVE, REFRESHING} @@ -69,14 +64,7 @@ private[actions] abstract class RefreshActionBase( // Reconstruct a df from schema protected lazy val df = { val relations = previousIndexLogEntry.relations - Hyperspace - .getContext(spark) - .sourceProviders - .view - .map(source => source.reconstructDataFrame(relations.head)) - .collectFirst { case Some(x) => x } - .getOrElse( - throw HyperspaceException("No source provider could reconstruct the given relation.")) + Hyperspace.getContext(spark).sourceProviderManager.reconstructDataFrame(relations.head) } protected lazy val indexConfig: IndexConfig = { diff --git a/src/main/scala/com/microsoft/hyperspace/index/FileBasedSignatureProvider.scala b/src/main/scala/com/microsoft/hyperspace/index/FileBasedSignatureProvider.scala index d71ff8d5c..9bcd7fd1d 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/FileBasedSignatureProvider.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/FileBasedSignatureProvider.scala @@ -50,11 +50,7 @@ class FileBasedSignatureProvider extends LogicalPlanSignatureProvider { var fingerprint = "" logicalPlan.foreachUp { case p: LogicalRelation => - Hyperspace.getContext.sourceProviders.view - .map(source => source.signature(p)) - .collectFirst { case Some(x) => x } - .map(fingerprint ++= _) - .orElse(throw HyperspaceException("No signature is found from source providers")) + fingerprint ++= Hyperspace.getContext.sourceProviderManager.signature(p) case _ => } diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/SourceProviderManager.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/SourceProviderManager.scala new file mode 100644 index 000000000..cc49a8d7e --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/SourceProviderManager.scala @@ -0,0 +1,84 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.sources + +import scala.util.{Success, Try} + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.util.hyperspace.Utils + +import com.microsoft.hyperspace.HyperspaceException +import com.microsoft.hyperspace.index.Relation + +/** + * + * @param spark + */ +class SourceProviderManager(spark: SparkSession) { + private lazy val builders: Seq[SourceProviderBuilder] = { + val builders = spark.sessionState.conf + .getConfString( + "spark.hyperspace.index.sources.builders", + "com.microsoft.hyperspace.index.sources.default.DefaultFileBasedSourceBuilder") + + builders.split(",").map(_.trim).map { name => + Try(Utils.classForName(name).getConstructor().newInstance()) match { + case Success(builder: SourceProviderBuilder) => builder + case _ => throw HyperspaceException(s"Cannot load SourceProviderBuilder: '$name'") + } + } + } + + private lazy val sourceProviders: Seq[SourceProvider] = builders.map(_.build(spark)) + + /** + * + * @param logicalRelation + * @return + */ + def createRelation(logicalRelation: LogicalRelation): Relation = { + sourceProviders.view + .map(provider => provider.createRelation(logicalRelation)) + .collectFirst { case Some(x) => x } + .getOrElse( + throw HyperspaceException("No source providers could reconstruct the given relation.")) + } + + /** + * + */ + def reconstructDataFrame(relation: Relation): DataFrame = { + sourceProviders.view + .map(source => source.reconstructDataFrame(relation)) + .collectFirst { case Some(x) => x } + .getOrElse( + throw HyperspaceException("No source provider could reconstruct the given relation.")) + } + + /** + * + * @param logicalRelation + * @return + */ + def signature(logicalRelation: LogicalRelation): String = { + sourceProviders.view + .map(source => source.signature(logicalRelation)) + .collectFirst { case Some(x) => x } + .getOrElse(throw HyperspaceException("No signature is found from source providers")) + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala index cbf22221e..7426a685e 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.types.{DataType, StructType} import com.microsoft.hyperspace.index.{Content, Hdfs, Relation} -import com.microsoft.hyperspace.index.sources.SourceProvider +import com.microsoft.hyperspace.index.sources.{SourceProvider, SourceProviderBuilder} import com.microsoft.hyperspace.util.HashingUtils /** @@ -147,3 +147,7 @@ class DefaultFileBasedSource(private val spark: SparkSession) extends SourceProv fileStatus.getPath.toString } } + +class DefaultFileBasedSourceBuilder extends SourceProviderBuilder { + override def build(spark: SparkSession): SourceProvider = new DefaultFileBasedSource(spark) +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala index 3fe938a92..2667b0fd4 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala @@ -69,3 +69,22 @@ trait SourceProvider { */ def signature(logicalRelation: LogicalRelation): Option[String] } + +/** + * ::Experimental:: + * A trait that a source provider's builder should implement. Each source provider should have an + * accompanying builder in order to be plugged into the SourceProviderManager. + * + * The reason for having a builder is to inject [[SparkSession]] to the source provider if needed. + * + * @since 0.3.0 + */ +trait SourceProviderBuilder { + /** + * Builds a [[SourceProvider]]. + * + * @param spark Spark session. + * @return [[SourceProvider]] object. + */ + def build(spark: SparkSession): SourceProvider +} From 6705d798881c22669d13a31a6bfd409dc3f12140 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Sun, 25 Oct 2020 22:38:17 -0700 Subject: [PATCH 06/13] CacheWithTransform, few refactoring --- .../com/microsoft/hyperspace/Hyperspace.scala | 4 +- .../FileBasedSourceProviderManager.scala | 116 ++++++++++++++++++ .../index/sources/SourceProviderManager.scala | 84 ------------- .../default/DefaultFileBasedSource.scala | 27 ++-- .../hyperspace/index/sources/interfaces.scala | 49 +++++--- .../hyperspace/util/CacheWithTransform.scala | 40 ++++++ .../hyperspace/util/HyperspaceConf.scala | 16 ++- 7 files changed, 212 insertions(+), 124 deletions(-) create mode 100644 src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala delete mode 100644 src/main/scala/com/microsoft/hyperspace/index/sources/SourceProviderManager.scala create mode 100644 src/main/scala/com/microsoft/hyperspace/util/CacheWithTransform.scala diff --git a/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala b/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala index b4463c59e..17fc1ca06 100644 --- a/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala +++ b/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession} import com.microsoft.hyperspace.index._ import com.microsoft.hyperspace.index.IndexConstants.{OPTIMIZE_MODE_QUICK, REFRESH_MODE_FULL} import com.microsoft.hyperspace.index.plananalysis.PlanAnalyzer -import com.microsoft.hyperspace.index.sources.SourceProviderManager +import com.microsoft.hyperspace.index.sources.FileBasedSourceProviderManager class Hyperspace(spark: SparkSession) { private val indexManager: IndexManager = Hyperspace.getContext(spark).indexCollectionManager @@ -190,5 +190,5 @@ object Hyperspace { private[hyperspace] class HyperspaceContext(val spark: SparkSession) { val indexCollectionManager = CachingIndexCollectionManager(spark) - val sourceProviderManager = new SourceProviderManager(spark) + val sourceProviderManager = new FileBasedSourceProviderManager(spark) } diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala new file mode 100644 index 000000000..40e8bfc9a --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala @@ -0,0 +1,116 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.sources + +import scala.util.{Success, Try} + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.util.hyperspace.Utils + +import com.microsoft.hyperspace.HyperspaceException +import com.microsoft.hyperspace.index.Relation +import com.microsoft.hyperspace.util.{CacheWithTransform, HyperspaceConf} + +/** + * + * @param spark + */ +class FileBasedSourceProviderManager(spark: SparkSession) { + private val sourceProviders: CacheWithTransform[String, Seq[FileBasedSourceProvider]] = + new CacheWithTransform[String, Seq[FileBasedSourceProvider]]({ () => + HyperspaceConf.fileBasedSourceBuilders(spark) + }, { builderClassNames => + providers(builderClassNames) + }) + + /** + * + * @param logicalRelation + * @return + */ + def createRelation(logicalRelation: LogicalRelation): Relation = { + run(p => p.createRelation(logicalRelation)) + } + + /** + * + */ + def reconstructDataFrame(relation: Relation): DataFrame = { + run(p => p.reconstructDataFrame(relation)) + } + + /** + * + * @param logicalRelation + * @return + */ + def signature(logicalRelation: LogicalRelation): String = { + run(p => p.signature(logicalRelation)) + } + + /** + * + * @param f + * @tparam A + * @return + */ + private def run[A](f: FileBasedSourceProvider => Option[A]): A = { + sourceProviders + .load() + .foldLeft(Option.empty[(A, FileBasedSourceProvider)]) { (acc, p) => + val result = f(p) + if (result.isDefined) { + if (acc.isDefined) { + throw HyperspaceException( + "Multiple source providers returned valid results: " + + s"'${p.getClass.getName}' and '${acc.get._2.getClass.getName}'") + } + Some(result.get, p) + } else { + acc + } + } + .map(_._1) + .getOrElse { + throw HyperspaceException("No source provider returned valid results.") + } + } + + /** + * + * @param builderClassNames + * @return + */ + private def providers(builderClassNames: String): Seq[FileBasedSourceProvider] = { + val builders = builderClassNames.split(",").map(_.trim).map { name => + Try(Utils.classForName(name).getConstructor().newInstance()) match { + case Success(builder: SourceProviderBuilder) => builder + case _ => throw HyperspaceException(s"Cannot load SourceProviderBuilder: '$name'") + } + } + + builders.map { builder => + builder.build(spark) match { + case p: FileBasedSourceProvider => p + case other => + throw HyperspaceException( + s"Builder '$builder' did not build FileBasedSourceProvider: '$other')") + } + } + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/SourceProviderManager.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/SourceProviderManager.scala deleted file mode 100644 index cc49a8d7e..000000000 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/SourceProviderManager.scala +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright (2020) The Hyperspace Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.microsoft.hyperspace.index.sources - -import scala.util.{Success, Try} - -import org.apache.spark.sql.{DataFrame, SparkSession} -import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.util.hyperspace.Utils - -import com.microsoft.hyperspace.HyperspaceException -import com.microsoft.hyperspace.index.Relation - -/** - * - * @param spark - */ -class SourceProviderManager(spark: SparkSession) { - private lazy val builders: Seq[SourceProviderBuilder] = { - val builders = spark.sessionState.conf - .getConfString( - "spark.hyperspace.index.sources.builders", - "com.microsoft.hyperspace.index.sources.default.DefaultFileBasedSourceBuilder") - - builders.split(",").map(_.trim).map { name => - Try(Utils.classForName(name).getConstructor().newInstance()) match { - case Success(builder: SourceProviderBuilder) => builder - case _ => throw HyperspaceException(s"Cannot load SourceProviderBuilder: '$name'") - } - } - } - - private lazy val sourceProviders: Seq[SourceProvider] = builders.map(_.build(spark)) - - /** - * - * @param logicalRelation - * @return - */ - def createRelation(logicalRelation: LogicalRelation): Relation = { - sourceProviders.view - .map(provider => provider.createRelation(logicalRelation)) - .collectFirst { case Some(x) => x } - .getOrElse( - throw HyperspaceException("No source providers could reconstruct the given relation.")) - } - - /** - * - */ - def reconstructDataFrame(relation: Relation): DataFrame = { - sourceProviders.view - .map(source => source.reconstructDataFrame(relation)) - .collectFirst { case Some(x) => x } - .getOrElse( - throw HyperspaceException("No source provider could reconstruct the given relation.")) - } - - /** - * - * @param logicalRelation - * @return - */ - def signature(logicalRelation: LogicalRelation): String = { - sourceProviders.view - .map(source => source.signature(logicalRelation)) - .collectFirst { case Some(x) => x } - .getOrElse(throw HyperspaceException("No signature is found from source providers")) - } -} diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala index 7426a685e..2f490b040 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala @@ -25,8 +25,8 @@ import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.types.{DataType, StructType} import com.microsoft.hyperspace.index.{Content, Hdfs, Relation} -import com.microsoft.hyperspace.index.sources.{SourceProvider, SourceProviderBuilder} -import com.microsoft.hyperspace.util.HashingUtils +import com.microsoft.hyperspace.index.sources.{FileBasedSourceProvider, SourceProvider, SourceProviderBuilder} +import com.microsoft.hyperspace.util.{CacheWithTransform, HashingUtils, HyperspaceConf} /** * Default implementation for file-based Spark built-in sources such as parquet, csv, json, etc. @@ -35,8 +35,13 @@ import com.microsoft.hyperspace.util.HashingUtils * - The relation is [[HadoopFsRelation]] with [[PartitioningAwareFileIndex]] as file index. * - Its file format implements [[DataSourceRegister]]. */ -class DefaultFileBasedSource(private val spark: SparkSession) extends SourceProvider { - private val supportedFormats = Set("avro", "csv", "json", "orc", "parquet", "text") +class DefaultFileBasedSource(private val spark: SparkSession) extends FileBasedSourceProvider { + private val supportedFormats: CacheWithTransform[String, Set[String]] = + new CacheWithTransform[String, Set[String]]({ () => + HyperspaceConf.supportedFileFormatsForDefaultFileBasedSource(spark) + }, { formats => + formats.toLowerCase(Locale.ROOT).split(",").map(_.trim).toSet + }) /** * Creates [[Relation]] for IndexLogEntry using the given [[LogicalRelation]]. @@ -80,19 +85,7 @@ class DefaultFileBasedSource(private val spark: SparkSession) extends SourceProv } private def isSupportedFileFormatName(name: String): Boolean = { - val supportedFileFormatsOverride = spark.sessionState.conf - .getConfString( - "spark.hyperspace.index.sources.defaultFileBasedSource.supportedFileFormats", - "") - if (supportedFileFormatsOverride.nonEmpty) { - supportedFileFormatsOverride - .toLowerCase(Locale.ROOT) - .split(",") - .map(_.trim) - .contains(name.toLowerCase(Locale.ROOT)) - } else { - supportedFormats.contains(name.toLowerCase(Locale.ROOT)) - } + supportedFormats.load().contains(name.toLowerCase(Locale.ROOT)) } /** diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala index 2667b0fd4..6ffff0dfd 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala @@ -28,7 +28,35 @@ import com.microsoft.hyperspace.index.Relation * * @since 0.3.0 */ -trait SourceProvider { +trait SourceProvider + +/** + * ::Experimental:: + * A trait that a source provider's builder should implement. Each source provider should have an + * accompanying builder in order to be plugged into the SourceProviderManager. + * + * The reason for having a builder is to inject [[SparkSession]] to the source provider if needed. + * + * @since 0.3.0 + */ +trait SourceProviderBuilder { + /** + * Builds a [[SourceProvider]]. + * + * @param spark Spark session. + * @return [[SourceProvider]] object. + */ + def build(spark: SparkSession): SourceProvider +} + +/** + * ::Experimental:: + * A trait that a data source should implement so that an index can be created/managed and + * utilized for the data source. + * + * @since 0.3.0 + */ +trait FileBasedSourceProvider extends SourceProvider { /** * Creates [[Relation]] for IndexLogEntry using the given [[LogicalRelation]]. * @@ -69,22 +97,3 @@ trait SourceProvider { */ def signature(logicalRelation: LogicalRelation): Option[String] } - -/** - * ::Experimental:: - * A trait that a source provider's builder should implement. Each source provider should have an - * accompanying builder in order to be plugged into the SourceProviderManager. - * - * The reason for having a builder is to inject [[SparkSession]] to the source provider if needed. - * - * @since 0.3.0 - */ -trait SourceProviderBuilder { - /** - * Builds a [[SourceProvider]]. - * - * @param spark Spark session. - * @return [[SourceProvider]] object. - */ - def build(spark: SparkSession): SourceProvider -} diff --git a/src/main/scala/com/microsoft/hyperspace/util/CacheWithTransform.scala b/src/main/scala/com/microsoft/hyperspace/util/CacheWithTransform.scala new file mode 100644 index 000000000..71c87cba1 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/util/CacheWithTransform.scala @@ -0,0 +1,40 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.util + +/** + * + * @param init + * @param transform + * @tparam A + * @tparam B + */ +class CacheWithTransform[A, B](val init: () => A, val transform: A => B) { + private var initValue: A = _ + private var finalValue: B = _ + + def load(): B = { + val newInitValue = init() + if (initValue == newInitValue) { + finalValue + } else { + initValue = newInitValue + finalValue = transform(initValue) + finalValue + } + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala index ea7afc488..20775d759 100644 --- a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala +++ b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala @@ -18,7 +18,7 @@ package com.microsoft.hyperspace.util import org.apache.spark.sql.SparkSession -import com.microsoft.hyperspace.index.{Content, CoveringIndex, IndexConstants, IndexLogEntry, LogEntry, Source} +import com.microsoft.hyperspace.index.IndexConstants /** * Helper class to extract Hyperspace-related configs from SparkSession. @@ -71,4 +71,18 @@ object HyperspaceConf { IndexConstants.INDEX_LINEAGE_ENABLED_DEFAULT) .toBoolean } + + def fileBasedSourceBuilders(spark: SparkSession): String = { + spark.sessionState.conf + .getConfString( + "spark.hyperspace.index.sources.fileBasedBuilders", + "com.microsoft.hyperspace.index.sources.default.DefaultFileBasedSourceBuilder") + } + + def supportedFileFormatsForDefaultFileBasedSource(spark: SparkSession): String = { + spark.sessionState.conf + .getConfString( + "spark.hyperspace.index.sources.defaultFileBasedSource.supportedFileFormats", + "avro,csv,json,orc,parquet,text") + } } From 28acf25e12730ec0270fdf03f2177943f01dea19 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Tue, 27 Oct 2020 14:18:17 -0700 Subject: [PATCH 07/13] Fix compilation after merging --- .../index/sources/default/DefaultFileBasedSource.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala index 2f490b040..c45ff41aa 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala @@ -62,7 +62,7 @@ class DefaultFileBasedSource(private val spark: SparkSession) extends FileBasedS val files = location.allFiles // Note that source files are currently fingerprinted when the optimized plan is // fingerprinted by LogicalPlanFingerprint. - val sourceDataProperties = Hdfs.Properties(Content.fromLeafFiles(files)) + val sourceDataProperties = Hdfs.Properties(Content.fromLeafFiles(files).get) val fileFormatName = fileFormat.asInstanceOf[DataSourceRegister].shortName // "path" key in options can incur multiple data read unexpectedly. val opts = options - "path" From ee3224513373e814f79b217a2b758e13599cce1d Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Tue, 27 Oct 2020 16:47:41 -0700 Subject: [PATCH 08/13] Update comments / refreshRelation API --- .../actions/RefreshActionBase.scala | 10 +++++- .../FileBasedSourceProviderManager.scala | 9 ++--- .../default/DefaultFileBasedSource.scala | 35 ++++++++++++------- .../hyperspace/index/sources/interfaces.scala | 6 ++-- .../hyperspace/util/CacheWithTransform.scala | 13 ++++--- 5 files changed, 48 insertions(+), 25 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala index 54b6a2972..d0e7442d9 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala @@ -18,6 +18,7 @@ package com.microsoft.hyperspace.actions import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex} +import org.apache.spark.sql.types.{DataType, StructType} import com.microsoft.hyperspace.{Hyperspace, HyperspaceException} import com.microsoft.hyperspace.actions.Constants.States.{ACTIVE, REFRESHING} @@ -64,7 +65,14 @@ private[actions] abstract class RefreshActionBase( // Reconstruct a df from schema protected lazy val df = { val relations = previousIndexLogEntry.relations - Hyperspace.getContext(spark).sourceProviderManager.reconstructDataFrame(relations.head) + val latestRelation = + Hyperspace.getContext(spark).sourceProviderManager.refreshRelation(relations.head) + val dataSchema = DataType.fromJson(latestRelation.dataSchemaJson).asInstanceOf[StructType] + spark.read + .schema(dataSchema) + .format(latestRelation.fileFormat) + .options(latestRelation.options) + .load(latestRelation.rootPaths: _*) } protected lazy val indexConfig: IndexConfig = { diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala index 40e8bfc9a..332c221cc 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala @@ -18,7 +18,7 @@ package com.microsoft.hyperspace.index.sources import scala.util.{Success, Try} -import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.util.hyperspace.Utils @@ -27,6 +27,7 @@ import com.microsoft.hyperspace.index.Relation import com.microsoft.hyperspace.util.{CacheWithTransform, HyperspaceConf} /** + * * * @param spark */ @@ -50,8 +51,8 @@ class FileBasedSourceProviderManager(spark: SparkSession) { /** * */ - def reconstructDataFrame(relation: Relation): DataFrame = { - run(p => p.reconstructDataFrame(relation)) + def refreshRelation(relation: Relation): Relation = { + run(p => p.refreshRelation(relation)) } /** @@ -109,7 +110,7 @@ class FileBasedSourceProviderManager(spark: SparkSession) { case p: FileBasedSourceProvider => p case other => throw HyperspaceException( - s"Builder '$builder' did not build FileBasedSourceProvider: '$other')") + s"'$builder' did not build FileBasedSourceProvider: '$other')") } } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala index c45ff41aa..c51106af5 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala @@ -22,7 +22,6 @@ import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex} import org.apache.spark.sql.sources.DataSourceRegister -import org.apache.spark.sql.types.{DataType, StructType} import com.microsoft.hyperspace.index.{Content, Hdfs, Relation} import com.microsoft.hyperspace.index.sources.{FileBasedSourceProvider, SourceProvider, SourceProviderBuilder} @@ -77,6 +76,12 @@ class DefaultFileBasedSource(private val spark: SparkSession) extends FileBasedS } } + /** + * Returns true if the given [[FileFormat]] is supported, false otherwise. + * + * @param format [[FileFormat]] object. + * @return true if the given [[FileFormat]] is supported, false otherwise. + */ private def isSupportedFileFormat(format: FileFormat): Boolean = { format match { case d: DataSourceRegister if isSupportedFileFormatName(d.shortName) => true @@ -84,40 +89,41 @@ class DefaultFileBasedSource(private val spark: SparkSession) extends FileBasedS } } + /** + * Returns true if the given format name is supported, false otherwise. + * + * @param name File format name (e.g, parquet, csv, json, etc.). + * @return true if the given format name is supported, false otherwise. + */ private def isSupportedFileFormatName(name: String): Boolean = { supportedFormats.load().contains(name.toLowerCase(Locale.ROOT)) } /** - * Reconstructs [[DataFrame]] using the given [[Relation]]. + * Given a [[Relation]], returns a new [[Relation]] that will have the latest source. * * @param relation [[Relation]] object to reconstruct [[DataFrame]] with. - * @return [[DataFrame]] object if the given 'relation' can be processed by this provider. + * @return [[Relation]] object if the given 'relation' can be processed by this provider. * Otherwise, None. */ - override def reconstructDataFrame(relation: Relation): Option[DataFrame] = { + override def refreshRelation(relation: Relation): Option[Relation] = { if (isSupportedFileFormatName(relation.fileFormat)) { - val dataSchema = DataType.fromJson(relation.dataSchemaJson).asInstanceOf[StructType] - val df = spark.read - .schema(dataSchema) - .format(relation.fileFormat) - .options(relation.options) - .load(relation.rootPaths: _*) - Some(df) + // No change is needed because rootPaths will be pointing to the latest source files. + Some(relation) } else { None } } /** - * Computes the signature using the given [[LogicalRelation]]. + * Computes the signature using the given [[LogicalRelation]]. This computes a signature of + * using all the files found in [[PartitioningAwareFileIndex]]. * * @param logicalRelation logical relation to compute signature from. * @return Signature computed if the given 'logicalRelation' can be processed by this provider. * Otherwise, None. */ override def signature(logicalRelation: LogicalRelation): Option[String] = { - // Currently we are only collecting plan fingerprint from hdfs file based scan nodes. logicalRelation.relation match { case HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, format, _) if isSupportedFileFormat(format) => @@ -141,6 +147,9 @@ class DefaultFileBasedSource(private val spark: SparkSession) extends FileBasedS } } +/** + * Builder for building [[DefaultFileBasedSource]]. + */ class DefaultFileBasedSourceBuilder extends SourceProviderBuilder { override def build(spark: SparkSession): SourceProvider = new DefaultFileBasedSource(spark) } diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala index 6ffff0dfd..6182fe913 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala @@ -71,17 +71,17 @@ trait FileBasedSourceProvider extends SourceProvider { def createRelation(logicalRelation: LogicalRelation): Option[Relation] /** - * Reconstructs [[DataFrame]] using the given [[Relation]]. + * Given a [[Relation]], returns a new [[Relation]] that will have the latest source. * * This API is used when an index is refreshed. * * If the given relation does not belong to this provider, None should be returned. * * @param relation [[Relation]] object to reconstruct [[DataFrame]] with. - * @return [[DataFrame]] object if the given 'relation' can be processed by this provider. + * @return [[Relation]] object if the given 'relation' can be processed by this provider. * Otherwise, None. */ - def reconstructDataFrame(relation: Relation): Option[DataFrame] + def refreshRelation(relation: Relation): Option[Relation] /** * Computes the signature using the given [[LogicalRelation]]. diff --git a/src/main/scala/com/microsoft/hyperspace/util/CacheWithTransform.scala b/src/main/scala/com/microsoft/hyperspace/util/CacheWithTransform.scala index 71c87cba1..78c2b9bab 100644 --- a/src/main/scala/com/microsoft/hyperspace/util/CacheWithTransform.scala +++ b/src/main/scala/com/microsoft/hyperspace/util/CacheWithTransform.scala @@ -17,11 +17,16 @@ package com.microsoft.hyperspace.util /** + * A cache object that stores both initial and final value. + * The initial value is produced by 'init' and the final value is produced by 'transform' + * using the initial value. * - * @param init - * @param transform - * @tparam A - * @tparam B + * The cached initial/final values will be updated only if the initial value is modified. + * + * @param init Function to produce an initial value. + * @param transform Function to transform the initial value. + * @tparam A Type of the initial value. + * @tparam B Type of the final value. */ class CacheWithTransform[A, B](val init: () => A, val transform: A => B) { private var initValue: A = _ From d10909d33124306a6ad2bd0f56d8ef5b620ea9ca Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Tue, 27 Oct 2020 19:05:18 -0700 Subject: [PATCH 09/13] Add comments --- .../FileBasedSourceProviderManager.scala | 61 +++++++++++++------ 1 file changed, 42 insertions(+), 19 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala index 332c221cc..6f52f45e5 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala @@ -27,63 +27,82 @@ import com.microsoft.hyperspace.index.Relation import com.microsoft.hyperspace.util.{CacheWithTransform, HyperspaceConf} /** + * [[FileBasedSourceProviderManager]] is responsible for loading source providers which implements + * [[FileBasedSourceProvider]] and running APIs for each provider loaded. * + * Each API in [[FileBasedSourceProvider]] returns [[Option]] and this manager ensures that only + * one provider returns [[Some]] for each API. * - * @param spark + * @param spark Spark session. */ class FileBasedSourceProviderManager(spark: SparkSession) { private val sourceProviders: CacheWithTransform[String, Seq[FileBasedSourceProvider]] = new CacheWithTransform[String, Seq[FileBasedSourceProvider]]({ () => HyperspaceConf.fileBasedSourceBuilders(spark) }, { builderClassNames => - providers(builderClassNames) + buildProviders(builderClassNames) }) /** + * Runs createRelation() for each provider. * - * @param logicalRelation - * @return + * @param logicalRelation Logical relation to create [[Relation]] from. + * @return [[Relation]] created from the given logical relation. + * @throws HyperspaceException if multiple providers returns [[Some]] or + * if no providers return [[Some]]. */ def createRelation(logicalRelation: LogicalRelation): Relation = { run(p => p.createRelation(logicalRelation)) } /** + * Runs refreshRelation() for each provider. * + * @param relation [[Relation]] to refresh. + * @return Refreshed [[Relation]]. + * @throws HyperspaceException if multiple providers returns [[Some]] or + * if no providers return [[Some]]. */ def refreshRelation(relation: Relation): Relation = { run(p => p.refreshRelation(relation)) } /** + * Runs signature() for each provider. * - * @param logicalRelation - * @return + * @param logicalRelation Logical relation to compute signature from. + * @return Computed signature. + * @throws HyperspaceException if multiple providers returns [[Some]] or + * if no providers return [[Some]]. */ def signature(logicalRelation: LogicalRelation): String = { run(p => p.signature(logicalRelation)) } /** + * Runs the given function 'f', which executes a [[FileBasedSourceProvider]]'s API that returns + * [[Option]] for each provider built. This function ensures that only one provider returns + * [[Some]] when 'f' is executed. * - * @param f - * @tparam A - * @return + * @param f Function that runs a [[FileBasedSourceProvider]]'s API that returns [[Option]] + * given a provider. + * @tparam A Type of the object that 'f' returns, wrapped in [[Option]]. + * @return The object in [[Some]] that 'f' returns. */ private def run[A](f: FileBasedSourceProvider => Option[A]): A = { sourceProviders .load() - .foldLeft(Option.empty[(A, FileBasedSourceProvider)]) { (acc, p) => - val result = f(p) - if (result.isDefined) { - if (acc.isDefined) { + .foldLeft(Option.empty[(A, FileBasedSourceProvider)]) { (result, provider) => + val cur = f(provider) + if (cur.isDefined) { + if (result.isDefined) { throw HyperspaceException( "Multiple source providers returned valid results: " + - s"'${p.getClass.getName}' and '${acc.get._2.getClass.getName}'") + s"'${provider.getClass.getName}' and '${result.get._2.getClass.getName}'") } - Some(result.get, p) + Some(cur.get, provider) } else { - acc + result } } .map(_._1) @@ -93,11 +112,15 @@ class FileBasedSourceProviderManager(spark: SparkSession) { } /** + * Given a comma separated class names that implement [[SourceProviderBuilder]], this method + * builds source providers that implement [[FileBasedSourceProvider]]. * - * @param builderClassNames - * @return + * @param builderClassNames Name of classes to load as [[SourceProviderBuilder]]. + * @return [[FileBasedSourceProvider]] objects built. + * @throws HyperspaceException if given builders cannot be loaded or + * if builder doesn't build [[FileBasedSourceProvider]]. */ - private def providers(builderClassNames: String): Seq[FileBasedSourceProvider] = { + private def buildProviders(builderClassNames: String): Seq[FileBasedSourceProvider] = { val builders = builderClassNames.split(",").map(_.trim).map { name => Try(Utils.classForName(name).getConstructor().newInstance()) match { case Success(builder: SourceProviderBuilder) => builder From 4f2f4b799b08cc9f77c6d5b4330a3d2a26b87274 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Sun, 22 Nov 2020 20:57:54 -0800 Subject: [PATCH 10/13] Add fileIdTracker to API --- .../index/sources/FileBasedSourceProviderManager.scala | 7 ++++--- .../index/sources/default/DefaultFileBasedSource.scala | 10 +++++++--- .../hyperspace/index/sources/interfaces.scala | 9 +++++++-- 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala index 6f52f45e5..91004136e 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.util.hyperspace.Utils import com.microsoft.hyperspace.HyperspaceException -import com.microsoft.hyperspace.index.Relation +import com.microsoft.hyperspace.index.{FileIdTracker, Relation} import com.microsoft.hyperspace.util.{CacheWithTransform, HyperspaceConf} /** @@ -47,12 +47,13 @@ class FileBasedSourceProviderManager(spark: SparkSession) { * Runs createRelation() for each provider. * * @param logicalRelation Logical relation to create [[Relation]] from. + * @param fileIdTracker [[FileIdTracker]] to use when populating the data of [[Relation]]. * @return [[Relation]] created from the given logical relation. * @throws HyperspaceException if multiple providers returns [[Some]] or * if no providers return [[Some]]. */ - def createRelation(logicalRelation: LogicalRelation): Relation = { - run(p => p.createRelation(logicalRelation)) + def createRelation(logicalRelation: LogicalRelation, fileIdTracker: FileIdTracker): Relation = { + run(p => p.createRelation(logicalRelation, fileIdTracker)) } /** diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala index c51106af5..723de32e8 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex} import org.apache.spark.sql.sources.DataSourceRegister -import com.microsoft.hyperspace.index.{Content, Hdfs, Relation} +import com.microsoft.hyperspace.index.{Content, FileIdTracker, Hdfs, Relation} import com.microsoft.hyperspace.index.sources.{FileBasedSourceProvider, SourceProvider, SourceProviderBuilder} import com.microsoft.hyperspace.util.{CacheWithTransform, HashingUtils, HyperspaceConf} @@ -46,10 +46,13 @@ class DefaultFileBasedSource(private val spark: SparkSession) extends FileBasedS * Creates [[Relation]] for IndexLogEntry using the given [[LogicalRelation]]. * * @param logicalRelation logical relation to derive [[Relation]] from. + * @param fileIdTracker [[FileIdTracker]] to use when populating the data of [[Relation]]. * @return [[Relation]] object if the given 'logicalRelation' can be processed by this provider. * Otherwise, None. */ - override def createRelation(logicalRelation: LogicalRelation): Option[Relation] = { + override def createRelation( + logicalRelation: LogicalRelation, + fileIdTracker: FileIdTracker): Option[Relation] = { logicalRelation.relation match { case HadoopFsRelation( location: PartitioningAwareFileIndex, @@ -61,7 +64,8 @@ class DefaultFileBasedSource(private val spark: SparkSession) extends FileBasedS val files = location.allFiles // Note that source files are currently fingerprinted when the optimized plan is // fingerprinted by LogicalPlanFingerprint. - val sourceDataProperties = Hdfs.Properties(Content.fromLeafFiles(files).get) + val sourceDataProperties = + Hdfs.Properties(Content.fromLeafFiles(files, fileIdTracker).get) val fileFormatName = fileFormat.asInstanceOf[DataSourceRegister].shortName // "path" key in options can incur multiple data read unexpectedly. val opts = options - "path" diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala index 6182fe913..03cb0b431 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala @@ -19,7 +19,7 @@ package com.microsoft.hyperspace.index.sources import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.execution.datasources.LogicalRelation -import com.microsoft.hyperspace.index.Relation +import com.microsoft.hyperspace.index.{FileIdTracker, Relation} /** * ::Experimental:: @@ -40,6 +40,7 @@ trait SourceProvider * @since 0.3.0 */ trait SourceProviderBuilder { + /** * Builds a [[SourceProvider]]. * @@ -57,6 +58,7 @@ trait SourceProviderBuilder { * @since 0.3.0 */ trait FileBasedSourceProvider extends SourceProvider { + /** * Creates [[Relation]] for IndexLogEntry using the given [[LogicalRelation]]. * @@ -65,10 +67,13 @@ trait FileBasedSourceProvider extends SourceProvider { * If the given logical relation does not belong to this provider, None should be returned. * * @param logicalRelation logical relation to derive [[Relation]] from. + * @param fileIdTracker [[FileIdTracker]] to use when populating the data of [[Relation]]. * @return [[Relation]] object if the given 'logicalRelation' can be processed by this provider. * Otherwise, None. */ - def createRelation(logicalRelation: LogicalRelation): Option[Relation] + def createRelation( + logicalRelation: LogicalRelation, + fileIdTracker: FileIdTracker): Option[Relation] /** * Given a [[Relation]], returns a new [[Relation]] that will have the latest source. From b513cb318b026754ab17c92d8ad3272070bac7d9 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Tue, 24 Nov 2020 13:27:01 -0800 Subject: [PATCH 11/13] Fix tests --- .../index/sources/default/DefaultFileBasedSource.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala index 723de32e8..68c1b8cbc 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala @@ -131,8 +131,9 @@ class DefaultFileBasedSource(private val spark: SparkSession) extends FileBasedS logicalRelation.relation match { case HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, format, _) if isSupportedFileFormat(format) => - val result = location.allFiles.foldLeft("") { (acc: String, f: FileStatus) => - HashingUtils.md5Hex(acc + fingerprint(f)) + val result = location.allFiles.sortBy(_.getPath.toString).foldLeft("") { + (acc: String, f: FileStatus) => + HashingUtils.md5Hex(acc + fingerprint(f)) } Some(result) case _ => None From 717ea966ebfbf05316639441a57a6247bcadef64 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Tue, 24 Nov 2020 13:28:13 -0800 Subject: [PATCH 12/13] Add final --- .../com/microsoft/hyperspace/actions/CreateActionBase.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala b/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala index e8c13a1ad..0ef60f86a 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala @@ -97,13 +97,13 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager) } } - protected def sourceRelations(spark: SparkSession, df: DataFrame): Seq[Relation] = + final protected def sourceRelations(spark: SparkSession, df: DataFrame): Seq[Relation] = df.queryExecution.optimizedPlan.collect { case p: LogicalRelation => Hyperspace.getContext(spark).sourceProviderManager.createRelation(p, fileIdTracker) } - protected def write(spark: SparkSession, df: DataFrame, indexConfig: IndexConfig): Unit = { + final protected def write(spark: SparkSession, df: DataFrame, indexConfig: IndexConfig): Unit = { val numBuckets = numBucketsForIndex(spark) val (indexDataFrame, resolvedIndexedColumns, _) = From cf737e52f5b25922d5a5f97c2df1d6e3c936a0d3 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Tue, 24 Nov 2020 19:24:49 -0800 Subject: [PATCH 13/13] update since version to 0.4.0 --- .../com/microsoft/hyperspace/actions/CreateActionBase.scala | 6 +++--- .../com/microsoft/hyperspace/index/sources/interfaces.scala | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala b/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala index 0ef60f86a..0b947e4ae 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala @@ -47,7 +47,7 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager) HyperspaceConf.indexLineageEnabled(spark) } - final protected def getIndexLogEntry( + protected def getIndexLogEntry( spark: SparkSession, df: DataFrame, indexConfig: IndexConfig, @@ -97,13 +97,13 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager) } } - final protected def sourceRelations(spark: SparkSession, df: DataFrame): Seq[Relation] = + protected def sourceRelations(spark: SparkSession, df: DataFrame): Seq[Relation] = df.queryExecution.optimizedPlan.collect { case p: LogicalRelation => Hyperspace.getContext(spark).sourceProviderManager.createRelation(p, fileIdTracker) } - final protected def write(spark: SparkSession, df: DataFrame, indexConfig: IndexConfig): Unit = { + protected def write(spark: SparkSession, df: DataFrame, indexConfig: IndexConfig): Unit = { val numBuckets = numBucketsForIndex(spark) val (indexDataFrame, resolvedIndexedColumns, _) = diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala index 03cb0b431..120396c54 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala @@ -26,7 +26,7 @@ import com.microsoft.hyperspace.index.{FileIdTracker, Relation} * A trait that a data source should implement so that an index can be created/managed and * utilized for the data source. * - * @since 0.3.0 + * @since 0.4.0 */ trait SourceProvider @@ -37,7 +37,7 @@ trait SourceProvider * * The reason for having a builder is to inject [[SparkSession]] to the source provider if needed. * - * @since 0.3.0 + * @since 0.4.0 */ trait SourceProviderBuilder { @@ -55,7 +55,7 @@ trait SourceProviderBuilder { * A trait that a data source should implement so that an index can be created/managed and * utilized for the data source. * - * @since 0.3.0 + * @since 0.4.0 */ trait FileBasedSourceProvider extends SourceProvider {