diff --git a/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala b/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala index 53b8cc126..621040a49 100644 --- a/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala +++ b/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala @@ -87,7 +87,11 @@ class Hyperspace(spark: SparkSession) { * @param mode Refresh mode. Currently supported modes are `incremental` and `full`. */ def refreshIndex(indexName: String, mode: String): Unit = { - indexManager.refresh(indexName, mode) + indexManager.refresh(indexName, mode, None) + } + + def refreshIndex(indexName: String, mode: String, scanPattern: Option[String]): Unit = { + indexManager.refresh(indexName, mode, scanPattern) } /** diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshScan.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshScan.scala new file mode 100644 index 000000000..3000d8d65 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshScan.scala @@ -0,0 +1,169 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.actions + +import scala.util.{Failure, Success, Try} + +import org.apache.commons.io.FilenameUtils +import org.apache.hadoop.fs.Path +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex} +import org.apache.spark.sql.types.{DataType, StructType} + +import com.microsoft.hyperspace.index._ + +class RefreshScan( + spark: SparkSession, + logManager: IndexLogManager, + dataManager: IndexDataManager, + scanPattern: String) + extends RefreshIncrementalAction(spark, logManager, dataManager) { + + /** df representing the complete set of data files which will be indexed once this refresh action + * finishes. */ + override protected lazy val df = { + val relation = previousIndexLogEntry.relations.head + val previouslyIndexedData = relation.data.properties.content + val newlyIndexedData = previouslyIndexedData.fileInfos -- deletedFiles ++ appendedFiles + val newlyIndexedDataFiles: Seq[String] = newlyIndexedData.map(_.name).toSeq + + val dataSchema = DataType.fromJson(relation.dataSchemaJson).asInstanceOf[StructType] + spark.read + .schema(dataSchema) + .format(relation.fileFormat) + .options(relation.options) + .load(newlyIndexedDataFiles: _*) + } + + private def isMatch(path: String, scanPattern: String): Boolean = { + val scanSplits = scanPattern.split(Path.SEPARATOR) + scanSplits.nonEmpty && path.split(Path.SEPARATOR).contains(scanSplits.head) + } + + private def resolve(path: String, scanPattern: String): String = { + val scanSplits: Array[String] = scanPattern.split(Path.SEPARATOR) + val pathSplits: Array[String] = path.split(Path.SEPARATOR) + val splitPoint: Int = pathSplits.lastIndexOf(scanSplits.head) + var (prefix, suffix) = pathSplits.splitAt(splitPoint) + + for (j <- 0 until math.max(scanSplits.length, suffix.length)) { + val resolvedPart = (Try(suffix(j)), Try(scanSplits(j))) match { + case (Success(path), Success(scan)) if FilenameUtils.wildcardMatch(path, scan) => path + case (Success(path), Success(scan)) if FilenameUtils.wildcardMatch(scan, path) => scan + case (Success(_), Success(_)) => throw new Exception("Incompatible scan pattern") + case (Success(path), Failure(_)) => path + case (Failure(_), Success(scan)) => scan + case _ => throw new Exception("Unexpected Exception") + } + + prefix :+= resolvedPart + } + prefix.mkString(Path.SEPARATOR) + } + + /** paths resolved with scan pattern. + * Paths merged With scan pattern to choose more selective option + * e.g. if rootPath + */ + private lazy val resolvedPaths = { + val relation = previousIndexLogEntry.relations.head + relation.rootPaths.collect { + case path if isMatch(path, scanPattern) => resolve(path, scanPattern) + } + } + + private def checkPathExists(path: String): Boolean = { + val hadoopConf = spark.sessionState.newHadoopConf() + val hdfsPath = new Path(path) + val fs = hdfsPath.getFileSystem(hadoopConf) + val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified) + globPath.nonEmpty && globPath.forall(fs.exists) + } + + override def logEntry: LogEntry = { + // TODO: Deduplicate from super.logEntry() + val entry = getIndexLogEntry(spark, df, indexConfig, indexDataPath) + + // If there is no deleted files, there are index data files only for appended data in this + // version and we need to add the index data files of previous index version. + // Otherwise, as previous index data is rewritten in this version while excluding + // indexed rows from deleted files, all necessary index data files exist in this version. + val updatedEntry = if (deletedFiles.isEmpty) { + // Merge new index files with old index files. + val mergedContent = Content(previousIndexLogEntry.content.root.merge(entry.content.root)) + entry.copy(content = mergedContent) + } else { + // New entry. + entry + } + + val relation = entry.source.plan.properties.relations.head + val updatedRelation = + relation.copy(rootPaths = previousIndexLogEntry.relations.head.rootPaths) + updatedEntry.copy( + source = updatedEntry.source.copy(plan = updatedEntry.source.plan.copy( + properties = updatedEntry.source.plan.properties.copy(relations = Seq(updatedRelation))))) + } + + /** Deleted files which match resolved paths */ + override protected lazy val deletedFiles: Seq[FileInfo] = { + // Helper function to check if a file belongs to one of the resolved paths. + def fromResolvedPaths(file: FileInfo): Boolean = { + resolvedPaths.exists(p => FilenameUtils.wildcardMatch(file.name, p)) + } + + val originalFiles = previousIndexLogEntry.relations.head.data.properties.content.fileInfos + .filter(fromResolvedPaths) + + (originalFiles -- currentFiles).toSeq + } + + override protected lazy val currentFiles: Set[FileInfo] = { + resolvedPaths.filter(checkPathExists) match { + case Nil => Set.empty + case _ => + val relation = previousIndexLogEntry.relations.head + val dataSchema = DataType.fromJson(relation.dataSchemaJson).asInstanceOf[StructType] + val changedDf = spark.read + .schema(dataSchema) + .format(relation.fileFormat) + .options(relation.options) + .load(resolvedPaths: _*) + changedDf.queryExecution.optimizedPlan + .collect { + case LogicalRelation( + HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _), + _, + _, + _) => + location + .allFiles() + .map { f => + // For each file, if it already has a file id, add that id to its corresponding + // FileInfo. Note that if content of an existing file is changed, it is treated + // as a new file (i.e. its current file id is no longer valid). + val id = fileIdTracker.addFile(f) + FileInfo(f, id, asFullPath = true) + } + } + .flatten + .toSet + } + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/CachingIndexCollectionManager.scala b/src/main/scala/com/microsoft/hyperspace/index/CachingIndexCollectionManager.scala index 23af1c28d..2c3040731 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/CachingIndexCollectionManager.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/CachingIndexCollectionManager.scala @@ -92,9 +92,9 @@ class CachingIndexCollectionManager( super.vacuum(indexName) } - override def refresh(indexName: String, mode: String): Unit = { + override def refresh(indexName: String, mode: String, scanPattern: Option[String]): Unit = { clearCache() - super.refresh(indexName, mode) + super.refresh(indexName, mode, scanPattern) } override def optimize(indexName: String, mode: String): Unit = { diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala index 186819232..5d73909ff 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala @@ -64,12 +64,25 @@ class IndexCollectionManager( } } - override def refresh(indexName: String, mode: String): Unit = { + override def refresh( + indexName: String, + mode: String, + scanPattern: Option[String] = None): Unit = { withLogManager(indexName) { logManager => val indexPath = PathResolver(spark.sessionState.conf).getIndexPath(indexName) val dataManager = indexDataManagerFactory.create(indexPath) + + if (scanPattern.isDefined && !mode.equalsIgnoreCase(REFRESH_MODE_INCREMENTAL)) { + throw HyperspaceException( + "Scan Patterns are currently only supported for " + + s"$REFRESH_MODE_INCREMENTAL.") + } + if (mode.equalsIgnoreCase(REFRESH_MODE_INCREMENTAL)) { - new RefreshIncrementalAction(spark, logManager, dataManager).run() + scanPattern match { + case Some(pattern) => new RefreshScan(spark, logManager, dataManager, pattern).run() + case _ => new RefreshIncrementalAction(spark, logManager, dataManager).run() + } } else if (mode.equalsIgnoreCase(REFRESH_MODE_FULL)) { new RefreshAction(spark, logManager, dataManager).run() } else if (mode.equalsIgnoreCase(REFRESH_MODE_QUICK)) { diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexManager.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexManager.scala index 9910d718a..194d917d9 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexManager.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexManager.scala @@ -65,7 +65,7 @@ trait IndexManager { * * @param indexName Name of the index to refresh. */ - def refresh(indexName: String, mode: String): Unit + def refresh(indexName: String, mode: String, scanPattern: Option[String]): Unit /** * Optimize index by changing the underlying index data layout (e.g., compaction). diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala index 5a77224df..8f2046082 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala @@ -18,6 +18,7 @@ package com.microsoft.hyperspace.index.sources.default import java.util.Locale +import org.apache.commons.io.FilenameUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.deploy.SparkHadoopUtil @@ -106,7 +107,14 @@ class DefaultFileBasedSource(private val spark: SparkSession) extends FileBasedS .toMap val globPathValues = globPaths.values.flatten.toSet - if (!location.rootPaths.forall(globPathValues.contains)) { + + // Root paths could be directories or leaf files. Make sure that all root paths either + // match the glob paths, in case of directories, or belong to glob paths, in case of + // files. + if (!location.rootPaths.forall(p => + globPathValues.exists(g => + FilenameUtils.equalsNormalized(p.toString, g.toString) || + FilenameUtils.directoryContains(g.toString, p.toString)))) { throw HyperspaceException( "Some glob patterns do not match with available root " + s"paths of the source data. Please check if $pattern matches all of " + @@ -281,7 +289,7 @@ class DefaultFileBasedSource(private val spark: SparkSession) extends FileBasedS // Keep the `asInstanceOf` to force casting or fallback because Databrick's // `InMemoryFileIndex` implementation returns `SerializableFileStatus` instead of the // standard API's `FileStatus`. - index.allFiles.map(_.asInstanceOf[FileStatus]) + index.allFiles } catch { case e: ClassCastException if e.getMessage.contains("SerializableFileStatus") => val dbClassName = "org.apache.spark.sql.execution.datasources.SerializableFileStatus" diff --git a/src/main/scala/com/microsoft/hyperspace/util/PathUtils.scala b/src/main/scala/com/microsoft/hyperspace/util/PathUtils.scala index fc8bc4904..460ab52b7 100644 --- a/src/main/scala/com/microsoft/hyperspace/util/PathUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/util/PathUtils.scala @@ -18,6 +18,7 @@ package com.microsoft.hyperspace.util import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, PathFilter} +import org.apache.spark.sql.execution.datasources.PartitionSpec object PathUtils { def makeAbsolute(path: String): Path = makeAbsolute(new Path(path)) @@ -27,6 +28,27 @@ object PathUtils { fs.makeQualified(path) } + /** + * Extract base data source path for from a given partition spec. + * @param partitionSpec PartitionSpec. + * @return Optional base path if partition spec is non empty. Else, None. + */ + def extractBasePath(partitionSpec: PartitionSpec): Option[String] = { + if (partitionSpec == PartitionSpec.emptySpec) { + None + } else { + // For example, we could have the following in PartitionSpec: + // - partition columns = "col1", "col2" + // - partitions: "/path/col1=1/col2=1", "/path/col1=1/col2=2", etc. + // , and going up the same number of directory levels as the number of partition columns + // will compute the base path. Note that PartitionSpec.partitions will always contain + // all the partitions in the path, so "partitions.head" is taken as an initial value. + val basePath = partitionSpec.partitionColumns + .foldLeft(partitionSpec.partitions.head.path)((path, _) => path.getParent) + Some(basePath.toString) + } + } + /* Definition taken from org.apache.spark.sql.execution.datasources.PartitionAwareFileIndex. */ // SPARK-15895: Metadata files (e.g. Parquet summary files) and temporary files should not be // counted as data files, so that they shouldn't participate partition discovery. diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshScanTestsGlobData.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshScanTestsGlobData.scala new file mode 100644 index 000000000..e20799c7e --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshScanTestsGlobData.scala @@ -0,0 +1,199 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.{DataFrame, QueryTest} +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} + +import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData} +import com.microsoft.hyperspace.TestUtils.latestIndexLogEntry +import com.microsoft.hyperspace.index.IndexConstants.GLOBBING_PATTERN_KEY +import com.microsoft.hyperspace.util.PathUtils + +class RefreshScanTestsGlobData extends QueryTest with HyperspaceSuite { + override val systemPath = PathUtils.makeAbsolute("src/test/resources/indexLocation") + + var originalDf: DataFrame = _ + var partition1: DataFrame = _ + var partition2: DataFrame = _ + var partition3: DataFrame = _ + var partition4: DataFrame = _ + private var hs: Hyperspace = _ + + override def beforeAll(): Unit = { + super.beforeAll() + import spark.implicits._ + hs = new Hyperspace(spark) + originalDf = SampleData.testData.toDF("c1", "c2", "c3", "c4", "c5") + + partition1 = originalDf.where("c5 = 1000") + partition2 = originalDf.where("c5 = 2000") + partition3 = originalDf.where("c5 = 3000") + partition4 = originalDf.where("c5 = 4000") + } + + before { + // Clear index cache so a new test does not see stale indexes from previous ones. + clearCache() + } + + test("RefreshScan indexes eligible files when scan pattern with wildcards is provided.") { + withTempPathAsString { testPath => + withIndex("index") { + val dataPath = new Path(PathUtils.makeAbsolute(testPath), "data").toString + val globPath = dataPath + "/*" + val p1 = dataPath + "/1000" + val p2 = dataPath + "/2000" + + // Create index. + partition1.write.parquet(p1) + val df = spark.read.option(GLOBBING_PATTERN_KEY, globPath).parquet(globPath) + val indexConfig = IndexConfig("index", Seq("c1"), Seq("c5")) + hs.createIndex(df, indexConfig) + + // Append data in another location that satisfies globbed path. + partition2.write.parquet(p2) + + // Refresh index with scan pattern. + hs.refreshIndex("index", "incremental", Some("data/20*")) + + // Validate index contents. + val index = latestIndexLogEntry(systemPath, indexConfig.indexName) + val relation = index.relations.head + val indexedFiles = relation.data.properties.content.files + assert(relation.rootPaths.equals(Seq(globPath))) + assert(indexedFiles.forall(path => + path.toString.contains("data/1000") || path.toString.contains("data/2000"))) + assert(indexedFiles.exists(_.toString.contains("data/1000"))) + assert(indexedFiles.exists(_.toString.contains("data/2000"))) + + // Validate results. + val df2 = spark.read.parquet(globPath) + def query: DataFrame = df2.filter("c1 = '2017-09-03'").select("c1", "c5") + spark.disableHyperspace() + val baseQuery = query + val basePlan = baseQuery.queryExecution.optimizedPlan + + spark.enableHyperspace() + val queryWithHs = query + val planWithHs = queryWithHs.queryExecution.optimizedPlan + assert(!basePlan.equals(planWithHs)) + + val files = planWithHs.collect { + case LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) => + fsRelation.location.inputFiles + }.flatten + + // Check data files are replaced by index files. + assert(files.nonEmpty && files.forall(_.contains("index"))) + checkAnswer(baseQuery, queryWithHs) + } + } + } + + test("RefreshScan indexes doesn't include files not satisfying the scan pattern.") { + withTempPathAsString { testPath => + withIndex("index") { + val dataPath = new Path(PathUtils.makeAbsolute(testPath), "data").toString + val globPath = dataPath + "/*" + val p1 = dataPath + "/1000" + val p2 = dataPath + "/2000" + val p3 = dataPath + "/3000" + + // Create index. + partition1.write.parquet(p1) + val df = spark.read.option(GLOBBING_PATTERN_KEY, globPath).parquet(globPath) + val indexConfig = IndexConfig("index", Seq("c1"), Seq("c5")) + hs.createIndex(df, indexConfig) + + // Append data to two new locations that satisfy globbed path. + partition2.write.parquet(p2) + partition3.write.parquet(p3) + + // Refresh index with scan pattern. Note that only one new partition is being indexed. + hs.refreshIndex("index", "incremental", Some("data/20*")) + + // Validate index contents. + var index = latestIndexLogEntry(systemPath, indexConfig.indexName) + var relation = index.relations.head + var indexedFiles = relation.data.properties.content.files + assert(relation.rootPaths.equals(Seq(globPath))) + assert(indexedFiles.forall(path => + path.toString.contains("data/1000") || path.toString.contains("data/2000"))) + assert(indexedFiles.exists(_.toString.contains("data/1000"))) + assert(indexedFiles.exists(_.toString.contains("data/2000"))) + assert(!indexedFiles.exists(_.toString.contains("data/3000"))) + + // Validate results. + val df2 = spark.read.parquet(globPath) + def query: DataFrame = df2.filter("c1 = '2017-09-03'").select("c1", "c5") + + { + // Hyperspace should not pick index because all data files are not indexed. + spark.disableHyperspace() + val baseQuery = query + val basePlan = baseQuery.queryExecution.optimizedPlan + + spark.enableHyperspace() + val queryWithHs = query + val planWithHs = queryWithHs.queryExecution.optimizedPlan + assert(basePlan.equals(planWithHs)) + } + + // Refresh index for missing directory. + hs.refreshIndex("index", "incremental", Some("data/30*")) + + // Validate index contents. + index = latestIndexLogEntry(systemPath, indexConfig.indexName) + relation = index.relations.head + indexedFiles = relation.data.properties.content.files + assert(relation.rootPaths.equals(Seq(globPath))) + assert( + indexedFiles.forall( + path => + path.toString.contains("data/1000") || + path.toString.contains("data/2000") || + path.toString.contains("data/3000"))) + assert(indexedFiles.exists(_.toString.contains("data/1000"))) + assert(indexedFiles.exists(_.toString.contains("data/2000"))) + assert(indexedFiles.exists(_.toString.contains("data/3000"))) + + { + // Hyperspace should pick index because now all data files are indexed. + spark.disableHyperspace() + val baseQuery = query + val basePlan = baseQuery.queryExecution.optimizedPlan + + spark.enableHyperspace() + val queryWithHs = query + val planWithHs = queryWithHs.queryExecution.optimizedPlan + assert(!basePlan.equals(planWithHs)) + + val files = planWithHs.collect { + case LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) => + fsRelation.location.inputFiles + }.flatten + + // Check data files are replaced by index files. + assert(files.nonEmpty && files.forall(_.contains("index"))) + checkAnswer(baseQuery, queryWithHs) + } + } + } + } +} diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshScanTestsPartitionedData.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshScanTestsPartitionedData.scala new file mode 100644 index 000000000..c3868dd43 --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshScanTestsPartitionedData.scala @@ -0,0 +1,297 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.index + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.{DataFrame, QueryTest} +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} + +import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData} +import com.microsoft.hyperspace.TestUtils.latestIndexLogEntry +import com.microsoft.hyperspace.util.{FileUtils, PathUtils} + +class RefreshScanTestsPartitionedData extends QueryTest with HyperspaceSuite { + override val systemPath = PathUtils.makeAbsolute("src/test/resources/indexLocation") + + var originalDf: DataFrame = _ + var partition1: DataFrame = _ + var partition2: DataFrame = _ + var partition3: DataFrame = _ + var partition4: DataFrame = _ + private var hs: Hyperspace = _ + + override def beforeAll(): Unit = { + super.beforeAll() + import spark.implicits._ + hs = new Hyperspace(spark) + originalDf = SampleData.testData.toDF("c1", "c2", "c3", "c4", "c5") + + partition1 = originalDf.where("c5 = 1000") + partition2 = originalDf.where("c5 = 2000") + partition3 = originalDf.where("c5 = 3000") + partition4 = originalDf.where("c5 = 4000") + } + + before { + // Clear index cache so a new test does not see stale indexes from previous ones. + clearCache() + } + + test("RefreshScan handles deleted files matching scan pattern.") { + withTempPathAsString { testPath => + withIndex("index") { + withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") { + val dataPath = new Path(PathUtils.makeAbsolute(testPath), "data").toString + + // Create index. + partition1.write.partitionBy("c5").parquet(dataPath) + partition2.write.mode("append").partitionBy("c5").parquet(dataPath) + + val df = spark.read.parquet(dataPath) + val indexConfig = IndexConfig("index", Seq("c1"), Seq("c5")) + hs.createIndex(df, indexConfig) + + // Delete Files from one partition. + FileUtils.delete(new Path(s"$dataPath/c5=2000")) + + // Refresh index with scan pattern matching deleted files. + hs.refreshIndex("index", "incremental", Some("data/c5=20*")) + + // Validate index contents. + val index = latestIndexLogEntry(systemPath, indexConfig.indexName) + val relation = index.relations.head + val indexedFiles = relation.data.properties.content.files + assert(relation.rootPaths.equals(Seq(dataPath))) + assert(indexedFiles.forall(path => path.toString.contains("data/c5=1000"))) + + // Validate results. + val df2 = spark.read.parquet(dataPath) + def query: DataFrame = df2.filter("c1 = '2017-09-03'").select("c1", "c5") + spark.disableHyperspace() + val baseQuery = query + val basePlan = baseQuery.queryExecution.optimizedPlan + + spark.enableHyperspace() + val queryWithHs = query + val planWithHs = queryWithHs.queryExecution.optimizedPlan + assert(!basePlan.equals(planWithHs)) + + val files = planWithHs.collect { + case LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) => + fsRelation.location.inputFiles + }.flatten + + // Check data files are replaced by index files. + assert(files.nonEmpty && files.forall(_.contains("index"))) + checkAnswer(baseQuery, queryWithHs) + } + } + } + } + + test("RefreshScan indexes eligible files with scan pattern provided.") { + withTempPathAsString { testPath => + withIndex("index") { + val dataPath = new Path(PathUtils.makeAbsolute(testPath), "data").toString + + // Create index. + partition1.write.partitionBy("c5").parquet(dataPath) + val df = spark.read.parquet(dataPath) + val indexConfig = IndexConfig("index", Seq("c1"), Seq("c5")) + hs.createIndex(df, indexConfig) + + // Append another partition. + partition2.write.mode("append").partitionBy("c5").parquet(dataPath) + + // Refresh index with scan pattern. + hs.refreshIndex("index", "incremental", Some("data/c5=2000")) + + // Validate index contents. + val index = latestIndexLogEntry(systemPath, indexConfig.indexName) + val relation = index.relations.head + val indexedFiles = relation.data.properties.content.files + assert(relation.rootPaths.equals(Seq(dataPath))) + assert(indexedFiles.forall(path => + path.toString.contains("data/c5=1000") || path.toString.contains("data/c5=2000"))) + assert(indexedFiles.exists(_.toString.contains("data/c5=1000"))) + assert(indexedFiles.exists(_.toString.contains("data/c5=2000"))) + + // Validate results. + val df2 = spark.read.parquet(dataPath) + def query: DataFrame = df2.filter("c1 = '2017-09-03'").select("c1", "c5") + spark.disableHyperspace() + val baseQuery = query + val basePlan = baseQuery.queryExecution.optimizedPlan + + spark.enableHyperspace() + val queryWithHs = query + val planWithHs = queryWithHs.queryExecution.optimizedPlan + assert(!basePlan.equals(planWithHs)) + + val files = planWithHs.collect { + case LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) => + fsRelation.location.inputFiles + }.flatten + + // Check data files are replaced by index files. + assert(files.nonEmpty && files.forall(_.contains("index"))) + checkAnswer(baseQuery, queryWithHs) + } + } + } + + test("RefreshScan indexes doesn't include files not satisfying the scan pattern.") { + withTempPathAsString { testPath => + withIndex("index") { + val dataPath = new Path(PathUtils.makeAbsolute(testPath), "data").toString + + // Create index. + partition1.write.partitionBy("c5").parquet(dataPath) + val df = spark.read.parquet(dataPath) + val indexConfig = IndexConfig("index", Seq("c1"), Seq("c5")) + hs.createIndex(df, indexConfig) + + // Append 2 new partitions. + partition2.write.mode("append").partitionBy("c5").parquet(dataPath) + partition3.write.mode("append").partitionBy("c5").parquet(dataPath) + + // Refresh index with scan pattern. Note that only one new partition is being indexed. + hs.refreshIndex("index", "incremental", Some("data/c5=2000")) + + // Validate index contents. + var index = latestIndexLogEntry(systemPath, indexConfig.indexName) + var relation = index.relations.head + var indexedFiles = relation.data.properties.content.files + assert(relation.rootPaths.equals(Seq(dataPath))) + assert(indexedFiles.forall(path => + path.toString.contains("data/c5=1000") || path.toString.contains("data/c5=2000"))) + assert(indexedFiles.exists(_.toString.contains("data/c5=1000"))) + assert(indexedFiles.exists(_.toString.contains("data/c5=2000"))) + assert(!indexedFiles.exists(_.toString.contains("data/c5=3000"))) + + // Validate results. + val df2 = spark.read.parquet(dataPath) + + def query: DataFrame = df2.filter("c1 = '2017-09-03'").select("c1", "c5") + + { + // Hyperspace should not pick index because all data files are not indexed. + spark.disableHyperspace() + val baseQuery = query + val basePlan = baseQuery.queryExecution.optimizedPlan + + spark.enableHyperspace() + val queryWithHs = query + val planWithHs = queryWithHs.queryExecution.optimizedPlan + assert(basePlan.equals(planWithHs)) + } + + // Refresh index for missing partitions. + hs.refreshIndex("index", "incremental", Some("data/c5=3000")) + + // Validate index contents. + index = latestIndexLogEntry(systemPath, indexConfig.indexName) + relation = index.relations.head + indexedFiles = relation.data.properties.content.files + assert(relation.rootPaths.equals(Seq(dataPath))) + assert( + indexedFiles.forall( + path => + path.toString.contains("data/c5=1000") || + path.toString.contains("data/c5=2000") || + path.toString.contains("data/c5=3000"))) + assert(indexedFiles.exists(_.toString.contains("data/c5=1000"))) + assert(indexedFiles.exists(_.toString.contains("data/c5=2000"))) + assert(indexedFiles.exists(_.toString.contains("data/c5=3000"))) + + { + // Hyperspace should pick index because now all data files are indexed. + spark.disableHyperspace() + val baseQuery = query + val basePlan = baseQuery.queryExecution.optimizedPlan + + spark.enableHyperspace() + val queryWithHs = query + val planWithHs = queryWithHs.queryExecution.optimizedPlan + assert(!basePlan.equals(planWithHs)) + + val files = planWithHs.collect { + case LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) => + fsRelation.location.inputFiles + }.flatten + + // Check data files are replaced by index files. + assert(files.nonEmpty && files.forall(_.contains("index"))) + checkAnswer(baseQuery, queryWithHs) + } + } + } + } + + test("RefreshScan indexes eligible files when scan pattern with wildcards is provided.") { + withTempPathAsString { testPath => + withIndex("index") { + val dataPath = new Path(PathUtils.makeAbsolute(testPath), "data").toString + + // Create index. + partition1.write.partitionBy("c5").parquet(dataPath) + val df = spark.read.parquet(dataPath) + val indexConfig = IndexConfig("index", Seq("c1"), Seq("c5")) + hs.createIndex(df, indexConfig) + + // Append another partition. + partition2.write.mode("append").partitionBy("c5").parquet(dataPath) + + // Refresh index with scan pattern with wildcard. + hs.refreshIndex("index", "incremental", Some("data/c*=2*")) + + // Validate index contents. + val index = latestIndexLogEntry(systemPath, indexConfig.indexName) + val relation = index.relations.head + val indexedFiles = relation.data.properties.content.files + assert(relation.rootPaths.equals(Seq(dataPath))) + assert(indexedFiles.forall(path => + path.toString.contains("data/c5=1000") || path.toString.contains("data/c5=2000"))) + assert(indexedFiles.exists(_.toString.contains("data/c5=1000"))) + assert(indexedFiles.exists(_.toString.contains("data/c5=2000"))) + + // Validate results. + val df2 = spark.read.parquet(dataPath) + + def query: DataFrame = df2.filter("c1 = '2017-09-03'").select("c1", "c5") + + spark.disableHyperspace() + val baseQuery = query + val basePlan = baseQuery.queryExecution.optimizedPlan + + spark.enableHyperspace() + val queryWithHs = query + val planWithHs = queryWithHs.queryExecution.optimizedPlan + assert(!basePlan.equals(planWithHs)) + + val files = planWithHs.collect { + case LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) => + fsRelation.location.inputFiles + }.flatten + + // Check data files are replaced by index files. + assert(files.nonEmpty && files.forall(_.contains("index"))) + checkAnswer(baseQuery, queryWithHs) + } + } + } +}