diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala index cf8562055..621e17d6f 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala @@ -63,7 +63,7 @@ private[actions] abstract class RefreshActionBase( IndexConfig(previousIndexLogEntry.name, ddColumns.indexed, ddColumns.included) } - final override def logEntry: LogEntry = getIndexLogEntry(spark, df, indexConfig, indexDataPath) + override def logEntry: LogEntry = getIndexLogEntry(spark, df, indexConfig, indexDataPath) final override val transientState: String = REFRESHING diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala index 9668fc8d3..8b06ef6c5 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala @@ -16,7 +16,6 @@ package com.microsoft.hyperspace.actions -import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ @@ -44,26 +43,16 @@ class RefreshDeleteAction( spark: SparkSession, logManager: IndexLogManager, dataManager: IndexDataManager) - extends RefreshActionBase(spark, logManager, dataManager) + extends RefreshDeleteActionBase(spark, logManager, dataManager) with Logging { final override protected def event(appInfo: AppInfo, message: String): HyperspaceEvent = { RefreshDeleteActionEvent(appInfo, logEntry.asInstanceOf[IndexLogEntry], message) } - /** - * Validate index has lineage column and it is in active state for refreshing and - * there are some deleted source data file(s). - */ - final override def validate(): Unit = { + override def validate(): Unit = { super.validate() - if (!previousIndexLogEntry.hasLineageColumn(spark)) { - throw HyperspaceException( - "Index refresh (to handle deleted source data) is " + - "only supported on an index with lineage.") - } - - if (deletedFiles.isEmpty) { + if (sourceFilesDiff._1.isEmpty) { throw HyperspaceException("Refresh aborted as no deleted source data file found.") } } @@ -76,12 +65,12 @@ class RefreshDeleteAction( final override def op(): Unit = { logInfo( "Refresh index is updating index by removing index entries " + - s"corresponding to ${deletedFiles.length} deleted source data files.") + s"corresponding to ${sourceFilesDiff._1.length} deleted source data files.") val refreshDF = spark.read .parquet(previousIndexLogEntry.content.files.map(_.toString): _*) - .filter(!col(s"${IndexConstants.DATA_FILE_NAME_COLUMN}").isin(deletedFiles: _*)) + .filter(!col(s"${IndexConstants.DATA_FILE_NAME_COLUMN}").isin(sourceFilesDiff._1: _*)) refreshDF.write.saveWithBuckets( refreshDF, @@ -89,37 +78,4 @@ class RefreshDeleteAction( previousIndexLogEntry.numBuckets, indexConfig.indexedColumns) } - - /** - * Compare list of source data files from previous IndexLogEntry to list - * of current source data files, validate fileInfo for existing files and - * identify deleted source data files. - */ - private lazy val deletedFiles: Seq[String] = { - val rels = previousIndexLogEntry.relations - val originalFiles = rels.head.data.properties.content.fileInfos - val currentFiles = rels.head.rootPaths - .flatMap { p => - Content - .fromDirectory(path = new Path(p), throwIfNotExists = true) - .fileInfos - } - .map(f => f.name -> f) - .toMap - - var delFiles = Seq[String]() - originalFiles.foreach { f => - currentFiles.get(f.name) match { - case Some(v) => - if (!f.equals(v)) { - throw HyperspaceException( - "Index refresh (to handle deleted source data) aborted. " + - s"Existing source data file info is changed (file: ${f.name}).") - } - case None => delFiles :+= f.name - } - } - - delFiles - } } diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteActionBase.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteActionBase.scala new file mode 100644 index 000000000..8a1de5991 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteActionBase.scala @@ -0,0 +1,75 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.actions + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.SparkSession + +import com.microsoft.hyperspace.HyperspaceException +import com.microsoft.hyperspace.index.{Content, IndexDataManager, IndexLogManager} + +private[actions] abstract class RefreshDeleteActionBase( + spark: SparkSession, + logManager: IndexLogManager, + dataManager: IndexDataManager) + extends RefreshActionBase(spark, logManager, dataManager) { + + /** + * Validate index has lineage column, and it is in active state for refreshing. + */ + override def validate(): Unit = { + super.validate() + if (!previousIndexLogEntry.hasLineageColumn(spark)) { + throw HyperspaceException( + "Index refresh to update source content in index metadata is " + + "only supported on an index with lineage.") + } + } + + /** + * Compare list of source data files from previous IndexLogEntry to list + * of current source data files, validate fileInfo for existing files and + * identify deleted and appended source data files. + */ + protected lazy val sourceFilesDiff: (Seq[String], Seq[String]) = { + val rels = previousIndexLogEntry.relations + val originalFiles = rels.head.data.properties.content.fileInfos + val currentFiles = rels.head.rootPaths + .flatMap { p => + Content + .fromDirectory(path = new Path(p), throwIfNotExists = true) + .fileInfos + } + .map(f => f.name -> f) + .toMap + + var delFiles = Seq[String]() + originalFiles.foreach { f => + currentFiles.get(f.name) match { + case Some(v) => + if (!f.equals(v)) { + throw HyperspaceException( + "Index refresh (to handle deleted or appended source data) aborted. " + + s"Existing source data file info is changed (file: ${f.name}).") + } + case None => delFiles :+= f.name + } + } + + (delFiles, (currentFiles.keySet diff originalFiles.map(_.name)).toSeq) + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshSourceContentAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshSourceContentAction.scala new file mode 100644 index 000000000..0488f82da --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshSourceContentAction.scala @@ -0,0 +1,114 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.actions + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession + +import com.microsoft.hyperspace.HyperspaceException +import com.microsoft.hyperspace.index._ +import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEvent, RefreshSourceContentActionEvent} + +/** + * Refresh index by updating list of deleted and appended source data files and + * index signature in index metadata. + * If some original source data file(s) are deleted or appended between previous + * version of index and now, this Action refreshes index as follows: + * 1. Deleted and appended source data files are identified. + * 2. New index fingerprint is computed w.r.t latest source data files. This captures + * both deleted source data files and appended ones. + * 3. IndexLogEntry is updated by modifying list of deleted and appended source data + * files and index fingerprint, computed in above steps. + * + * @param spark SparkSession. + * @param logManager Index LogManager for index being refreshed. + * @param dataManager Index DataManager for index being refreshed. + */ +class RefreshSourceContentAction( + spark: SparkSession, + logManager: IndexLogManager, + dataManager: IndexDataManager) + extends RefreshDeleteActionBase(spark, logManager, dataManager) + with Logging { + + private lazy val newDeletedFiles: Seq[String] = + sourceFilesDiff._1 diff previousIndexLogEntry.deletedFiles + + private lazy val newAppendedFiles: Seq[String] = + sourceFilesDiff._2 diff previousIndexLogEntry.appendedFiles + + final override protected def event(appInfo: AppInfo, message: String): HyperspaceEvent = { + RefreshSourceContentActionEvent(appInfo, logEntry.asInstanceOf[IndexLogEntry], message) + } + + override def validate(): Unit = { + super.validate() + if (newDeletedFiles.isEmpty && newAppendedFiles.isEmpty) { + throw HyperspaceException( + "Refresh aborted as no new deleted or appended source data file found.") + } + } + + final override def op(): Unit = { + logInfo( + "Refresh index is updating index metadata by adding " + + s"${newDeletedFiles.length} new files to the list of deleted source data files and " + + s"${newAppendedFiles.length} new files to the list of appended source data files.") + } + + /** + * Compute new index fingerprint using latest source data files and create + * new IndexLogEntry with updated list of deleted and appended source data + * files and new index fingerprint. + * + * @return Updated IndexLogEntry. + */ + final override def logEntry: LogEntry = { + // Compute index fingerprint using current source data file. + val signatureProvider = LogicalPlanSignatureProvider.create() + val newSignature = signatureProvider.signature(df.queryExecution.optimizedPlan) match { + case Some(s) => + LogicalPlanFingerprint( + LogicalPlanFingerprint.Properties(Seq(Signature(signatureProvider.name, s)))) + + case None => throw HyperspaceException("Invalid source plan found during index refresh.") + } + + // Grab nested structures from previous IndexLogEntry. + val source = previousIndexLogEntry.source + val plan = source.plan + val planProps = plan.properties + val relation = planProps.relations.head + val data = relation.data + val dataProps = data.properties + val deleted = dataProps.deleted + val appended = dataProps.appended + + // Create a new IndexLogEntry by updating deleted files and fingerprint. + previousIndexLogEntry.copy( + source = source.copy( + plan = plan.copy( + properties = planProps.copy( + fingerprint = newSignature, + relations = Seq( + relation.copy( + data = data.copy( + properties = dataProps.copy( + deleted = deleted ++ newDeletedFiles, + appended = appended ++ newAppendedFiles)))))))) + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala index 6d3dc6493..156c08cd6 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala @@ -69,6 +69,8 @@ class IndexCollectionManager( val dataManager = indexDataManagerFactory.create(indexPath) if (HyperspaceConf.refreshDeleteEnabled(spark)) { new RefreshDeleteAction(spark, logManager, dataManager).run() + } else if (HyperspaceConf.refreshSourceContentEnabled(spark)) { + new RefreshSourceContentAction(spark, logManager, dataManager).run() } else { new RefreshAction(spark, logManager, dataManager).run() } diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala index d1a163d49..3ddf2ee0e 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala @@ -57,4 +57,7 @@ object IndexConstants { val REFRESH_DELETE_ENABLED = "spark.hyperspace.index.refresh.delete.enabled" val REFRESH_DELETE_ENABLED_DEFAULT = "false" + + val REFRESH_SOURCE_CONTENT_ENABLED = "spark.hyperspace.index.refresh.source.content.enabled" + val REFRESH_SOURCE_CONTENT_ENABLED_DEFAULT = "false" } diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index 23a8e8980..962a8361a 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -325,7 +325,7 @@ case class Hdfs(properties: Hdfs.Properties) { val kind = "HDFS" } object Hdfs { - case class Properties(content: Content) + case class Properties(content: Content, deleted: Seq[String] = Nil, appended: Seq[String] = Nil) } // IndexLogEntry-specific Relation that represents the source relation. @@ -410,6 +410,14 @@ case class IndexLogEntry( sourcePlanSignatures.head } + def deletedFiles: Seq[String] = { + relations.head.data.properties.deleted + } + + def appendedFiles: Seq[String] = { + relations.head.data.properties.appended + } + def hasLineageColumn(spark: SparkSession): Boolean = { ResolverUtils .resolve(spark, IndexConstants.DATA_FILE_NAME_COLUMN, schema.fieldNames) diff --git a/src/main/scala/com/microsoft/hyperspace/telemetry/HyperspaceEvent.scala b/src/main/scala/com/microsoft/hyperspace/telemetry/HyperspaceEvent.scala index 386d50d9d..7c638d306 100644 --- a/src/main/scala/com/microsoft/hyperspace/telemetry/HyperspaceEvent.scala +++ b/src/main/scala/com/microsoft/hyperspace/telemetry/HyperspaceEvent.scala @@ -116,6 +116,17 @@ case class CancelActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: St case class RefreshDeleteActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: String) extends HyperspaceIndexCRUDEvent +/** + * Index Refresh Event for deleted and appended source files. Emitted when refresh is called + * on an index to update index metadata according to latest source data files. + * + * @param appInfo AppInfo for spark application. + * @param index Related index. + * @param message Message about event. + */ +case class RefreshSourceContentActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: String) + extends HyperspaceIndexCRUDEvent + /** * Index usage event. This event is emitted when an index is picked instead of original data * source by one of the hyperspace rules. diff --git a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala index f2b338332..c5b0ef82c 100644 --- a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala +++ b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala @@ -38,4 +38,12 @@ object HyperspaceConf { IndexConstants.REFRESH_DELETE_ENABLED_DEFAULT) .toBoolean } + + def refreshSourceContentEnabled(spark: SparkSession): Boolean = { + spark.conf + .get( + IndexConstants.REFRESH_SOURCE_CONTENT_ENABLED, + IndexConstants.REFRESH_SOURCE_CONTENT_ENABLED_DEFAULT) + .toBoolean + } } diff --git a/src/test/scala/com/microsoft/hyperspace/SampleData.scala b/src/test/scala/com/microsoft/hyperspace/SampleData.scala index 9b82e4561..0cb47387f 100644 --- a/src/test/scala/com/microsoft/hyperspace/SampleData.scala +++ b/src/test/scala/com/microsoft/hyperspace/SampleData.scala @@ -43,9 +43,9 @@ object SampleData { val df = testData.toDF(columns: _*) partitionColumns match { case Some(pcs) => - df.write.partitionBy(pcs: _*).parquet(path) + df.write.mode("overwrite").partitionBy(pcs: _*).parquet(path) case None => - df.write.parquet(path) + df.write.mode("overwrite").parquet(path) } } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTests.scala b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTests.scala index 4fbc7aa56..d97e7a4bf 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTests.scala @@ -419,49 +419,104 @@ class E2EHyperspaceRulesTests extends HyperspaceSuite with SQLHelper { withSQLConf( IndexConstants.INDEX_LINEAGE_ENABLED -> "true", IndexConstants.REFRESH_DELETE_ENABLED -> "true") { + withTempDir { inputDir => + // Save a copy of source data files. + val location = inputDir.toString + val dataPath = new Path(location, "*parquet") + val dataColumns = Seq("c1", "c2", "c3", "c4", "c5") + SampleData.save(spark, location, dataColumns) + + // Create index on original source data files. + val df = spark.read.parquet(location) + val indexConfig = IndexConfig("filterIndex", Seq("c3"), Seq("c1")) + hyperspace.createIndex(df, indexConfig) + + // Verify index usage for index version (v=0). + def query1(): DataFrame = + spark.read.parquet(location).filter("c3 == 'facebook'").select("c3", "c1") + + verifyIndexUsage(query1, getIndexFilesPath(indexConfig.indexName)) + + // Delete some source data file. + val dataFileNames = dataPath + .getFileSystem(new Configuration) + .globStatus(dataPath) + .map(_.getPath) + + assert(dataFileNames.nonEmpty) + val fileToDelete = dataFileNames.head + FileUtils.delete(fileToDelete) + + def query2(): DataFrame = + spark.read.parquet(location).filter("c3 == 'facebook'").select("c3", "c1") + + // Verify index is not used. + spark.enableHyperspace() + val planRootPaths = getAllRootPaths(query2().queryExecution.optimizedPlan) + spark.disableHyperspace() + assert(planRootPaths.equals(Seq(PathUtils.makeAbsolute(location)))) + + // Refresh the index to remove deleted source data file records from index. + hyperspace.refreshIndex(indexConfig.indexName) + + // Verify index usage on latest version of index (v=1) after refresh. + verifyIndexUsage(query2, getIndexFilesPath(indexConfig.indexName, 1)) + } + } + } - // Save a copy of source data files. - val location = testDir + "ixRefreshTest" - val dataPath = new Path(location, "*parquet") - val dataColumns = Seq("c1", "c2", "c3", "c4", "c5") - SampleData.save(spark, location, dataColumns) - - // Create index on original source data files. - val df = spark.read.parquet(location) - val indexConfig = IndexConfig("filterIndex", Seq("c3"), Seq("c1")) - hyperspace.createIndex(df, indexConfig) - - // Verify index usage for index version (v=0). - def query1(): DataFrame = - spark.read.parquet(location).filter("c3 == 'facebook'").select("c3", "c1") - - verifyIndexUsage(query1, getIndexFilesPath(indexConfig.indexName)) - - // Delete some source data file. - val dataFileNames = dataPath - .getFileSystem(new Configuration) - .globStatus(dataPath) - .map(_.getPath) - - assert(dataFileNames.nonEmpty) - val fileToDelete = dataFileNames.head - FileUtils.delete(fileToDelete) - - def query2(): DataFrame = - spark.read.parquet(location).filter("c3 == 'facebook'").select("c3", "c1") - - // Verify index is not used. - spark.enableHyperspace() - val planRootPaths = getAllRootPaths(query2().queryExecution.optimizedPlan) - spark.disableHyperspace() - assert(planRootPaths.equals(Seq(PathUtils.makeAbsolute(location)))) - - // Refresh the index to remove deleted source data file records from index. - hyperspace.refreshIndex(indexConfig.indexName) - - // Verify index usage on latest version of index (v=1) after refresh. - verifyIndexUsage(query2, getIndexFilesPath(indexConfig.indexName, 1)) - FileUtils.delete(dataPath) + test("Validate index usage for enforce delete on read after some source data file is deleted.") { + withSQLConf( + IndexConstants.INDEX_LINEAGE_ENABLED -> "true", + IndexConstants.REFRESH_SOURCE_CONTENT_ENABLED -> "true") { + withTempDir { inputDir => + // Save a copy of source data files. + val location = inputDir.toString + val dataPath = new Path(location, "*parquet") + val dataColumns = Seq("c1", "c2", "c3", "c4", "c5") + SampleData.save(spark, location, dataColumns) + + // Create index on original source data files. + val df = spark.read.parquet(location) + val indexConfig = IndexConfig("filterIndex", Seq("c3"), Seq("c1")) + hyperspace.createIndex(df, indexConfig) + + // Verify index usage. + def query1(): DataFrame = + spark.read.parquet(location).filter("c3 == 'facebook'").select("c3", "c1") + + verifyIndexUsage(query1, getIndexFilesPath(indexConfig.indexName)) + + // Delete one source data file. + val dataFileNames = dataPath + .getFileSystem(new Configuration) + .globStatus(dataPath) + .map(_.getPath) + + assert(dataFileNames.nonEmpty) + val fileToDelete = dataFileNames.head + FileUtils.delete(fileToDelete) + + def query2(): DataFrame = + spark.read.parquet(location).filter("c3 == 'facebook'").select("c3", "c1") + + // Verify index is not used after source data file is deleted. + spark.enableHyperspace() + val planRootPaths = getAllRootPaths(query2().queryExecution.optimizedPlan) + spark.disableHyperspace() + assert(planRootPaths.equals(Seq(PathUtils.makeAbsolute(location)))) + + // Refresh index to update its metadata. + hyperspace.refreshIndex(indexConfig.indexName) + + // Verify index is used after refresh fixes fingerprint in index metadata. + // TODO: Replace check with verifyIndexUsage() once delete on read covers query path. + spark.enableHyperspace() + assert( + queryPlanHasExpectedRootPaths( + query2().queryExecution.optimizedPlan, + getIndexFilesPath(indexConfig.indexName))) + } } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala index 85541c3e4..1deed4c20 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala @@ -125,7 +125,9 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter | "kind" : "NoOp", | "properties" : { } | } - | } + | }, + | "deleted" : ["file:/rootpath/f1"], + | "appended" : ["file:/rootpath/f3"] | }, | "kind" : "HDFS" | }, @@ -163,7 +165,9 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter Seq(Relation( Seq("rootpath"), Hdfs(Hdfs.Properties(Content( - Directory("", Seq(FileInfo("f1", 100L, 100L), FileInfo("f2", 200L, 200L)), Seq())))), + Directory("", Seq(FileInfo("f1", 100L, 100L), FileInfo("f2", 200L, 200L)), Seq())), + Seq("file:/rootpath/f1"), + Seq("file:/rootpath/f3"))), "schema", "type", Map())), diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala index 90bd8453a..7ae5c411d 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala @@ -118,7 +118,7 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { val ex = intercept[HyperspaceException](hyperspace.refreshIndex(indexConfig.indexName)) assert( - ex.getMessage.contains(s"Index refresh (to handle deleted source data) is " + + ex.getMessage.contains(s"Index refresh to update source content in index metadata is " + "only supported on an index with lineage.")) } } @@ -149,33 +149,32 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { withSQLConf( IndexConstants.INDEX_LINEAGE_ENABLED -> "true", IndexConstants.REFRESH_DELETE_ENABLED -> "true") { - SampleData.save( - spark, - nonPartitionedDataPath, - Seq("Date", "RGUID", "Query", "imprs", "clicks")) - val nonPartitionedDataDF = spark.read.parquet(nonPartitionedDataPath) + withTempDir { inputDir => + val inputDataPath = inputDir.toString + SampleData.save(spark, inputDataPath, Seq("Date", "RGUID", "Query", "imprs", "clicks")) + val nonPartitionedDataDF = spark.read.parquet(inputDataPath) - hyperspace.createIndex(nonPartitionedDataDF, indexConfig) - - if (deleteDataFolder) { - FileUtils.delete(new Path(nonPartitionedDataPath)) + hyperspace.createIndex(nonPartitionedDataDF, indexConfig) - val ex = intercept[AnalysisException](hyperspace.refreshIndex(indexConfig.indexName)) - assert(ex.getMessage.contains("Path does not exist")) + if (deleteDataFolder) { + FileUtils.delete(new Path(inputDataPath)) - } else { - val dataPath = new Path(nonPartitionedDataPath, "*parquet") - dataPath - .getFileSystem(new Configuration) - .globStatus(dataPath) - .foreach(p => FileUtils.delete(p.getPath)) + val ex = intercept[AnalysisException](hyperspace.refreshIndex(indexConfig.indexName)) + assert(ex.getMessage.contains("Path does not exist")) - val ex = - intercept[HyperspaceException](hyperspace.refreshIndex(indexConfig.indexName)) - assert(ex.getMessage.contains("Invalid plan for creating an index.")) + } else { + val dataPath = new Path(inputDataPath, "*parquet") + dataPath + .getFileSystem(new Configuration) + .globStatus(dataPath) + .foreach(p => FileUtils.delete(p.getPath)) + + val ex = + intercept[HyperspaceException](hyperspace.refreshIndex(indexConfig.indexName)) + assert(ex.getMessage.contains("Invalid plan for creating an index.")) + } + FileUtils.delete(systemPath) } - FileUtils.delete(new Path(nonPartitionedDataPath)) - FileUtils.delete(systemPath) } } } @@ -203,8 +202,186 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { val ex = intercept[HyperspaceException](hyperspace.refreshIndex(indexConfig.indexName)) assert( - ex.getMessage.contains("Index refresh (to handle deleted source data) aborted. " + - "Existing source data file info is changed")) + ex.getMessage.contains( + "Index refresh (to handle deleted or appended source data) aborted. " + + "Existing source data file info is changed")) + } + } + + test("Validate refresh IndexLogEntry for deleted and appended source data files.") { + // Save test data non-partitioned. + SampleData.save( + spark, + nonPartitionedDataPath, + Seq("Date", "RGUID", "Query", "imprs", "clicks")) + val nonPartitionedDataDF = spark.read.parquet(nonPartitionedDataPath) + + withSQLConf( + IndexConstants.INDEX_LINEAGE_ENABLED -> "true", + IndexConstants.REFRESH_SOURCE_CONTENT_ENABLED -> "true") { + withIndex(indexConfig.indexName) { + hyperspace.createIndex(nonPartitionedDataDF, indexConfig) + val ixManager = Hyperspace.getContext(spark).indexCollectionManager + val originalIndex = ixManager.getIndexes() + assert(originalIndex.length == 1) + assert(originalIndex.head.deletedFiles.isEmpty) + assert(originalIndex.head.appendedFiles.isEmpty) + + // Delete one source data file. + val deletedFile1 = deleteDataFile(nonPartitionedDataPath) + + // Append one new source data file. + val appendedFile1 = new Path(deletedFile1.getParent, "newFile1") + FileUtils.createFile( + appendedFile1.getFileSystem(new Configuration), + appendedFile1, + "I am some random content for a new file :).") + + // Refresh index and validate updated IndexLogEntry. + hyperspace.refreshIndex(indexConfig.indexName) + val refreshedIndex1 = ixManager.getIndexes() + assert(refreshedIndex1.length == 1) + assert(refreshedIndex1.head.deletedFiles.equals(Seq(deletedFile1.toString))) + assert(refreshedIndex1.head.appendedFiles.equals(Seq(appendedFile1.toString))) + + // Make sure index fingerprint is changed. + assert(!originalIndex.head.signature.equals(refreshedIndex1.head.signature)) + + // Delete another source data file. + val deletedFile2 = deleteDataFile(nonPartitionedDataPath) + + // Append another new source data file. + val appendedFile2 = new Path(deletedFile2.getParent, "newFile2") + FileUtils.createFile( + appendedFile2.getFileSystem(new Configuration), + appendedFile2, + "I am some random content for yet another new file :).") + + // Refresh index and validate updated IndexLogEntry. + // `deleted` files should contain both of the deleted source data files. + // `appended` files should contain both of the appended source data files. + hyperspace.refreshIndex(indexConfig.indexName) + val refreshedIndex2 = ixManager.getIndexes() + assert(refreshedIndex2.length == 1) + assert( + refreshedIndex2.head.deletedFiles + .equals(Seq(deletedFile1.toString, deletedFile2.toString))) + assert( + refreshedIndex2.head.appendedFiles + .equals(Seq(appendedFile1.toString, appendedFile2.toString))) + + // Make sure index fingerprint is changed. + assert(!originalIndex.head.signature.equals(refreshedIndex2.head.signature)) + assert(!refreshedIndex1.head.signature.equals(refreshedIndex2.head.signature)) + } + } + } + + test( + "Validate refresh IndexLogEntry for deleted and appended source data files " + + "does not add duplicate deleted or appended files.") { + // Save test data non-partitioned. + SampleData.save( + spark, + nonPartitionedDataPath, + Seq("Date", "RGUID", "Query", "imprs", "clicks")) + val nonPartitionedDataDF = spark.read.parquet(nonPartitionedDataPath) + + withSQLConf( + IndexConstants.INDEX_LINEAGE_ENABLED -> "true", + IndexConstants.REFRESH_SOURCE_CONTENT_ENABLED -> "true") { + withIndex(indexConfig.indexName) { + hyperspace.createIndex(nonPartitionedDataDF, indexConfig) + val ixManager = Hyperspace.getContext(spark).indexCollectionManager + val originalIndex = ixManager.getIndexes() + assert(originalIndex.length == 1) + assert(originalIndex.head.deletedFiles.isEmpty) + assert(originalIndex.head.appendedFiles.isEmpty) + + // Delete one source data file. + val deletedFile = deleteDataFile(nonPartitionedDataPath) + + // Append one new source data file. + val appendedFile = new Path(deletedFile.getParent, "newFile") + FileUtils.createFile( + appendedFile.getFileSystem(new Configuration), + appendedFile, + "I am some random content for a new file :).") + + // Refresh index and validate updated IndexLogEntry. + hyperspace.refreshIndex(indexConfig.indexName) + val refreshedIndex = ixManager.getIndexes() + assert(refreshedIndex.head.deletedFiles.equals(Seq(deletedFile.toString))) + + // Refresh index again and validate it fails as expected. + val ex = intercept[HyperspaceException](hyperspace.refreshIndex(indexConfig.indexName)) + assert( + ex.getMessage.contains( + "Refresh aborted as no new deleted or appended source data file found.")) + } + } + } + + test( + "Validate refresh index (to handle deletes from the source data) " + + "updates index files correctly when called on an index with deleted files.") { + // Save test data non-partitioned. + SampleData.save( + spark, + nonPartitionedDataPath, + Seq("Date", "RGUID", "Query", "imprs", "clicks")) + val nonPartitionedDataDF = spark.read.parquet(nonPartitionedDataPath) + + withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") { + withIndex(indexConfig.indexName) { + spark.conf.set(IndexConstants.REFRESH_SOURCE_CONTENT_ENABLED, "true") + hyperspace.createIndex(nonPartitionedDataDF, indexConfig) + val ixManager = Hyperspace.getContext(spark).indexCollectionManager + val originalIndex = ixManager.getIndexes() + assert(originalIndex.length == 1) + assert(originalIndex.head.deletedFiles.isEmpty) + + // Delete one source data file. + val deletedFile = deleteDataFile(nonPartitionedDataPath) + + val originalIndexDF = spark.read.parquet( + s"$systemPath/${indexConfig.indexName}/" + + s"${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=0") + val originalIndexWithoutDeletedFile = originalIndexDF + .filter(s"""${IndexConstants.DATA_FILE_NAME_COLUMN} != "$deletedFile"""") + + // Refresh index and validate updated IndexLogEntry. + hyperspace.refreshIndex(indexConfig.indexName) + val refreshedIndex1 = ixManager.getIndexes() + assert(refreshedIndex1.length == 1) + assert(refreshedIndex1.head.deletedFiles.equals(Seq(deletedFile.toString))) + + // Change refresh configurations and refresh index to update index files. + spark.conf.set(IndexConstants.REFRESH_SOURCE_CONTENT_ENABLED, "false") + spark.conf.set(IndexConstants.REFRESH_DELETE_ENABLED, "true") + hyperspace.refreshIndex(indexConfig.indexName) + val refreshedIndex2 = ixManager.getIndexes() + assert(refreshedIndex2.length == 1) + + // Now that index entries for deleted source data files are removed + // from index files, `deleted` files list should be reset and empty. + assert(refreshedIndex2.head.deletedFiles.isEmpty) + + // Verify index signature in latest version is different from + // original version (before any source data file deletion). + assert(!refreshedIndex2.head.signature.equals(originalIndex.head.signature)) + + // Verify index signature in latest version is the same as previous version + // (created by refresh for enforce delete on read), as they both have same + // set of source data files. + assert(refreshedIndex2.head.signature.equals(refreshedIndex1.head.signature)) + + // Make sure only index entries from deleted files are removed from index files. + val refreshedIndexDF = spark.read.parquet( + s"$systemPath/${indexConfig.indexName}/" + + s"${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=1") + checkAnswer(originalIndexWithoutDeletedFile, refreshedIndexDF) + } } }