From 55f146b08e22db43373025376ab506906078be52 Mon Sep 17 00:00:00 2001 From: Pouria Pirzadeh Date: Fri, 18 Sep 2020 19:25:58 -0700 Subject: [PATCH 01/13] Metadata changes for enforce delete on read --- .../actions/DeleteOnReadAction.scala | 61 ++++++++++++++ .../actions/RefreshActionBase.scala | 2 +- .../actions/RefreshDeleteAction.scala | 54 +------------ .../actions/RefreshDeleteActionBase.scala | 80 +++++++++++++++++++ .../index/IndexCollectionManager.scala | 2 + .../hyperspace/index/IndexConstants.scala | 3 + .../hyperspace/index/IndexLogEntry.scala | 2 +- .../telemetry/HyperspaceEvent.scala | 3 + .../hyperspace/util/HyperspaceConf.scala | 8 ++ .../hyperspace/index/RefreshIndexTests.scala | 35 ++++++++ 10 files changed, 195 insertions(+), 55 deletions(-) create mode 100644 src/main/scala/com/microsoft/hyperspace/actions/DeleteOnReadAction.scala create mode 100644 src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteActionBase.scala diff --git a/src/main/scala/com/microsoft/hyperspace/actions/DeleteOnReadAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/DeleteOnReadAction.scala new file mode 100644 index 000000000..9b98f67cb --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/actions/DeleteOnReadAction.scala @@ -0,0 +1,61 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.actions + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession + +import com.microsoft.hyperspace.index._ +import com.microsoft.hyperspace.telemetry.{AppInfo, DeleteOnReadActionEvent, HyperspaceEvent} + +class DeleteOnReadAction( + spark: SparkSession, + logManager: IndexLogManager, + dataManager: IndexDataManager) + extends RefreshDeleteActionBase(spark, logManager, dataManager) with Logging { + + final override protected def event(appInfo: AppInfo, message: String): HyperspaceEvent = { + DeleteOnReadActionEvent(appInfo, logEntry.asInstanceOf[IndexLogEntry], message) + } + + final override def op(): Unit = { + logInfo( + "Refresh index is updating index metadata by adding " + + s"${deletedFiles.length} deleted files to list of excluded source data files.") + } + + final override def logEntry: LogEntry = { + // Grab nested structures from previous IndexLogEntry. + val source = previousIndexLogEntry.source + val plan = source.plan + val planProps = plan.properties + val relation = planProps.relations.head + val data = relation.data + val dataProps = data.properties + val excluded = dataProps.excluded + + // Instantiate a new IndexLogEntry by appending deleted files to list of excluded files. + previousIndexLogEntry.copy( + source = source.copy( + plan = plan.copy( + properties = planProps.copy( + relations = Seq( + relation.copy( + data = data.copy( + properties = dataProps.copy(excluded = excluded ++ deletedFiles)))))))) + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala index cf8562055..621e17d6f 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala @@ -63,7 +63,7 @@ private[actions] abstract class RefreshActionBase( IndexConfig(previousIndexLogEntry.name, ddColumns.indexed, ddColumns.included) } - final override def logEntry: LogEntry = getIndexLogEntry(spark, df, indexConfig, indexDataPath) + override def logEntry: LogEntry = getIndexLogEntry(spark, df, indexConfig, indexDataPath) final override val transientState: String = REFRESHING diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala index 9668fc8d3..c85fb96c4 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala @@ -16,12 +16,10 @@ package com.microsoft.hyperspace.actions -import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ -import com.microsoft.hyperspace.HyperspaceException import com.microsoft.hyperspace.index._ import com.microsoft.hyperspace.index.DataFrameWriterExtensions.Bucketizer import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEvent, RefreshDeleteActionEvent} @@ -44,30 +42,13 @@ class RefreshDeleteAction( spark: SparkSession, logManager: IndexLogManager, dataManager: IndexDataManager) - extends RefreshActionBase(spark, logManager, dataManager) + extends RefreshDeleteActionBase(spark, logManager, dataManager) with Logging { final override protected def event(appInfo: AppInfo, message: String): HyperspaceEvent = { RefreshDeleteActionEvent(appInfo, logEntry.asInstanceOf[IndexLogEntry], message) } - /** - * Validate index has lineage column and it is in active state for refreshing and - * there are some deleted source data file(s). - */ - final override def validate(): Unit = { - super.validate() - if (!previousIndexLogEntry.hasLineageColumn(spark)) { - throw HyperspaceException( - "Index refresh (to handle deleted source data) is " + - "only supported on an index with lineage.") - } - - if (deletedFiles.isEmpty) { - throw HyperspaceException("Refresh aborted as no deleted source data file found.") - } - } - /** * For an index with lineage, find all the source data files which have been deleted, * and use index records' lineage to mark and remove index entries which belong to @@ -89,37 +70,4 @@ class RefreshDeleteAction( previousIndexLogEntry.numBuckets, indexConfig.indexedColumns) } - - /** - * Compare list of source data files from previous IndexLogEntry to list - * of current source data files, validate fileInfo for existing files and - * identify deleted source data files. - */ - private lazy val deletedFiles: Seq[String] = { - val rels = previousIndexLogEntry.relations - val originalFiles = rels.head.data.properties.content.fileInfos - val currentFiles = rels.head.rootPaths - .flatMap { p => - Content - .fromDirectory(path = new Path(p), throwIfNotExists = true) - .fileInfos - } - .map(f => f.name -> f) - .toMap - - var delFiles = Seq[String]() - originalFiles.foreach { f => - currentFiles.get(f.name) match { - case Some(v) => - if (!f.equals(v)) { - throw HyperspaceException( - "Index refresh (to handle deleted source data) aborted. " + - s"Existing source data file info is changed (file: ${f.name}).") - } - case None => delFiles :+= f.name - } - } - - delFiles - } } diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteActionBase.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteActionBase.scala new file mode 100644 index 000000000..cf8941d08 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteActionBase.scala @@ -0,0 +1,80 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.actions + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.SparkSession + +import com.microsoft.hyperspace.HyperspaceException +import com.microsoft.hyperspace.index.{Content, IndexDataManager, IndexLogManager} + +private[actions] abstract class RefreshDeleteActionBase( + spark: SparkSession, + logManager: IndexLogManager, + dataManager: IndexDataManager) + extends RefreshActionBase(spark, logManager, dataManager) { + + /** + * Validate index has lineage column and it is in active state for refreshing and + * there are some deleted source data file(s). + */ + override def validate(): Unit = { + super.validate() + if (!previousIndexLogEntry.hasLineageColumn(spark)) { + throw HyperspaceException( + "Index refresh (to handle deleted source data) is " + + "only supported on an index with lineage.") + } + + if (deletedFiles.isEmpty) { + throw HyperspaceException("Refresh aborted as no deleted source data file found.") + } + } + + /** + * Compare list of source data files from previous IndexLogEntry to list + * of current source data files, validate fileInfo for existing files and + * identify deleted source data files. + */ + protected lazy val deletedFiles: Seq[String] = { + val rels = previousIndexLogEntry.relations + val originalFiles = rels.head.data.properties.content.fileInfos + val currentFiles = rels.head.rootPaths + .flatMap { p => + Content + .fromDirectory(path = new Path(p), throwIfNotExists = true) + .fileInfos + } + .map(f => f.name -> f) + .toMap + + var delFiles = Seq[String]() + originalFiles.foreach { f => + currentFiles.get(f.name) match { + case Some(v) => + if (!f.equals(v)) { + throw HyperspaceException( + "Index refresh (to handle deleted source data) aborted. " + + s"Existing source data file info is changed (file: ${f.name}).") + } + case None => delFiles :+= f.name + } + } + + delFiles + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala index 6d3dc6493..79da726f5 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala @@ -69,6 +69,8 @@ class IndexCollectionManager( val dataManager = indexDataManagerFactory.create(indexPath) if (HyperspaceConf.refreshDeleteEnabled(spark)) { new RefreshDeleteAction(spark, logManager, dataManager).run() + } else if (HyperspaceConf.enforceDeleteOnReadEnabled(spark)) { + new DeleteOnReadAction(spark, logManager, dataManager).run() } else { new RefreshAction(spark, logManager, dataManager).run() } diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala index d1a163d49..557dd223a 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala @@ -57,4 +57,7 @@ object IndexConstants { val REFRESH_DELETE_ENABLED = "spark.hyperspace.index.refresh.delete.enabled" val REFRESH_DELETE_ENABLED_DEFAULT = "false" + + val ENFORCE_DELETE_ON_READ_ENABLED = "spark.hyperspace.index.enforce.delete.on.read.enabled" + val ENFORCE_DELETE_ON_READ_ENABLED_DEFAULT = "false" } diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index e99b10de1..8ceb55799 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -324,7 +324,7 @@ case class Hdfs(properties: Hdfs.Properties) { val kind = "HDFS" } object Hdfs { - case class Properties(content: Content) + case class Properties(content: Content, excluded: Seq[String] = Seq[String]()) } // IndexLogEntry-specific Relation that represents the source relation. diff --git a/src/main/scala/com/microsoft/hyperspace/telemetry/HyperspaceEvent.scala b/src/main/scala/com/microsoft/hyperspace/telemetry/HyperspaceEvent.scala index 386d50d9d..fd79265e4 100644 --- a/src/main/scala/com/microsoft/hyperspace/telemetry/HyperspaceEvent.scala +++ b/src/main/scala/com/microsoft/hyperspace/telemetry/HyperspaceEvent.scala @@ -116,6 +116,9 @@ case class CancelActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: St case class RefreshDeleteActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: String) extends HyperspaceIndexCRUDEvent +case class DeleteOnReadActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: String) + extends HyperspaceIndexCRUDEvent + /** * Index usage event. This event is emitted when an index is picked instead of original data * source by one of the hyperspace rules. diff --git a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala index f2b338332..dd6d5e6a3 100644 --- a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala +++ b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala @@ -38,4 +38,12 @@ object HyperspaceConf { IndexConstants.REFRESH_DELETE_ENABLED_DEFAULT) .toBoolean } + + def enforceDeleteOnReadEnabled(spark: SparkSession): Boolean = { + spark.conf + .get( + IndexConstants.ENFORCE_DELETE_ON_READ_ENABLED, + IndexConstants.ENFORCE_DELETE_ON_READ_ENABLED_DEFAULT) + .toBoolean + } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala index 90bd8453a..628a4a800 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala @@ -208,6 +208,41 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { } } + test( + "Validate refresh index (to handle deletes from the source data) " + + "for enforce delete on read.") { + // Save test data non-partitioned. + SampleData.save( + spark, + nonPartitionedDataPath, + Seq("Date", "RGUID", "Query", "imprs", "clicks")) + val nonPartitionedDataDF = spark.read.parquet(nonPartitionedDataPath) + + withSQLConf( + IndexConstants.INDEX_LINEAGE_ENABLED -> "true", + IndexConstants.ENFORCE_DELETE_ON_READ_ENABLED -> "true") { + withIndex(indexConfig.indexName) { + hyperspace.createIndex(nonPartitionedDataDF, indexConfig) + val ixManager = Hyperspace.getContext(spark).indexCollectionManager + val originalIndex = ixManager.getIndexes() + assert(originalIndex.length == 1) + assert( + originalIndex.head.source.plan.properties.relations.head.data.properties.excluded.isEmpty) + + // Delete one source data file. + val deletedFile = deleteDataFile(nonPartitionedDataPath) + + hyperspace.refreshIndex(indexConfig.indexName) + val refreshedIndex = ixManager.getIndexes() + assert(refreshedIndex.length == 1) + assert( + refreshedIndex.head.source.plan.properties.relations.head.data.properties.excluded + .equals(Seq(deletedFile.toString))) + + } + } + } + /** * Delete one file from a given path. * From e9e6e3359d8f989117bd43fcff5f904a6c35488e Mon Sep 17 00:00:00 2001 From: Pouria Pirzadeh Date: Sat, 19 Sep 2020 12:12:09 -0700 Subject: [PATCH 02/13] fix IndexLogEntry test --- .../com/microsoft/hyperspace/index/IndexLogEntryTest.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala index 85541c3e4..24c9dd003 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala @@ -125,7 +125,8 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter | "kind" : "NoOp", | "properties" : { } | } - | } + | }, + | "excluded" : [] | }, | "kind" : "HDFS" | }, From 777b57eca80c2d871b15ccdac12e822e289a0bf4 Mon Sep 17 00:00:00 2001 From: Pouria Pirzadeh Date: Sat, 19 Sep 2020 13:06:29 -0700 Subject: [PATCH 03/13] add signature update to enforce delete on read --- .../actions/DeleteOnReadAction.scala | 12 ++++ .../index/E2EHyperspaceRulesTests.scala | 57 +++++++++++++++++++ .../hyperspace/index/RefreshIndexTests.scala | 4 ++ 3 files changed, 73 insertions(+) diff --git a/src/main/scala/com/microsoft/hyperspace/actions/DeleteOnReadAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/DeleteOnReadAction.scala index 9b98f67cb..ea1e98c48 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/DeleteOnReadAction.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/DeleteOnReadAction.scala @@ -19,6 +19,7 @@ package com.microsoft.hyperspace.actions import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession +import com.microsoft.hyperspace.HyperspaceException import com.microsoft.hyperspace.index._ import com.microsoft.hyperspace.telemetry.{AppInfo, DeleteOnReadActionEvent, HyperspaceEvent} @@ -39,6 +40,16 @@ class DeleteOnReadAction( } final override def logEntry: LogEntry = { + // Compute index fingerprint using current source data file. + val signatureProvider = LogicalPlanSignatureProvider.create() + val newSignature = signatureProvider.signature(df.queryExecution.optimizedPlan) match { + case Some(s) => + LogicalPlanFingerprint( + LogicalPlanFingerprint.Properties(Seq(Signature(signatureProvider.name, s)))) + + case None => throw HyperspaceException("Invalid source plan found during index refresh.") + } + // Grab nested structures from previous IndexLogEntry. val source = previousIndexLogEntry.source val plan = source.plan @@ -53,6 +64,7 @@ class DeleteOnReadAction( source = source.copy( plan = plan.copy( properties = planProps.copy( + fingerprint = newSignature, relations = Seq( relation.copy( data = data.copy( diff --git a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTests.scala b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTests.scala index 944cf3b42..90f7e8576 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTests.scala @@ -454,6 +454,63 @@ class E2EHyperspaceRulesTests extends HyperspaceSuite with SQLHelper { } } + test( + "Validate index usage for enforce delete on read " + + "after some source data file is deleted.") { + withSQLConf( + IndexConstants.INDEX_LINEAGE_ENABLED -> "true", + IndexConstants.ENFORCE_DELETE_ON_READ_ENABLED -> "true") { + + // Save a copy of source data files. + val location = testDir + "ixRefreshTest" + val dataPath = new Path(location, "*parquet") + val dataColumns = Seq("c1", "c2", "c3", "c4", "c5") + SampleData.save(spark, location, dataColumns) + + // Create index on original source data files. + val df = spark.read.parquet(location) + val indexConfig = IndexConfig("filterIndex", Seq("c3"), Seq("c1")) + hyperspace.createIndex(df, indexConfig) + + // Verify index usage. + def query1(): DataFrame = + spark.read.parquet(location).filter("c3 == 'facebook'").select("c3", "c1") + + verifyIndexUsage(query1, getIndexFilesPath(indexConfig.indexName)) + + // Delete one source data file. + val dataFileNames = dataPath + .getFileSystem(new Configuration) + .globStatus(dataPath) + .map(_.getPath) + + assert(dataFileNames.nonEmpty) + val fileToDelete = dataFileNames.head + FileUtils.delete(fileToDelete) + + def query2(): DataFrame = + spark.read.parquet(location).filter("c3 == 'facebook'").select("c3", "c1") + + // Verify index is not used after source data file is deleted. + spark.enableHyperspace() + val planRootPaths = getAllRootPaths(query2().queryExecution.optimizedPlan) + spark.disableHyperspace() + assert(planRootPaths.equals(Seq(PathUtils.makeAbsolute(location)))) + + // Refresh index to update its metadata. + hyperspace.refreshIndex(indexConfig.indexName) + + // Verify index is used after refresh fixes fingerprint in index metadata. + // TODO Replace check with verifyIndexUsage() once EnforceDeleteOnRead covers query path. + spark.enableHyperspace() + assert( + queryPlanHasExpectedRootPaths( + query2().queryExecution.optimizedPlan, + getIndexFilesPath(indexConfig.indexName))) + FileUtils.delete(dataPath) + } + } + /** * Check that if the query plan has the expected rootPaths. * diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala index 628a4a800..b3608ffeb 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala @@ -232,6 +232,7 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { // Delete one source data file. val deletedFile = deleteDataFile(nonPartitionedDataPath) + // Refresh index and validate updated IndexLogEntry. hyperspace.refreshIndex(indexConfig.indexName) val refreshedIndex = ixManager.getIndexes() assert(refreshedIndex.length == 1) @@ -239,6 +240,9 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { refreshedIndex.head.source.plan.properties.relations.head.data.properties.excluded .equals(Seq(deletedFile.toString))) + assert( + !originalIndex.head.source.plan.properties.fingerprint + .equals(refreshedIndex.head.source.plan.properties.fingerprint)) } } } From fcc3952f1d7f229dda29c5f64f2bfbbab3bd30c6 Mon Sep 17 00:00:00 2001 From: Pouria Pirzadeh Date: Sat, 19 Sep 2020 13:29:41 -0700 Subject: [PATCH 04/13] fix test cleanup --- .../microsoft/hyperspace/index/E2EHyperspaceRulesTests.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTests.scala b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTests.scala index 90f7e8576..618133bca 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTests.scala @@ -450,7 +450,7 @@ class E2EHyperspaceRulesTests extends HyperspaceSuite with SQLHelper { // Verify index usage on latest version of index (v=1) after refresh. verifyIndexUsage(query2, getIndexFilesPath(indexConfig.indexName, 1)) - FileUtils.delete(dataPath) + FileUtils.delete(new Path(location)) } } @@ -507,7 +507,7 @@ class E2EHyperspaceRulesTests extends HyperspaceSuite with SQLHelper { queryPlanHasExpectedRootPaths( query2().queryExecution.optimizedPlan, getIndexFilesPath(indexConfig.indexName))) - FileUtils.delete(dataPath) + FileUtils.delete(new Path(location)) } } From 8a36e604a2c2746474bc4adba10fb39ef08147bb Mon Sep 17 00:00:00 2001 From: Pouria Pirzadeh Date: Mon, 21 Sep 2020 11:06:32 -0700 Subject: [PATCH 05/13] changes in excluded files update --- .../actions/DeleteOnReadAction.scala | 5 ++-- .../hyperspace/index/RefreshIndexTests.scala | 30 +++++++++++++++---- 2 files changed, 27 insertions(+), 8 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/actions/DeleteOnReadAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/DeleteOnReadAction.scala index ea1e98c48..89ed55ad2 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/DeleteOnReadAction.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/DeleteOnReadAction.scala @@ -59,7 +59,7 @@ class DeleteOnReadAction( val dataProps = data.properties val excluded = dataProps.excluded - // Instantiate a new IndexLogEntry by appending deleted files to list of excluded files. + // Instantiate a new IndexLogEntry by updating excluded files and fingerprint. previousIndexLogEntry.copy( source = source.copy( plan = plan.copy( @@ -68,6 +68,7 @@ class DeleteOnReadAction( relations = Seq( relation.copy( data = data.copy( - properties = dataProps.copy(excluded = excluded ++ deletedFiles)))))))) + properties = dataProps.copy( + excluded = excluded ++ (deletedFiles diff excluded))))))))) } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala index b3608ffeb..7c8d5102c 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala @@ -230,19 +230,37 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { originalIndex.head.source.plan.properties.relations.head.data.properties.excluded.isEmpty) // Delete one source data file. - val deletedFile = deleteDataFile(nonPartitionedDataPath) + val deletedFile1 = deleteDataFile(nonPartitionedDataPath) // Refresh index and validate updated IndexLogEntry. hyperspace.refreshIndex(indexConfig.indexName) - val refreshedIndex = ixManager.getIndexes() - assert(refreshedIndex.length == 1) + val refreshedIndex1 = ixManager.getIndexes() assert( - refreshedIndex.head.source.plan.properties.relations.head.data.properties.excluded - .equals(Seq(deletedFile.toString))) + refreshedIndex1.head.source.plan.properties.relations.head.data.properties.excluded + .equals(Seq(deletedFile1.toString))) + // Make sure index fingerprint is changed. assert( !originalIndex.head.source.plan.properties.fingerprint - .equals(refreshedIndex.head.source.plan.properties.fingerprint)) + .equals(refreshedIndex1.head.source.plan.properties.fingerprint)) + + // Delete another source data file. + val deletedFile2 = deleteDataFile(nonPartitionedDataPath) + // Refresh index and validate updated IndexLogEntry. + // `excluded` files should contain both deleted source data files. + hyperspace.refreshIndex(indexConfig.indexName) + val refreshedIndex2 = ixManager.getIndexes() + assert( + refreshedIndex2.head.source.plan.properties.relations.head.data.properties.excluded + .equals(Seq(deletedFile1.toString, deletedFile2.toString))) + + // Make sure index fingerprint is changed. + assert( + !originalIndex.head.source.plan.properties.fingerprint + .equals(refreshedIndex2.head.source.plan.properties.fingerprint)) + assert( + !refreshedIndex1.head.source.plan.properties.fingerprint + .equals(refreshedIndex2.head.source.plan.properties.fingerprint)) } } } From 821ccd5f1e4720a50782fd2771ebb0b7f27ce496 Mon Sep 17 00:00:00 2001 From: Pouria Pirzadeh Date: Mon, 21 Sep 2020 11:24:25 -0700 Subject: [PATCH 06/13] changes in refresh delete validate --- .../actions/RefreshDeleteActionBase.scala | 4 +- .../hyperspace/index/RefreshIndexTests.scala | 39 +++++++++++++++++++ 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteActionBase.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteActionBase.scala index cf8941d08..15a6eaeb0 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteActionBase.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteActionBase.scala @@ -40,7 +40,9 @@ private[actions] abstract class RefreshDeleteActionBase( "only supported on an index with lineage.") } - if (deletedFiles.isEmpty) { + if (deletedFiles.isEmpty || deletedFiles.toSet.equals( + previousIndexLogEntry.source.plan.properties. + relations.head.data.properties.excluded.toSet)) { throw HyperspaceException("Refresh aborted as no deleted source data file found.") } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala index 7c8d5102c..67d458e60 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala @@ -265,6 +265,45 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { } } + test( + "Validate refresh index (to handle deletes from the source data) " + + "for enforce delete on read does not add duplicate excluded files.") { + // Save test data non-partitioned. + SampleData.save( + spark, + nonPartitionedDataPath, + Seq("Date", "RGUID", "Query", "imprs", "clicks")) + val nonPartitionedDataDF = spark.read.parquet(nonPartitionedDataPath) + + withSQLConf( + IndexConstants.INDEX_LINEAGE_ENABLED -> "true", + IndexConstants.ENFORCE_DELETE_ON_READ_ENABLED -> "true") { + withIndex(indexConfig.indexName) { + hyperspace.createIndex(nonPartitionedDataDF, indexConfig) + val ixManager = Hyperspace.getContext(spark).indexCollectionManager + val originalIndex = ixManager.getIndexes() + assert(originalIndex.length == 1) + assert( + originalIndex.head.source.plan.properties.relations.head.data.properties.excluded.isEmpty) + + // Delete one source data file. + val deletedFile1 = deleteDataFile(nonPartitionedDataPath) + + // Refresh index and validate updated IndexLogEntry. + hyperspace.refreshIndex(indexConfig.indexName) + val refreshedIndex1 = ixManager.getIndexes() + assert( + refreshedIndex1.head.source.plan.properties.relations.head.data.properties.excluded + .equals(Seq(deletedFile1.toString))) + + // Refresh index again and validate it fails as expected. + val ex = intercept[HyperspaceException](hyperspace.refreshIndex(indexConfig.indexName)) + assert( + ex.getMessage.contains("Refresh aborted as no deleted source data file found.")) + } + } + } + /** * Delete one file from a given path. * From f27a8a732d31254924348aa7ae8e895ccb413bcc Mon Sep 17 00:00:00 2001 From: Pouria Pirzadeh Date: Thu, 24 Sep 2020 10:52:56 -0700 Subject: [PATCH 07/13] minor code refactor in IndexLogEntry --- .../actions/RefreshDeleteActionBase.scala | 3 +-- .../hyperspace/index/IndexLogEntry.scala | 5 +++++ .../hyperspace/index/RefreshIndexTests.scala | 19 ++++++------------- 3 files changed, 12 insertions(+), 15 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteActionBase.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteActionBase.scala index 15a6eaeb0..886a80b9f 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteActionBase.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteActionBase.scala @@ -41,8 +41,7 @@ private[actions] abstract class RefreshDeleteActionBase( } if (deletedFiles.isEmpty || deletedFiles.toSet.equals( - previousIndexLogEntry.source.plan.properties. - relations.head.data.properties.excluded.toSet)) { + previousIndexLogEntry.excludedFiles.toSet)) { throw HyperspaceException("Refresh aborted as no deleted source data file found.") } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index 8ceb55799..1fad92879 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -403,6 +403,11 @@ case class IndexLogEntry( sourcePlanSignatures.head } + def excludedFiles: Seq[String] = { + // Only one relation is currently supported (See relations). + relations.head.data.properties.excluded + } + def hasLineageColumn(spark: SparkSession): Boolean = { ResolverUtils .resolve(spark, IndexConstants.DATA_FILE_NAME_COLUMN, schema.fieldNames) diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala index 67d458e60..e191f9668 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala @@ -226,8 +226,7 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { val ixManager = Hyperspace.getContext(spark).indexCollectionManager val originalIndex = ixManager.getIndexes() assert(originalIndex.length == 1) - assert( - originalIndex.head.source.plan.properties.relations.head.data.properties.excluded.isEmpty) + assert(originalIndex.head.excludedFiles.isEmpty) // Delete one source data file. val deletedFile1 = deleteDataFile(nonPartitionedDataPath) @@ -235,9 +234,7 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { // Refresh index and validate updated IndexLogEntry. hyperspace.refreshIndex(indexConfig.indexName) val refreshedIndex1 = ixManager.getIndexes() - assert( - refreshedIndex1.head.source.plan.properties.relations.head.data.properties.excluded - .equals(Seq(deletedFile1.toString))) + assert(refreshedIndex1.head.excludedFiles.equals(Seq(deletedFile1.toString))) // Make sure index fingerprint is changed. assert( @@ -251,7 +248,7 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { hyperspace.refreshIndex(indexConfig.indexName) val refreshedIndex2 = ixManager.getIndexes() assert( - refreshedIndex2.head.source.plan.properties.relations.head.data.properties.excluded + refreshedIndex2.head.excludedFiles .equals(Seq(deletedFile1.toString, deletedFile2.toString))) // Make sure index fingerprint is changed. @@ -283,8 +280,7 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { val ixManager = Hyperspace.getContext(spark).indexCollectionManager val originalIndex = ixManager.getIndexes() assert(originalIndex.length == 1) - assert( - originalIndex.head.source.plan.properties.relations.head.data.properties.excluded.isEmpty) + assert(originalIndex.head.excludedFiles.isEmpty) // Delete one source data file. val deletedFile1 = deleteDataFile(nonPartitionedDataPath) @@ -292,14 +288,11 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { // Refresh index and validate updated IndexLogEntry. hyperspace.refreshIndex(indexConfig.indexName) val refreshedIndex1 = ixManager.getIndexes() - assert( - refreshedIndex1.head.source.plan.properties.relations.head.data.properties.excluded - .equals(Seq(deletedFile1.toString))) + assert(refreshedIndex1.head.excludedFiles.equals(Seq(deletedFile1.toString))) // Refresh index again and validate it fails as expected. val ex = intercept[HyperspaceException](hyperspace.refreshIndex(indexConfig.indexName)) - assert( - ex.getMessage.contains("Refresh aborted as no deleted source data file found.")) + assert(ex.getMessage.contains("Refresh aborted as no deleted source data file found.")) } } } From bf21c8d9be51ba705a62d68409ecaca9eaffac39 Mon Sep 17 00:00:00 2001 From: Pouria Pirzadeh Date: Thu, 24 Sep 2020 11:24:44 -0700 Subject: [PATCH 08/13] add code comments --- .../actions/DeleteOnReadAction.scala | 28 +++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/actions/DeleteOnReadAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/DeleteOnReadAction.scala index 89ed55ad2..c5f140498 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/DeleteOnReadAction.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/DeleteOnReadAction.scala @@ -23,11 +23,28 @@ import com.microsoft.hyperspace.HyperspaceException import com.microsoft.hyperspace.index._ import com.microsoft.hyperspace.telemetry.{AppInfo, DeleteOnReadActionEvent, HyperspaceEvent} +/** + * Refresh index by updating list of excluded source data files and index signature + * in index metadata. + * Note this Refresh Action only fixes an index metadata w.r.t deleted source data files + * and does not consider new source data files (if any). + * If some original source data file(s) are removed between previous version of index and + * now, this Action refreshes index as follows: + * 1. Deleted source data files are identified. + * 2. New index fingerprint is computed w.r.t latest source data files. + * 3. IndexLogEntry is updated by modifying list of excluded source data files and + * index fingerprint, computed in above steps. + * + * @param spark SparkSession. + * @param logManager Index LogManager for index being refreshed. + * @param dataManager Index DataManager for index being refreshed. + */ class DeleteOnReadAction( spark: SparkSession, logManager: IndexLogManager, dataManager: IndexDataManager) - extends RefreshDeleteActionBase(spark, logManager, dataManager) with Logging { + extends RefreshDeleteActionBase(spark, logManager, dataManager) + with Logging { final override protected def event(appInfo: AppInfo, message: String): HyperspaceEvent = { DeleteOnReadActionEvent(appInfo, logEntry.asInstanceOf[IndexLogEntry], message) @@ -39,6 +56,13 @@ class DeleteOnReadAction( s"${deletedFiles.length} deleted files to list of excluded source data files.") } + /** + * Compute new index fingerprint using latest source data files and create + * new IndexLogEntry with updated list of excluded source data files and + * new index fingerprint. + * + * @return updated IndexLogEntry. + */ final override def logEntry: LogEntry = { // Compute index fingerprint using current source data file. val signatureProvider = LogicalPlanSignatureProvider.create() @@ -59,7 +83,7 @@ class DeleteOnReadAction( val dataProps = data.properties val excluded = dataProps.excluded - // Instantiate a new IndexLogEntry by updating excluded files and fingerprint. + // Create a new IndexLogEntry by updating excluded files and fingerprint. previousIndexLogEntry.copy( source = source.copy( plan = plan.copy( From a0479826e28858f88f40af4c986270a6182739d9 Mon Sep 17 00:00:00 2001 From: Pouria Pirzadeh Date: Fri, 25 Sep 2020 16:53:57 -0700 Subject: [PATCH 09/13] Changes in enforce delete on read and its tests --- .../actions/DeleteOnReadAction.scala | 9 ++- .../actions/RefreshDeleteActionBase.scala | 7 +- .../index/E2EHyperspaceRulesTests.scala | 4 +- .../hyperspace/index/RefreshIndexTests.scala | 80 ++++++++++++++++--- 4 files changed, 83 insertions(+), 17 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/actions/DeleteOnReadAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/DeleteOnReadAction.scala index c5f140498..14b3ca148 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/DeleteOnReadAction.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/DeleteOnReadAction.scala @@ -50,6 +50,13 @@ class DeleteOnReadAction( DeleteOnReadActionEvent(appInfo, logEntry.asInstanceOf[IndexLogEntry], message) } + override def validate(): Unit = { + super.validate() + if (deletedFiles.toSet.equals(previousIndexLogEntry.excludedFiles.toSet)) { + throw HyperspaceException("Refresh aborted as no new deleted source data file found.") + } + } + final override def op(): Unit = { logInfo( "Refresh index is updating index metadata by adding " + @@ -61,7 +68,7 @@ class DeleteOnReadAction( * new IndexLogEntry with updated list of excluded source data files and * new index fingerprint. * - * @return updated IndexLogEntry. + * @return Updated IndexLogEntry. */ final override def logEntry: LogEntry = { // Compute index fingerprint using current source data file. diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteActionBase.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteActionBase.scala index 886a80b9f..fcd8839af 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteActionBase.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteActionBase.scala @@ -29,8 +29,8 @@ private[actions] abstract class RefreshDeleteActionBase( extends RefreshActionBase(spark, logManager, dataManager) { /** - * Validate index has lineage column and it is in active state for refreshing and - * there are some deleted source data file(s). + * Validate index has lineage column, and it is in active state for refreshing, + * and there are some deleted source data file(s). */ override def validate(): Unit = { super.validate() @@ -40,8 +40,7 @@ private[actions] abstract class RefreshDeleteActionBase( "only supported on an index with lineage.") } - if (deletedFiles.isEmpty || deletedFiles.toSet.equals( - previousIndexLogEntry.excludedFiles.toSet)) { + if (deletedFiles.isEmpty) { throw HyperspaceException("Refresh aborted as no deleted source data file found.") } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTests.scala b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTests.scala index 618133bca..b35f88b11 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTests.scala @@ -410,7 +410,7 @@ class E2EHyperspaceRulesTests extends HyperspaceSuite with SQLHelper { IndexConstants.REFRESH_DELETE_ENABLED -> "true") { // Save a copy of source data files. - val location = testDir + "ixRefreshTest" + val location = testDir + "refreshDeleteTest" val dataPath = new Path(location, "*parquet") val dataColumns = Seq("c1", "c2", "c3", "c4", "c5") SampleData.save(spark, location, dataColumns) @@ -462,7 +462,7 @@ class E2EHyperspaceRulesTests extends HyperspaceSuite with SQLHelper { IndexConstants.ENFORCE_DELETE_ON_READ_ENABLED -> "true") { // Save a copy of source data files. - val location = testDir + "ixRefreshTest" + val location = testDir + "deleteOnReadTest" val dataPath = new Path(location, "*parquet") val dataColumns = Seq("c1", "c2", "c3", "c4", "c5") SampleData.save(spark, location, dataColumns) diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala index e191f9668..de4421060 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala @@ -234,12 +234,11 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { // Refresh index and validate updated IndexLogEntry. hyperspace.refreshIndex(indexConfig.indexName) val refreshedIndex1 = ixManager.getIndexes() + assert(refreshedIndex1.length == 1) assert(refreshedIndex1.head.excludedFiles.equals(Seq(deletedFile1.toString))) // Make sure index fingerprint is changed. - assert( - !originalIndex.head.source.plan.properties.fingerprint - .equals(refreshedIndex1.head.source.plan.properties.fingerprint)) + assert(!originalIndex.head.signature.equals(refreshedIndex1.head.signature)) // Delete another source data file. val deletedFile2 = deleteDataFile(nonPartitionedDataPath) @@ -247,17 +246,14 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { // `excluded` files should contain both deleted source data files. hyperspace.refreshIndex(indexConfig.indexName) val refreshedIndex2 = ixManager.getIndexes() + assert(refreshedIndex2.length == 1) assert( refreshedIndex2.head.excludedFiles .equals(Seq(deletedFile1.toString, deletedFile2.toString))) // Make sure index fingerprint is changed. - assert( - !originalIndex.head.source.plan.properties.fingerprint - .equals(refreshedIndex2.head.source.plan.properties.fingerprint)) - assert( - !refreshedIndex1.head.source.plan.properties.fingerprint - .equals(refreshedIndex2.head.source.plan.properties.fingerprint)) + assert(!originalIndex.head.signature.equals(refreshedIndex2.head.signature)) + assert(!refreshedIndex1.head.signature.equals(refreshedIndex2.head.signature)) } } } @@ -292,7 +288,71 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { // Refresh index again and validate it fails as expected. val ex = intercept[HyperspaceException](hyperspace.refreshIndex(indexConfig.indexName)) - assert(ex.getMessage.contains("Refresh aborted as no deleted source data file found.")) + assert( + ex.getMessage.contains("Refresh aborted as no new deleted source data file found.")) + } + } + } + + test( + "Validate refresh index (to handle deletes from the source data) " + + "updates index files correctly when called on an index with excluded files.") { + // Save test data non-partitioned. + SampleData.save( + spark, + nonPartitionedDataPath, + Seq("Date", "RGUID", "Query", "imprs", "clicks")) + val nonPartitionedDataDF = spark.read.parquet(nonPartitionedDataPath) + + withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") { + withIndex(indexConfig.indexName) { + spark.conf.set(IndexConstants.ENFORCE_DELETE_ON_READ_ENABLED, "true") + hyperspace.createIndex(nonPartitionedDataDF, indexConfig) + val ixManager = Hyperspace.getContext(spark).indexCollectionManager + val originalIndex = ixManager.getIndexes() + assert(originalIndex.length == 1) + assert(originalIndex.head.excludedFiles.isEmpty) + + // Delete one source data file. + val deletedFile = deleteDataFile(nonPartitionedDataPath) + + val originalIndexDF = spark.read.parquet( + s"$systemPath/${indexConfig.indexName}/" + + s"${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=0") + val originalIndexWithoutDeletedFile = originalIndexDF + .filter(s"""${IndexConstants.DATA_FILE_NAME_COLUMN} != "$deletedFile"""") + + // Refresh index and validate updated IndexLogEntry. + hyperspace.refreshIndex(indexConfig.indexName) + val refreshedIndex1 = ixManager.getIndexes() + assert(refreshedIndex1.length == 1) + assert(refreshedIndex1.head.excludedFiles.equals(Seq(deletedFile.toString))) + + // Change refresh configurations and refresh index to update index files. + spark.conf.set(IndexConstants.ENFORCE_DELETE_ON_READ_ENABLED, "false") + spark.conf.set(IndexConstants.REFRESH_DELETE_ENABLED, "true") + hyperspace.refreshIndex(indexConfig.indexName) + val refreshedIndex2 = ixManager.getIndexes() + assert(refreshedIndex2.length == 1) + + // Now that index entries for deleted source data files are removed + // from index files, `excluded` files list should be reset and empty. + assert(refreshedIndex2.head.excludedFiles.isEmpty) + + // Verify index signature in latest version is different from + // original version (before any source data file deletion). + assert(!refreshedIndex2.head.signature.equals(originalIndex.head.signature)) + + // Verify index signature in latest version is the same as previous version + // (created by refresh for enforce delete on read), as they both have same + // set of source data files. + assert(refreshedIndex2.head.signature.equals(refreshedIndex1.head.signature)) + + // Make sure only index entries from deleted files are removed from index files. + val refreshedIndexDF = spark.read.parquet( + s"$systemPath/${indexConfig.indexName}/" + + s"${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=1") + checkAnswer(originalIndexWithoutDeletedFile, refreshedIndexDF) } } } From 2720656e48a6253517b34c63553c2150a24a55d7 Mon Sep 17 00:00:00 2001 From: Pouria Pirzadeh Date: Mon, 28 Sep 2020 16:35:42 -0700 Subject: [PATCH 10/13] Action rename and misc test fixes for delete on read --- ...ala => RefreshForDeleteOnReadAction.scala} | 20 +- .../index/IndexCollectionManager.scala | 4 +- .../hyperspace/index/IndexConstants.scala | 4 +- .../hyperspace/index/IndexLogEntry.scala | 3 +- .../telemetry/HyperspaceEvent.scala | 14 +- .../hyperspace/util/HyperspaceConf.scala | 6 +- .../com/microsoft/hyperspace/SampleData.scala | 4 +- .../index/E2EHyperspaceRulesTests.scala | 188 +++++++++--------- .../hyperspace/index/IndexLogEntryTest.scala | 5 +- .../hyperspace/index/RefreshIndexTests.scala | 60 +++--- 10 files changed, 161 insertions(+), 147 deletions(-) rename src/main/scala/com/microsoft/hyperspace/actions/{DeleteOnReadAction.scala => RefreshForDeleteOnReadAction.scala} (85%) diff --git a/src/main/scala/com/microsoft/hyperspace/actions/DeleteOnReadAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshForDeleteOnReadAction.scala similarity index 85% rename from src/main/scala/com/microsoft/hyperspace/actions/DeleteOnReadAction.scala rename to src/main/scala/com/microsoft/hyperspace/actions/RefreshForDeleteOnReadAction.scala index 14b3ca148..c2d9f3617 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/DeleteOnReadAction.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshForDeleteOnReadAction.scala @@ -21,38 +21,40 @@ import org.apache.spark.sql.SparkSession import com.microsoft.hyperspace.HyperspaceException import com.microsoft.hyperspace.index._ -import com.microsoft.hyperspace.telemetry.{AppInfo, DeleteOnReadActionEvent, HyperspaceEvent} +import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEvent, RefreshForDeleteOnReadActionEvent} /** * Refresh index by updating list of excluded source data files and index signature * in index metadata. - * Note this Refresh Action only fixes an index metadata w.r.t deleted source data files - * and does not consider new source data files (if any). * If some original source data file(s) are removed between previous version of index and * now, this Action refreshes index as follows: * 1. Deleted source data files are identified. - * 2. New index fingerprint is computed w.r.t latest source data files. + * 2. New index fingerprint is computed w.r.t latest source data files. This captures + * both deleted source data files and new source data files (if any). * 3. IndexLogEntry is updated by modifying list of excluded source data files and - * index fingerprint, computed in above steps. + * index fingerprint, computed in above steps. * * @param spark SparkSession. * @param logManager Index LogManager for index being refreshed. * @param dataManager Index DataManager for index being refreshed. */ -class DeleteOnReadAction( +class RefreshForDeleteOnReadAction( spark: SparkSession, logManager: IndexLogManager, dataManager: IndexDataManager) extends RefreshDeleteActionBase(spark, logManager, dataManager) with Logging { + private lazy val newExcludedFiles: Seq[String] = + deletedFiles diff previousIndexLogEntry.excludedFiles + final override protected def event(appInfo: AppInfo, message: String): HyperspaceEvent = { - DeleteOnReadActionEvent(appInfo, logEntry.asInstanceOf[IndexLogEntry], message) + RefreshForDeleteOnReadActionEvent(appInfo, logEntry.asInstanceOf[IndexLogEntry], message) } override def validate(): Unit = { super.validate() - if (deletedFiles.toSet.equals(previousIndexLogEntry.excludedFiles.toSet)) { + if (newExcludedFiles.isEmpty) { throw HyperspaceException("Refresh aborted as no new deleted source data file found.") } } @@ -100,6 +102,6 @@ class DeleteOnReadAction( relation.copy( data = data.copy( properties = dataProps.copy( - excluded = excluded ++ (deletedFiles diff excluded))))))))) + excluded = excluded ++ newExcludedFiles)))))))) } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala index 79da726f5..7af52eef7 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala @@ -69,8 +69,8 @@ class IndexCollectionManager( val dataManager = indexDataManagerFactory.create(indexPath) if (HyperspaceConf.refreshDeleteEnabled(spark)) { new RefreshDeleteAction(spark, logManager, dataManager).run() - } else if (HyperspaceConf.enforceDeleteOnReadEnabled(spark)) { - new DeleteOnReadAction(spark, logManager, dataManager).run() + } else if (HyperspaceConf.refreshForDeleteOnReadEnabled(spark)) { + new RefreshForDeleteOnReadAction(spark, logManager, dataManager).run() } else { new RefreshAction(spark, logManager, dataManager).run() } diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala index 557dd223a..f965a4268 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala @@ -58,6 +58,6 @@ object IndexConstants { val REFRESH_DELETE_ENABLED = "spark.hyperspace.index.refresh.delete.enabled" val REFRESH_DELETE_ENABLED_DEFAULT = "false" - val ENFORCE_DELETE_ON_READ_ENABLED = "spark.hyperspace.index.enforce.delete.on.read.enabled" - val ENFORCE_DELETE_ON_READ_ENABLED_DEFAULT = "false" + val REFRESH_FOR_DELETE_ON_READ_ENABLED = "spark.hyperspace.index.refresh.deleteOnRead.enabled" + val REFRESH_FOR_DELETE_ON_READ_ENABLED_DEFAULT = "false" } diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index eba8fed24..d5b9d11a4 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -325,7 +325,7 @@ case class Hdfs(properties: Hdfs.Properties) { val kind = "HDFS" } object Hdfs { - case class Properties(content: Content, excluded: Seq[String] = Seq[String]()) + case class Properties(content: Content, excluded: Seq[String] = Nil) } // IndexLogEntry-specific Relation that represents the source relation. @@ -411,7 +411,6 @@ case class IndexLogEntry( } def excludedFiles: Seq[String] = { - // Only one relation is currently supported (See relations). relations.head.data.properties.excluded } diff --git a/src/main/scala/com/microsoft/hyperspace/telemetry/HyperspaceEvent.scala b/src/main/scala/com/microsoft/hyperspace/telemetry/HyperspaceEvent.scala index fd79265e4..671e9952d 100644 --- a/src/main/scala/com/microsoft/hyperspace/telemetry/HyperspaceEvent.scala +++ b/src/main/scala/com/microsoft/hyperspace/telemetry/HyperspaceEvent.scala @@ -116,7 +116,19 @@ case class CancelActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: St case class RefreshDeleteActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: String) extends HyperspaceIndexCRUDEvent -case class DeleteOnReadActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: String) +/** + * Index Refresh Event for deleted source files. Emitted when refresh is called on an index + * with config flag set to update index metadata to enforce delete during read by excluding + * deleted source data files when index is used at query time. + * + * @param appInfo AppInfo for spark application. + * @param index Related index. + * @param message Message about event. + */ +case class RefreshForDeleteOnReadActionEvent( + appInfo: AppInfo, + index: IndexLogEntry, + message: String) extends HyperspaceIndexCRUDEvent /** diff --git a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala index dd6d5e6a3..5aa86ea97 100644 --- a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala +++ b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala @@ -39,11 +39,11 @@ object HyperspaceConf { .toBoolean } - def enforceDeleteOnReadEnabled(spark: SparkSession): Boolean = { + def refreshForDeleteOnReadEnabled(spark: SparkSession): Boolean = { spark.conf .get( - IndexConstants.ENFORCE_DELETE_ON_READ_ENABLED, - IndexConstants.ENFORCE_DELETE_ON_READ_ENABLED_DEFAULT) + IndexConstants.REFRESH_FOR_DELETE_ON_READ_ENABLED, + IndexConstants.REFRESH_FOR_DELETE_ON_READ_ENABLED_DEFAULT) .toBoolean } } diff --git a/src/test/scala/com/microsoft/hyperspace/SampleData.scala b/src/test/scala/com/microsoft/hyperspace/SampleData.scala index 9b82e4561..0cb47387f 100644 --- a/src/test/scala/com/microsoft/hyperspace/SampleData.scala +++ b/src/test/scala/com/microsoft/hyperspace/SampleData.scala @@ -43,9 +43,9 @@ object SampleData { val df = testData.toDF(columns: _*) partitionColumns match { case Some(pcs) => - df.write.partitionBy(pcs: _*).parquet(path) + df.write.mode("overwrite").partitionBy(pcs: _*).parquet(path) case None => - df.write.parquet(path) + df.write.mode("overwrite").parquet(path) } } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTests.scala b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTests.scala index 8ac801440..d3243af11 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTests.scala @@ -419,106 +419,104 @@ class E2EHyperspaceRulesTests extends HyperspaceSuite with SQLHelper { withSQLConf( IndexConstants.INDEX_LINEAGE_ENABLED -> "true", IndexConstants.REFRESH_DELETE_ENABLED -> "true") { - - // Save a copy of source data files. - val location = testDir + "refreshDeleteTest" - val dataPath = new Path(location, "*parquet") - val dataColumns = Seq("c1", "c2", "c3", "c4", "c5") - SampleData.save(spark, location, dataColumns) - - // Create index on original source data files. - val df = spark.read.parquet(location) - val indexConfig = IndexConfig("filterIndex", Seq("c3"), Seq("c1")) - hyperspace.createIndex(df, indexConfig) - - // Verify index usage for index version (v=0). - def query1(): DataFrame = - spark.read.parquet(location).filter("c3 == 'facebook'").select("c3", "c1") - - verifyIndexUsage(query1, getIndexFilesPath(indexConfig.indexName)) - - // Delete some source data file. - val dataFileNames = dataPath - .getFileSystem(new Configuration) - .globStatus(dataPath) - .map(_.getPath) - - assert(dataFileNames.nonEmpty) - val fileToDelete = dataFileNames.head - FileUtils.delete(fileToDelete) - - def query2(): DataFrame = - spark.read.parquet(location).filter("c3 == 'facebook'").select("c3", "c1") - - // Verify index is not used. - spark.enableHyperspace() - val planRootPaths = getAllRootPaths(query2().queryExecution.optimizedPlan) - spark.disableHyperspace() - assert(planRootPaths.equals(Seq(PathUtils.makeAbsolute(location)))) - - // Refresh the index to remove deleted source data file records from index. - hyperspace.refreshIndex(indexConfig.indexName) - - // Verify index usage on latest version of index (v=1) after refresh. - verifyIndexUsage(query2, getIndexFilesPath(indexConfig.indexName, 1)) - FileUtils.delete(new Path(location)) + withTempDir { inputDir => + // Save a copy of source data files. + val location = inputDir.toString + val dataPath = new Path(location, "*parquet") + val dataColumns = Seq("c1", "c2", "c3", "c4", "c5") + SampleData.save(spark, location, dataColumns) + + // Create index on original source data files. + val df = spark.read.parquet(location) + val indexConfig = IndexConfig("filterIndex", Seq("c3"), Seq("c1")) + hyperspace.createIndex(df, indexConfig) + + // Verify index usage for index version (v=0). + def query1(): DataFrame = + spark.read.parquet(location).filter("c3 == 'facebook'").select("c3", "c1") + + verifyIndexUsage(query1, getIndexFilesPath(indexConfig.indexName)) + + // Delete some source data file. + val dataFileNames = dataPath + .getFileSystem(new Configuration) + .globStatus(dataPath) + .map(_.getPath) + + assert(dataFileNames.nonEmpty) + val fileToDelete = dataFileNames.head + FileUtils.delete(fileToDelete) + + def query2(): DataFrame = + spark.read.parquet(location).filter("c3 == 'facebook'").select("c3", "c1") + + // Verify index is not used. + spark.enableHyperspace() + val planRootPaths = getAllRootPaths(query2().queryExecution.optimizedPlan) + spark.disableHyperspace() + assert(planRootPaths.equals(Seq(PathUtils.makeAbsolute(location)))) + + // Refresh the index to remove deleted source data file records from index. + hyperspace.refreshIndex(indexConfig.indexName) + + // Verify index usage on latest version of index (v=1) after refresh. + verifyIndexUsage(query2, getIndexFilesPath(indexConfig.indexName, 1)) + } } } - test( - "Validate index usage for enforce delete on read " + - "after some source data file is deleted.") { + test("Validate index usage for enforce delete on read after some source data file is deleted.") { withSQLConf( IndexConstants.INDEX_LINEAGE_ENABLED -> "true", - IndexConstants.ENFORCE_DELETE_ON_READ_ENABLED -> "true") { - - // Save a copy of source data files. - val location = testDir + "deleteOnReadTest" - val dataPath = new Path(location, "*parquet") - val dataColumns = Seq("c1", "c2", "c3", "c4", "c5") - SampleData.save(spark, location, dataColumns) - - // Create index on original source data files. - val df = spark.read.parquet(location) - val indexConfig = IndexConfig("filterIndex", Seq("c3"), Seq("c1")) - hyperspace.createIndex(df, indexConfig) - - // Verify index usage. - def query1(): DataFrame = - spark.read.parquet(location).filter("c3 == 'facebook'").select("c3", "c1") - - verifyIndexUsage(query1, getIndexFilesPath(indexConfig.indexName)) - - // Delete one source data file. - val dataFileNames = dataPath - .getFileSystem(new Configuration) - .globStatus(dataPath) - .map(_.getPath) - - assert(dataFileNames.nonEmpty) - val fileToDelete = dataFileNames.head - FileUtils.delete(fileToDelete) - - def query2(): DataFrame = - spark.read.parquet(location).filter("c3 == 'facebook'").select("c3", "c1") - - // Verify index is not used after source data file is deleted. - spark.enableHyperspace() - val planRootPaths = getAllRootPaths(query2().queryExecution.optimizedPlan) - spark.disableHyperspace() - assert(planRootPaths.equals(Seq(PathUtils.makeAbsolute(location)))) - - // Refresh index to update its metadata. - hyperspace.refreshIndex(indexConfig.indexName) - - // Verify index is used after refresh fixes fingerprint in index metadata. - // TODO Replace check with verifyIndexUsage() once EnforceDeleteOnRead covers query path. - spark.enableHyperspace() - assert( - queryPlanHasExpectedRootPaths( - query2().queryExecution.optimizedPlan, - getIndexFilesPath(indexConfig.indexName))) - FileUtils.delete(new Path(location)) + IndexConstants.REFRESH_FOR_DELETE_ON_READ_ENABLED -> "true") { + withTempDir { inputDir => + // Save a copy of source data files. + val location = inputDir.toString + val dataPath = new Path(location, "*parquet") + val dataColumns = Seq("c1", "c2", "c3", "c4", "c5") + SampleData.save(spark, location, dataColumns) + + // Create index on original source data files. + val df = spark.read.parquet(location) + val indexConfig = IndexConfig("filterIndex", Seq("c3"), Seq("c1")) + hyperspace.createIndex(df, indexConfig) + + // Verify index usage. + def query1(): DataFrame = + spark.read.parquet(location).filter("c3 == 'facebook'").select("c3", "c1") + + verifyIndexUsage(query1, getIndexFilesPath(indexConfig.indexName)) + + // Delete one source data file. + val dataFileNames = dataPath + .getFileSystem(new Configuration) + .globStatus(dataPath) + .map(_.getPath) + + assert(dataFileNames.nonEmpty) + val fileToDelete = dataFileNames.head + FileUtils.delete(fileToDelete) + + def query2(): DataFrame = + spark.read.parquet(location).filter("c3 == 'facebook'").select("c3", "c1") + + // Verify index is not used after source data file is deleted. + spark.enableHyperspace() + val planRootPaths = getAllRootPaths(query2().queryExecution.optimizedPlan) + spark.disableHyperspace() + assert(planRootPaths.equals(Seq(PathUtils.makeAbsolute(location)))) + + // Refresh index to update its metadata. + hyperspace.refreshIndex(indexConfig.indexName) + + // Verify index is used after refresh fixes fingerprint in index metadata. + // TODO: Replace check with verifyIndexUsage() once delete on read covers query path. + spark.enableHyperspace() + assert( + queryPlanHasExpectedRootPaths( + query2().queryExecution.optimizedPlan, + getIndexFilesPath(indexConfig.indexName))) + } } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala index 24c9dd003..5722c8a7a 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala @@ -126,7 +126,7 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter | "properties" : { } | } | }, - | "excluded" : [] + | "excluded" : ["file:/rootpath/f1"] | }, | "kind" : "HDFS" | }, @@ -164,7 +164,8 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter Seq(Relation( Seq("rootpath"), Hdfs(Hdfs.Properties(Content( - Directory("", Seq(FileInfo("f1", 100L, 100L), FileInfo("f2", 200L, 200L)), Seq())))), + Directory("", Seq(FileInfo("f1", 100L, 100L), FileInfo("f2", 200L, 200L)), Seq())), + Seq("file:/rootpath/f1"))), "schema", "type", Map())), diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala index de4421060..b7dc16263 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala @@ -149,33 +149,35 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { withSQLConf( IndexConstants.INDEX_LINEAGE_ENABLED -> "true", IndexConstants.REFRESH_DELETE_ENABLED -> "true") { - SampleData.save( - spark, - nonPartitionedDataPath, - Seq("Date", "RGUID", "Query", "imprs", "clicks")) - val nonPartitionedDataDF = spark.read.parquet(nonPartitionedDataPath) + withTempDir { inputDir => + val inputDataPath = inputDir.toString + SampleData.save( + spark, + inputDataPath, + Seq("Date", "RGUID", "Query", "imprs", "clicks")) + val nonPartitionedDataDF = spark.read.parquet(inputDataPath) - hyperspace.createIndex(nonPartitionedDataDF, indexConfig) - - if (deleteDataFolder) { - FileUtils.delete(new Path(nonPartitionedDataPath)) + hyperspace.createIndex(nonPartitionedDataDF, indexConfig) - val ex = intercept[AnalysisException](hyperspace.refreshIndex(indexConfig.indexName)) - assert(ex.getMessage.contains("Path does not exist")) + if (deleteDataFolder) { + FileUtils.delete(new Path(inputDataPath)) - } else { - val dataPath = new Path(nonPartitionedDataPath, "*parquet") - dataPath - .getFileSystem(new Configuration) - .globStatus(dataPath) - .foreach(p => FileUtils.delete(p.getPath)) + val ex = intercept[AnalysisException](hyperspace.refreshIndex(indexConfig.indexName)) + assert(ex.getMessage.contains("Path does not exist")) - val ex = - intercept[HyperspaceException](hyperspace.refreshIndex(indexConfig.indexName)) - assert(ex.getMessage.contains("Invalid plan for creating an index.")) + } else { + val dataPath = new Path(inputDataPath, "*parquet") + dataPath + .getFileSystem(new Configuration) + .globStatus(dataPath) + .foreach(p => FileUtils.delete(p.getPath)) + + val ex = + intercept[HyperspaceException](hyperspace.refreshIndex(indexConfig.indexName)) + assert(ex.getMessage.contains("Invalid plan for creating an index.")) + } + FileUtils.delete(systemPath) } - FileUtils.delete(new Path(nonPartitionedDataPath)) - FileUtils.delete(systemPath) } } } @@ -220,7 +222,7 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { withSQLConf( IndexConstants.INDEX_LINEAGE_ENABLED -> "true", - IndexConstants.ENFORCE_DELETE_ON_READ_ENABLED -> "true") { + IndexConstants.REFRESH_FOR_DELETE_ON_READ_ENABLED -> "true") { withIndex(indexConfig.indexName) { hyperspace.createIndex(nonPartitionedDataDF, indexConfig) val ixManager = Hyperspace.getContext(spark).indexCollectionManager @@ -270,7 +272,7 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { withSQLConf( IndexConstants.INDEX_LINEAGE_ENABLED -> "true", - IndexConstants.ENFORCE_DELETE_ON_READ_ENABLED -> "true") { + IndexConstants.REFRESH_FOR_DELETE_ON_READ_ENABLED -> "true") { withIndex(indexConfig.indexName) { hyperspace.createIndex(nonPartitionedDataDF, indexConfig) val ixManager = Hyperspace.getContext(spark).indexCollectionManager @@ -279,12 +281,12 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { assert(originalIndex.head.excludedFiles.isEmpty) // Delete one source data file. - val deletedFile1 = deleteDataFile(nonPartitionedDataPath) + val deletedFile = deleteDataFile(nonPartitionedDataPath) // Refresh index and validate updated IndexLogEntry. hyperspace.refreshIndex(indexConfig.indexName) - val refreshedIndex1 = ixManager.getIndexes() - assert(refreshedIndex1.head.excludedFiles.equals(Seq(deletedFile1.toString))) + val refreshedIndex = ixManager.getIndexes() + assert(refreshedIndex.head.excludedFiles.equals(Seq(deletedFile.toString))) // Refresh index again and validate it fails as expected. val ex = intercept[HyperspaceException](hyperspace.refreshIndex(indexConfig.indexName)) @@ -306,7 +308,7 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") { withIndex(indexConfig.indexName) { - spark.conf.set(IndexConstants.ENFORCE_DELETE_ON_READ_ENABLED, "true") + spark.conf.set(IndexConstants.REFRESH_FOR_DELETE_ON_READ_ENABLED, "true") hyperspace.createIndex(nonPartitionedDataDF, indexConfig) val ixManager = Hyperspace.getContext(spark).indexCollectionManager val originalIndex = ixManager.getIndexes() @@ -329,7 +331,7 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { assert(refreshedIndex1.head.excludedFiles.equals(Seq(deletedFile.toString))) // Change refresh configurations and refresh index to update index files. - spark.conf.set(IndexConstants.ENFORCE_DELETE_ON_READ_ENABLED, "false") + spark.conf.set(IndexConstants.REFRESH_FOR_DELETE_ON_READ_ENABLED, "false") spark.conf.set(IndexConstants.REFRESH_DELETE_ENABLED, "true") hyperspace.refreshIndex(indexConfig.indexName) val refreshedIndex2 = ixManager.getIndexes() From e93217fcb30a02a96dfec9e0810cc0788c82b34a Mon Sep 17 00:00:00 2001 From: Pouria Pirzadeh Date: Tue, 29 Sep 2020 17:37:54 -0700 Subject: [PATCH 11/13] Add appended files to index metadata and rename action --- .../actions/RefreshDeleteAction.scala | 12 +- .../actions/RefreshDeleteActionBase.scala | 17 +-- .../RefreshForDeleteOnReadAction.scala | 107 ------------------ .../index/IndexCollectionManager.scala | 4 +- .../hyperspace/index/IndexConstants.scala | 4 +- .../hyperspace/index/IndexLogEntry.scala | 6 +- .../telemetry/HyperspaceEvent.scala | 10 +- .../hyperspace/util/HyperspaceConf.scala | 6 +- .../index/E2EHyperspaceRulesTests.scala | 2 +- .../hyperspace/index/IndexLogEntryTest.scala | 6 +- .../hyperspace/index/RefreshIndexTests.scala | 60 +++++++--- 11 files changed, 79 insertions(+), 155 deletions(-) delete mode 100644 src/main/scala/com/microsoft/hyperspace/actions/RefreshForDeleteOnReadAction.scala diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala index c85fb96c4..8b06ef6c5 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala @@ -20,6 +20,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ +import com.microsoft.hyperspace.HyperspaceException import com.microsoft.hyperspace.index._ import com.microsoft.hyperspace.index.DataFrameWriterExtensions.Bucketizer import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEvent, RefreshDeleteActionEvent} @@ -49,6 +50,13 @@ class RefreshDeleteAction( RefreshDeleteActionEvent(appInfo, logEntry.asInstanceOf[IndexLogEntry], message) } + override def validate(): Unit = { + super.validate() + if (sourceFilesDiff._1.isEmpty) { + throw HyperspaceException("Refresh aborted as no deleted source data file found.") + } + } + /** * For an index with lineage, find all the source data files which have been deleted, * and use index records' lineage to mark and remove index entries which belong to @@ -57,12 +65,12 @@ class RefreshDeleteAction( final override def op(): Unit = { logInfo( "Refresh index is updating index by removing index entries " + - s"corresponding to ${deletedFiles.length} deleted source data files.") + s"corresponding to ${sourceFilesDiff._1.length} deleted source data files.") val refreshDF = spark.read .parquet(previousIndexLogEntry.content.files.map(_.toString): _*) - .filter(!col(s"${IndexConstants.DATA_FILE_NAME_COLUMN}").isin(deletedFiles: _*)) + .filter(!col(s"${IndexConstants.DATA_FILE_NAME_COLUMN}").isin(sourceFilesDiff._1: _*)) refreshDF.write.saveWithBuckets( refreshDF, diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteActionBase.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteActionBase.scala index fcd8839af..3b3b7981a 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteActionBase.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteActionBase.scala @@ -29,28 +29,23 @@ private[actions] abstract class RefreshDeleteActionBase( extends RefreshActionBase(spark, logManager, dataManager) { /** - * Validate index has lineage column, and it is in active state for refreshing, - * and there are some deleted source data file(s). + * Validate index has lineage column, and it is in active state for refreshing. */ override def validate(): Unit = { super.validate() if (!previousIndexLogEntry.hasLineageColumn(spark)) { throw HyperspaceException( - "Index refresh (to handle deleted source data) is " + + "Index refresh (to handle deleted or appended source data) is " + "only supported on an index with lineage.") } - - if (deletedFiles.isEmpty) { - throw HyperspaceException("Refresh aborted as no deleted source data file found.") - } } /** * Compare list of source data files from previous IndexLogEntry to list * of current source data files, validate fileInfo for existing files and - * identify deleted source data files. + * identify deleted and appended source data files. */ - protected lazy val deletedFiles: Seq[String] = { + protected lazy val sourceFilesDiff: (Seq[String], Seq[String]) = { val rels = previousIndexLogEntry.relations val originalFiles = rels.head.data.properties.content.fileInfos val currentFiles = rels.head.rootPaths @@ -68,13 +63,13 @@ private[actions] abstract class RefreshDeleteActionBase( case Some(v) => if (!f.equals(v)) { throw HyperspaceException( - "Index refresh (to handle deleted source data) aborted. " + + "Index refresh (to handle deleted or appended source data) aborted. " + s"Existing source data file info is changed (file: ${f.name}).") } case None => delFiles :+= f.name } } - delFiles + (delFiles, (currentFiles.keySet diff originalFiles.map(_.name)).toSeq) } } diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshForDeleteOnReadAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshForDeleteOnReadAction.scala deleted file mode 100644 index c2d9f3617..000000000 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshForDeleteOnReadAction.scala +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright (2020) The Hyperspace Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.microsoft.hyperspace.actions - -import org.apache.spark.internal.Logging -import org.apache.spark.sql.SparkSession - -import com.microsoft.hyperspace.HyperspaceException -import com.microsoft.hyperspace.index._ -import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEvent, RefreshForDeleteOnReadActionEvent} - -/** - * Refresh index by updating list of excluded source data files and index signature - * in index metadata. - * If some original source data file(s) are removed between previous version of index and - * now, this Action refreshes index as follows: - * 1. Deleted source data files are identified. - * 2. New index fingerprint is computed w.r.t latest source data files. This captures - * both deleted source data files and new source data files (if any). - * 3. IndexLogEntry is updated by modifying list of excluded source data files and - * index fingerprint, computed in above steps. - * - * @param spark SparkSession. - * @param logManager Index LogManager for index being refreshed. - * @param dataManager Index DataManager for index being refreshed. - */ -class RefreshForDeleteOnReadAction( - spark: SparkSession, - logManager: IndexLogManager, - dataManager: IndexDataManager) - extends RefreshDeleteActionBase(spark, logManager, dataManager) - with Logging { - - private lazy val newExcludedFiles: Seq[String] = - deletedFiles diff previousIndexLogEntry.excludedFiles - - final override protected def event(appInfo: AppInfo, message: String): HyperspaceEvent = { - RefreshForDeleteOnReadActionEvent(appInfo, logEntry.asInstanceOf[IndexLogEntry], message) - } - - override def validate(): Unit = { - super.validate() - if (newExcludedFiles.isEmpty) { - throw HyperspaceException("Refresh aborted as no new deleted source data file found.") - } - } - - final override def op(): Unit = { - logInfo( - "Refresh index is updating index metadata by adding " + - s"${deletedFiles.length} deleted files to list of excluded source data files.") - } - - /** - * Compute new index fingerprint using latest source data files and create - * new IndexLogEntry with updated list of excluded source data files and - * new index fingerprint. - * - * @return Updated IndexLogEntry. - */ - final override def logEntry: LogEntry = { - // Compute index fingerprint using current source data file. - val signatureProvider = LogicalPlanSignatureProvider.create() - val newSignature = signatureProvider.signature(df.queryExecution.optimizedPlan) match { - case Some(s) => - LogicalPlanFingerprint( - LogicalPlanFingerprint.Properties(Seq(Signature(signatureProvider.name, s)))) - - case None => throw HyperspaceException("Invalid source plan found during index refresh.") - } - - // Grab nested structures from previous IndexLogEntry. - val source = previousIndexLogEntry.source - val plan = source.plan - val planProps = plan.properties - val relation = planProps.relations.head - val data = relation.data - val dataProps = data.properties - val excluded = dataProps.excluded - - // Create a new IndexLogEntry by updating excluded files and fingerprint. - previousIndexLogEntry.copy( - source = source.copy( - plan = plan.copy( - properties = planProps.copy( - fingerprint = newSignature, - relations = Seq( - relation.copy( - data = data.copy( - properties = dataProps.copy( - excluded = excluded ++ newExcludedFiles)))))))) - } -} diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala index 7af52eef7..6a77c8289 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala @@ -69,8 +69,8 @@ class IndexCollectionManager( val dataManager = indexDataManagerFactory.create(indexPath) if (HyperspaceConf.refreshDeleteEnabled(spark)) { new RefreshDeleteAction(spark, logManager, dataManager).run() - } else if (HyperspaceConf.refreshForDeleteOnReadEnabled(spark)) { - new RefreshForDeleteOnReadAction(spark, logManager, dataManager).run() + } else if (HyperspaceConf.refreshLogEntryEnabled(spark)) { + new RefreshLogEntryAction(spark, logManager, dataManager).run() } else { new RefreshAction(spark, logManager, dataManager).run() } diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala index f965a4268..db898e689 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala @@ -58,6 +58,6 @@ object IndexConstants { val REFRESH_DELETE_ENABLED = "spark.hyperspace.index.refresh.delete.enabled" val REFRESH_DELETE_ENABLED_DEFAULT = "false" - val REFRESH_FOR_DELETE_ON_READ_ENABLED = "spark.hyperspace.index.refresh.deleteOnRead.enabled" - val REFRESH_FOR_DELETE_ON_READ_ENABLED_DEFAULT = "false" + val REFRESH_LOGENTRY_ACTION_ENABLED = "spark.hyperspace.index.refresh.logentry.enabled" + val REFRESH_LOGENTRY_ACTION_ENABLED_DEFAULT = "false" } diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index d5b9d11a4..f324b1634 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -325,7 +325,7 @@ case class Hdfs(properties: Hdfs.Properties) { val kind = "HDFS" } object Hdfs { - case class Properties(content: Content, excluded: Seq[String] = Nil) + case class Properties(content: Content, excluded: Seq[String] = Nil, appended: Seq[String] = Nil) } // IndexLogEntry-specific Relation that represents the source relation. @@ -414,6 +414,10 @@ case class IndexLogEntry( relations.head.data.properties.excluded } + def appendedFiles: Seq[String] = { + relations.head.data.properties.appended + } + def hasLineageColumn(spark: SparkSession): Boolean = { ResolverUtils .resolve(spark, IndexConstants.DATA_FILE_NAME_COLUMN, schema.fieldNames) diff --git a/src/main/scala/com/microsoft/hyperspace/telemetry/HyperspaceEvent.scala b/src/main/scala/com/microsoft/hyperspace/telemetry/HyperspaceEvent.scala index 671e9952d..0703d3352 100644 --- a/src/main/scala/com/microsoft/hyperspace/telemetry/HyperspaceEvent.scala +++ b/src/main/scala/com/microsoft/hyperspace/telemetry/HyperspaceEvent.scala @@ -117,18 +117,14 @@ case class RefreshDeleteActionEvent(appInfo: AppInfo, index: IndexLogEntry, mess extends HyperspaceIndexCRUDEvent /** - * Index Refresh Event for deleted source files. Emitted when refresh is called on an index - * with config flag set to update index metadata to enforce delete during read by excluding - * deleted source data files when index is used at query time. + * Index Refresh Event for deleted and appended source files. Emitted when refresh is called + * on an index to update index metadata according to latest source data files. * * @param appInfo AppInfo for spark application. * @param index Related index. * @param message Message about event. */ -case class RefreshForDeleteOnReadActionEvent( - appInfo: AppInfo, - index: IndexLogEntry, - message: String) +case class RefreshLogEntryActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: String) extends HyperspaceIndexCRUDEvent /** diff --git a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala index 5aa86ea97..1041d3f5b 100644 --- a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala +++ b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala @@ -39,11 +39,11 @@ object HyperspaceConf { .toBoolean } - def refreshForDeleteOnReadEnabled(spark: SparkSession): Boolean = { + def refreshLogEntryEnabled(spark: SparkSession): Boolean = { spark.conf .get( - IndexConstants.REFRESH_FOR_DELETE_ON_READ_ENABLED, - IndexConstants.REFRESH_FOR_DELETE_ON_READ_ENABLED_DEFAULT) + IndexConstants.REFRESH_LOGENTRY_ACTION_ENABLED, + IndexConstants.REFRESH_LOGENTRY_ACTION_ENABLED_DEFAULT) .toBoolean } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTests.scala b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTests.scala index d3243af11..8c723fe70 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTests.scala @@ -468,7 +468,7 @@ class E2EHyperspaceRulesTests extends HyperspaceSuite with SQLHelper { test("Validate index usage for enforce delete on read after some source data file is deleted.") { withSQLConf( IndexConstants.INDEX_LINEAGE_ENABLED -> "true", - IndexConstants.REFRESH_FOR_DELETE_ON_READ_ENABLED -> "true") { + IndexConstants.REFRESH_LOGENTRY_ACTION_ENABLED -> "true") { withTempDir { inputDir => // Save a copy of source data files. val location = inputDir.toString diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala index 5722c8a7a..98172197f 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala @@ -126,7 +126,8 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter | "properties" : { } | } | }, - | "excluded" : ["file:/rootpath/f1"] + | "excluded" : ["file:/rootpath/f1"], + | "appended" : ["file:/rootpath/f3"] | }, | "kind" : "HDFS" | }, @@ -165,7 +166,8 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter Seq("rootpath"), Hdfs(Hdfs.Properties(Content( Directory("", Seq(FileInfo("f1", 100L, 100L), FileInfo("f2", 200L, 200L)), Seq())), - Seq("file:/rootpath/f1"))), + Seq("file:/rootpath/f1"), + Seq("file:/rootpath/f3"))), "schema", "type", Map())), diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala index b7dc16263..2d9b1e48e 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala @@ -118,7 +118,7 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { val ex = intercept[HyperspaceException](hyperspace.refreshIndex(indexConfig.indexName)) assert( - ex.getMessage.contains(s"Index refresh (to handle deleted source data) is " + + ex.getMessage.contains(s"Index refresh (to handle deleted or appended source data) is " + "only supported on an index with lineage.")) } } @@ -151,10 +151,7 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { IndexConstants.REFRESH_DELETE_ENABLED -> "true") { withTempDir { inputDir => val inputDataPath = inputDir.toString - SampleData.save( - spark, - inputDataPath, - Seq("Date", "RGUID", "Query", "imprs", "clicks")) + SampleData.save(spark, inputDataPath, Seq("Date", "RGUID", "Query", "imprs", "clicks")) val nonPartitionedDataDF = spark.read.parquet(inputDataPath) hyperspace.createIndex(nonPartitionedDataDF, indexConfig) @@ -205,14 +202,13 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { val ex = intercept[HyperspaceException](hyperspace.refreshIndex(indexConfig.indexName)) assert( - ex.getMessage.contains("Index refresh (to handle deleted source data) aborted. " + - "Existing source data file info is changed")) + ex.getMessage.contains( + "Index refresh (to handle deleted or appended source data) aborted. " + + "Existing source data file info is changed")) } } - test( - "Validate refresh index (to handle deletes from the source data) " + - "for enforce delete on read.") { + test("Validate refresh IndexLogEntry for deleted and appended source data files.") { // Save test data non-partitioned. SampleData.save( spark, @@ -222,36 +218,57 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { withSQLConf( IndexConstants.INDEX_LINEAGE_ENABLED -> "true", - IndexConstants.REFRESH_FOR_DELETE_ON_READ_ENABLED -> "true") { + IndexConstants.REFRESH_LOGENTRY_ACTION_ENABLED -> "true") { withIndex(indexConfig.indexName) { hyperspace.createIndex(nonPartitionedDataDF, indexConfig) val ixManager = Hyperspace.getContext(spark).indexCollectionManager val originalIndex = ixManager.getIndexes() assert(originalIndex.length == 1) assert(originalIndex.head.excludedFiles.isEmpty) + assert(originalIndex.head.appendedFiles.isEmpty) // Delete one source data file. val deletedFile1 = deleteDataFile(nonPartitionedDataPath) + // Append one new source data file. + val appendedFile1 = new Path(deletedFile1.getParent, "newFile1") + FileUtils.createFile( + appendedFile1.getFileSystem(new Configuration), + appendedFile1, + "I am some random content for a new file :).") + // Refresh index and validate updated IndexLogEntry. hyperspace.refreshIndex(indexConfig.indexName) val refreshedIndex1 = ixManager.getIndexes() assert(refreshedIndex1.length == 1) assert(refreshedIndex1.head.excludedFiles.equals(Seq(deletedFile1.toString))) + assert(refreshedIndex1.head.appendedFiles.equals(Seq(appendedFile1.toString))) // Make sure index fingerprint is changed. assert(!originalIndex.head.signature.equals(refreshedIndex1.head.signature)) // Delete another source data file. val deletedFile2 = deleteDataFile(nonPartitionedDataPath) + + // Append another new source data file. + val appendedFile2 = new Path(deletedFile2.getParent, "newFile2") + FileUtils.createFile( + appendedFile2.getFileSystem(new Configuration), + appendedFile2, + "I am some random content for yet another new file :).") + // Refresh index and validate updated IndexLogEntry. // `excluded` files should contain both deleted source data files. + // `appened` files should contain both appended source data files. hyperspace.refreshIndex(indexConfig.indexName) val refreshedIndex2 = ixManager.getIndexes() assert(refreshedIndex2.length == 1) assert( refreshedIndex2.head.excludedFiles .equals(Seq(deletedFile1.toString, deletedFile2.toString))) + assert( + refreshedIndex2.head.appendedFiles + .equals(Seq(appendedFile1.toString, appendedFile2.toString))) // Make sure index fingerprint is changed. assert(!originalIndex.head.signature.equals(refreshedIndex2.head.signature)) @@ -261,8 +278,8 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { } test( - "Validate refresh index (to handle deletes from the source data) " + - "for enforce delete on read does not add duplicate excluded files.") { + "Validate refresh IndexLogEntry for deleted and appended source data files " + + "does not add duplicate excluded or appended files.") { // Save test data non-partitioned. SampleData.save( spark, @@ -272,17 +289,25 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { withSQLConf( IndexConstants.INDEX_LINEAGE_ENABLED -> "true", - IndexConstants.REFRESH_FOR_DELETE_ON_READ_ENABLED -> "true") { + IndexConstants.REFRESH_LOGENTRY_ACTION_ENABLED -> "true") { withIndex(indexConfig.indexName) { hyperspace.createIndex(nonPartitionedDataDF, indexConfig) val ixManager = Hyperspace.getContext(spark).indexCollectionManager val originalIndex = ixManager.getIndexes() assert(originalIndex.length == 1) assert(originalIndex.head.excludedFiles.isEmpty) + assert(originalIndex.head.appendedFiles.isEmpty) // Delete one source data file. val deletedFile = deleteDataFile(nonPartitionedDataPath) + // Append one new source data file. + val appendedFile = new Path(deletedFile.getParent, "newFile") + FileUtils.createFile( + appendedFile.getFileSystem(new Configuration), + appendedFile, + "I am some random content for a new file :).") + // Refresh index and validate updated IndexLogEntry. hyperspace.refreshIndex(indexConfig.indexName) val refreshedIndex = ixManager.getIndexes() @@ -291,7 +316,8 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { // Refresh index again and validate it fails as expected. val ex = intercept[HyperspaceException](hyperspace.refreshIndex(indexConfig.indexName)) assert( - ex.getMessage.contains("Refresh aborted as no new deleted source data file found.")) + ex.getMessage.contains( + "Refresh aborted as no new deleted or appended source data file found.")) } } } @@ -308,7 +334,7 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") { withIndex(indexConfig.indexName) { - spark.conf.set(IndexConstants.REFRESH_FOR_DELETE_ON_READ_ENABLED, "true") + spark.conf.set(IndexConstants.REFRESH_LOGENTRY_ACTION_ENABLED, "true") hyperspace.createIndex(nonPartitionedDataDF, indexConfig) val ixManager = Hyperspace.getContext(spark).indexCollectionManager val originalIndex = ixManager.getIndexes() @@ -331,7 +357,7 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { assert(refreshedIndex1.head.excludedFiles.equals(Seq(deletedFile.toString))) // Change refresh configurations and refresh index to update index files. - spark.conf.set(IndexConstants.REFRESH_FOR_DELETE_ON_READ_ENABLED, "false") + spark.conf.set(IndexConstants.REFRESH_LOGENTRY_ACTION_ENABLED, "false") spark.conf.set(IndexConstants.REFRESH_DELETE_ENABLED, "true") hyperspace.refreshIndex(indexConfig.indexName) val refreshedIndex2 = ixManager.getIndexes() From 91a136f4d43a19ae4f6e90387d119ee4bbb7c68b Mon Sep 17 00:00:00 2001 From: Pouria Pirzadeh Date: Wed, 30 Sep 2020 10:24:19 -0700 Subject: [PATCH 12/13] add RefreshLogEntryAction --- .../actions/RefreshLogEntryAction.scala | 113 ++++++++++++++++++ 1 file changed, 113 insertions(+) create mode 100644 src/main/scala/com/microsoft/hyperspace/actions/RefreshLogEntryAction.scala diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshLogEntryAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshLogEntryAction.scala new file mode 100644 index 000000000..86581d8e4 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshLogEntryAction.scala @@ -0,0 +1,113 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.actions + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession + +import com.microsoft.hyperspace.HyperspaceException +import com.microsoft.hyperspace.index._ +import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEvent, RefreshLogEntryActionEvent} + +/** + * Refresh index by updating list of excluded and appended source data files and + * index signature in index metadata. + * If some original source data file(s) are removed or appended between previous + * version of index and now, this Action refreshes index as follows: + * 1. Deleted and appended source data files are identified. + * 2. New index fingerprint is computed w.r.t latest source data files. This captures + * both deleted source data files and appended ones. + * 3. IndexLogEntry is updated by modifying list of excluded and appended source data + * files and index fingerprint, computed in above steps. + * + * @param spark SparkSession. + * @param logManager Index LogManager for index being refreshed. + * @param dataManager Index DataManager for index being refreshed. + */ +class RefreshLogEntryAction( + spark: SparkSession, + logManager: IndexLogManager, + dataManager: IndexDataManager) + extends RefreshDeleteActionBase(spark, logManager, dataManager) + with Logging { + + private lazy val newExcludedFiles: Seq[String] = + sourceFilesDiff._1 diff previousIndexLogEntry.excludedFiles + + private lazy val newAppendedFiles: Seq[String] = + sourceFilesDiff._2 diff previousIndexLogEntry.appendedFiles + + final override protected def event(appInfo: AppInfo, message: String): HyperspaceEvent = { + RefreshLogEntryActionEvent(appInfo, logEntry.asInstanceOf[IndexLogEntry], message) + } + + override def validate(): Unit = { + super.validate() + if (newExcludedFiles.isEmpty && newAppendedFiles.isEmpty) { + throw HyperspaceException( + "Refresh aborted as no new deleted or appended source data file found.") + } + } + + final override def op(): Unit = { + logInfo("Refresh index is updating index metadata by adding " + + s"${newExcludedFiles.length} new deleted files to list of excluded source data files and " + + s"${newAppendedFiles.length} new appended files to list of appended source data files.") + } + + /** + * Compute new index fingerprint using latest source data files and create + * new IndexLogEntry with updated list of excluded and appended source data + * files and new index fingerprint. + * + * @return Updated IndexLogEntry. + */ + final override def logEntry: LogEntry = { + // Compute index fingerprint using current source data file. + val signatureProvider = LogicalPlanSignatureProvider.create() + val newSignature = signatureProvider.signature(df.queryExecution.optimizedPlan) match { + case Some(s) => + LogicalPlanFingerprint( + LogicalPlanFingerprint.Properties(Seq(Signature(signatureProvider.name, s)))) + + case None => throw HyperspaceException("Invalid source plan found during index refresh.") + } + + // Grab nested structures from previous IndexLogEntry. + val source = previousIndexLogEntry.source + val plan = source.plan + val planProps = plan.properties + val relation = planProps.relations.head + val data = relation.data + val dataProps = data.properties + val excluded = dataProps.excluded + val appended = dataProps.appended + + // Create a new IndexLogEntry by updating excluded files and fingerprint. + previousIndexLogEntry.copy( + source = source.copy( + plan = plan.copy( + properties = planProps.copy( + fingerprint = newSignature, + relations = Seq( + relation.copy( + data = data.copy( + properties = dataProps.copy( + excluded = excluded ++ newExcludedFiles, + appended = appended ++ newAppendedFiles)))))))) + } +} From a6f6fc05ff1d12cddc57a95006432825fa443eae Mon Sep 17 00:00:00 2001 From: Pouria Pirzadeh Date: Wed, 30 Sep 2020 16:01:30 -0700 Subject: [PATCH 13/13] Action and config renames --- .../actions/RefreshDeleteActionBase.scala | 2 +- ...scala => RefreshSourceContentAction.scala} | 33 ++++++++--------- .../index/IndexCollectionManager.scala | 4 +-- .../hyperspace/index/IndexConstants.scala | 4 +-- .../hyperspace/index/IndexLogEntry.scala | 6 ++-- .../telemetry/HyperspaceEvent.scala | 2 +- .../hyperspace/util/HyperspaceConf.scala | 6 ++-- .../index/E2EHyperspaceRulesTests.scala | 2 +- .../hyperspace/index/IndexLogEntryTest.scala | 2 +- .../hyperspace/index/RefreshIndexTests.scala | 36 +++++++++---------- 10 files changed, 49 insertions(+), 48 deletions(-) rename src/main/scala/com/microsoft/hyperspace/actions/{RefreshLogEntryAction.scala => RefreshSourceContentAction.scala} (76%) diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteActionBase.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteActionBase.scala index 3b3b7981a..8a1de5991 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteActionBase.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteActionBase.scala @@ -35,7 +35,7 @@ private[actions] abstract class RefreshDeleteActionBase( super.validate() if (!previousIndexLogEntry.hasLineageColumn(spark)) { throw HyperspaceException( - "Index refresh (to handle deleted or appended source data) is " + + "Index refresh to update source content in index metadata is " + "only supported on an index with lineage.") } } diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshLogEntryAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshSourceContentAction.scala similarity index 76% rename from src/main/scala/com/microsoft/hyperspace/actions/RefreshLogEntryAction.scala rename to src/main/scala/com/microsoft/hyperspace/actions/RefreshSourceContentAction.scala index 86581d8e4..0488f82da 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshLogEntryAction.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshSourceContentAction.scala @@ -21,57 +21,58 @@ import org.apache.spark.sql.SparkSession import com.microsoft.hyperspace.HyperspaceException import com.microsoft.hyperspace.index._ -import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEvent, RefreshLogEntryActionEvent} +import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEvent, RefreshSourceContentActionEvent} /** - * Refresh index by updating list of excluded and appended source data files and + * Refresh index by updating list of deleted and appended source data files and * index signature in index metadata. - * If some original source data file(s) are removed or appended between previous + * If some original source data file(s) are deleted or appended between previous * version of index and now, this Action refreshes index as follows: * 1. Deleted and appended source data files are identified. * 2. New index fingerprint is computed w.r.t latest source data files. This captures * both deleted source data files and appended ones. - * 3. IndexLogEntry is updated by modifying list of excluded and appended source data + * 3. IndexLogEntry is updated by modifying list of deleted and appended source data * files and index fingerprint, computed in above steps. * * @param spark SparkSession. * @param logManager Index LogManager for index being refreshed. * @param dataManager Index DataManager for index being refreshed. */ -class RefreshLogEntryAction( +class RefreshSourceContentAction( spark: SparkSession, logManager: IndexLogManager, dataManager: IndexDataManager) extends RefreshDeleteActionBase(spark, logManager, dataManager) with Logging { - private lazy val newExcludedFiles: Seq[String] = - sourceFilesDiff._1 diff previousIndexLogEntry.excludedFiles + private lazy val newDeletedFiles: Seq[String] = + sourceFilesDiff._1 diff previousIndexLogEntry.deletedFiles private lazy val newAppendedFiles: Seq[String] = sourceFilesDiff._2 diff previousIndexLogEntry.appendedFiles final override protected def event(appInfo: AppInfo, message: String): HyperspaceEvent = { - RefreshLogEntryActionEvent(appInfo, logEntry.asInstanceOf[IndexLogEntry], message) + RefreshSourceContentActionEvent(appInfo, logEntry.asInstanceOf[IndexLogEntry], message) } override def validate(): Unit = { super.validate() - if (newExcludedFiles.isEmpty && newAppendedFiles.isEmpty) { + if (newDeletedFiles.isEmpty && newAppendedFiles.isEmpty) { throw HyperspaceException( "Refresh aborted as no new deleted or appended source data file found.") } } final override def op(): Unit = { - logInfo("Refresh index is updating index metadata by adding " + - s"${newExcludedFiles.length} new deleted files to list of excluded source data files and " + - s"${newAppendedFiles.length} new appended files to list of appended source data files.") + logInfo( + "Refresh index is updating index metadata by adding " + + s"${newDeletedFiles.length} new files to the list of deleted source data files and " + + s"${newAppendedFiles.length} new files to the list of appended source data files.") } /** * Compute new index fingerprint using latest source data files and create - * new IndexLogEntry with updated list of excluded and appended source data + * new IndexLogEntry with updated list of deleted and appended source data * files and new index fingerprint. * * @return Updated IndexLogEntry. @@ -94,10 +95,10 @@ class RefreshLogEntryAction( val relation = planProps.relations.head val data = relation.data val dataProps = data.properties - val excluded = dataProps.excluded + val deleted = dataProps.deleted val appended = dataProps.appended - // Create a new IndexLogEntry by updating excluded files and fingerprint. + // Create a new IndexLogEntry by updating deleted files and fingerprint. previousIndexLogEntry.copy( source = source.copy( plan = plan.copy( @@ -107,7 +108,7 @@ class RefreshLogEntryAction( relation.copy( data = data.copy( properties = dataProps.copy( - excluded = excluded ++ newExcludedFiles, + deleted = deleted ++ newDeletedFiles, appended = appended ++ newAppendedFiles)))))))) } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala index 6a77c8289..156c08cd6 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala @@ -69,8 +69,8 @@ class IndexCollectionManager( val dataManager = indexDataManagerFactory.create(indexPath) if (HyperspaceConf.refreshDeleteEnabled(spark)) { new RefreshDeleteAction(spark, logManager, dataManager).run() - } else if (HyperspaceConf.refreshLogEntryEnabled(spark)) { - new RefreshLogEntryAction(spark, logManager, dataManager).run() + } else if (HyperspaceConf.refreshSourceContentEnabled(spark)) { + new RefreshSourceContentAction(spark, logManager, dataManager).run() } else { new RefreshAction(spark, logManager, dataManager).run() } diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala index db898e689..3ddf2ee0e 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala @@ -58,6 +58,6 @@ object IndexConstants { val REFRESH_DELETE_ENABLED = "spark.hyperspace.index.refresh.delete.enabled" val REFRESH_DELETE_ENABLED_DEFAULT = "false" - val REFRESH_LOGENTRY_ACTION_ENABLED = "spark.hyperspace.index.refresh.logentry.enabled" - val REFRESH_LOGENTRY_ACTION_ENABLED_DEFAULT = "false" + val REFRESH_SOURCE_CONTENT_ENABLED = "spark.hyperspace.index.refresh.source.content.enabled" + val REFRESH_SOURCE_CONTENT_ENABLED_DEFAULT = "false" } diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index f324b1634..962a8361a 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -325,7 +325,7 @@ case class Hdfs(properties: Hdfs.Properties) { val kind = "HDFS" } object Hdfs { - case class Properties(content: Content, excluded: Seq[String] = Nil, appended: Seq[String] = Nil) + case class Properties(content: Content, deleted: Seq[String] = Nil, appended: Seq[String] = Nil) } // IndexLogEntry-specific Relation that represents the source relation. @@ -410,8 +410,8 @@ case class IndexLogEntry( sourcePlanSignatures.head } - def excludedFiles: Seq[String] = { - relations.head.data.properties.excluded + def deletedFiles: Seq[String] = { + relations.head.data.properties.deleted } def appendedFiles: Seq[String] = { diff --git a/src/main/scala/com/microsoft/hyperspace/telemetry/HyperspaceEvent.scala b/src/main/scala/com/microsoft/hyperspace/telemetry/HyperspaceEvent.scala index 0703d3352..7c638d306 100644 --- a/src/main/scala/com/microsoft/hyperspace/telemetry/HyperspaceEvent.scala +++ b/src/main/scala/com/microsoft/hyperspace/telemetry/HyperspaceEvent.scala @@ -124,7 +124,7 @@ case class RefreshDeleteActionEvent(appInfo: AppInfo, index: IndexLogEntry, mess * @param index Related index. * @param message Message about event. */ -case class RefreshLogEntryActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: String) +case class RefreshSourceContentActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: String) extends HyperspaceIndexCRUDEvent /** diff --git a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala index 1041d3f5b..c5b0ef82c 100644 --- a/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala +++ b/src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala @@ -39,11 +39,11 @@ object HyperspaceConf { .toBoolean } - def refreshLogEntryEnabled(spark: SparkSession): Boolean = { + def refreshSourceContentEnabled(spark: SparkSession): Boolean = { spark.conf .get( - IndexConstants.REFRESH_LOGENTRY_ACTION_ENABLED, - IndexConstants.REFRESH_LOGENTRY_ACTION_ENABLED_DEFAULT) + IndexConstants.REFRESH_SOURCE_CONTENT_ENABLED, + IndexConstants.REFRESH_SOURCE_CONTENT_ENABLED_DEFAULT) .toBoolean } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTests.scala b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTests.scala index 8c723fe70..d97e7a4bf 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTests.scala @@ -468,7 +468,7 @@ class E2EHyperspaceRulesTests extends HyperspaceSuite with SQLHelper { test("Validate index usage for enforce delete on read after some source data file is deleted.") { withSQLConf( IndexConstants.INDEX_LINEAGE_ENABLED -> "true", - IndexConstants.REFRESH_LOGENTRY_ACTION_ENABLED -> "true") { + IndexConstants.REFRESH_SOURCE_CONTENT_ENABLED -> "true") { withTempDir { inputDir => // Save a copy of source data files. val location = inputDir.toString diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala index 98172197f..1deed4c20 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala @@ -126,7 +126,7 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter | "properties" : { } | } | }, - | "excluded" : ["file:/rootpath/f1"], + | "deleted" : ["file:/rootpath/f1"], | "appended" : ["file:/rootpath/f3"] | }, | "kind" : "HDFS" diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala index 2d9b1e48e..7ae5c411d 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala @@ -118,7 +118,7 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { val ex = intercept[HyperspaceException](hyperspace.refreshIndex(indexConfig.indexName)) assert( - ex.getMessage.contains(s"Index refresh (to handle deleted or appended source data) is " + + ex.getMessage.contains(s"Index refresh to update source content in index metadata is " + "only supported on an index with lineage.")) } } @@ -218,13 +218,13 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { withSQLConf( IndexConstants.INDEX_LINEAGE_ENABLED -> "true", - IndexConstants.REFRESH_LOGENTRY_ACTION_ENABLED -> "true") { + IndexConstants.REFRESH_SOURCE_CONTENT_ENABLED -> "true") { withIndex(indexConfig.indexName) { hyperspace.createIndex(nonPartitionedDataDF, indexConfig) val ixManager = Hyperspace.getContext(spark).indexCollectionManager val originalIndex = ixManager.getIndexes() assert(originalIndex.length == 1) - assert(originalIndex.head.excludedFiles.isEmpty) + assert(originalIndex.head.deletedFiles.isEmpty) assert(originalIndex.head.appendedFiles.isEmpty) // Delete one source data file. @@ -241,7 +241,7 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { hyperspace.refreshIndex(indexConfig.indexName) val refreshedIndex1 = ixManager.getIndexes() assert(refreshedIndex1.length == 1) - assert(refreshedIndex1.head.excludedFiles.equals(Seq(deletedFile1.toString))) + assert(refreshedIndex1.head.deletedFiles.equals(Seq(deletedFile1.toString))) assert(refreshedIndex1.head.appendedFiles.equals(Seq(appendedFile1.toString))) // Make sure index fingerprint is changed. @@ -258,13 +258,13 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { "I am some random content for yet another new file :).") // Refresh index and validate updated IndexLogEntry. - // `excluded` files should contain both deleted source data files. - // `appened` files should contain both appended source data files. + // `deleted` files should contain both of the deleted source data files. + // `appended` files should contain both of the appended source data files. hyperspace.refreshIndex(indexConfig.indexName) val refreshedIndex2 = ixManager.getIndexes() assert(refreshedIndex2.length == 1) assert( - refreshedIndex2.head.excludedFiles + refreshedIndex2.head.deletedFiles .equals(Seq(deletedFile1.toString, deletedFile2.toString))) assert( refreshedIndex2.head.appendedFiles @@ -279,7 +279,7 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { test( "Validate refresh IndexLogEntry for deleted and appended source data files " + - "does not add duplicate excluded or appended files.") { + "does not add duplicate deleted or appended files.") { // Save test data non-partitioned. SampleData.save( spark, @@ -289,13 +289,13 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { withSQLConf( IndexConstants.INDEX_LINEAGE_ENABLED -> "true", - IndexConstants.REFRESH_LOGENTRY_ACTION_ENABLED -> "true") { + IndexConstants.REFRESH_SOURCE_CONTENT_ENABLED -> "true") { withIndex(indexConfig.indexName) { hyperspace.createIndex(nonPartitionedDataDF, indexConfig) val ixManager = Hyperspace.getContext(spark).indexCollectionManager val originalIndex = ixManager.getIndexes() assert(originalIndex.length == 1) - assert(originalIndex.head.excludedFiles.isEmpty) + assert(originalIndex.head.deletedFiles.isEmpty) assert(originalIndex.head.appendedFiles.isEmpty) // Delete one source data file. @@ -311,7 +311,7 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { // Refresh index and validate updated IndexLogEntry. hyperspace.refreshIndex(indexConfig.indexName) val refreshedIndex = ixManager.getIndexes() - assert(refreshedIndex.head.excludedFiles.equals(Seq(deletedFile.toString))) + assert(refreshedIndex.head.deletedFiles.equals(Seq(deletedFile.toString))) // Refresh index again and validate it fails as expected. val ex = intercept[HyperspaceException](hyperspace.refreshIndex(indexConfig.indexName)) @@ -324,7 +324,7 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { test( "Validate refresh index (to handle deletes from the source data) " + - "updates index files correctly when called on an index with excluded files.") { + "updates index files correctly when called on an index with deleted files.") { // Save test data non-partitioned. SampleData.save( spark, @@ -334,12 +334,12 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") { withIndex(indexConfig.indexName) { - spark.conf.set(IndexConstants.REFRESH_LOGENTRY_ACTION_ENABLED, "true") + spark.conf.set(IndexConstants.REFRESH_SOURCE_CONTENT_ENABLED, "true") hyperspace.createIndex(nonPartitionedDataDF, indexConfig) val ixManager = Hyperspace.getContext(spark).indexCollectionManager val originalIndex = ixManager.getIndexes() assert(originalIndex.length == 1) - assert(originalIndex.head.excludedFiles.isEmpty) + assert(originalIndex.head.deletedFiles.isEmpty) // Delete one source data file. val deletedFile = deleteDataFile(nonPartitionedDataPath) @@ -354,18 +354,18 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { hyperspace.refreshIndex(indexConfig.indexName) val refreshedIndex1 = ixManager.getIndexes() assert(refreshedIndex1.length == 1) - assert(refreshedIndex1.head.excludedFiles.equals(Seq(deletedFile.toString))) + assert(refreshedIndex1.head.deletedFiles.equals(Seq(deletedFile.toString))) // Change refresh configurations and refresh index to update index files. - spark.conf.set(IndexConstants.REFRESH_LOGENTRY_ACTION_ENABLED, "false") + spark.conf.set(IndexConstants.REFRESH_SOURCE_CONTENT_ENABLED, "false") spark.conf.set(IndexConstants.REFRESH_DELETE_ENABLED, "true") hyperspace.refreshIndex(indexConfig.indexName) val refreshedIndex2 = ixManager.getIndexes() assert(refreshedIndex2.length == 1) // Now that index entries for deleted source data files are removed - // from index files, `excluded` files list should be reset and empty. - assert(refreshedIndex2.head.excludedFiles.isEmpty) + // from index files, `deleted` files list should be reset and empty. + assert(refreshedIndex2.head.deletedFiles.isEmpty) // Verify index signature in latest version is different from // original version (before any source data file deletion).