From f34f40c610db42630e1d117259e5bb02e38ec65e Mon Sep 17 00:00:00 2001 From: Apoorve Dave Date: Tue, 15 Dec 2020 11:29:57 -0800 Subject: [PATCH 1/5] initial commit --- .../com/microsoft/hyperspace/Hyperspace.scala | 6 +- .../hyperspace/actions/RefreshScan.scala | 158 +++++++++++ .../index/CachingIndexCollectionManager.scala | 4 +- .../index/IndexCollectionManager.scala | 19 +- .../hyperspace/index/IndexManager.scala | 2 +- .../default/DefaultFileBasedSource.scala | 26 +- .../microsoft/hyperspace/util/PathUtils.scala | 22 ++ .../index/RefreshScanTestsGlobData.scala | 199 ++++++++++++++ .../RefreshScanTestsPartitionedData.scala | 245 ++++++++++++++++++ 9 files changed, 671 insertions(+), 10 deletions(-) create mode 100644 src/main/scala/com/microsoft/hyperspace/actions/RefreshScan.scala create mode 100644 src/test/scala/com/microsoft/hyperspace/index/RefreshScanTestsGlobData.scala create mode 100644 src/test/scala/com/microsoft/hyperspace/index/RefreshScanTestsPartitionedData.scala diff --git a/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala b/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala index 17fc1ca06..07e6cd0e5 100644 --- a/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala +++ b/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala @@ -87,7 +87,11 @@ class Hyperspace(spark: SparkSession) { * @param mode Refresh mode. Currently supported modes are `incremental` and `full`. */ def refreshIndex(indexName: String, mode: String): Unit = { - indexManager.refresh(indexName, mode) + indexManager.refresh(indexName, mode, None) + } + + def refreshIndex(indexName: String, mode: String, scanPattern: Option[String]): Unit = { + indexManager.refresh(indexName, mode, scanPattern) } /** diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshScan.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshScan.scala new file mode 100644 index 000000000..f730e4d58 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshScan.scala @@ -0,0 +1,158 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.actions + +import scala.util.{Failure, Success, Try} + +import org.apache.commons.io.FilenameUtils +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex} +import org.apache.spark.sql.types.{DataType, StructType} + +import com.microsoft.hyperspace.index._ + +class RefreshScan( + spark: SparkSession, + logManager: IndexLogManager, + dataManager: IndexDataManager, + scanPattern: String) + extends RefreshIncrementalAction(spark, logManager, dataManager) { + + /** df representing the complete set of data files which will be indexed once this refresh action + * finishes. */ + override protected lazy val df = { + val relation = previousIndexLogEntry.relations.head + val previouslyIndexedData = relation.data.properties.content + val newlyIndexedData = previouslyIndexedData.fileInfos -- deletedFiles ++ appendedFiles + val newlyIndexedDataFiles: Seq[String] = newlyIndexedData.map(_.name).toSeq + + val dataSchema = DataType.fromJson(relation.dataSchemaJson).asInstanceOf[StructType] + spark.read + .schema(dataSchema) + .format(relation.fileFormat) + .options(relation.options) + .load(newlyIndexedDataFiles: _*) + } + + def isMatch(path: String, scanPattern: String): Boolean = { + val scanSplits = scanPattern.split(Path.SEPARATOR) + scanSplits.nonEmpty && path.split(Path.SEPARATOR).contains(scanSplits.head) + } + + def resolve(path: String, scanPattern: String): String = { + val scanSplits: Array[String] = scanPattern.split(Path.SEPARATOR) + val pathSplits: Array[String] = path.split(Path.SEPARATOR) + val splitPoint: Int = pathSplits.lastIndexOf(scanSplits.head) + var (prefix, suffix) = pathSplits.splitAt(splitPoint) + + for (j <- 0 until math.max(scanSplits.length, suffix.length)) { + val resolvedPart = (Try(suffix(j)), Try(scanSplits(j))) match { + case (Success(path), Success(scan)) if FilenameUtils.wildcardMatch(path, scan) => path + case (Success(path), Success(scan)) if FilenameUtils.wildcardMatch(scan, path) => scan + case (Success(_), Success(_)) => throw new Exception("Incompatible scan pattern") + case (Success(path), Failure(_)) => path + case (Failure(_), Success(scan)) => scan + case _ => throw new Exception("Unexpected Exception") + } + + prefix :+= resolvedPart + } + prefix.mkString(Path.SEPARATOR) + } + + /** paths resolved with scan pattern. + * Paths merged With scan pattern to choose more selective option + * e.g. if rootPath + */ + private lazy val resolvedPaths = { + val relation = previousIndexLogEntry.relations.head + relation.rootPaths.collect { + case path if isMatch(path, scanPattern) => resolve(path, scanPattern) + } + + // Remove this after testing + // Seq("glob2/y=2023") + } + + override def logEntry: LogEntry = { + // TODO: Deduplicate from super.logEntry() + val entry = getIndexLogEntry(spark, df, indexConfig, indexDataPath) + + // If there is no deleted files, there are index data files only for appended data in this + // version and we need to add the index data files of previous index version. + // Otherwise, as previous index data is rewritten in this version while excluding + // indexed rows from deleted files, all necessary index data files exist in this version. + val updatedEntry = if (deletedFiles.isEmpty) { + // Merge new index files with old index files. + val mergedContent = Content(previousIndexLogEntry.content.root.merge(entry.content.root)) + entry.copy(content = mergedContent) + } else { + // New entry. + entry + } + + val relation = entry.source.plan.properties.relations.head + val updatedRelation = + relation.copy(rootPaths = previousIndexLogEntry.relations.head.rootPaths) + updatedEntry.copy( + source = updatedEntry.source.copy(plan = updatedEntry.source.plan.copy( + properties = updatedEntry.source.plan.properties.copy(relations = Seq(updatedRelation))))) + } + + /** Deleted files which match resolved paths */ + override protected lazy val deletedFiles: Seq[FileInfo] = { + // Helper function to check if a file belongs to one of the resolved paths. + def fromResolvedPaths(file: FileInfo): Boolean = { + resolvedPaths.exists(p => FilenameUtils.wildcardMatch(file.name, p)) + } + + val originalFiles = previousIndexLogEntry.relations.head.data.properties.content.fileInfos + .filter(fromResolvedPaths) + + (originalFiles -- currentFiles).toSeq + } + + override protected lazy val currentFiles: Set[FileInfo] = { + val relation = previousIndexLogEntry.relations.head + val dataSchema = DataType.fromJson(relation.dataSchemaJson).asInstanceOf[StructType] + val changedDf = spark.read + .schema(dataSchema) + .format(relation.fileFormat) + .options(relation.options) + .load(resolvedPaths: _*) + changedDf.queryExecution.optimizedPlan + .collect { + case LogicalRelation( + HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _), + _, + _, + _) => + location + .allFiles() + .map { f => + // For each file, if it already has a file id, add that id to its corresponding + // FileInfo. Note that if content of an existing file is changed, it is treated + // as a new file (i.e. its current file id is no longer valid). + val id = fileIdTracker.addFile(f) + FileInfo(f, id, asFullPath = true) + } + } + .flatten + .toSet + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/CachingIndexCollectionManager.scala b/src/main/scala/com/microsoft/hyperspace/index/CachingIndexCollectionManager.scala index 23af1c28d..2c3040731 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/CachingIndexCollectionManager.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/CachingIndexCollectionManager.scala @@ -92,9 +92,9 @@ class CachingIndexCollectionManager( super.vacuum(indexName) } - override def refresh(indexName: String, mode: String): Unit = { + override def refresh(indexName: String, mode: String, scanPattern: Option[String]): Unit = { clearCache() - super.refresh(indexName, mode) + super.refresh(indexName, mode, scanPattern) } override def optimize(indexName: String, mode: String): Unit = { diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala index 6db059beb..7649eea0c 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.internal.SQLConf import com.microsoft.hyperspace.HyperspaceException -import com.microsoft.hyperspace.actions._ +import com.microsoft.hyperspace.actions.{RefreshIncrementalAction, _} import com.microsoft.hyperspace.index.IndexConstants.{REFRESH_MODE_FULL, REFRESH_MODE_INCREMENTAL, REFRESH_MODE_QUICK} class IndexCollectionManager( @@ -63,12 +63,25 @@ class IndexCollectionManager( } } - override def refresh(indexName: String, mode: String): Unit = { + override def refresh( + indexName: String, + mode: String, + scanPattern: Option[String] = None): Unit = { withLogManager(indexName) { logManager => val indexPath = PathResolver(spark.sessionState.conf).getIndexPath(indexName) val dataManager = indexDataManagerFactory.create(indexPath) + + if (scanPattern.isDefined && !mode.equalsIgnoreCase(REFRESH_MODE_INCREMENTAL)) { + throw HyperspaceException( + "Scan Patterns are currently only supported for " + + s"$REFRESH_MODE_INCREMENTAL.") + } + if (mode.equalsIgnoreCase(REFRESH_MODE_INCREMENTAL)) { - new RefreshIncrementalAction(spark, logManager, dataManager).run() + scanPattern match { + case Some(pattern) => new RefreshScan(spark, logManager, dataManager, pattern).run() + case _ => new RefreshIncrementalAction(spark, logManager, dataManager).run() + } } else if (mode.equalsIgnoreCase(REFRESH_MODE_FULL)) { new RefreshAction(spark, logManager, dataManager).run() } else if (mode.equalsIgnoreCase(REFRESH_MODE_QUICK)) { diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexManager.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexManager.scala index 4bb2e3b98..6fcee5000 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexManager.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexManager.scala @@ -65,7 +65,7 @@ trait IndexManager { * * @param indexName Name of the index to refresh. */ - def refresh(indexName: String, mode: String): Unit + def refresh(indexName: String, mode: String, scanPattern: Option[String]): Unit /** * Optimize index by changing the underlying index data layout (e.g., compaction). diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala index d7492bf37..6dea7b3c3 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala @@ -18,6 +18,7 @@ package com.microsoft.hyperspace.index.sources.default import java.util.Locale +import org.apache.commons.io.FilenameUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.deploy.SparkHadoopUtil @@ -29,7 +30,7 @@ import com.microsoft.hyperspace.HyperspaceException import com.microsoft.hyperspace.index.{Content, FileIdTracker, Hdfs, Relation} import com.microsoft.hyperspace.index.IndexConstants.GLOBBING_PATTERN_KEY import com.microsoft.hyperspace.index.sources.{FileBasedSourceProvider, SourceProvider, SourceProviderBuilder} -import com.microsoft.hyperspace.util.{CacheWithTransform, HashingUtils, HyperspaceConf} +import com.microsoft.hyperspace.util.{CacheWithTransform, HashingUtils, HyperspaceConf, PathUtils} /** * Default implementation for file-based Spark built-in sources such as parquet, csv, json, etc. @@ -72,8 +73,20 @@ class DefaultFileBasedSource(private val spark: SparkSession) extends FileBasedS Hdfs.Properties(Content.fromLeafFiles(files, fileIdTracker).get) val fileFormatName = fileFormat.asInstanceOf[DataSourceRegister].shortName + // Store basePath of hive-partitioned data sources, if applicable. + val basePath = options.get("basePath") match { + case None => PathUtils.extractBasePath(location.partitionSpec()) + case p => p + } + // "path" key in options can incur multiple data read unexpectedly. - val opts = options - "path" + // Since "options" is case-insensitive map, it will change any previous entries of + // "basePath" to lowercase "basepath", making it unusable. + // Remove lowercase "basepath" and add proper cased "basePath". + val opts = basePath match { + case Some(path) => Map("basePath" -> path) ++ options - "path" - "basepath" + case _ => options - "path" - "basepath" + } val rootPaths = opts.get(GLOBBING_PATTERN_KEY) match { case Some(pattern) => @@ -93,7 +106,14 @@ class DefaultFileBasedSource(private val spark: SparkSession) extends FileBasedS .toMap val globPathValues = globPaths.values.flatten.toSet - if (!location.rootPaths.forall(globPathValues.contains)) { + + // Root paths could be directories or leaf files. Make sure that all root paths either + // match the glob paths, in case of directories, or belong to glob paths, in case of + // files. + if (!location.rootPaths.forall(p => + globPathValues.exists(g => + FilenameUtils.equalsNormalized(p.toString, g.toString) || + FilenameUtils.directoryContains(g.toString, p.toString)))) { throw HyperspaceException( "Some glob patterns do not match with available root " + s"paths of the source data. Please check if $pattern matches all of " + diff --git a/src/main/scala/com/microsoft/hyperspace/util/PathUtils.scala b/src/main/scala/com/microsoft/hyperspace/util/PathUtils.scala index fc8bc4904..460ab52b7 100644 --- a/src/main/scala/com/microsoft/hyperspace/util/PathUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/util/PathUtils.scala @@ -18,6 +18,7 @@ package com.microsoft.hyperspace.util import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, PathFilter} +import org.apache.spark.sql.execution.datasources.PartitionSpec object PathUtils { def makeAbsolute(path: String): Path = makeAbsolute(new Path(path)) @@ -27,6 +28,27 @@ object PathUtils { fs.makeQualified(path) } + /** + * Extract base data source path for from a given partition spec. + * @param partitionSpec PartitionSpec. + * @return Optional base path if partition spec is non empty. Else, None. + */ + def extractBasePath(partitionSpec: PartitionSpec): Option[String] = { + if (partitionSpec == PartitionSpec.emptySpec) { + None + } else { + // For example, we could have the following in PartitionSpec: + // - partition columns = "col1", "col2" + // - partitions: "/path/col1=1/col2=1", "/path/col1=1/col2=2", etc. + // , and going up the same number of directory levels as the number of partition columns + // will compute the base path. Note that PartitionSpec.partitions will always contain + // all the partitions in the path, so "partitions.head" is taken as an initial value. + val basePath = partitionSpec.partitionColumns + .foldLeft(partitionSpec.partitions.head.path)((path, _) => path.getParent) + Some(basePath.toString) + } + } + /* Definition taken from org.apache.spark.sql.execution.datasources.PartitionAwareFileIndex. */ // SPARK-15895: Metadata files (e.g. Parquet summary files) and temporary files should not be // counted as data files, so that they shouldn't participate partition discovery. diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshScanTestsGlobData.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshScanTestsGlobData.scala new file mode 100644 index 000000000..7fa5561a0 --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshScanTestsGlobData.scala @@ -0,0 +1,199 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.{DataFrame, QueryTest} + +import com.microsoft.hyperspace.TestUtils.latestIndexLogEntry +import com.microsoft.hyperspace.index.IndexConstants.GLOBBING_PATTERN_KEY +import com.microsoft.hyperspace.util.PathUtils +import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData} + +class RefreshScanTestsGlobData extends QueryTest with HyperspaceSuite { + override val systemPath = PathUtils.makeAbsolute("src/test/resources/indexLocation") + + var originalDf: DataFrame = _ + var partition1: DataFrame = _ + var partition2: DataFrame = _ + var partition3: DataFrame = _ + var partition4: DataFrame = _ + private var hs: Hyperspace = _ + + override def beforeAll(): Unit = { + super.beforeAll() + import spark.implicits._ + hs = new Hyperspace(spark) + originalDf = SampleData.testData.toDF("c1", "c2", "c3", "c4", "c5") + + partition1 = originalDf.where("c5 = 1000") + partition2 = originalDf.where("c5 = 2000") + partition3 = originalDf.where("c5 = 3000") + partition4 = originalDf.where("c5 = 4000") + } + + before { + // Clear index cache so a new test does not see stale indexes from previous ones. + clearCache() + } + + test("RefreshScan indexes eligible files when scan pattern with wildcards is provided.") { + withTempPathAsString { testPath => + withIndex("index") { + val dataPath = new Path(PathUtils.makeAbsolute(testPath), "data").toString + val globPath = dataPath + "/*" + val p1 = dataPath + "/1000" + val p2 = dataPath + "/2000" + + // Create index. + partition1.write.parquet(p1) + val df = spark.read.option(GLOBBING_PATTERN_KEY, globPath).parquet(globPath) + val indexConfig = IndexConfig("index", Seq("c1"), Seq("c5")) + hs.createIndex(df, indexConfig) + + // Append data in another location that satisfies globbed path. + partition2.write.parquet(p2) + + // Refresh index with scan pattern. + hs.refreshIndex("index", "incremental", Some("data/20*")) + + // Validate index contents. + val index = latestIndexLogEntry(systemPath, indexConfig.indexName) + val relation = index.relations.head + val indexedFiles = relation.data.properties.content.files + assert(relation.rootPaths.equals(Seq(globPath))) + assert(indexedFiles.forall(path => + path.toString.contains("data/1000") || path.toString.contains("data/2000"))) + assert(indexedFiles.exists(_.toString.contains("data/1000"))) + assert(indexedFiles.exists(_.toString.contains("data/2000"))) + + // Validate results. + val df2 = spark.read.parquet(globPath) + def query: DataFrame = df2.filter("c1 = '2017-09-03'").select("c1", "c5") + spark.disableHyperspace() + val baseQuery = query + val basePlan = baseQuery.queryExecution.optimizedPlan + + spark.enableHyperspace() + val queryWithHs = query + val planWithHs = queryWithHs.queryExecution.optimizedPlan + assert(!basePlan.equals(planWithHs)) + + val files = planWithHs.collect { + case LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) => + fsRelation.location.inputFiles + }.flatten + + // Check data files are replaced by index files. + assert(files.nonEmpty && files.forall(_.contains("index"))) + checkAnswer(baseQuery, queryWithHs) + } + } + } + + test("RefreshScan indexes doesn't include files not satisfying the scan pattern.") { + withTempPathAsString { testPath => + withIndex("index") { + val dataPath = new Path(PathUtils.makeAbsolute(testPath), "data").toString + val globPath = dataPath + "/*" + val p1 = dataPath + "/1000" + val p2 = dataPath + "/2000" + val p3 = dataPath + "/3000" + + // Create index. + partition1.write.parquet(p1) + val df = spark.read.option(GLOBBING_PATTERN_KEY, globPath).parquet(globPath) + val indexConfig = IndexConfig("index", Seq("c1"), Seq("c5")) + hs.createIndex(df, indexConfig) + + // Append data to two new locations that satisfy globbed path. + partition2.write.parquet(p2) + partition3.write.parquet(p3) + + // Refresh index with scan pattern. Note that only one new partition is being indexed. + hs.refreshIndex("index", "incremental", Some("data/20*")) + + // Validate index contents. + var index = latestIndexLogEntry(systemPath, indexConfig.indexName) + var relation = index.relations.head + var indexedFiles = relation.data.properties.content.files + assert(relation.rootPaths.equals(Seq(globPath))) + assert(indexedFiles.forall(path => + path.toString.contains("data/1000") || path.toString.contains("data/2000"))) + assert(indexedFiles.exists(_.toString.contains("data/1000"))) + assert(indexedFiles.exists(_.toString.contains("data/2000"))) + assert(!indexedFiles.exists(_.toString.contains("data/3000"))) + + // Validate results. + val df2 = spark.read.parquet(globPath) + def query: DataFrame = df2.filter("c1 = '2017-09-03'").select("c1", "c5") + + { + // Hyperspace should not pick index because all data files are not indexed. + spark.disableHyperspace() + val baseQuery = query + val basePlan = baseQuery.queryExecution.optimizedPlan + + spark.enableHyperspace() + val queryWithHs = query + val planWithHs = queryWithHs.queryExecution.optimizedPlan + assert(basePlan.equals(planWithHs)) + } + + // Refresh index for missing directory. + hs.refreshIndex("index", "incremental", Some("data/30*")) + + // Validate index contents. + index = latestIndexLogEntry(systemPath, indexConfig.indexName) + relation = index.relations.head + indexedFiles = relation.data.properties.content.files + assert(relation.rootPaths.equals(Seq(globPath))) + assert( + indexedFiles.forall( + path => + path.toString.contains("data/1000") || + path.toString.contains("data/2000") || + path.toString.contains("data/3000"))) + assert(indexedFiles.exists(_.toString.contains("data/1000"))) + assert(indexedFiles.exists(_.toString.contains("data/2000"))) + assert(indexedFiles.exists(_.toString.contains("data/3000"))) + + { + // Hyperspace should pick index because now all data files are indexed. + spark.disableHyperspace() + val baseQuery = query + val basePlan = baseQuery.queryExecution.optimizedPlan + + spark.enableHyperspace() + val queryWithHs = query + val planWithHs = queryWithHs.queryExecution.optimizedPlan + assert(!basePlan.equals(planWithHs)) + + val files = planWithHs.collect { + case LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) => + fsRelation.location.inputFiles + }.flatten + + // Check data files are replaced by index files. + assert(files.nonEmpty && files.forall(_.contains("index"))) + checkAnswer(baseQuery, queryWithHs) + } + } + } + } +} diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshScanTestsPartitionedData.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshScanTestsPartitionedData.scala new file mode 100644 index 000000000..62df775d8 --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshScanTestsPartitionedData.scala @@ -0,0 +1,245 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.{DataFrame, QueryTest} +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} + +import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData} +import com.microsoft.hyperspace.TestUtils.latestIndexLogEntry +import com.microsoft.hyperspace.util.PathUtils + +class RefreshScanTestsPartitionedData extends QueryTest with HyperspaceSuite { + override val systemPath = PathUtils.makeAbsolute("src/test/resources/indexLocation") + + var originalDf: DataFrame = _ + var partition1: DataFrame = _ + var partition2: DataFrame = _ + var partition3: DataFrame = _ + var partition4: DataFrame = _ + private var hs: Hyperspace = _ + + override def beforeAll(): Unit = { + super.beforeAll() + import spark.implicits._ + hs = new Hyperspace(spark) + originalDf = SampleData.testData.toDF("c1", "c2", "c3", "c4", "c5") + + partition1 = originalDf.where("c5 = 1000") + partition2 = originalDf.where("c5 = 2000") + partition3 = originalDf.where("c5 = 3000") + partition4 = originalDf.where("c5 = 4000") + } + + before { + // Clear index cache so a new test does not see stale indexes from previous ones. + clearCache() + } + + test("RefreshScan indexes eligible files with scan pattern provided.") { + withTempPathAsString { testPath => + withIndex("index") { + val dataPath = new Path(PathUtils.makeAbsolute(testPath), "data").toString + + // Create index. + partition1.write.partitionBy("c5").parquet(dataPath) + val df = spark.read.parquet(dataPath) + val indexConfig = IndexConfig("index", Seq("c1"), Seq("c5")) + hs.createIndex(df, indexConfig) + + // Append another partition. + partition2.write.mode("append").partitionBy("c5").parquet(dataPath) + + // Refresh index with scan pattern. + hs.refreshIndex("index", "incremental", Some("data/c5=2000")) + + // Validate index contents. + val index = latestIndexLogEntry(systemPath, indexConfig.indexName) + val relation = index.relations.head + val indexedFiles = relation.data.properties.content.files + assert(relation.rootPaths.equals(Seq(dataPath))) + assert(indexedFiles.forall(path => + path.toString.contains("data/c5=1000") || path.toString.contains("data/c5=2000"))) + assert(indexedFiles.exists(_.toString.contains("data/c5=1000"))) + assert(indexedFiles.exists(_.toString.contains("data/c5=2000"))) + + // Validate results. + val df2 = spark.read.parquet(dataPath) + def query: DataFrame = df2.filter("c1 = '2017-09-03'").select("c1", "c5") + spark.disableHyperspace() + val baseQuery = query + val basePlan = baseQuery.queryExecution.optimizedPlan + + spark.enableHyperspace() + val queryWithHs = query + val planWithHs = queryWithHs.queryExecution.optimizedPlan + assert(!basePlan.equals(planWithHs)) + + val files = planWithHs.collect { + case LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) => + fsRelation.location.inputFiles + }.flatten + + // Check data files are replaced by index files. + assert(files.nonEmpty && files.forall(_.contains("index"))) + checkAnswer(baseQuery, queryWithHs) + } + } + } + + test("RefreshScan indexes doesn't include files not satisfying the scan pattern.") { + withTempPathAsString { testPath => + withIndex("index") { + val dataPath = new Path(PathUtils.makeAbsolute(testPath), "data").toString + + // Create index. + partition1.write.partitionBy("c5").parquet(dataPath) + val df = spark.read.parquet(dataPath) + val indexConfig = IndexConfig("index", Seq("c1"), Seq("c5")) + hs.createIndex(df, indexConfig) + + // Append 2 new partitions. + partition2.write.mode("append").partitionBy("c5").parquet(dataPath) + partition3.write.mode("append").partitionBy("c5").parquet(dataPath) + + // Refresh index with scan pattern. Note that only one new partition is being indexed. + hs.refreshIndex("index", "incremental", Some("data/c5=2000")) + + // Validate index contents. + var index = latestIndexLogEntry(systemPath, indexConfig.indexName) + var relation = index.relations.head + var indexedFiles = relation.data.properties.content.files + assert(relation.rootPaths.equals(Seq(dataPath))) + assert(indexedFiles.forall(path => + path.toString.contains("data/c5=1000") || path.toString.contains("data/c5=2000"))) + assert(indexedFiles.exists(_.toString.contains("data/c5=1000"))) + assert(indexedFiles.exists(_.toString.contains("data/c5=2000"))) + assert(!indexedFiles.exists(_.toString.contains("data/c5=3000"))) + + // Validate results. + val df2 = spark.read.parquet(dataPath) + + def query: DataFrame = df2.filter("c1 = '2017-09-03'").select("c1", "c5") + + { + // Hyperspace should not pick index because all data files are not indexed. + spark.disableHyperspace() + val baseQuery = query + val basePlan = baseQuery.queryExecution.optimizedPlan + + spark.enableHyperspace() + val queryWithHs = query + val planWithHs = queryWithHs.queryExecution.optimizedPlan + assert(basePlan.equals(planWithHs)) + } + + // Refresh index for missing partitions. + hs.refreshIndex("index", "incremental", Some("data/c5=3000")) + + // Validate index contents. + index = latestIndexLogEntry(systemPath, indexConfig.indexName) + relation = index.relations.head + indexedFiles = relation.data.properties.content.files + assert(relation.rootPaths.equals(Seq(dataPath))) + assert( + indexedFiles.forall( + path => + path.toString.contains("data/c5=1000") || + path.toString.contains("data/c5=2000") || + path.toString.contains("data/c5=3000"))) + assert(indexedFiles.exists(_.toString.contains("data/c5=1000"))) + assert(indexedFiles.exists(_.toString.contains("data/c5=2000"))) + assert(indexedFiles.exists(_.toString.contains("data/c5=3000"))) + + { + // Hyperspace should pick index because now all data files are indexed. + spark.disableHyperspace() + val baseQuery = query + val basePlan = baseQuery.queryExecution.optimizedPlan + + spark.enableHyperspace() + val queryWithHs = query + val planWithHs = queryWithHs.queryExecution.optimizedPlan + assert(!basePlan.equals(planWithHs)) + + val files = planWithHs.collect { + case LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) => + fsRelation.location.inputFiles + }.flatten + + // Check data files are replaced by index files. + assert(files.nonEmpty && files.forall(_.contains("index"))) + checkAnswer(baseQuery, queryWithHs) + } + } + } + } + + test("RefreshScan indexes eligible files when scan pattern with wildcards is provided.") { + withTempPathAsString { testPath => + withIndex("index") { + val dataPath = new Path(PathUtils.makeAbsolute(testPath), "data").toString + + // Create index. + partition1.write.partitionBy("c5").parquet(dataPath) + val df = spark.read.parquet(dataPath) + val indexConfig = IndexConfig("index", Seq("c1"), Seq("c5")) + hs.createIndex(df, indexConfig) + + // Append another partition. + partition2.write.mode("append").partitionBy("c5").parquet(dataPath) + + // Refresh index with scan pattern with wildcard. + hs.refreshIndex("index", "incremental", Some("data/c*=2*")) + + // Validate index contents. + val index = latestIndexLogEntry(systemPath, indexConfig.indexName) + val relation = index.relations.head + val indexedFiles = relation.data.properties.content.files + assert(relation.rootPaths.equals(Seq(dataPath))) + assert(indexedFiles.forall(path => + path.toString.contains("data/c5=1000") || path.toString.contains("data/c5=2000"))) + assert(indexedFiles.exists(_.toString.contains("data/c5=1000"))) + assert(indexedFiles.exists(_.toString.contains("data/c5=2000"))) + + // Validate results. + val df2 = spark.read.parquet(dataPath) + + def query: DataFrame = df2.filter("c1 = '2017-09-03'").select("c1", "c5") + + spark.disableHyperspace() + val baseQuery = query + val basePlan = baseQuery.queryExecution.optimizedPlan + + spark.enableHyperspace() + val queryWithHs = query + val planWithHs = queryWithHs.queryExecution.optimizedPlan + assert(!basePlan.equals(planWithHs)) + + val files = planWithHs.collect { + case LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) => + fsRelation.location.inputFiles + }.flatten + + // Check data files are replaced by index files. + assert(files.nonEmpty && files.forall(_.contains("index"))) + checkAnswer(baseQuery, queryWithHs) + } + } + } +} From ab2ff2f8396877eb8582394411250873d3f34b46 Mon Sep 17 00:00:00 2001 From: Apoorve Dave Date: Tue, 15 Dec 2020 16:49:27 -0800 Subject: [PATCH 2/5] Trigger Build From 1009056da09eda74524de023795c7a4167d7109a Mon Sep 17 00:00:00 2001 From: Apoorve Dave Date: Tue, 15 Dec 2020 16:50:26 -0800 Subject: [PATCH 3/5] scalastyle --- .../microsoft/hyperspace/index/RefreshScanTestsGlobData.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshScanTestsGlobData.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshScanTestsGlobData.scala index 7fa5561a0..e20799c7e 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/RefreshScanTestsGlobData.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshScanTestsGlobData.scala @@ -17,13 +17,13 @@ package com.microsoft.hyperspace.index import org.apache.hadoop.fs.Path -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.{DataFrame, QueryTest} +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData} import com.microsoft.hyperspace.TestUtils.latestIndexLogEntry import com.microsoft.hyperspace.index.IndexConstants.GLOBBING_PATTERN_KEY import com.microsoft.hyperspace.util.PathUtils -import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData} class RefreshScanTestsGlobData extends QueryTest with HyperspaceSuite { override val systemPath = PathUtils.makeAbsolute("src/test/resources/indexLocation") From a3833d915d306ca4bb7fbb55ff07ede680d2751c Mon Sep 17 00:00:00 2001 From: Apoorve Dave Date: Wed, 16 Dec 2020 17:31:32 -0800 Subject: [PATCH 4/5] support deleted directories --- .../hyperspace/actions/RefreshScan.scala | 71 +++++++++++-------- .../RefreshScanTestsPartitionedData.scala | 54 +++++++++++++- 2 files changed, 94 insertions(+), 31 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshScan.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshScan.scala index f730e4d58..3000d8d65 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshScan.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshScan.scala @@ -20,6 +20,7 @@ import scala.util.{Failure, Success, Try} import org.apache.commons.io.FilenameUtils import org.apache.hadoop.fs.Path +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex} import org.apache.spark.sql.types.{DataType, StructType} @@ -49,12 +50,12 @@ class RefreshScan( .load(newlyIndexedDataFiles: _*) } - def isMatch(path: String, scanPattern: String): Boolean = { + private def isMatch(path: String, scanPattern: String): Boolean = { val scanSplits = scanPattern.split(Path.SEPARATOR) scanSplits.nonEmpty && path.split(Path.SEPARATOR).contains(scanSplits.head) } - def resolve(path: String, scanPattern: String): String = { + private def resolve(path: String, scanPattern: String): String = { val scanSplits: Array[String] = scanPattern.split(Path.SEPARATOR) val pathSplits: Array[String] = path.split(Path.SEPARATOR) val splitPoint: Int = pathSplits.lastIndexOf(scanSplits.head) @@ -84,9 +85,15 @@ class RefreshScan( relation.rootPaths.collect { case path if isMatch(path, scanPattern) => resolve(path, scanPattern) } + } - // Remove this after testing - // Seq("glob2/y=2023") + private def checkPathExists(path: String): Boolean = { + val hadoopConf = spark.sessionState.newHadoopConf() + val hdfsPath = new Path(path) + val fs = hdfsPath.getFileSystem(hadoopConf) + val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified) + globPath.nonEmpty && globPath.forall(fs.exists) } override def logEntry: LogEntry = { @@ -128,31 +135,35 @@ class RefreshScan( } override protected lazy val currentFiles: Set[FileInfo] = { - val relation = previousIndexLogEntry.relations.head - val dataSchema = DataType.fromJson(relation.dataSchemaJson).asInstanceOf[StructType] - val changedDf = spark.read - .schema(dataSchema) - .format(relation.fileFormat) - .options(relation.options) - .load(resolvedPaths: _*) - changedDf.queryExecution.optimizedPlan - .collect { - case LogicalRelation( - HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _), - _, - _, - _) => - location - .allFiles() - .map { f => - // For each file, if it already has a file id, add that id to its corresponding - // FileInfo. Note that if content of an existing file is changed, it is treated - // as a new file (i.e. its current file id is no longer valid). - val id = fileIdTracker.addFile(f) - FileInfo(f, id, asFullPath = true) - } - } - .flatten - .toSet + resolvedPaths.filter(checkPathExists) match { + case Nil => Set.empty + case _ => + val relation = previousIndexLogEntry.relations.head + val dataSchema = DataType.fromJson(relation.dataSchemaJson).asInstanceOf[StructType] + val changedDf = spark.read + .schema(dataSchema) + .format(relation.fileFormat) + .options(relation.options) + .load(resolvedPaths: _*) + changedDf.queryExecution.optimizedPlan + .collect { + case LogicalRelation( + HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _), + _, + _, + _) => + location + .allFiles() + .map { f => + // For each file, if it already has a file id, add that id to its corresponding + // FileInfo. Note that if content of an existing file is changed, it is treated + // as a new file (i.e. its current file id is no longer valid). + val id = fileIdTracker.addFile(f) + FileInfo(f, id, asFullPath = true) + } + } + .flatten + .toSet + } } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshScanTestsPartitionedData.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshScanTestsPartitionedData.scala index 62df775d8..0b6c8394d 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/RefreshScanTestsPartitionedData.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshScanTestsPartitionedData.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRela import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData} import com.microsoft.hyperspace.TestUtils.latestIndexLogEntry -import com.microsoft.hyperspace.util.PathUtils +import com.microsoft.hyperspace.util.{FileUtils, PathUtils} class RefreshScanTestsPartitionedData extends QueryTest with HyperspaceSuite { override val systemPath = PathUtils.makeAbsolute("src/test/resources/indexLocation") @@ -102,6 +102,58 @@ class RefreshScanTestsPartitionedData extends QueryTest with HyperspaceSuite { } } + test("RefreshScan handles deleted files matching scan pattern.") { + withTempPathAsString { testPath => + withIndex("index") { + withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") { + val dataPath = new Path(PathUtils.makeAbsolute(testPath), "data").toString + + // Create index. + partition1.write.partitionBy("c5").parquet(dataPath) + partition2.write.mode("append").partitionBy("c5").parquet(dataPath) + + val df = spark.read.parquet(dataPath) + val indexConfig = IndexConfig("index", Seq("c1"), Seq("c5")) + hs.createIndex(df, indexConfig) + + // Delete Files from one partition. + FileUtils.delete(new Path(s"$dataPath/c5=2000")) + + // Refresh index with scan pattern matching deleted files. + hs.refreshIndex("index", "incremental", Some("data/c5=20*")) + + // Validate index contents. + val index = latestIndexLogEntry(systemPath, indexConfig.indexName) + val relation = index.relations.head + val indexedFiles = relation.data.properties.content.files + assert(relation.rootPaths.equals(Seq(dataPath))) + assert(indexedFiles.forall(path => path.toString.contains("data/c5=1000"))) + + // Validate results. + val df2 = spark.read.parquet(dataPath) + def query: DataFrame = df2.filter("c1 = '2017-09-03'").select("c1", "c5") + spark.disableHyperspace() + val baseQuery = query + val basePlan = baseQuery.queryExecution.optimizedPlan + + spark.enableHyperspace() + val queryWithHs = query + val planWithHs = queryWithHs.queryExecution.optimizedPlan + assert(!basePlan.equals(planWithHs)) + + val files = planWithHs.collect { + case LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) => + fsRelation.location.inputFiles + }.flatten + + // Check data files are replaced by index files. + assert(files.nonEmpty && files.forall(_.contains("index"))) + checkAnswer(baseQuery, queryWithHs) + } + } + } + } + test("RefreshScan indexes doesn't include files not satisfying the scan pattern.") { withTempPathAsString { testPath => withIndex("index") { From a55060baedbb7ae53eb42b7081bddd9b654f550e Mon Sep 17 00:00:00 2001 From: Apoorve Dave Date: Wed, 16 Dec 2020 19:21:46 -0800 Subject: [PATCH 5/5] test fix --- .../RefreshScanTestsPartitionedData.scala | 102 +++++++++--------- 1 file changed, 51 insertions(+), 51 deletions(-) diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshScanTestsPartitionedData.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshScanTestsPartitionedData.scala index 0b6c8394d..c3868dd43 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/RefreshScanTestsPartitionedData.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshScanTestsPartitionedData.scala @@ -51,57 +51,6 @@ class RefreshScanTestsPartitionedData extends QueryTest with HyperspaceSuite { clearCache() } - test("RefreshScan indexes eligible files with scan pattern provided.") { - withTempPathAsString { testPath => - withIndex("index") { - val dataPath = new Path(PathUtils.makeAbsolute(testPath), "data").toString - - // Create index. - partition1.write.partitionBy("c5").parquet(dataPath) - val df = spark.read.parquet(dataPath) - val indexConfig = IndexConfig("index", Seq("c1"), Seq("c5")) - hs.createIndex(df, indexConfig) - - // Append another partition. - partition2.write.mode("append").partitionBy("c5").parquet(dataPath) - - // Refresh index with scan pattern. - hs.refreshIndex("index", "incremental", Some("data/c5=2000")) - - // Validate index contents. - val index = latestIndexLogEntry(systemPath, indexConfig.indexName) - val relation = index.relations.head - val indexedFiles = relation.data.properties.content.files - assert(relation.rootPaths.equals(Seq(dataPath))) - assert(indexedFiles.forall(path => - path.toString.contains("data/c5=1000") || path.toString.contains("data/c5=2000"))) - assert(indexedFiles.exists(_.toString.contains("data/c5=1000"))) - assert(indexedFiles.exists(_.toString.contains("data/c5=2000"))) - - // Validate results. - val df2 = spark.read.parquet(dataPath) - def query: DataFrame = df2.filter("c1 = '2017-09-03'").select("c1", "c5") - spark.disableHyperspace() - val baseQuery = query - val basePlan = baseQuery.queryExecution.optimizedPlan - - spark.enableHyperspace() - val queryWithHs = query - val planWithHs = queryWithHs.queryExecution.optimizedPlan - assert(!basePlan.equals(planWithHs)) - - val files = planWithHs.collect { - case LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) => - fsRelation.location.inputFiles - }.flatten - - // Check data files are replaced by index files. - assert(files.nonEmpty && files.forall(_.contains("index"))) - checkAnswer(baseQuery, queryWithHs) - } - } - } - test("RefreshScan handles deleted files matching scan pattern.") { withTempPathAsString { testPath => withIndex("index") { @@ -154,6 +103,57 @@ class RefreshScanTestsPartitionedData extends QueryTest with HyperspaceSuite { } } + test("RefreshScan indexes eligible files with scan pattern provided.") { + withTempPathAsString { testPath => + withIndex("index") { + val dataPath = new Path(PathUtils.makeAbsolute(testPath), "data").toString + + // Create index. + partition1.write.partitionBy("c5").parquet(dataPath) + val df = spark.read.parquet(dataPath) + val indexConfig = IndexConfig("index", Seq("c1"), Seq("c5")) + hs.createIndex(df, indexConfig) + + // Append another partition. + partition2.write.mode("append").partitionBy("c5").parquet(dataPath) + + // Refresh index with scan pattern. + hs.refreshIndex("index", "incremental", Some("data/c5=2000")) + + // Validate index contents. + val index = latestIndexLogEntry(systemPath, indexConfig.indexName) + val relation = index.relations.head + val indexedFiles = relation.data.properties.content.files + assert(relation.rootPaths.equals(Seq(dataPath))) + assert(indexedFiles.forall(path => + path.toString.contains("data/c5=1000") || path.toString.contains("data/c5=2000"))) + assert(indexedFiles.exists(_.toString.contains("data/c5=1000"))) + assert(indexedFiles.exists(_.toString.contains("data/c5=2000"))) + + // Validate results. + val df2 = spark.read.parquet(dataPath) + def query: DataFrame = df2.filter("c1 = '2017-09-03'").select("c1", "c5") + spark.disableHyperspace() + val baseQuery = query + val basePlan = baseQuery.queryExecution.optimizedPlan + + spark.enableHyperspace() + val queryWithHs = query + val planWithHs = queryWithHs.queryExecution.optimizedPlan + assert(!basePlan.equals(planWithHs)) + + val files = planWithHs.collect { + case LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) => + fsRelation.location.inputFiles + }.flatten + + // Check data files are replaced by index files. + assert(files.nonEmpty && files.forall(_.contains("index"))) + checkAnswer(baseQuery, queryWithHs) + } + } + } + test("RefreshScan indexes doesn't include files not satisfying the scan pattern.") { withTempPathAsString { testPath => withIndex("index") {