diff --git a/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala b/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala index cdc786ed0..f1203ebf9 100644 --- a/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala +++ b/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala @@ -19,6 +19,7 @@ package com.microsoft.hyperspace import org.apache.spark.sql.{DataFrame, SparkSession} import com.microsoft.hyperspace.index._ +import com.microsoft.hyperspace.index.IndexConstants.REFRESH_MODE_FULL import com.microsoft.hyperspace.index.plananalysis.PlanAnalyzer class Hyperspace(spark: SparkSession) { @@ -74,7 +75,18 @@ class Hyperspace(spark: SparkSession) { * @param indexName Name of the index to refresh. */ def refreshIndex(indexName: String): Unit = { - indexManager.refresh(indexName) + refreshIndex(indexName, REFRESH_MODE_FULL) + } + + /** + * Update indexes for the latest version of the data. This api provides a few supported + * refresh modes as listed below. + * + * @param indexName Name of the index to refresh. + * @param mode Refresh mode. Currently supported modes are `incremental` and `full`. + */ + def refreshIndex(indexName: String, mode: String): Unit = { + indexManager.refresh(indexName, mode) } /** diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshAction.scala index 547fa0dfa..8b9af26fd 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshAction.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshAction.scala @@ -37,9 +37,10 @@ class RefreshAction( extends RefreshActionBase(spark, logManager, dataManager) { final override def op(): Unit = { - // TODO: The current implementation picks the number of buckets from session config. - // This should be user-configurable to allow maintain the existing bucket numbers - // in the index log entry. + // TODO: The current implementation picks the number of buckets and + // "spark.hyperspace.index.lineage.enabled" from session config. This should be + // user-configurable to allow maintain the existing bucket numbers in the index log entry. + // https://github.com/microsoft/hyperspace/issues/196. write(spark, df, indexConfig) } diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshAppendAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshAppendAction.scala index def6d9f08..133b48720 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshAppendAction.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshAppendAction.scala @@ -41,9 +41,10 @@ class RefreshAppendAction( dataManager: IndexDataManager) extends RefreshActionBase(spark, logManager, dataManager) { final override def op(): Unit = { - // TODO: The current implementation picks the number of buckets from session config. - // This should be user-configurable to allow maintain the existing bucket numbers - // in the index log entry. + // TODO: The current implementation picks the number of buckets and + // "spark.hyperspace.index.lineage.enabled" from session config. This should be + // user-configurable to allow maintain the existing bucket numbers in the index log entry. + // https://github.com/microsoft/hyperspace/issues/196. write(spark, dfWithAppendedFiles, indexConfig) } diff --git a/src/main/scala/com/microsoft/hyperspace/index/CachingIndexCollectionManager.scala b/src/main/scala/com/microsoft/hyperspace/index/CachingIndexCollectionManager.scala index dc5a47323..56fbc6c4a 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/CachingIndexCollectionManager.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/CachingIndexCollectionManager.scala @@ -92,9 +92,9 @@ class CachingIndexCollectionManager( super.vacuum(indexName) } - override def refresh(indexName: String): Unit = { + override def refresh(indexName: String, mode: String): Unit = { clearCache() - super.refresh(indexName) + super.refresh(indexName, mode) } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala index cc3903e21..b8fccdc52 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.internal.SQLConf import com.microsoft.hyperspace.HyperspaceException import com.microsoft.hyperspace.actions._ -import com.microsoft.hyperspace.util.HyperspaceConf +import com.microsoft.hyperspace.index.IndexConstants.{REFRESH_MODE_FULL, REFRESH_MODE_INCREMENTAL} class IndexCollectionManager( spark: SparkSession, @@ -63,16 +63,17 @@ class IndexCollectionManager( } } - override def refresh(indexName: String): Unit = { + override def refresh(indexName: String, mode: String): Unit = { withLogManager(indexName) { logManager => val indexPath = PathResolver(spark.sessionState.conf).getIndexPath(indexName) val dataManager = indexDataManagerFactory.create(indexPath) - if (HyperspaceConf.refreshDeleteEnabled(spark)) { + if (mode.equalsIgnoreCase(REFRESH_MODE_INCREMENTAL)) { new RefreshDeleteAction(spark, logManager, dataManager).run() - } else if (HyperspaceConf.refreshAppendEnabled(spark)) { new RefreshAppendAction(spark, logManager, dataManager).run() - } else { + } else if (mode.equalsIgnoreCase(REFRESH_MODE_FULL)) { new RefreshAction(spark, logManager, dataManager).run() + } else { + throw HyperspaceException(s"Unsupported refresh mode '$mode' found.") } } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala index 98b234a79..363a631f2 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala @@ -73,16 +73,6 @@ object IndexConstants { val INDEX_LINEAGE_ENABLED = "spark.hyperspace.index.lineage.enabled" val INDEX_LINEAGE_ENABLED_DEFAULT = "false" - val REFRESH_DELETE_ENABLED = "spark.hyperspace.index.refresh.delete.enabled" - val REFRESH_DELETE_ENABLED_DEFAULT = "false" - - /** - * This flag enables refreshing index if additional data files are appended to the source. When - * set to false, the refresh call will not run RefreshAppendAction. It will instead go for full - * refresh. - * This flag is temporary, and will be removed when both Append and Delete actions are merged - * for refreshing indexes. - */ - val REFRESH_APPEND_ENABLED = "spark.hyperspace.index.refresh.append.enabled" - val REFRESH_APPEND_ENABLED_DEFAULT = "false" + val REFRESH_MODE_INCREMENTAL = "incremental" + val REFRESH_MODE_FULL = "full" } diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexManager.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexManager.scala index 87d3c638c..993912513 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexManager.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexManager.scala @@ -65,7 +65,7 @@ trait IndexManager { * * @param indexName Name of the index to refresh. */ - def refresh(indexName: String): Unit + def refresh(indexName: String, mode: String): Unit /** * Cancel api to bring back index from an inconsistent state to the last known stable state. diff --git a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala index 655391b91..e84d51bac 100644 --- a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala +++ b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala @@ -39,18 +39,4 @@ object HyperspaceConf { IndexConstants.INDEX_HYBRID_SCAN_DELETE_ENABLED_DEFAULT) .toBoolean } - - def refreshDeleteEnabled(spark: SparkSession): Boolean = { - spark.conf - .get(IndexConstants.REFRESH_DELETE_ENABLED, - IndexConstants.REFRESH_DELETE_ENABLED_DEFAULT) - .toBoolean - } - - def refreshAppendEnabled(spark: SparkSession): Boolean = { - spark.conf - .get(IndexConstants.REFRESH_APPEND_ENABLED, - IndexConstants.REFRESH_APPEND_ENABLED_DEFAULT) - .toBoolean - } } diff --git a/src/test/scala/com/microsoft/hyperspace/TestUtils.scala b/src/test/scala/com/microsoft/hyperspace/TestUtils.scala index 78b8f6ede..96bce037a 100644 --- a/src/test/scala/com/microsoft/hyperspace/TestUtils.scala +++ b/src/test/scala/com/microsoft/hyperspace/TestUtils.scala @@ -16,11 +16,13 @@ package com.microsoft.hyperspace +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import com.microsoft.hyperspace.MockEventLogger.reset import com.microsoft.hyperspace.index.IndexLogEntry import com.microsoft.hyperspace.telemetry.{EventLogger, HyperspaceEvent} +import com.microsoft.hyperspace.util.FileUtils object TestUtils { def copyWithState(index: IndexLogEntry, state: String): IndexLogEntry = { @@ -46,6 +48,28 @@ object TestUtils { path.getName +: splitPath(path.getParent) } } + + /** + * Delete files from a given path. + * + * @param path Path to the folder containing files. + * @param pattern File name pattern to delete. + * @param numFilesToDelete Number of files to delete. + * @return Paths to the deleted file. + */ + def deleteFiles(path: String, pattern: String, numFilesToDelete: Int): Seq[Path] = { + val pathToDelete = new Path(path, pattern) + val fileNames = pathToDelete + .getFileSystem(new Configuration) + .globStatus(pathToDelete) + .map(_.getPath) + + assert(fileNames.length >= numFilesToDelete) + val filesToDelete = fileNames.take(numFilesToDelete) + filesToDelete.foreach(FileUtils.delete(_)) + + filesToDelete + } } /** diff --git a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTests.scala b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTests.scala index d804b8099..0af0dbd33 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTests.scala @@ -18,19 +18,19 @@ package com.microsoft.hyperspace.index import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.spark.sql.{AnalysisException, DataFrame, Row} -import org.apache.spark.sql.catalyst.plans.SQLHelper +import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.execution.SortExec import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, LogicalRelation} import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec -import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData} +import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData, TestUtils} +import com.microsoft.hyperspace.index.IndexConstants.REFRESH_MODE_INCREMENTAL import com.microsoft.hyperspace.index.execution.BucketUnionStrategy import com.microsoft.hyperspace.index.rules.{FilterIndexRule, JoinIndexRule} -import com.microsoft.hyperspace.util.{FileUtils, PathUtils} +import com.microsoft.hyperspace.util.PathUtils -class E2EHyperspaceRulesTests extends HyperspaceSuite with SQLHelper { +class E2EHyperspaceRulesTests extends QueryTest with HyperspaceSuite { private val testDir = "src/test/resources/e2eTests/" private val nonPartitionedDataPath = testDir + "sampleparquet" private val partitionedDataPath = testDir + "samplepartitionedparquet" @@ -409,9 +409,126 @@ class E2EHyperspaceRulesTests extends HyperspaceSuite with SQLHelper { sortedRowsWithHyperspaceDisabled.sameElements(getSortedRows(dfAfterHyperspaceDisabled))) } - test("Verify Join Index Rule utilizes indexes correctly after incremental refresh.") { + test("Verify JoinIndexRule utilizes indexes correctly after incremental refresh (append-only).") { withTempPathAsString { testPath => - withSQLConf(IndexConstants.REFRESH_APPEND_ENABLED -> "true") { + // Setup. Create data. + val indexConfig = IndexConfig("index", Seq("c2"), Seq("c4")) + import spark.implicits._ + SampleData.testData + .toDF("c1", "c2", "c3", "c4", "c5") + .limit(10) + .write + .parquet(testPath) + val df = spark.read.load(testPath) + + // Create index. + hyperspace.createIndex(df, indexConfig) + + // Append to original data. + SampleData.testData + .toDF("c1", "c2", "c3", "c4", "c5") + .limit(3) + .write + .mode("append") + .parquet(testPath) + + // Refresh index. + hyperspace.refreshIndex(indexConfig.indexName, REFRESH_MODE_INCREMENTAL) + + // Create a join query. + val leftDf = spark.read.parquet(testPath) + val rightDf = spark.read.parquet(testPath) + + def query(): DataFrame = { + leftDf + .join(rightDf, leftDf("c2") === rightDf("c2")) + .select(leftDf("c2"), rightDf("c4")) + } + + // Verify indexes are used, and all index files are picked. + verifyIndexUsage( + query, + getIndexFilesPath(indexConfig.indexName, Seq(0, 1)) ++ + getIndexFilesPath(indexConfig.indexName, Seq(0, 1))) + + // With Hyperspace disabled, verify there are shuffle and sort nodes as expected. + spark.disableHyperspace() + val dfWithHyperspaceDisabled = query() + var shuffleNodes = dfWithHyperspaceDisabled.queryExecution.executedPlan.collect { + case s: ShuffleExchangeExec => s + } + assert(shuffleNodes.size == 2) + var sortNodes = dfWithHyperspaceDisabled.queryExecution.executedPlan.collect { + case s: SortExec => s + } + assert(sortNodes.size == 2) + + // With Hyperspace enabled, verify bucketing works as expected. This is reflected in + // shuffle nodes being eliminated. + spark.enableHyperspace() + val dfWithHyperspaceEnabled = query() + shuffleNodes = dfWithHyperspaceEnabled.queryExecution.executedPlan.collect { + case s: ShuffleExchangeExec => s + } + assert(shuffleNodes.isEmpty) + + // SortExec is expected to be present because there are multiple files per bucket. + sortNodes = dfWithHyperspaceEnabled.queryExecution.executedPlan.collect { + case s: SortExec => s + } + assert(sortNodes.size == 2) + } + } + + test("Test for isHyperspaceEnabled().") { + assert(!spark.isHyperspaceEnabled(), "Hyperspace must be disabled by default.") + spark.enableHyperspace() + assert(spark.isHyperspaceEnabled()) + spark.disableHyperspace() + assert(!spark.isHyperspaceEnabled()) + } + + test("Validate index usage after incremental refresh with some source data file deleted.") { + withTempPathAsString { testPath => + withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") { + // Save a copy of source data files. + val dataColumns = Seq("c1", "c2", "c3", "c4", "c5") + SampleData.save(spark, testPath, dataColumns) + + // Create index on original source data files. + val df = spark.read.parquet(testPath) + val indexConfig = IndexConfig("filterIndex", Seq("c3"), Seq("c1")) + hyperspace.createIndex(df, indexConfig) + + // Verify index usage for index version (v=0). + def query(): DataFrame = + spark.read.parquet(testPath).filter("c3 == 'facebook'").select("c3", "c1") + + verifyIndexUsage(query, getIndexFilesPath(indexConfig.indexName)) + + // Delete some source data file. + TestUtils.deleteFiles(testPath, "*parquet", 1) + + // Verify index is not used. + spark.enableHyperspace() + val planRootPaths = getAllRootPaths(query().queryExecution.optimizedPlan) + spark.disableHyperspace() + assert(planRootPaths.equals(Seq(PathUtils.makeAbsolute(testPath)))) + + // Refresh the index to remove deleted source data file records from index. + hyperspace.refreshIndex(indexConfig.indexName, REFRESH_MODE_INCREMENTAL) + + // Verify index usage on latest version of index (v=1) after refresh. + verifyIndexUsage(query, getIndexFilesPath(indexConfig.indexName, Seq(1))) + } + } + } + + test( + "Verify JoinIndexRule utilizes indexes correctly after incremental refresh when some file " + + "gets deleted and some appended to source data.") { + withTempPathAsString { testPath => + withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") { // Setup. Create data. val indexConfig = IndexConfig("index", Seq("c2"), Seq("c4")) import spark.implicits._ @@ -425,6 +542,9 @@ class E2EHyperspaceRulesTests extends HyperspaceSuite with SQLHelper { // Create index. hyperspace.createIndex(df, indexConfig) + // Delete some source data file. + TestUtils.deleteFiles(testPath, "*parquet", 1) + // Append to original data. SampleData.testData .toDF("c1", "c2", "c3", "c4", "c5") @@ -434,7 +554,7 @@ class E2EHyperspaceRulesTests extends HyperspaceSuite with SQLHelper { .parquet(testPath) // Refresh index. - hyperspace.refreshIndex(indexConfig.indexName) + hyperspace.refreshIndex(indexConfig.indexName, REFRESH_MODE_INCREMENTAL) // Create a join query. val leftDf = spark.read.parquet(testPath) @@ -449,97 +569,19 @@ class E2EHyperspaceRulesTests extends HyperspaceSuite with SQLHelper { // Verify indexes are used, and all index files are picked. verifyIndexUsage( query, - getIndexFilesPath(indexConfig.indexName, Seq(0, 1)) ++ - getIndexFilesPath(indexConfig.indexName, Seq(0, 1))) + getIndexFilesPath(indexConfig.indexName, Seq(1, 2)) ++ + getIndexFilesPath(indexConfig.indexName, Seq(1, 2))) - // With Hyperspace disabled, verify there are shuffle and sort nodes as expected. + // Verify correctness of results. spark.disableHyperspace() val dfWithHyperspaceDisabled = query() - var shuffleNodes = dfWithHyperspaceDisabled.queryExecution.executedPlan.collect { - case s: ShuffleExchangeExec => s - } - assert(shuffleNodes.size == 2) - var sortNodes = dfWithHyperspaceDisabled.queryExecution.executedPlan.collect { - case s: SortExec => s - } - assert(sortNodes.size == 2) - - // With Hyperspace enabled, verify bucketing works as expected. This is reflected in - // shuffle nodes being eliminated. spark.enableHyperspace() val dfWithHyperspaceEnabled = query() - shuffleNodes = dfWithHyperspaceEnabled.queryExecution.executedPlan.collect { - case s: ShuffleExchangeExec => s - } - assert(shuffleNodes.isEmpty) - - // SortExec is expected to be present because there are multiple files per bucket. - sortNodes = dfWithHyperspaceEnabled.queryExecution.executedPlan.collect { - case s: SortExec => s - } - assert(sortNodes.size == 2) + checkAnswer(dfWithHyperspaceDisabled, dfWithHyperspaceEnabled) } } } - test("Test for isHyperspaceEnabled().") { - assert(!spark.isHyperspaceEnabled(), "Hyperspace must be disabled by default.") - spark.enableHyperspace() - assert(spark.isHyperspaceEnabled()) - spark.disableHyperspace() - assert(!spark.isHyperspaceEnabled()) - } - - test("Validate index usage after refresh with some source data file deleted.") { - withSQLConf( - IndexConstants.INDEX_LINEAGE_ENABLED -> "true", - IndexConstants.REFRESH_DELETE_ENABLED -> "true") { - - // Save a copy of source data files. - val location = testDir + "ixRefreshTest" - val dataPath = new Path(location, "*parquet") - val dataColumns = Seq("c1", "c2", "c3", "c4", "c5") - SampleData.save(spark, location, dataColumns) - - // Create index on original source data files. - val df = spark.read.parquet(location) - val indexConfig = IndexConfig("filterIndex", Seq("c3"), Seq("c1")) - hyperspace.createIndex(df, indexConfig) - - // Verify index usage for index version (v=0). - def query1(): DataFrame = - spark.read.parquet(location).filter("c3 == 'facebook'").select("c3", "c1") - - verifyIndexUsage(query1, getIndexFilesPath(indexConfig.indexName)) - - // Delete some source data file. - val dataFileNames = dataPath - .getFileSystem(new Configuration) - .globStatus(dataPath) - .map(_.getPath) - - assert(dataFileNames.nonEmpty) - val fileToDelete = dataFileNames.head - FileUtils.delete(fileToDelete) - - def query2(): DataFrame = - spark.read.parquet(location).filter("c3 == 'facebook'").select("c3", "c1") - - // Verify index is not used. - spark.enableHyperspace() - val planRootPaths = getAllRootPaths(query2().queryExecution.optimizedPlan) - spark.disableHyperspace() - assert(planRootPaths.equals(Seq(PathUtils.makeAbsolute(location)))) - - // Refresh the index to remove deleted source data file records from index. - hyperspace.refreshIndex(indexConfig.indexName) - - // Verify index usage on latest version of index (v=1) after refresh. - verifyIndexUsage(query2, getIndexFilesPath(indexConfig.indexName, Seq(1))) - FileUtils.delete(dataPath) - } - } - /** * Check that if the query plan has the expected rootPaths. * diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexCollectionManagerTest.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexCollectionManagerTest.scala index 8b86e658e..895f6ae16 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexCollectionManagerTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexCollectionManagerTest.scala @@ -23,6 +23,7 @@ import org.mockito.Mockito.{mock, when} import com.microsoft.hyperspace.{HyperspaceException, SparkInvolvedSuite} import com.microsoft.hyperspace.actions.Constants +import com.microsoft.hyperspace.index.IndexConstants.{REFRESH_MODE_FULL, REFRESH_MODE_INCREMENTAL} class IndexCollectionManagerTest extends SparkFunSuite with SparkInvolvedSuite { private val indexSystemPath = "src/test/resources/indexLocation" @@ -52,8 +53,7 @@ class IndexCollectionManagerTest extends SparkFunSuite with SparkInvolvedSuite { .Columns(Seq("RGUID"), Seq("Date")), "", 10)), - Content( - Directory(s"$indexPath/${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=0")), + Content(Directory(s"$indexPath/${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=0")), Source(SparkPlan(sourcePlanProperties)), Map()) entry.state = Constants.States.ACTIVE @@ -132,8 +132,14 @@ class IndexCollectionManagerTest extends SparkFunSuite with SparkInvolvedSuite { intercept[HyperspaceException](indexCollectionManager.restore("idx4")) } - test("refresh() throws exception if index is not found") { + test("refresh() with mode = 'full' throws exception if index is not found") { when(mockFileSystem.exists(new Path(indexSystemPath, "idx4"))).thenReturn(false) - intercept[HyperspaceException](indexCollectionManager.refresh("idx4")) + intercept[HyperspaceException](indexCollectionManager.refresh("idx4", REFRESH_MODE_FULL)) + } + + test("refresh() with mode = 'incremental' throws exception if index is not found") { + when(mockFileSystem.exists(new Path(indexSystemPath, "idx4"))).thenReturn(false) + intercept[HyperspaceException]( + indexCollectionManager.refresh("idx4", REFRESH_MODE_INCREMENTAL)) } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala index ec1d9298b..380fd6c35 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala @@ -23,10 +23,10 @@ import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRela import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} -import com.microsoft.hyperspace.{Hyperspace, HyperspaceException, MockEventLogger, SampleData} +import com.microsoft.hyperspace.{Hyperspace, HyperspaceException, SampleData} import com.microsoft.hyperspace.TestUtils.copyWithState import com.microsoft.hyperspace.actions.Constants -import com.microsoft.hyperspace.telemetry.RefreshAppendActionEvent +import com.microsoft.hyperspace.index.IndexConstants.{REFRESH_MODE_FULL, REFRESH_MODE_INCREMENTAL} import com.microsoft.hyperspace.util.{FileUtils, PathUtils} class IndexManagerTests extends HyperspaceSuite with SQLHelper { @@ -226,7 +226,7 @@ class IndexManagerTests extends HyperspaceSuite with SQLHelper { .options(option) .format(format) .save(refreshTestLocation) - hyperspace.refreshIndex(indexConfig.indexName) + hyperspace.refreshIndex(indexConfig.indexName, REFRESH_MODE_FULL) val newIndexLocation = s"$systemPath/index_$format" indexCount = spark.read .parquet(newIndexLocation + @@ -252,81 +252,45 @@ class IndexManagerTests extends HyperspaceSuite with SQLHelper { } } - test("Verify refresh-incremental (append-only) is a no-op if no new files found.") { + test("Verify incremental refresh (append-only) should index only newly appended data.") { withTempPathAsString { testPath => - withSQLConf(IndexConstants.REFRESH_APPEND_ENABLED -> "true") { - // Setup. Create sample data and index. - val indexConfig = IndexConfig(s"index", Seq("RGUID"), Seq("imprs")) - import spark.implicits._ - SampleData.testData - .toDF("Date", "RGUID", "Query", "imprs", "clicks") - .limit(10) - .write - .parquet(testPath) - val df = spark.read.parquet(testPath) - hyperspace.createIndex(df, indexConfig) - val indexPath = PathUtils.makeAbsolute(s"$systemPath/${indexConfig.indexName}") - val logManager = IndexLogManagerFactoryImpl.create(indexPath) - val latestId = logManager.getLatestId().get - - MockEventLogger.reset() - hyperspace.refreshIndex(indexConfig.indexName) - // Check that no new log files were created in this operation. - assert(latestId == logManager.getLatestId().get) - - // Check emitted events. - MockEventLogger.emittedEvents match { - case Seq( - RefreshAppendActionEvent(_, _, "Operation started."), - RefreshAppendActionEvent(_, _, msg)) => - assert(msg.contains("Refresh append aborted as no appended source data files found.")) - case _ => fail() - } - } - } - } - - test("Verify refresh-incremental (append-only) should index only newly appended data.") { - withTempPathAsString { testPath => - withSQLConf(IndexConstants.REFRESH_APPEND_ENABLED -> "true") { - // Setup. Create sample data and index. - val indexConfig = IndexConfig(s"index", Seq("RGUID"), Seq("imprs")) - import spark.implicits._ - SampleData.testData - .toDF("Date", "RGUID", "Query", "imprs", "clicks") - .limit(10) - .write - .parquet(testPath) - val df = spark.read.parquet(testPath) - hyperspace.createIndex(df, indexConfig) - var indexCount = - spark.read - .parquet(s"$systemPath/index" + - s"/${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=0") - .count() - assert(indexCount == 10) - // Check if latest log file is updated with newly created index files. - validateMetadata("index", Set("v__=0")) - - // Change original data. - SampleData.testData - .toDF("Date", "RGUID", "Query", "imprs", "clicks") - .limit(3) - .write - .mode("append") - .parquet(testPath) - hyperspace.refreshIndex(indexConfig.indexName) - indexCount = spark.read + // Setup. Create sample data and index. + val indexConfig = IndexConfig(s"index", Seq("RGUID"), Seq("imprs")) + import spark.implicits._ + SampleData.testData + .toDF("Date", "RGUID", "Query", "imprs", "clicks") + .limit(10) + .write + .parquet(testPath) + val df = spark.read.parquet(testPath) + hyperspace.createIndex(df, indexConfig) + var indexCount = + spark.read .parquet(s"$systemPath/index" + - s"/${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=1") + s"/${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=0") .count() - - // Check if index got updated. - assert(indexCount == 3) - - // Check if latest log file is updated with newly created index files. - validateMetadata("index", Set("v__=0", "v__=1")) - } + assert(indexCount == 10) + // Check if latest log file is updated with newly created index files. + validateMetadata("index", Set("v__=0")) + + // Change original data. + SampleData.testData + .toDF("Date", "RGUID", "Query", "imprs", "clicks") + .limit(3) + .write + .mode("append") + .parquet(testPath) + hyperspace.refreshIndex(indexConfig.indexName, REFRESH_MODE_INCREMENTAL) + indexCount = spark.read + .parquet(s"$systemPath/index" + + s"/${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=1") + .count() + + // Check if index got updated. + assert(indexCount == 3) + + // Check if latest log file is updated with newly created index files. + validateMetadata("index", Set("v__=0", "v__=1")) } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala index 074bb5ec4..5e7b8d88d 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala @@ -20,8 +20,10 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.sql.{AnalysisException, QueryTest} -import com.microsoft.hyperspace.{Hyperspace, HyperspaceException, MockEventLogger, SampleData} -import com.microsoft.hyperspace.telemetry.RefreshDeleteActionEvent +import com.microsoft.hyperspace.{Hyperspace, HyperspaceException, MockEventLogger, SampleData, TestUtils} +import com.microsoft.hyperspace.actions.{RefreshAppendAction, RefreshDeleteAction} +import com.microsoft.hyperspace.index.IndexConstants.REFRESH_MODE_INCREMENTAL +import com.microsoft.hyperspace.telemetry.{RefreshAppendActionEvent, RefreshDeleteActionEvent} import com.microsoft.hyperspace.util.{FileUtils, PathUtils} /** @@ -51,7 +53,7 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { FileUtils.delete(systemPath) } - test("Validate refresh index when some file gets deleted from the source data.") { + test("Validate incremental refresh index when some file gets deleted from the source data.") { // Save test data non-partitioned. SampleData.save( spark, @@ -68,9 +70,7 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { val partitionedDataDF = spark.read.parquet(partitionedDataPath) Seq(nonPartitionedDataPath, partitionedDataPath).foreach { loc => - withSQLConf( - IndexConstants.INDEX_LINEAGE_ENABLED -> "true", - IndexConstants.REFRESH_DELETE_ENABLED -> "true") { + withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") { withIndex(indexConfig.indexName) { val dfToIndex = if (loc.equals(nonPartitionedDataPath)) nonPartitionedDataDF else partitionedDataDF @@ -78,9 +78,9 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { // Delete one source data file. val deletedFile = if (loc.equals(nonPartitionedDataPath)) { - deleteDataFile(nonPartitionedDataPath) + deleteOneDataFile(nonPartitionedDataPath) } else { - deleteDataFile(partitionedDataPath, true) + deleteOneDataFile(partitionedDataPath, true) } // Validate only index records whose lineage is the deleted file are removed. @@ -89,7 +89,7 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { val originalIndexWithoutDeletedFile = originalIndexDF .filter(s"""${IndexConstants.DATA_FILE_NAME_COLUMN} != "$deletedFile"""") - hyperspace.refreshIndex(indexConfig.indexName) + hyperspace.refreshIndex(indexConfig.indexName, REFRESH_MODE_INCREMENTAL) val refreshedIndexDF = spark.read.parquet(s"$systemPath/${indexConfig.indexName}/" + s"${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=1") @@ -101,7 +101,7 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { } test( - "Validate refresh index (to handle deletes from the source data) " + + "Validate incremental refresh index (to handle deletes from the source data) " + "fails as expected on an index without lineage.") { SampleData.save( spark, @@ -109,14 +109,13 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { Seq("Date", "RGUID", "Query", "imprs", "clicks")) val nonPartitionedDataDF = spark.read.parquet(nonPartitionedDataPath) - withSQLConf( - IndexConstants.INDEX_LINEAGE_ENABLED -> "false", - IndexConstants.REFRESH_DELETE_ENABLED -> "true") { + withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "false") { hyperspace.createIndex(nonPartitionedDataDF, indexConfig) - deleteDataFile(nonPartitionedDataPath) + deleteOneDataFile(nonPartitionedDataPath) - val ex = intercept[HyperspaceException](hyperspace.refreshIndex(indexConfig.indexName)) + val ex = intercept[HyperspaceException]( + hyperspace.refreshIndex(indexConfig.indexName, REFRESH_MODE_INCREMENTAL)) assert( ex.getMessage.contains(s"Index refresh (to handle deleted source data) is " + "only supported on an index with lineage.")) @@ -124,23 +123,21 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { } test( - "Validate refresh index (to handle deletes from the source data) " + - "is a no-op if no source data file is deleted.") { + "Validate incremental refresh index is a no-op if no source data file is deleted or " + + "appended.") { SampleData.save( spark, nonPartitionedDataPath, Seq("Date", "RGUID", "Query", "imprs", "clicks")) val nonPartitionedDataDF = spark.read.parquet(nonPartitionedDataPath) - withSQLConf( - IndexConstants.INDEX_LINEAGE_ENABLED -> "true", - IndexConstants.REFRESH_DELETE_ENABLED -> "true") { + withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") { hyperspace.createIndex(nonPartitionedDataDF, indexConfig) val latestId = logManager(indexConfig.indexName).getLatestId().get MockEventLogger.reset() - hyperspace.refreshIndex(indexConfig.indexName) + hyperspace.refreshIndex(indexConfig.indexName, REFRESH_MODE_INCREMENTAL) // Check that no new log files were created in this operation. assert(latestId == logManager(indexConfig.indexName).getLatestId().get) @@ -148,20 +145,21 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { MockEventLogger.emittedEvents match { case Seq( RefreshDeleteActionEvent(_, _, "Operation started."), - RefreshDeleteActionEvent(_, _, msg)) => - assert(msg.contains("Refresh delete aborted as no deleted source data file found.")) + RefreshDeleteActionEvent(_, _, msg1), + RefreshAppendActionEvent(_, _, "Operation started."), + RefreshAppendActionEvent(_, _, msg2)) => + assert(msg1.contains("Refresh delete aborted as no deleted source data file found.")) + assert(msg2.contains("Refresh append aborted as no appended source data files found.")) case _ => fail() } } } test( - "Validate refresh index (to handle deletes from the source data) " + + "Validate incremental refresh index (to handle deletes from the source data) " + "fails as expected when all source data files are deleted.") { Seq(true, false).foreach { deleteDataFolder => - withSQLConf( - IndexConstants.INDEX_LINEAGE_ENABLED -> "true", - IndexConstants.REFRESH_DELETE_ENABLED -> "true") { + withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") { SampleData.save( spark, nonPartitionedDataPath, @@ -173,7 +171,8 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { if (deleteDataFolder) { FileUtils.delete(new Path(nonPartitionedDataPath)) - val ex = intercept[AnalysisException](hyperspace.refreshIndex(indexConfig.indexName)) + val ex = intercept[AnalysisException]( + hyperspace.refreshIndex(indexConfig.indexName, REFRESH_MODE_INCREMENTAL)) assert(ex.getMessage.contains("Path does not exist")) } else { @@ -184,7 +183,8 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { .foreach(p => FileUtils.delete(p.getPath)) val ex = - intercept[HyperspaceException](hyperspace.refreshIndex(indexConfig.indexName)) + intercept[HyperspaceException]( + hyperspace.refreshIndex(indexConfig.indexName, REFRESH_MODE_INCREMENTAL)) assert(ex.getMessage.contains("Invalid plan for creating an index.")) } FileUtils.delete(new Path(nonPartitionedDataPath)) @@ -194,7 +194,7 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { } test( - "Validate refresh index (to handle deletes from the source data) " + + "Validate incremental refresh index (to handle deletes from the source data) " + "fails as expected when file info for an existing source data file changes.") { SampleData.save( spark, @@ -202,19 +202,18 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { Seq("Date", "RGUID", "Query", "imprs", "clicks")) val nonPartitionedDataDF = spark.read.parquet(nonPartitionedDataPath) - withSQLConf( - IndexConstants.INDEX_LINEAGE_ENABLED -> "true", - IndexConstants.REFRESH_DELETE_ENABLED -> "true") { + withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") { hyperspace.createIndex(nonPartitionedDataDF, indexConfig) // Replace a source data file with a new file with same name but different properties. - val deletedFile = deleteDataFile(nonPartitionedDataPath) + val deletedFile = deleteOneDataFile(nonPartitionedDataPath) FileUtils.createFile( deletedFile.getFileSystem(new Configuration), deletedFile, "I am some random content :).") - val ex = intercept[HyperspaceException](hyperspace.refreshIndex(indexConfig.indexName)) + val ex = intercept[HyperspaceException]( + hyperspace.refreshIndex(indexConfig.indexName, REFRESH_MODE_INCREMENTAL)) assert( ex.getMessage.contains("Index refresh (to handle deleted source data) aborted. " + "Existing source data file info is changed")) @@ -222,22 +221,17 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { } test( - "Validate refresh delete action updates appended and deleted files in metadata as " + + "Validate RefreshDeleteAction updates appended and deleted files in metadata as " + "expected, when some file gets deleted and some appended to source data.") { withTempPathAsString { testPath => - withSQLConf( - IndexConstants.INDEX_LINEAGE_ENABLED -> "true", - IndexConstants.REFRESH_DELETE_ENABLED -> "true") { + withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") { withIndex(indexConfig.indexName) { - SampleData.save( - spark, - testPath, - Seq("Date", "RGUID", "Query", "imprs", "clicks")) + SampleData.save(spark, testPath, Seq("Date", "RGUID", "Query", "imprs", "clicks")) val df = spark.read.parquet(testPath) hyperspace.createIndex(df, indexConfig) // Delete one source data file. - deleteDataFile(testPath) + deleteOneDataFile(testPath) val oldFiles = listFiles(testPath).toSet @@ -250,7 +244,12 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { .mode("append") .parquet(testPath) - hyperspace.refreshIndex(indexConfig.indexName) + val indexPath = PathUtils.makeAbsolute(s"$systemPath/${indexConfig.indexName}") + new RefreshDeleteAction( + spark, + IndexLogManagerFactoryImpl.create(indexPath), + IndexDataManagerFactoryImpl.create(indexPath)) + .run() // Verify "deletedFiles" is cleared and "appendedFiles" is updated after refresh. val indexLogEntry = getLatestStableLog(indexConfig.indexName) @@ -265,19 +264,61 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { } test( - "Validate refresh append action updates appended and deleted files in metadata as" + + "Validate RefreshAppendAction updates appended and deleted files in metadata as" + "expected, when some file gets deleted and some appended to source data.") { withTempPathAsString { testPath => - withSQLConf(IndexConstants.REFRESH_APPEND_ENABLED -> "true") { + withIndex(indexConfig.indexName) { + SampleData.save(spark, testPath, Seq("Date", "RGUID", "Query", "imprs", "clicks")) + val df = spark.read.parquet(testPath) + hyperspace.createIndex(df, indexConfig) + + val oldFiles = listFiles(testPath).toSet + + // Delete one source data file. + deleteOneDataFile(testPath) + + // Add some new data to source. + import spark.implicits._ + SampleData.testData + .take(3) + .toDF("Date", "RGUID", "Query", "imprs", "clicks") + .write + .mode("append") + .parquet(testPath) + + val indexPath = PathUtils.makeAbsolute(s"$systemPath/${indexConfig.indexName}") + new RefreshAppendAction( + spark, + IndexLogManagerFactoryImpl.create(indexPath), + IndexDataManagerFactoryImpl.create(indexPath)) + .run() + + // Verify "appendedFiles" is cleared and "deletedFiles" is updated after refresh. + val indexLogEntry = getLatestStableLog(indexConfig.indexName) + assert(indexLogEntry.appendedFiles.isEmpty) + + val latestFiles = listFiles(testPath).toSet + assert(indexLogEntry.deletedFiles.toSet.equals(oldFiles -- latestFiles)) + } + } + } + + test( + "Validate incremental refresh index when some file gets deleted and some appended to " + + "source data.") { + withTempPathAsString { testPath => + withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") { withIndex(indexConfig.indexName) { + // Save test data non-partitioned. SampleData.save(spark, testPath, Seq("Date", "RGUID", "Query", "imprs", "clicks")) val df = spark.read.parquet(testPath) hyperspace.createIndex(df, indexConfig) - - val oldFiles = listFiles(testPath).toSet + val countOriginal = df.count() // Delete one source data file. - deleteDataFile(testPath) + deleteOneDataFile(testPath) + val countAfterDelete = spark.read.parquet(testPath).count() + assert(countAfterDelete < countOriginal) // Add some new data to source. import spark.implicits._ @@ -288,14 +329,19 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { .mode("append") .parquet(testPath) - hyperspace.refreshIndex(indexConfig.indexName) + val countAfterAppend = spark.read.parquet(testPath).count() + assert(countAfterDelete + 3 == countAfterAppend) - // Verify "appendedFiles" is cleared and "deletedFiles" is updated after refresh. - val indexLogEntry = getLatestStableLog(indexConfig.indexName) - assert(indexLogEntry.appendedFiles.isEmpty) + hyperspace.refreshIndex(indexConfig.indexName, REFRESH_MODE_INCREMENTAL) - val latestFiles = listFiles(testPath).toSet - assert(indexLogEntry.deletedFiles.toSet.equals(oldFiles -- latestFiles)) + // Check if refreshed index is updated appropriately. + val indexDf = spark.read + .parquet(s"$systemPath/${indexConfig.indexName}/" + + s"${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=1") + .union(spark.read.parquet(s"$systemPath/${indexConfig.indexName}/" + + s"${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=2")) + + assert(indexDf.count() == countAfterAppend) } } } @@ -308,23 +354,9 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { * @param isPartitioned Is data folder partitioned or not. * @return Path to the deleted file. */ - private def deleteDataFile(path: String, isPartitioned: Boolean = false): Path = { - val dataPath = if (isPartitioned) { - new Path(s"$path/*/*", "*parquet") - } else { - new Path(path, "*parquet") - } - - val dataFileNames = dataPath - .getFileSystem(new Configuration) - .globStatus(dataPath) - .map(_.getPath) - - assert(dataFileNames.nonEmpty) - val fileToDelete = dataFileNames.head - FileUtils.delete(fileToDelete) - - fileToDelete + private def deleteOneDataFile(path: String, isPartitioned: Boolean = false): Path = { + val dataPath = if (isPartitioned) s"$path/*/*" else path + TestUtils.deleteFiles(dataPath, "*parquet", 1).head } private def logManager(indexName: String): IndexLogManager = {