diff --git a/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala b/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala index f4df20177..17fc1ca06 100644 --- a/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala +++ b/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession} import com.microsoft.hyperspace.index._ import com.microsoft.hyperspace.index.IndexConstants.{OPTIMIZE_MODE_QUICK, REFRESH_MODE_FULL} import com.microsoft.hyperspace.index.plananalysis.PlanAnalyzer +import com.microsoft.hyperspace.index.sources.FileBasedSourceProviderManager class Hyperspace(spark: SparkSession) { private val indexManager: IndexManager = Hyperspace.getContext(spark).indexCollectionManager @@ -169,6 +170,14 @@ object Hyperspace { context.get() } + private[hyperspace] def getContext: HyperspaceContext = { + val sparkSession = SparkSession.getActiveSession.getOrElse { + throw HyperspaceException("Could not find active SparkSession.") + } + + getContext(sparkSession) + } + def apply(): Hyperspace = { val sparkSession = SparkSession.getActiveSession.getOrElse { throw HyperspaceException("Could not find active SparkSession.") @@ -180,4 +189,6 @@ object Hyperspace { private[hyperspace] class HyperspaceContext(val spark: SparkSession) { val indexCollectionManager = CachingIndexCollectionManager(spark) + + val sourceProviderManager = new FileBasedSourceProviderManager(spark) } diff --git a/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala b/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala index 803a62568..0b947e4ae 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/CreateActionBase.scala @@ -18,11 +18,10 @@ package com.microsoft.hyperspace.actions import org.apache.hadoop.fs.Path import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex} +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.functions.input_file_name -import org.apache.spark.sql.sources.DataSourceRegister -import com.microsoft.hyperspace.HyperspaceException +import com.microsoft.hyperspace.{Hyperspace, HyperspaceException} import com.microsoft.hyperspace.index._ import com.microsoft.hyperspace.index.DataFrameWriterExtensions.Bucketizer import com.microsoft.hyperspace.util.{HyperspaceConf, PathUtils, ResolverUtils} @@ -64,7 +63,7 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager) signatureProvider.signature(df.queryExecution.optimizedPlan) match { case Some(s) => - val relations = sourceRelations(df) + val relations = sourceRelations(spark, df) // Currently we only support to create an index on a LogicalRelation. assert(relations.size == 1) @@ -98,37 +97,10 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager) } } - protected def sourceRelations(df: DataFrame): Seq[Relation] = + protected def sourceRelations(spark: SparkSession, df: DataFrame): Seq[Relation] = df.queryExecution.optimizedPlan.collect { - case LogicalRelation( - HadoopFsRelation( - location: PartitioningAwareFileIndex, - _, - dataSchema, - _, - fileFormat, - options), - _, - _, - _) => - val files = location.allFiles - // Note that source files are currently fingerprinted when the optimized plan is - // fingerprinted by LogicalPlanFingerprint. - val sourceDataProperties = - Hdfs.Properties(Content.fromLeafFiles(files, fileIdTracker).get) - - val fileFormatName = fileFormat match { - case d: DataSourceRegister => d.shortName - case other => throw HyperspaceException(s"Unsupported file format: $other") - } - // "path" key in options can incur multiple data read unexpectedly. - val opts = options - "path" - Relation( - location.rootPaths.map(_.toString), - Hdfs(sourceDataProperties), - dataSchema.json, - fileFormatName, - opts) + case p: LogicalRelation => + Hyperspace.getContext(spark).sourceProviderManager.createRelation(p, fileIdTracker) } protected def write(spark: SparkSession, df: DataFrame, indexConfig: IndexConfig): Unit = { diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala index dd6b82ff5..914152571 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala @@ -20,7 +20,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex} import org.apache.spark.sql.types.{DataType, StructType} -import com.microsoft.hyperspace.HyperspaceException +import com.microsoft.hyperspace.{Hyperspace, HyperspaceException} import com.microsoft.hyperspace.actions.Constants.States.{ACTIVE, REFRESHING} import com.microsoft.hyperspace.index._ @@ -66,13 +66,15 @@ private[actions] abstract class RefreshActionBase( // Reconstruct a df from schema protected lazy val df = { - val rels = previousIndexLogEntry.relations - val dataSchema = DataType.fromJson(rels.head.dataSchemaJson).asInstanceOf[StructType] + val relations = previousIndexLogEntry.relations + val latestRelation = + Hyperspace.getContext(spark).sourceProviderManager.refreshRelation(relations.head) + val dataSchema = DataType.fromJson(latestRelation.dataSchemaJson).asInstanceOf[StructType] spark.read .schema(dataSchema) - .format(rels.head.fileFormat) - .options(rels.head.options) - .load(rels.head.rootPaths: _*) + .format(latestRelation.fileFormat) + .options(latestRelation.options) + .load(latestRelation.rootPaths: _*) } protected lazy val indexConfig: IndexConfig = { diff --git a/src/main/scala/com/microsoft/hyperspace/index/FileBasedSignatureProvider.scala b/src/main/scala/com/microsoft/hyperspace/index/FileBasedSignatureProvider.scala index 860fe0788..9bcd7fd1d 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/FileBasedSignatureProvider.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/FileBasedSignatureProvider.scala @@ -16,10 +16,10 @@ package com.microsoft.hyperspace.index -import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import com.microsoft.hyperspace.{Hyperspace, HyperspaceException} import com.microsoft.hyperspace.util.HashingUtils /** @@ -49,15 +49,8 @@ class FileBasedSignatureProvider extends LogicalPlanSignatureProvider { private def fingerprintVisitor(logicalPlan: LogicalPlan): Option[String] = { var fingerprint = "" logicalPlan.foreachUp { - // Currently we are only collecting plan fingerprint from hdfs file based scan nodes. - case LogicalRelation( - HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _), - _, - _, - _) => - fingerprint ++= location.allFiles.sortBy(_.getPath.toString).foldLeft("")( - (accumulate: String, fileStatus: FileStatus) => - HashingUtils.md5Hex(accumulate + getFingerprint(fileStatus))) + case p: LogicalRelation => + fingerprint ++= Hyperspace.getContext.sourceProviderManager.signature(p) case _ => } @@ -66,15 +59,4 @@ class FileBasedSignatureProvider extends LogicalPlanSignatureProvider { case _ => Some(fingerprint) } } - - /** - * Get the fingerprint of a file. - * - * @param fileStatus file status. - * @return the fingerprint of a file. - */ - private def getFingerprint(fileStatus: FileStatus): String = { - fileStatus.getLen.toString + fileStatus.getModificationTime.toString + - fileStatus.getPath.toString - } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala new file mode 100644 index 000000000..91004136e --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/FileBasedSourceProviderManager.scala @@ -0,0 +1,141 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.sources + +import scala.util.{Success, Try} + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.util.hyperspace.Utils + +import com.microsoft.hyperspace.HyperspaceException +import com.microsoft.hyperspace.index.{FileIdTracker, Relation} +import com.microsoft.hyperspace.util.{CacheWithTransform, HyperspaceConf} + +/** + * [[FileBasedSourceProviderManager]] is responsible for loading source providers which implements + * [[FileBasedSourceProvider]] and running APIs for each provider loaded. + * + * Each API in [[FileBasedSourceProvider]] returns [[Option]] and this manager ensures that only + * one provider returns [[Some]] for each API. + * + * @param spark Spark session. + */ +class FileBasedSourceProviderManager(spark: SparkSession) { + private val sourceProviders: CacheWithTransform[String, Seq[FileBasedSourceProvider]] = + new CacheWithTransform[String, Seq[FileBasedSourceProvider]]({ () => + HyperspaceConf.fileBasedSourceBuilders(spark) + }, { builderClassNames => + buildProviders(builderClassNames) + }) + + /** + * Runs createRelation() for each provider. + * + * @param logicalRelation Logical relation to create [[Relation]] from. + * @param fileIdTracker [[FileIdTracker]] to use when populating the data of [[Relation]]. + * @return [[Relation]] created from the given logical relation. + * @throws HyperspaceException if multiple providers returns [[Some]] or + * if no providers return [[Some]]. + */ + def createRelation(logicalRelation: LogicalRelation, fileIdTracker: FileIdTracker): Relation = { + run(p => p.createRelation(logicalRelation, fileIdTracker)) + } + + /** + * Runs refreshRelation() for each provider. + * + * @param relation [[Relation]] to refresh. + * @return Refreshed [[Relation]]. + * @throws HyperspaceException if multiple providers returns [[Some]] or + * if no providers return [[Some]]. + */ + def refreshRelation(relation: Relation): Relation = { + run(p => p.refreshRelation(relation)) + } + + /** + * Runs signature() for each provider. + * + * @param logicalRelation Logical relation to compute signature from. + * @return Computed signature. + * @throws HyperspaceException if multiple providers returns [[Some]] or + * if no providers return [[Some]]. + */ + def signature(logicalRelation: LogicalRelation): String = { + run(p => p.signature(logicalRelation)) + } + + /** + * Runs the given function 'f', which executes a [[FileBasedSourceProvider]]'s API that returns + * [[Option]] for each provider built. This function ensures that only one provider returns + * [[Some]] when 'f' is executed. + * + * @param f Function that runs a [[FileBasedSourceProvider]]'s API that returns [[Option]] + * given a provider. + * @tparam A Type of the object that 'f' returns, wrapped in [[Option]]. + * @return The object in [[Some]] that 'f' returns. + */ + private def run[A](f: FileBasedSourceProvider => Option[A]): A = { + sourceProviders + .load() + .foldLeft(Option.empty[(A, FileBasedSourceProvider)]) { (result, provider) => + val cur = f(provider) + if (cur.isDefined) { + if (result.isDefined) { + throw HyperspaceException( + "Multiple source providers returned valid results: " + + s"'${provider.getClass.getName}' and '${result.get._2.getClass.getName}'") + } + Some(cur.get, provider) + } else { + result + } + } + .map(_._1) + .getOrElse { + throw HyperspaceException("No source provider returned valid results.") + } + } + + /** + * Given a comma separated class names that implement [[SourceProviderBuilder]], this method + * builds source providers that implement [[FileBasedSourceProvider]]. + * + * @param builderClassNames Name of classes to load as [[SourceProviderBuilder]]. + * @return [[FileBasedSourceProvider]] objects built. + * @throws HyperspaceException if given builders cannot be loaded or + * if builder doesn't build [[FileBasedSourceProvider]]. + */ + private def buildProviders(builderClassNames: String): Seq[FileBasedSourceProvider] = { + val builders = builderClassNames.split(",").map(_.trim).map { name => + Try(Utils.classForName(name).getConstructor().newInstance()) match { + case Success(builder: SourceProviderBuilder) => builder + case _ => throw HyperspaceException(s"Cannot load SourceProviderBuilder: '$name'") + } + } + + builders.map { builder => + builder.build(spark) match { + case p: FileBasedSourceProvider => p + case other => + throw HyperspaceException( + s"'$builder' did not build FileBasedSourceProvider: '$other')") + } + } + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala new file mode 100644 index 000000000..68c1b8cbc --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala @@ -0,0 +1,160 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.sources.default + +import java.util.Locale + +import org.apache.hadoop.fs.FileStatus +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex} +import org.apache.spark.sql.sources.DataSourceRegister + +import com.microsoft.hyperspace.index.{Content, FileIdTracker, Hdfs, Relation} +import com.microsoft.hyperspace.index.sources.{FileBasedSourceProvider, SourceProvider, SourceProviderBuilder} +import com.microsoft.hyperspace.util.{CacheWithTransform, HashingUtils, HyperspaceConf} + +/** + * Default implementation for file-based Spark built-in sources such as parquet, csv, json, etc. + * + * This source can support relations that meet the following criteria: + * - The relation is [[HadoopFsRelation]] with [[PartitioningAwareFileIndex]] as file index. + * - Its file format implements [[DataSourceRegister]]. + */ +class DefaultFileBasedSource(private val spark: SparkSession) extends FileBasedSourceProvider { + private val supportedFormats: CacheWithTransform[String, Set[String]] = + new CacheWithTransform[String, Set[String]]({ () => + HyperspaceConf.supportedFileFormatsForDefaultFileBasedSource(spark) + }, { formats => + formats.toLowerCase(Locale.ROOT).split(",").map(_.trim).toSet + }) + + /** + * Creates [[Relation]] for IndexLogEntry using the given [[LogicalRelation]]. + * + * @param logicalRelation logical relation to derive [[Relation]] from. + * @param fileIdTracker [[FileIdTracker]] to use when populating the data of [[Relation]]. + * @return [[Relation]] object if the given 'logicalRelation' can be processed by this provider. + * Otherwise, None. + */ + override def createRelation( + logicalRelation: LogicalRelation, + fileIdTracker: FileIdTracker): Option[Relation] = { + logicalRelation.relation match { + case HadoopFsRelation( + location: PartitioningAwareFileIndex, + _, + dataSchema, + _, + fileFormat, + options) if isSupportedFileFormat(fileFormat) => + val files = location.allFiles + // Note that source files are currently fingerprinted when the optimized plan is + // fingerprinted by LogicalPlanFingerprint. + val sourceDataProperties = + Hdfs.Properties(Content.fromLeafFiles(files, fileIdTracker).get) + val fileFormatName = fileFormat.asInstanceOf[DataSourceRegister].shortName + // "path" key in options can incur multiple data read unexpectedly. + val opts = options - "path" + Some( + Relation( + location.rootPaths.map(_.toString), + Hdfs(sourceDataProperties), + dataSchema.json, + fileFormatName, + opts)) + case _ => None + } + } + + /** + * Returns true if the given [[FileFormat]] is supported, false otherwise. + * + * @param format [[FileFormat]] object. + * @return true if the given [[FileFormat]] is supported, false otherwise. + */ + private def isSupportedFileFormat(format: FileFormat): Boolean = { + format match { + case d: DataSourceRegister if isSupportedFileFormatName(d.shortName) => true + case _ => false + } + } + + /** + * Returns true if the given format name is supported, false otherwise. + * + * @param name File format name (e.g, parquet, csv, json, etc.). + * @return true if the given format name is supported, false otherwise. + */ + private def isSupportedFileFormatName(name: String): Boolean = { + supportedFormats.load().contains(name.toLowerCase(Locale.ROOT)) + } + + /** + * Given a [[Relation]], returns a new [[Relation]] that will have the latest source. + * + * @param relation [[Relation]] object to reconstruct [[DataFrame]] with. + * @return [[Relation]] object if the given 'relation' can be processed by this provider. + * Otherwise, None. + */ + override def refreshRelation(relation: Relation): Option[Relation] = { + if (isSupportedFileFormatName(relation.fileFormat)) { + // No change is needed because rootPaths will be pointing to the latest source files. + Some(relation) + } else { + None + } + } + + /** + * Computes the signature using the given [[LogicalRelation]]. This computes a signature of + * using all the files found in [[PartitioningAwareFileIndex]]. + * + * @param logicalRelation logical relation to compute signature from. + * @return Signature computed if the given 'logicalRelation' can be processed by this provider. + * Otherwise, None. + */ + override def signature(logicalRelation: LogicalRelation): Option[String] = { + logicalRelation.relation match { + case HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, format, _) + if isSupportedFileFormat(format) => + val result = location.allFiles.sortBy(_.getPath.toString).foldLeft("") { + (acc: String, f: FileStatus) => + HashingUtils.md5Hex(acc + fingerprint(f)) + } + Some(result) + case _ => None + } + } + + /** + * Fingerprints a file. + * + * @param fileStatus file status. + * @return the fingerprint of a file. + */ + private def fingerprint(fileStatus: FileStatus): String = { + fileStatus.getLen.toString + fileStatus.getModificationTime.toString + + fileStatus.getPath.toString + } +} + +/** + * Builder for building [[DefaultFileBasedSource]]. + */ +class DefaultFileBasedSourceBuilder extends SourceProviderBuilder { + override def build(spark: SparkSession): SourceProvider = new DefaultFileBasedSource(spark) +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala new file mode 100644 index 000000000..120396c54 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/interfaces.scala @@ -0,0 +1,104 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index.sources + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.execution.datasources.LogicalRelation + +import com.microsoft.hyperspace.index.{FileIdTracker, Relation} + +/** + * ::Experimental:: + * A trait that a data source should implement so that an index can be created/managed and + * utilized for the data source. + * + * @since 0.4.0 + */ +trait SourceProvider + +/** + * ::Experimental:: + * A trait that a source provider's builder should implement. Each source provider should have an + * accompanying builder in order to be plugged into the SourceProviderManager. + * + * The reason for having a builder is to inject [[SparkSession]] to the source provider if needed. + * + * @since 0.4.0 + */ +trait SourceProviderBuilder { + + /** + * Builds a [[SourceProvider]]. + * + * @param spark Spark session. + * @return [[SourceProvider]] object. + */ + def build(spark: SparkSession): SourceProvider +} + +/** + * ::Experimental:: + * A trait that a data source should implement so that an index can be created/managed and + * utilized for the data source. + * + * @since 0.4.0 + */ +trait FileBasedSourceProvider extends SourceProvider { + + /** + * Creates [[Relation]] for IndexLogEntry using the given [[LogicalRelation]]. + * + * This API is used when an index is created. + * + * If the given logical relation does not belong to this provider, None should be returned. + * + * @param logicalRelation logical relation to derive [[Relation]] from. + * @param fileIdTracker [[FileIdTracker]] to use when populating the data of [[Relation]]. + * @return [[Relation]] object if the given 'logicalRelation' can be processed by this provider. + * Otherwise, None. + */ + def createRelation( + logicalRelation: LogicalRelation, + fileIdTracker: FileIdTracker): Option[Relation] + + /** + * Given a [[Relation]], returns a new [[Relation]] that will have the latest source. + * + * This API is used when an index is refreshed. + * + * If the given relation does not belong to this provider, None should be returned. + * + * @param relation [[Relation]] object to reconstruct [[DataFrame]] with. + * @return [[Relation]] object if the given 'relation' can be processed by this provider. + * Otherwise, None. + */ + def refreshRelation(relation: Relation): Option[Relation] + + /** + * Computes the signature using the given [[LogicalRelation]]. + * + * This API is used when the signature of source needs to be computed, e.g., creating an index, + * computing query plan's signature, etc. + * + * If the given logical relation does not belong to this provider, None should be returned. + * + * @param logicalRelation logical relation to compute signature from. + * @return Signature computed if the given 'logicalRelation' can be processed by this provider. + * Otherwise, None. + */ + def signature(logicalRelation: LogicalRelation): Option[String] +} diff --git a/src/main/scala/com/microsoft/hyperspace/util/CacheWithTransform.scala b/src/main/scala/com/microsoft/hyperspace/util/CacheWithTransform.scala new file mode 100644 index 000000000..78c2b9bab --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/util/CacheWithTransform.scala @@ -0,0 +1,45 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.util + +/** + * A cache object that stores both initial and final value. + * The initial value is produced by 'init' and the final value is produced by 'transform' + * using the initial value. + * + * The cached initial/final values will be updated only if the initial value is modified. + * + * @param init Function to produce an initial value. + * @param transform Function to transform the initial value. + * @tparam A Type of the initial value. + * @tparam B Type of the final value. + */ +class CacheWithTransform[A, B](val init: () => A, val transform: A => B) { + private var initValue: A = _ + private var finalValue: B = _ + + def load(): B = { + val newInitValue = init() + if (initValue == newInitValue) { + finalValue + } else { + initValue = newInitValue + finalValue = transform(initValue) + finalValue + } + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala index 45cd47add..b7dfef307 100644 --- a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala +++ b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala @@ -71,6 +71,20 @@ object HyperspaceConf { .toBoolean } + def fileBasedSourceBuilders(spark: SparkSession): String = { + spark.sessionState.conf + .getConfString( + "spark.hyperspace.index.sources.fileBasedBuilders", + "com.microsoft.hyperspace.index.sources.default.DefaultFileBasedSourceBuilder") + } + + def supportedFileFormatsForDefaultFileBasedSource(spark: SparkSession): String = { + spark.sessionState.conf + .getConfString( + "spark.hyperspace.index.sources.defaultFileBasedSource.supportedFileFormats", + "avro,csv,json,orc,parquet,text") + } + /** * Returns the config value whose key matches the first key given multiple keys. If no keys are * matched, the given default value is returned. diff --git a/src/test/scala/com/microsoft/hyperspace/actions/CreateActionTest.scala b/src/test/scala/com/microsoft/hyperspace/actions/CreateActionTest.scala index c276fd640..8b8bf4240 100644 --- a/src/test/scala/com/microsoft/hyperspace/actions/CreateActionTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/actions/CreateActionTest.scala @@ -38,7 +38,7 @@ class CreateActionTest extends SparkFunSuite with SparkInvolvedSuite with SQLHel private val mockDataManager: IndexDataManager = mock(classOf[IndexDataManager]) object CreateActionBaseWrapper extends CreateActionBase(mockDataManager) { - def getSourceRelations(df: DataFrame): Seq[Relation] = sourceRelations(df) + def getSourceRelations(df: DataFrame): Seq[Relation] = sourceRelations(spark, df) } override def beforeAll(): Unit = { diff --git a/src/test/scala/com/microsoft/hyperspace/actions/RefreshActionTest.scala b/src/test/scala/com/microsoft/hyperspace/actions/RefreshActionTest.scala index 028f0c51a..513308c78 100644 --- a/src/test/scala/com/microsoft/hyperspace/actions/RefreshActionTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/actions/RefreshActionTest.scala @@ -35,7 +35,7 @@ class RefreshActionTest extends SparkFunSuite with SparkInvolvedSuite { private var testLogEntry: LogEntry = _ object CreateActionBaseWrapper extends CreateActionBase(mockDataManager) { - def getSourceRelations(df: DataFrame): Seq[Relation] = sourceRelations(df) + def getSourceRelations(df: DataFrame): Seq[Relation] = sourceRelations(spark, df) } private def updateSourceFiles(): Unit = {