From b0771bf997ee6244a690ebc393cc1a7f79bea1d5 Mon Sep 17 00:00:00 2001 From: Apoorve Dave Date: Wed, 7 Oct 2020 20:39:29 -0700 Subject: [PATCH 01/29] initial commit --- .../microsoft/hyperspace/HyperspaceException.scala | 2 ++ .../com/microsoft/hyperspace/actions/Action.scala | 5 ++++- .../hyperspace/actions/RefreshAppendAction.scala | 4 ++-- .../hyperspace/actions/RefreshDeleteAction.scala | 11 ++++++----- .../hyperspace/index/IndexManagerTests.scala | 11 +++++++---- .../hyperspace/index/RefreshIndexTests.scala | 11 ++++++++--- 6 files changed, 29 insertions(+), 15 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/HyperspaceException.scala b/src/main/scala/com/microsoft/hyperspace/HyperspaceException.scala index e397983e7..09ce0dba6 100644 --- a/src/main/scala/com/microsoft/hyperspace/HyperspaceException.scala +++ b/src/main/scala/com/microsoft/hyperspace/HyperspaceException.scala @@ -17,3 +17,5 @@ package com.microsoft.hyperspace case class HyperspaceException(msg: String) extends Exception(msg) + +case class NoChangesDetected(msg: String) extends Exception(msg) diff --git a/src/main/scala/com/microsoft/hyperspace/actions/Action.scala b/src/main/scala/com/microsoft/hyperspace/actions/Action.scala index 5106b7cc9..fb767e2e5 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/Action.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/Action.scala @@ -18,7 +18,7 @@ package com.microsoft.hyperspace.actions import org.apache.spark.internal.Logging -import com.microsoft.hyperspace.{ActiveSparkSession, HyperspaceException} +import com.microsoft.hyperspace.{ActiveSparkSession, HyperspaceException, NoChangesDetected} import com.microsoft.hyperspace.index.{IndexLogManager, LogEntry} import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEvent, HyperspaceEventLogging} @@ -94,6 +94,9 @@ trait Action extends HyperspaceEventLogging with Logging with ActiveSparkSession end() logEvent(event(appInfo, message = "Operation Succeeded.")) } catch { + case e: NoChangesDetected => + logEvent(event(appInfo, message = s"No-op Operation Recorded: ${e.getMessage}.")) + logWarning(e.msg) case e: Exception => logEvent(event(appInfo, message = s"Operation Failed: ${e.getMessage}.")) throw e diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshAppendAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshAppendAction.scala index 2dbf6fc8d..4a975bd87 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshAppendAction.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshAppendAction.scala @@ -19,7 +19,7 @@ package com.microsoft.hyperspace.actions import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex} -import com.microsoft.hyperspace.HyperspaceException +import com.microsoft.hyperspace.NoChangesDetected import com.microsoft.hyperspace.index._ import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEvent, RefreshAppendActionEvent} @@ -57,7 +57,7 @@ class RefreshAppendAction( super.validate() if (appendedFiles.isEmpty) { - throw HyperspaceException("Refresh aborted as no appended source data files found.") + throw NoChangesDetected("Refresh aborted as no appended source data files found.") } } diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala index 9668fc8d3..e5de40ec4 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala @@ -21,7 +21,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ -import com.microsoft.hyperspace.HyperspaceException +import com.microsoft.hyperspace.{HyperspaceException, NoChangesDetected} import com.microsoft.hyperspace.index._ import com.microsoft.hyperspace.index.DataFrameWriterExtensions.Bucketizer import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEvent, RefreshDeleteActionEvent} @@ -57,15 +57,16 @@ class RefreshDeleteAction( */ final override def validate(): Unit = { super.validate() + + if (deletedFiles.isEmpty) { + throw NoChangesDetected("Refresh aborted as no deleted source data file found.") + } + if (!previousIndexLogEntry.hasLineageColumn(spark)) { throw HyperspaceException( "Index refresh (to handle deleted source data) is " + "only supported on an index with lineage.") } - - if (deletedFiles.isEmpty) { - throw HyperspaceException("Refresh aborted as no deleted source data file found.") - } } /** diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala index abc6ed3f5..c58c4b0ed 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala @@ -264,10 +264,13 @@ class IndexManagerTests extends HyperspaceSuite with SQLHelper { .parquet(testPath) val df = spark.read.parquet(testPath) hyperspace.createIndex(df, indexConfig) - val ex = intercept[HyperspaceException] { - hyperspace.refreshIndex(indexConfig.indexName) - } - assert(ex.msg.equals("Refresh aborted as no appended source data files found.")) + val indexPath = PathUtils.makeAbsolute(s"$systemPath/${indexConfig.indexName}") + val logManager = IndexLogManagerFactoryImpl.create(indexPath) + val latestId = logManager.getLatestId().get + + hyperspace.refreshIndex(indexConfig.indexName) + // Check that no new log files were created in this operation. + assert(latestId == logManager.getLatestId().get) } } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala index 90bd8453a..7f30e981e 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala @@ -21,7 +21,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.{AnalysisException, QueryTest} import com.microsoft.hyperspace.{Hyperspace, HyperspaceException, SampleData} -import com.microsoft.hyperspace.util.FileUtils +import com.microsoft.hyperspace.util.{FileUtils, PathUtils} /** * Unit E2E test cases for RefreshIndex. @@ -137,8 +137,13 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { IndexConstants.REFRESH_DELETE_ENABLED -> "true") { hyperspace.createIndex(nonPartitionedDataDF, indexConfig) - val ex = intercept[HyperspaceException](hyperspace.refreshIndex(indexConfig.indexName)) - assert(ex.getMessage.contains("Refresh aborted as no deleted source data file found.")) + val indexPath = PathUtils.makeAbsolute(s"$systemPath/${indexConfig.indexName}") + val logManager = IndexLogManagerFactoryImpl.create(indexPath) + val latestId = logManager.getLatestId().get + + hyperspace.refreshIndex(indexConfig.indexName) + // Check that no new log files were created in this operation. + assert(latestId == logManager.getLatestId().get) } } From 37c0dc82a83927d32c7420a00ac5e14984b7b045 Mon Sep 17 00:00:00 2001 From: Apoorve Dave <66283785+apoorvedave1@users.noreply.github.com> Date: Wed, 7 Oct 2020 21:32:57 -0700 Subject: [PATCH 02/29] test name fix --- .../com/microsoft/hyperspace/index/IndexManagerTests.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala index c58c4b0ed..6508e3b59 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala @@ -251,7 +251,7 @@ class IndexManagerTests extends HyperspaceSuite with SQLHelper { } } - test("Verify refresh-incremental (append-only) throws exception if no new files found.") { + test("Verify refresh-incremental (append-only) is a no-op if no new files found.") { withTempPathAsString { testPath => withSQLConf(IndexConstants.REFRESH_APPEND_ENABLED -> "true") { // Setup. Create sample data and index. From 739d474a39ca2c51e7606cdef513a6b4db373a11 Mon Sep 17 00:00:00 2001 From: Apoorve Dave <66283785+apoorvedave1@users.noreply.github.com> Date: Wed, 7 Oct 2020 21:33:55 -0700 Subject: [PATCH 03/29] Update RefreshIndexTests.scala --- .../com/microsoft/hyperspace/index/RefreshIndexTests.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala index 7f30e981e..51a8eb6d0 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala @@ -125,7 +125,7 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { test( "Validate refresh index (to handle deletes from the source data) " + - "is aborted if no source data file is deleted.") { + "is a no-op if no source data file is deleted.") { SampleData.save( spark, nonPartitionedDataPath, From ad2e9dfafed11426fe64a7d6944067159f367a5a Mon Sep 17 00:00:00 2001 From: Apoorve Dave Date: Wed, 7 Oct 2020 22:03:13 -0700 Subject: [PATCH 04/29] initial commit part 2 --- .../actions/RefreshActionBase.scala | 54 +++++++++++++++++++ .../actions/RefreshAppendAction.scala | 25 ++------- .../actions/RefreshDeleteAction.scala | 36 ++----------- .../hyperspace/index/IndexLogEntry.scala | 48 +++++++++++++++++ 4 files changed, 109 insertions(+), 54 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala index 621e17d6f..36041f821 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala @@ -16,7 +16,9 @@ package com.microsoft.hyperspace.actions +import org.apache.hadoop.fs.Path import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex} import org.apache.spark.sql.types.{DataType, StructType} import com.microsoft.hyperspace.HyperspaceException @@ -76,4 +78,56 @@ private[actions] abstract class RefreshActionBase( s"Current index state is ${previousIndexLogEntry.state}") } } + + /** + * Compare list of source data files from previous IndexLogEntry to list + * of current source data files, validate fileInfo for existing files and + * identify deleted source data files. + */ + protected lazy val deletedFiles: Seq[String] = { + val rels = previousIndexLogEntry.relations + val originalFiles = rels.head.data.properties.content.fileInfos + val currentFiles = rels.head.rootPaths + .flatMap { p => + Content + .fromDirectory(path = new Path(p), throwIfNotExists = true) + .fileInfos + } + .map(f => f.name -> f) + .toMap + + var delFiles = Seq[String]() + originalFiles.foreach { f => + currentFiles.get(f.name) match { + case Some(v) => + if (!f.equals(v)) { + throw HyperspaceException( + "Index refresh (to handle deleted source data) aborted. " + + s"Existing source data file info is changed (file: ${f.name}).") + } + case None => delFiles :+= f.name + } + } + + delFiles ++ previousIndexLogEntry.deletedFiles + } + + protected lazy val appendedFiles = { + val relation = previousIndexLogEntry.relations.head + + // TODO: improve this to take last modified time of files into account. + // https://github.com/microsoft/hyperspace/issues/182 + val originalFiles = relation.data.properties.content.files.map(_.toString) + + val allFiles = df.queryExecution.optimizedPlan.collect { + case LogicalRelation( + HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _), + _, + _, + _) => + location.allFiles().map(_.getPath.toString) + }.flatten + + allFiles.diff(originalFiles) ++ previousIndexLogEntry.appendedFiles + } } diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshAppendAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshAppendAction.scala index 4a975bd87..2ae1eda1d 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshAppendAction.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshAppendAction.scala @@ -17,7 +17,6 @@ package com.microsoft.hyperspace.actions import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex} import com.microsoft.hyperspace.NoChangesDetected import com.microsoft.hyperspace.index._ @@ -61,25 +60,6 @@ class RefreshAppendAction( } } - private lazy val appendedFiles = { - val relation = previousIndexLogEntry.relations.head - - // TODO: improve this to take last modified time of files into account. - // https://github.com/microsoft/hyperspace/issues/182 - val indexedFiles = relation.data.properties.content.files.map(_.toString) - - val allFiles = df.queryExecution.optimizedPlan.collect { - case LogicalRelation( - HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _), - _, - _, - _) => - location.allFiles().map(_.getPath.toString) - }.flatten - - allFiles.diff(indexedFiles) - } - private lazy val dfWithAppendedFiles = { val relation = previousIndexLogEntry.relations.head // Create a df with only diff files from original list of files. @@ -109,7 +89,10 @@ class RefreshAppendAction( // https://github.com/microsoft/hyperspace/issues/183 // New entry. - entry.copy(content = mergedContent) + entry + .copy(content = mergedContent) + .withNewDeletedFiles(deletedFiles) + .withAdditionalAppendedFiles(Seq()) } override protected def event(appInfo: AppInfo, message: String): HyperspaceEvent = { diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala index e5de40ec4..bda07a465 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala @@ -16,7 +16,6 @@ package com.microsoft.hyperspace.actions -import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ @@ -57,7 +56,6 @@ class RefreshDeleteAction( */ final override def validate(): Unit = { super.validate() - if (deletedFiles.isEmpty) { throw NoChangesDetected("Refresh aborted as no deleted source data file found.") } @@ -91,36 +89,8 @@ class RefreshDeleteAction( indexConfig.indexedColumns) } - /** - * Compare list of source data files from previous IndexLogEntry to list - * of current source data files, validate fileInfo for existing files and - * identify deleted source data files. - */ - private lazy val deletedFiles: Seq[String] = { - val rels = previousIndexLogEntry.relations - val originalFiles = rels.head.data.properties.content.fileInfos - val currentFiles = rels.head.rootPaths - .flatMap { p => - Content - .fromDirectory(path = new Path(p), throwIfNotExists = true) - .fileInfos - } - .map(f => f.name -> f) - .toMap - - var delFiles = Seq[String]() - originalFiles.foreach { f => - currentFiles.get(f.name) match { - case Some(v) => - if (!f.equals(v)) { - throw HyperspaceException( - "Index refresh (to handle deleted source data) aborted. " + - s"Existing source data file info is changed (file: ${f.name}).") - } - case None => delFiles :+= f.name - } - } - - delFiles + override def logEntry: LogEntry = { + val entry = getIndexLogEntry(spark, df, indexConfig, indexDataPath) + entry.withNewDeletedFiles(Seq()).withAdditionalAppendedFiles(appendedFiles) } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index 1bbbb0ba4..5dbc2b329 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -397,6 +397,54 @@ case class IndexLogEntry( relations.head.data.properties.appendedFiles } + def withAdditionalAppendedFiles(files: Seq[String]): IndexLogEntry = { + copy( + source = source.copy( + plan = source.plan.copy( + properties = source.plan.properties.copy( + relations = Seq( + relations.head.copy( + data = relations.head.data.copy( + properties = relations.head.data.properties.copy( + appendedFiles = relations.head.data.properties.appendedFiles ++ files)))))))) + } + + def withAdditionalDeletedFiles(files: Seq[String]): IndexLogEntry = { + copy( + source = source.copy( + plan = source.plan.copy( + properties = source.plan.properties.copy( + relations = Seq( + relations.head.copy( + data = relations.head.data.copy( + properties = relations.head.data.properties.copy( + deletedFiles = relations.head.data.properties.deletedFiles ++ files)))))))) + } + + def withNewAppendedFiles(files: Seq[String]): IndexLogEntry = { + copy( + source = source.copy( + plan = source.plan.copy( + properties = source.plan.properties.copy( + relations = Seq( + relations.head.copy( + data = relations.head.data.copy( + properties = relations.head.data.properties.copy( + appendedFiles = files)))))))) + } + + def withNewDeletedFiles(files: Seq[String]): IndexLogEntry = { + copy( + source = source.copy( + plan = source.plan.copy( + properties = source.plan.properties.copy( + relations = Seq( + relations.head.copy( + data = relations.head.data.copy( + properties = relations.head.data.properties.copy( + deletedFiles = files)))))))) + } + def bucketSpec: BucketSpec = BucketSpec( numBuckets = numBuckets, From 4d27fe2a70609ab29edf34a21d5bf6335914f51a Mon Sep 17 00:00:00 2001 From: Apoorve Dave Date: Thu, 8 Oct 2020 12:22:26 -0700 Subject: [PATCH 05/29] review comments --- .../hyperspace/HyperspaceException.scala | 2 -- .../microsoft/hyperspace/actions/Action.scala | 12 ++++++------ .../actions/NoChangesException.scala | 19 +++++++++++++++++++ .../actions/RefreshAppendAction.scala | 3 +-- .../actions/RefreshDeleteAction.scala | 4 ++-- .../com/microsoft/hyperspace/TestUtils.scala | 17 +++++++++++++++++ .../hyperspace/index/IndexManagerTests.scala | 18 ++++++++++++++++-- .../hyperspace/index/RefreshIndexTests.scala | 17 +++++++++++++++-- 8 files changed, 76 insertions(+), 16 deletions(-) create mode 100644 src/main/scala/com/microsoft/hyperspace/actions/NoChangesException.scala diff --git a/src/main/scala/com/microsoft/hyperspace/HyperspaceException.scala b/src/main/scala/com/microsoft/hyperspace/HyperspaceException.scala index 09ce0dba6..e397983e7 100644 --- a/src/main/scala/com/microsoft/hyperspace/HyperspaceException.scala +++ b/src/main/scala/com/microsoft/hyperspace/HyperspaceException.scala @@ -17,5 +17,3 @@ package com.microsoft.hyperspace case class HyperspaceException(msg: String) extends Exception(msg) - -case class NoChangesDetected(msg: String) extends Exception(msg) diff --git a/src/main/scala/com/microsoft/hyperspace/actions/Action.scala b/src/main/scala/com/microsoft/hyperspace/actions/Action.scala index fb767e2e5..72c992417 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/Action.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/Action.scala @@ -18,7 +18,7 @@ package com.microsoft.hyperspace.actions import org.apache.spark.internal.Logging -import com.microsoft.hyperspace.{ActiveSparkSession, HyperspaceException, NoChangesDetected} +import com.microsoft.hyperspace.{ActiveSparkSession, HyperspaceException} import com.microsoft.hyperspace.index.{IndexLogManager, LogEntry} import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEvent, HyperspaceEventLogging} @@ -84,7 +84,7 @@ trait Action extends HyperspaceEventLogging with Logging with ActiveSparkSession val appInfo = AppInfo(sparkContext.sparkUser, sparkContext.applicationId, sparkContext.appName) try { - logEvent(event(appInfo, "Operation Started.")) + logEvent(event(appInfo, "Operation started.")) validate() begin() @@ -92,13 +92,13 @@ trait Action extends HyperspaceEventLogging with Logging with ActiveSparkSession op() end() - logEvent(event(appInfo, message = "Operation Succeeded.")) + logEvent(event(appInfo, message = "Operation succeeded.")) } catch { - case e: NoChangesDetected => - logEvent(event(appInfo, message = s"No-op Operation Recorded: ${e.getMessage}.")) + case e: NoChangesException => + logEvent(event(appInfo, message = s"No-op operation recorded: ${e.getMessage}.")) logWarning(e.msg) case e: Exception => - logEvent(event(appInfo, message = s"Operation Failed: ${e.getMessage}.")) + logEvent(event(appInfo, message = s"Operation failed: ${e.getMessage}.")) throw e } } diff --git a/src/main/scala/com/microsoft/hyperspace/actions/NoChangesException.scala b/src/main/scala/com/microsoft/hyperspace/actions/NoChangesException.scala new file mode 100644 index 000000000..4202bed02 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/actions/NoChangesException.scala @@ -0,0 +1,19 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.actions + +private[actions] case class NoChangesException(msg: String) extends Exception(msg) diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshAppendAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshAppendAction.scala index 4a975bd87..e6b613d59 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshAppendAction.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshAppendAction.scala @@ -19,7 +19,6 @@ package com.microsoft.hyperspace.actions import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex} -import com.microsoft.hyperspace.NoChangesDetected import com.microsoft.hyperspace.index._ import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEvent, RefreshAppendActionEvent} @@ -57,7 +56,7 @@ class RefreshAppendAction( super.validate() if (appendedFiles.isEmpty) { - throw NoChangesDetected("Refresh aborted as no appended source data files found.") + throw NoChangesException("Refresh append aborted as no appended source data files found.") } } diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala index e5de40ec4..e39b625ec 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala @@ -21,7 +21,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ -import com.microsoft.hyperspace.{HyperspaceException, NoChangesDetected} +import com.microsoft.hyperspace.HyperspaceException import com.microsoft.hyperspace.index._ import com.microsoft.hyperspace.index.DataFrameWriterExtensions.Bucketizer import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEvent, RefreshDeleteActionEvent} @@ -59,7 +59,7 @@ class RefreshDeleteAction( super.validate() if (deletedFiles.isEmpty) { - throw NoChangesDetected("Refresh aborted as no deleted source data file found.") + throw NoChangesException("Refresh delete aborted as no deleted source data file found.") } if (!previousIndexLogEntry.hasLineageColumn(spark)) { diff --git a/src/test/scala/com/microsoft/hyperspace/TestUtils.scala b/src/test/scala/com/microsoft/hyperspace/TestUtils.scala index 3f99e4354..a0ae38992 100644 --- a/src/test/scala/com/microsoft/hyperspace/TestUtils.scala +++ b/src/test/scala/com/microsoft/hyperspace/TestUtils.scala @@ -19,6 +19,7 @@ package com.microsoft.hyperspace import org.apache.hadoop.fs.Path import com.microsoft.hyperspace.index.IndexLogEntry +import com.microsoft.hyperspace.telemetry.{EventLogger, HyperspaceEvent} object TestUtils { def copyWithState(index: IndexLogEntry, state: String): IndexLogEntry = { @@ -45,3 +46,19 @@ object TestUtils { } } } + +/** + * This class can be used to test emitted events from Hyperspace actions. + */ +private[index] class MockEventLogger extends EventLogger { + import com.microsoft.hyperspace.MockEventLogger.emittedEvents + // Reset events for `this` action. + emittedEvents = Seq() + + override def logEvent(event: HyperspaceEvent): Unit = { + emittedEvents :+= event + } +} +private[index] object MockEventLogger { + var emittedEvents: Seq[HyperspaceEvent] = Seq() +} diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala index c58c4b0ed..f68a475fa 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala @@ -23,9 +23,11 @@ import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRela import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} -import com.microsoft.hyperspace.{Hyperspace, HyperspaceException, SampleData} +import com.microsoft.hyperspace.{Hyperspace, HyperspaceException, MockEventLogger, SampleData} import com.microsoft.hyperspace.TestUtils.copyWithState import com.microsoft.hyperspace.actions.Constants +import com.microsoft.hyperspace.telemetry.{CreateActionEvent, RefreshAppendActionEvent} +import com.microsoft.hyperspace.telemetry.Constants.HYPERSPACE_EVENT_LOGGER_CLASS_KEY import com.microsoft.hyperspace.util.{FileUtils, PathUtils} class IndexManagerTests extends HyperspaceSuite with SQLHelper { @@ -253,7 +255,9 @@ class IndexManagerTests extends HyperspaceSuite with SQLHelper { test("Verify refresh-incremental (append-only) throws exception if no new files found.") { withTempPathAsString { testPath => - withSQLConf(IndexConstants.REFRESH_APPEND_ENABLED -> "true") { + withSQLConf( + IndexConstants.REFRESH_APPEND_ENABLED -> "true", + HYPERSPACE_EVENT_LOGGER_CLASS_KEY -> "com.microsoft.hyperspace.index.MockEventLogger") { // Setup. Create sample data and index. val indexConfig = IndexConfig(s"index", Seq("RGUID"), Seq("imprs")) import spark.implicits._ @@ -271,6 +275,16 @@ class IndexManagerTests extends HyperspaceSuite with SQLHelper { hyperspace.refreshIndex(indexConfig.indexName) // Check that no new log files were created in this operation. assert(latestId == logManager.getLatestId().get) + + // Check emitted events. + MockEventLogger.emittedEvents match { + case Seq( + _: CreateActionEvent, + _: CreateActionEvent, + _: RefreshAppendActionEvent, + _: RefreshAppendActionEvent) => // pass + case _ => fail() + } } } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala index 7f30e981e..c26147a25 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala @@ -20,7 +20,9 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.sql.{AnalysisException, QueryTest} -import com.microsoft.hyperspace.{Hyperspace, HyperspaceException, SampleData} +import com.microsoft.hyperspace.{Hyperspace, HyperspaceException, MockEventLogger, SampleData} +import com.microsoft.hyperspace.telemetry.{CreateActionEvent, RefreshDeleteActionEvent} +import com.microsoft.hyperspace.telemetry.Constants.HYPERSPACE_EVENT_LOGGER_CLASS_KEY import com.microsoft.hyperspace.util.{FileUtils, PathUtils} /** @@ -134,7 +136,8 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { withSQLConf( IndexConstants.INDEX_LINEAGE_ENABLED -> "true", - IndexConstants.REFRESH_DELETE_ENABLED -> "true") { + IndexConstants.REFRESH_DELETE_ENABLED -> "true", + HYPERSPACE_EVENT_LOGGER_CLASS_KEY -> "com.microsoft.hyperspace.index.MockEventLogger") { hyperspace.createIndex(nonPartitionedDataDF, indexConfig) val indexPath = PathUtils.makeAbsolute(s"$systemPath/${indexConfig.indexName}") @@ -144,6 +147,16 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { hyperspace.refreshIndex(indexConfig.indexName) // Check that no new log files were created in this operation. assert(latestId == logManager.getLatestId().get) + + // Check emitted events. + MockEventLogger.emittedEvents match { + case Seq( + _: CreateActionEvent, + _: CreateActionEvent, + _: RefreshDeleteActionEvent, + _: RefreshDeleteActionEvent) => // pass + case _ => fail() + } } } From 87ff7b72552f9af0d184eb974f32db4ab76b420b Mon Sep 17 00:00:00 2001 From: Apoorve Dave <66283785+apoorvedave1@users.noreply.github.com> Date: Thu, 8 Oct 2020 12:42:12 -0700 Subject: [PATCH 06/29] doc --- .../hyperspace/actions/NoChangesException.scala | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/main/scala/com/microsoft/hyperspace/actions/NoChangesException.scala b/src/main/scala/com/microsoft/hyperspace/actions/NoChangesException.scala index 4202bed02..e7bafedb9 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/NoChangesException.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/NoChangesException.scala @@ -16,4 +16,14 @@ package com.microsoft.hyperspace.actions +/** + * This exception represents a No-op required from Hyperspace. Use this exception when a + * hyperspace action is not necessary for index maintenance. + * For example, if the data source has not changed since the last time an index was created on it, + * we don't need to do anything when user calls a `refreshIndex()`. + * + * Hyperspace actions silently catch this exception and do not fail the application. + * + * @param msg Error message. + */ private[actions] case class NoChangesException(msg: String) extends Exception(msg) From e31f17138689c4ff54a9e4cfe05a042e1587267f Mon Sep 17 00:00:00 2001 From: Apoorve Dave <66283785+apoorvedave1@users.noreply.github.com> Date: Thu, 8 Oct 2020 13:16:45 -0700 Subject: [PATCH 07/29] build failure fix --- src/test/scala/com/microsoft/hyperspace/TestUtils.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/test/scala/com/microsoft/hyperspace/TestUtils.scala b/src/test/scala/com/microsoft/hyperspace/TestUtils.scala index a0ae38992..5cb3d43e0 100644 --- a/src/test/scala/com/microsoft/hyperspace/TestUtils.scala +++ b/src/test/scala/com/microsoft/hyperspace/TestUtils.scala @@ -50,7 +50,7 @@ object TestUtils { /** * This class can be used to test emitted events from Hyperspace actions. */ -private[index] class MockEventLogger extends EventLogger { +class MockEventLogger extends EventLogger { import com.microsoft.hyperspace.MockEventLogger.emittedEvents // Reset events for `this` action. emittedEvents = Seq() @@ -59,6 +59,7 @@ private[index] class MockEventLogger extends EventLogger { emittedEvents :+= event } } -private[index] object MockEventLogger { + +object MockEventLogger { var emittedEvents: Seq[HyperspaceEvent] = Seq() } From 47c6730bb2acc1bb5ac1da1585080fee583c7982 Mon Sep 17 00:00:00 2001 From: Apoorve Dave Date: Thu, 8 Oct 2020 13:32:41 -0700 Subject: [PATCH 08/29] add unit test for "appended"files update --- .../com/microsoft/hyperspace/TestUtils.scala | 5 +- .../hyperspace/index/RefreshIndexTests.scala | 60 +++++++++++++++++-- 2 files changed, 59 insertions(+), 6 deletions(-) diff --git a/src/test/scala/com/microsoft/hyperspace/TestUtils.scala b/src/test/scala/com/microsoft/hyperspace/TestUtils.scala index a0ae38992..5cb3d43e0 100644 --- a/src/test/scala/com/microsoft/hyperspace/TestUtils.scala +++ b/src/test/scala/com/microsoft/hyperspace/TestUtils.scala @@ -50,7 +50,7 @@ object TestUtils { /** * This class can be used to test emitted events from Hyperspace actions. */ -private[index] class MockEventLogger extends EventLogger { +class MockEventLogger extends EventLogger { import com.microsoft.hyperspace.MockEventLogger.emittedEvents // Reset events for `this` action. emittedEvents = Seq() @@ -59,6 +59,7 @@ private[index] class MockEventLogger extends EventLogger { emittedEvents :+= event } } -private[index] object MockEventLogger { + +object MockEventLogger { var emittedEvents: Seq[HyperspaceEvent] = Seq() } diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala index 869027000..b4d234c39 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala @@ -140,13 +140,11 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { HYPERSPACE_EVENT_LOGGER_CLASS_KEY -> "com.microsoft.hyperspace.index.MockEventLogger") { hyperspace.createIndex(nonPartitionedDataDF, indexConfig) - val indexPath = PathUtils.makeAbsolute(s"$systemPath/${indexConfig.indexName}") - val logManager = IndexLogManagerFactoryImpl.create(indexPath) - val latestId = logManager.getLatestId().get + val latestId = logManager(indexConfig.indexName).getLatestId().get hyperspace.refreshIndex(indexConfig.indexName) // Check that no new log files were created in this operation. - assert(latestId == logManager.getLatestId().get) + assert(latestId == logManager(indexConfig.indexName).getLatestId().get) // Check emitted events. MockEventLogger.emittedEvents match { @@ -226,6 +224,49 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { } } + test( + "Validate refresh delete action updates appended files as expected," + + "when some file gets deleted and some appended to source data.") { + withSQLConf( + IndexConstants.INDEX_LINEAGE_ENABLED -> "true", + IndexConstants.REFRESH_DELETE_ENABLED -> "true") { + withIndex(indexConfig.indexName) { + SampleData.save( + spark, + nonPartitionedDataPath, + Seq("Date", "RGUID", "Query", "imprs", "clicks")) + val df = spark.read.parquet(nonPartitionedDataPath) + hyperspace.createIndex(df, indexConfig) + + // Delete one source data file. + deleteDataFile(nonPartitionedDataPath) + + val oldFileList = fileList(nonPartitionedDataPath).toSet + + // Add some new data to source. + import spark.implicits._ + SampleData.testData + .take(3) + .toDF("Date", "RGUID", "Query", "imprs", "clicks") + .write + .mode("append") + .parquet(nonPartitionedDataPath) + + hyperspace.refreshIndex(indexConfig.indexName) + + // Check if refreshed index metadata has "appendedFiles" and "deletedFiles" updated. + val entry = logManager(indexConfig.indexName).getLatestStableLog() + assert(entry.isDefined) + assert(entry.get.isInstanceOf[IndexLogEntry]) + val indexLogEntry = entry.get.asInstanceOf[IndexLogEntry] + assert(indexLogEntry.deletedFiles.isEmpty) + + val newFileList = fileList(nonPartitionedDataPath).toSet + assert(indexLogEntry.appendedFiles.toSet.equals(newFileList -- oldFileList)) + } + } + } + /** * Delete one file from a given path. * @@ -251,4 +292,15 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { fileToDelete } + + private def logManager(indexName: String): IndexLogManager = { + val indexPath = PathUtils.makeAbsolute(s"$systemPath/$indexName") + IndexLogManagerFactoryImpl.create(indexPath) + } + + private def fileList(path: String): Seq[String] = { + val absolutePath = PathUtils.makeAbsolute(path) + val fs = absolutePath.getFileSystem(new Configuration) + fs.listStatus(absolutePath).toSeq.map(_.getPath.toString) + } } From 7b4a6a8aadaabbd350b2b10ec760a2921f416754 Mon Sep 17 00:00:00 2001 From: Apoorve Dave Date: Thu, 8 Oct 2020 13:36:43 -0700 Subject: [PATCH 09/29] test fix --- .../com/microsoft/hyperspace/index/IndexManagerTests.scala | 2 +- .../com/microsoft/hyperspace/index/RefreshIndexTests.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala index 0deea9bad..48a451a1d 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala @@ -257,7 +257,7 @@ class IndexManagerTests extends HyperspaceSuite with SQLHelper { withTempPathAsString { testPath => withSQLConf( IndexConstants.REFRESH_APPEND_ENABLED -> "true", - HYPERSPACE_EVENT_LOGGER_CLASS_KEY -> "com.microsoft.hyperspace.index.MockEventLogger") { + HYPERSPACE_EVENT_LOGGER_CLASS_KEY -> "com.microsoft.hyperspace.MockEventLogger") { // Setup. Create sample data and index. val indexConfig = IndexConfig(s"index", Seq("RGUID"), Seq("imprs")) import spark.implicits._ diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala index 869027000..8d93d9c8b 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala @@ -137,7 +137,7 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { withSQLConf( IndexConstants.INDEX_LINEAGE_ENABLED -> "true", IndexConstants.REFRESH_DELETE_ENABLED -> "true", - HYPERSPACE_EVENT_LOGGER_CLASS_KEY -> "com.microsoft.hyperspace.index.MockEventLogger") { + HYPERSPACE_EVENT_LOGGER_CLASS_KEY -> "com.microsoft.hyperspace.MockEventLogger") { hyperspace.createIndex(nonPartitionedDataDF, indexConfig) val indexPath = PathUtils.makeAbsolute(s"$systemPath/${indexConfig.indexName}") From f8766c0ff3f46e520e1bc6546fd70870435002ea Mon Sep 17 00:00:00 2001 From: Apoorve Dave Date: Thu, 8 Oct 2020 14:01:39 -0700 Subject: [PATCH 10/29] review comments --- .../com/microsoft/hyperspace/actions/Action.scala | 4 ++-- .../hyperspace/actions/NoChangesException.scala | 2 +- .../scala/com/microsoft/hyperspace/TestUtils.scala | 6 +++++- .../hyperspace/index/IndexManagerTests.scala | 11 ++++++----- .../hyperspace/index/RefreshIndexTests.scala | 11 ++++++----- 5 files changed, 20 insertions(+), 14 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/actions/Action.scala b/src/main/scala/com/microsoft/hyperspace/actions/Action.scala index 72c992417..eb1143cde 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/Action.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/Action.scala @@ -95,10 +95,10 @@ trait Action extends HyperspaceEventLogging with Logging with ActiveSparkSession logEvent(event(appInfo, message = "Operation succeeded.")) } catch { case e: NoChangesException => - logEvent(event(appInfo, message = s"No-op operation recorded: ${e.getMessage}.")) + logEvent(event(appInfo, message = s"No-op operation recorded: ${e.getMessage}")) logWarning(e.msg) case e: Exception => - logEvent(event(appInfo, message = s"Operation failed: ${e.getMessage}.")) + logEvent(event(appInfo, message = s"Operation failed: ${e.getMessage}")) throw e } } diff --git a/src/main/scala/com/microsoft/hyperspace/actions/NoChangesException.scala b/src/main/scala/com/microsoft/hyperspace/actions/NoChangesException.scala index e7bafedb9..7f16ece31 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/NoChangesException.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/NoChangesException.scala @@ -22,7 +22,7 @@ package com.microsoft.hyperspace.actions * For example, if the data source has not changed since the last time an index was created on it, * we don't need to do anything when user calls a `refreshIndex()`. * - * Hyperspace actions silently catch this exception and do not fail the application. + * [[Action.run]] will silently catch this exception and will not fail the application. * * @param msg Error message. */ diff --git a/src/test/scala/com/microsoft/hyperspace/TestUtils.scala b/src/test/scala/com/microsoft/hyperspace/TestUtils.scala index 5cb3d43e0..5324eb8c3 100644 --- a/src/test/scala/com/microsoft/hyperspace/TestUtils.scala +++ b/src/test/scala/com/microsoft/hyperspace/TestUtils.scala @@ -18,6 +18,7 @@ package com.microsoft.hyperspace import org.apache.hadoop.fs.Path +import com.microsoft.hyperspace.MockEventLogger.reset import com.microsoft.hyperspace.index.IndexLogEntry import com.microsoft.hyperspace.telemetry.{EventLogger, HyperspaceEvent} @@ -53,7 +54,7 @@ object TestUtils { class MockEventLogger extends EventLogger { import com.microsoft.hyperspace.MockEventLogger.emittedEvents // Reset events for `this` action. - emittedEvents = Seq() + reset() override def logEvent(event: HyperspaceEvent): Unit = { emittedEvents :+= event @@ -61,5 +62,8 @@ class MockEventLogger extends EventLogger { } object MockEventLogger { + def reset(): Unit = { + emittedEvents = Seq() + } var emittedEvents: Seq[HyperspaceEvent] = Seq() } diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala index 48a451a1d..822cb3a2c 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala @@ -26,8 +26,8 @@ import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructT import com.microsoft.hyperspace.{Hyperspace, HyperspaceException, MockEventLogger, SampleData} import com.microsoft.hyperspace.TestUtils.copyWithState import com.microsoft.hyperspace.actions.Constants -import com.microsoft.hyperspace.telemetry.{CreateActionEvent, RefreshAppendActionEvent} import com.microsoft.hyperspace.telemetry.Constants.HYPERSPACE_EVENT_LOGGER_CLASS_KEY +import com.microsoft.hyperspace.telemetry.RefreshAppendActionEvent import com.microsoft.hyperspace.util.{FileUtils, PathUtils} class IndexManagerTests extends HyperspaceSuite with SQLHelper { @@ -272,6 +272,7 @@ class IndexManagerTests extends HyperspaceSuite with SQLHelper { val logManager = IndexLogManagerFactoryImpl.create(indexPath) val latestId = logManager.getLatestId().get + MockEventLogger.reset() hyperspace.refreshIndex(indexConfig.indexName) // Check that no new log files were created in this operation. assert(latestId == logManager.getLatestId().get) @@ -279,10 +280,10 @@ class IndexManagerTests extends HyperspaceSuite with SQLHelper { // Check emitted events. MockEventLogger.emittedEvents match { case Seq( - _: CreateActionEvent, - _: CreateActionEvent, - _: RefreshAppendActionEvent, - _: RefreshAppendActionEvent) => // pass + RefreshAppendActionEvent(_, _, "Operation started."), + RefreshAppendActionEvent(_, _, msg)) + if msg.equals("No-op operation recorded: Refresh append aborted as" + + " no appended source data files found.") => // pass case _ => fail() } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala index 8d93d9c8b..7319328d3 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala @@ -21,8 +21,8 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.{AnalysisException, QueryTest} import com.microsoft.hyperspace.{Hyperspace, HyperspaceException, MockEventLogger, SampleData} -import com.microsoft.hyperspace.telemetry.{CreateActionEvent, RefreshDeleteActionEvent} import com.microsoft.hyperspace.telemetry.Constants.HYPERSPACE_EVENT_LOGGER_CLASS_KEY +import com.microsoft.hyperspace.telemetry.RefreshDeleteActionEvent import com.microsoft.hyperspace.util.{FileUtils, PathUtils} /** @@ -144,6 +144,7 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { val logManager = IndexLogManagerFactoryImpl.create(indexPath) val latestId = logManager.getLatestId().get + MockEventLogger.reset() hyperspace.refreshIndex(indexConfig.indexName) // Check that no new log files were created in this operation. assert(latestId == logManager.getLatestId().get) @@ -151,10 +152,10 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { // Check emitted events. MockEventLogger.emittedEvents match { case Seq( - _: CreateActionEvent, - _: CreateActionEvent, - _: RefreshDeleteActionEvent, - _: RefreshDeleteActionEvent) => // pass + RefreshDeleteActionEvent(_, _, "Operation started."), + RefreshDeleteActionEvent(_, _, msg)) + if msg.equals("No-op operation recorded: Refresh delete aborted as " + + "no deleted source data file found.") => // pass case _ => fail() } } From 58f13392273e62e9697c076be42e3654c12a2559 Mon Sep 17 00:00:00 2001 From: Apoorve Dave Date: Thu, 8 Oct 2020 15:58:06 -0700 Subject: [PATCH 11/29] set logger at session initialization in test cases --- .../com/microsoft/hyperspace/index/IndexManagerTests.scala | 5 ++--- .../com/microsoft/hyperspace/index/RefreshIndexTests.scala | 5 ++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala index 822cb3a2c..2fae0575f 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala @@ -40,6 +40,7 @@ class IndexManagerTests extends HyperspaceSuite with SQLHelper { override def beforeAll(): Unit = { super.beforeAll() + spark.conf.set(HYPERSPACE_EVENT_LOGGER_CLASS_KEY, "com.microsoft.hyperspace.MockEventLogger") FileUtils.delete(new Path(sampleParquetDataLocation)) SampleData.save( @@ -255,9 +256,7 @@ class IndexManagerTests extends HyperspaceSuite with SQLHelper { test("Verify refresh-incremental (append-only) is a no-op if no new files found.") { withTempPathAsString { testPath => - withSQLConf( - IndexConstants.REFRESH_APPEND_ENABLED -> "true", - HYPERSPACE_EVENT_LOGGER_CLASS_KEY -> "com.microsoft.hyperspace.MockEventLogger") { + withSQLConf(IndexConstants.REFRESH_APPEND_ENABLED -> "true") { // Setup. Create sample data and index. val indexConfig = IndexConfig(s"index", Seq("RGUID"), Seq("imprs")) import spark.implicits._ diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala index 7319328d3..68db70ce6 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala @@ -38,7 +38,7 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { override def beforeAll(): Unit = { super.beforeAll() - + spark.conf.set(HYPERSPACE_EVENT_LOGGER_CLASS_KEY, "com.microsoft.hyperspace.MockEventLogger") hyperspace = new Hyperspace(spark) FileUtils.delete(new Path(testDir)) } @@ -136,8 +136,7 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { withSQLConf( IndexConstants.INDEX_LINEAGE_ENABLED -> "true", - IndexConstants.REFRESH_DELETE_ENABLED -> "true", - HYPERSPACE_EVENT_LOGGER_CLASS_KEY -> "com.microsoft.hyperspace.MockEventLogger") { + IndexConstants.REFRESH_DELETE_ENABLED -> "true") { hyperspace.createIndex(nonPartitionedDataDF, indexConfig) val indexPath = PathUtils.makeAbsolute(s"$systemPath/${indexConfig.indexName}") From 2c55d8632e7ee71555abd7b2ab29f51a58ed0b96 Mon Sep 17 00:00:00 2001 From: Apoorve Dave Date: Thu, 8 Oct 2020 17:31:05 -0700 Subject: [PATCH 12/29] move mock logger setting to beginning of all SparkInvolvedSuite tests --- .../scala/com/microsoft/hyperspace/SparkInvolvedSuite.scala | 3 +++ .../com/microsoft/hyperspace/index/IndexManagerTests.scala | 1 - .../com/microsoft/hyperspace/index/RefreshIndexTests.scala | 1 - 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/test/scala/com/microsoft/hyperspace/SparkInvolvedSuite.scala b/src/test/scala/com/microsoft/hyperspace/SparkInvolvedSuite.scala index c22391e90..a480316bc 100644 --- a/src/test/scala/com/microsoft/hyperspace/SparkInvolvedSuite.scala +++ b/src/test/scala/com/microsoft/hyperspace/SparkInvolvedSuite.scala @@ -21,6 +21,8 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.SparkSession import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} +import com.microsoft.hyperspace.telemetry.Constants.HYPERSPACE_EVENT_LOGGER_CLASS_KEY + trait SparkInvolvedSuite extends BeforeAndAfterAll with BeforeAndAfter { self: SparkFunSuite => @@ -31,6 +33,7 @@ trait SparkInvolvedSuite extends BeforeAndAfterAll with BeforeAndAfter { protected lazy val spark: SparkSession = SparkSession .builder() .master(s"local[$numParallelism]") + .config(HYPERSPACE_EVENT_LOGGER_CLASS_KEY, "com.microsoft.hyperspace.MockEventLogger") .appName(suiteName) .getOrCreate() diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala index 2fae0575f..76e6f27d5 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala @@ -40,7 +40,6 @@ class IndexManagerTests extends HyperspaceSuite with SQLHelper { override def beforeAll(): Unit = { super.beforeAll() - spark.conf.set(HYPERSPACE_EVENT_LOGGER_CLASS_KEY, "com.microsoft.hyperspace.MockEventLogger") FileUtils.delete(new Path(sampleParquetDataLocation)) SampleData.save( diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala index 68db70ce6..1f0889c96 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala @@ -38,7 +38,6 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { override def beforeAll(): Unit = { super.beforeAll() - spark.conf.set(HYPERSPACE_EVENT_LOGGER_CLASS_KEY, "com.microsoft.hyperspace.MockEventLogger") hyperspace = new Hyperspace(spark) FileUtils.delete(new Path(testDir)) } From 138e96acb7561073ad7ce48f15ce1cc714a2e842 Mon Sep 17 00:00:00 2001 From: Apoorve Dave <66283785+apoorvedave1@users.noreply.github.com> Date: Thu, 8 Oct 2020 17:33:18 -0700 Subject: [PATCH 13/29] refactoring Co-authored-by: Terry Kim --- src/test/scala/com/microsoft/hyperspace/TestUtils.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/test/scala/com/microsoft/hyperspace/TestUtils.scala b/src/test/scala/com/microsoft/hyperspace/TestUtils.scala index 5324eb8c3..94f90644c 100644 --- a/src/test/scala/com/microsoft/hyperspace/TestUtils.scala +++ b/src/test/scala/com/microsoft/hyperspace/TestUtils.scala @@ -62,8 +62,9 @@ class MockEventLogger extends EventLogger { } object MockEventLogger { + var emittedEvents: Seq[HyperspaceEvent] = Seq() + def reset(): Unit = { emittedEvents = Seq() } - var emittedEvents: Seq[HyperspaceEvent] = Seq() } From 767513f35b24b375f2ea346e037694adba657594 Mon Sep 17 00:00:00 2001 From: Apoorve Dave <66283785+apoorvedave1@users.noreply.github.com> Date: Thu, 8 Oct 2020 17:34:02 -0700 Subject: [PATCH 14/29] explicit assert in test Co-authored-by: Terry Kim --- .../com/microsoft/hyperspace/index/IndexManagerTests.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala index 76e6f27d5..43c6987c5 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala @@ -278,10 +278,9 @@ class IndexManagerTests extends HyperspaceSuite with SQLHelper { // Check emitted events. MockEventLogger.emittedEvents match { case Seq( - RefreshAppendActionEvent(_, _, "Operation started."), - RefreshAppendActionEvent(_, _, msg)) - if msg.equals("No-op operation recorded: Refresh append aborted as" + - " no appended source data files found.") => // pass + RefreshDeleteActionEvent(_, _, "Operation started."), + RefreshDeleteActionEvent(_, _, msg)) => + assert(msg.contains("Refresh delete aborted as no deleted source data file found")) case _ => fail() } } From 7338de5e501de2d2b5f17627ac4f4e499302d53c Mon Sep 17 00:00:00 2001 From: Apoorve Dave Date: Thu, 8 Oct 2020 17:39:48 -0700 Subject: [PATCH 15/29] review comets --- src/test/scala/com/microsoft/hyperspace/TestUtils.scala | 2 +- .../com/microsoft/hyperspace/index/RefreshIndexTests.scala | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/test/scala/com/microsoft/hyperspace/TestUtils.scala b/src/test/scala/com/microsoft/hyperspace/TestUtils.scala index 5324eb8c3..9e853cf97 100644 --- a/src/test/scala/com/microsoft/hyperspace/TestUtils.scala +++ b/src/test/scala/com/microsoft/hyperspace/TestUtils.scala @@ -53,7 +53,7 @@ object TestUtils { */ class MockEventLogger extends EventLogger { import com.microsoft.hyperspace.MockEventLogger.emittedEvents - // Reset events for `this` action. + // Reset events. reset() override def logEvent(event: HyperspaceEvent): Unit = { diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala index 1f0889c96..aca55617c 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala @@ -151,9 +151,8 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { MockEventLogger.emittedEvents match { case Seq( RefreshDeleteActionEvent(_, _, "Operation started."), - RefreshDeleteActionEvent(_, _, msg)) - if msg.equals("No-op operation recorded: Refresh delete aborted as " + - "no deleted source data file found.") => // pass + RefreshDeleteActionEvent(_, _, msg)) => + assert(msg.contains("Refresh delete aborted as no deleted source data file found.")) case _ => fail() } } From 29ca9b3b289148b125e14848646ae97001fcfc38 Mon Sep 17 00:00:00 2001 From: Apoorve Dave Date: Thu, 8 Oct 2020 17:53:06 -0700 Subject: [PATCH 16/29] test bug fixes --- .../com/microsoft/hyperspace/index/IndexManagerTests.scala | 7 +++---- .../com/microsoft/hyperspace/index/RefreshIndexTests.scala | 1 - 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala index 43c6987c5..ec1d9298b 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala @@ -26,7 +26,6 @@ import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructT import com.microsoft.hyperspace.{Hyperspace, HyperspaceException, MockEventLogger, SampleData} import com.microsoft.hyperspace.TestUtils.copyWithState import com.microsoft.hyperspace.actions.Constants -import com.microsoft.hyperspace.telemetry.Constants.HYPERSPACE_EVENT_LOGGER_CLASS_KEY import com.microsoft.hyperspace.telemetry.RefreshAppendActionEvent import com.microsoft.hyperspace.util.{FileUtils, PathUtils} @@ -278,9 +277,9 @@ class IndexManagerTests extends HyperspaceSuite with SQLHelper { // Check emitted events. MockEventLogger.emittedEvents match { case Seq( - RefreshDeleteActionEvent(_, _, "Operation started."), - RefreshDeleteActionEvent(_, _, msg)) => - assert(msg.contains("Refresh delete aborted as no deleted source data file found")) + RefreshAppendActionEvent(_, _, "Operation started."), + RefreshAppendActionEvent(_, _, msg)) => + assert(msg.contains("Refresh append aborted as no appended source data files found.")) case _ => fail() } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala index aca55617c..bca72bfc2 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala @@ -21,7 +21,6 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.{AnalysisException, QueryTest} import com.microsoft.hyperspace.{Hyperspace, HyperspaceException, MockEventLogger, SampleData} -import com.microsoft.hyperspace.telemetry.Constants.HYPERSPACE_EVENT_LOGGER_CLASS_KEY import com.microsoft.hyperspace.telemetry.RefreshDeleteActionEvent import com.microsoft.hyperspace.util.{FileUtils, PathUtils} From 9d26ddcf417d6c06f5e2e12bfb209ea70b13ebe2 Mon Sep 17 00:00:00 2001 From: Apoorve Dave Date: Thu, 8 Oct 2020 18:08:42 -0700 Subject: [PATCH 17/29] whitespace fix --- src/test/scala/com/microsoft/hyperspace/TestUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/scala/com/microsoft/hyperspace/TestUtils.scala b/src/test/scala/com/microsoft/hyperspace/TestUtils.scala index d345c98c8..78b8f6ede 100644 --- a/src/test/scala/com/microsoft/hyperspace/TestUtils.scala +++ b/src/test/scala/com/microsoft/hyperspace/TestUtils.scala @@ -63,7 +63,7 @@ class MockEventLogger extends EventLogger { object MockEventLogger { var emittedEvents: Seq[HyperspaceEvent] = Seq() - + def reset(): Unit = { emittedEvents = Seq() } From b43b29e1a554b2ff802f7b2bbc1d2bb2e31eb3e7 Mon Sep 17 00:00:00 2001 From: Apoorve Dave Date: Thu, 8 Oct 2020 19:07:45 -0700 Subject: [PATCH 18/29] add refresh-append test for updating delted files --- .../hyperspace/index/RefreshIndexTests.scala | 44 ++++++++++++++++++- 1 file changed, 42 insertions(+), 2 deletions(-) diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala index 2456da1c7..df170ce25 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala @@ -222,8 +222,8 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { } test( - "Validate refresh delete action updates appended files as expected," + - "when some file gets deleted and some appended to source data.") { + "Validate refresh delete action updates appended and deleted files in metadata as" + + "expected, when some file gets deleted and some appended to source data.") { withSQLConf( IndexConstants.INDEX_LINEAGE_ENABLED -> "true", IndexConstants.REFRESH_DELETE_ENABLED -> "true") { @@ -264,6 +264,46 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { } } + test( + "Validate refresh append action updates appended and deleted files in metadata as" + + "expected, when some file gets deleted and some appended to source data.") { + withTempPathAsString { testPath => + withSQLConf(IndexConstants.REFRESH_APPEND_ENABLED -> "true") { + withIndex(indexConfig.indexName) { + SampleData.save(spark, testPath, Seq("Date", "RGUID", "Query", "imprs", "clicks")) + val df = spark.read.parquet(testPath) + hyperspace.createIndex(df, indexConfig) + + val oldFileList = fileList(testPath).toSet + + // Delete one source data file. + deleteDataFile(testPath) + + // Add some new data to source. + import spark.implicits._ + SampleData.testData + .take(3) + .toDF("Date", "RGUID", "Query", "imprs", "clicks") + .write + .mode("append") + .parquet(testPath) + + hyperspace.refreshIndex(indexConfig.indexName) + + // Check if refreshed index metadata has "appendedFiles" and "deletedFiles" updated. + val entry = logManager(indexConfig.indexName).getLatestStableLog() + assert(entry.isDefined) + assert(entry.get.isInstanceOf[IndexLogEntry]) + val indexLogEntry = entry.get.asInstanceOf[IndexLogEntry] + assert(indexLogEntry.appendedFiles.isEmpty) + + val newFileList = fileList(testPath).toSet + assert(indexLogEntry.deletedFiles.toSet.equals(oldFileList -- newFileList)) + } + } + } + } + /** * Delete one file from a given path. * From 8f55e66d28a4d59ecd2e909f35fe79035d69ded3 Mon Sep 17 00:00:00 2001 From: Apoorve Dave Date: Fri, 9 Oct 2020 10:02:40 -0700 Subject: [PATCH 19/29] bug fix in updating index log entry after refresh append and deleted --- .../actions/RefreshAppendAction.scala | 5 +-- .../actions/RefreshDeleteAction.scala | 2 +- .../hyperspace/index/IndexLogEntry.scala | 40 +------------------ 3 files changed, 4 insertions(+), 43 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshAppendAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshAppendAction.scala index 8e846ff0b..a5471a0e2 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshAppendAction.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshAppendAction.scala @@ -88,10 +88,7 @@ class RefreshAppendAction( // https://github.com/microsoft/hyperspace/issues/183 // New entry. - entry - .copy(content = mergedContent) - .withNewDeletedFiles(deletedFiles) - .withAdditionalAppendedFiles(Seq()) + entry.copy(content = mergedContent).withAppendedAndDeletedFiles(Seq(), deletedFiles) } override protected def event(appInfo: AppInfo, message: String): HyperspaceEvent = { diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala index 3dd49a036..015bed62d 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshDeleteAction.scala @@ -91,6 +91,6 @@ class RefreshDeleteAction( override def logEntry: LogEntry = { val entry = getIndexLogEntry(spark, df, indexConfig, indexDataPath) - entry.withNewDeletedFiles(Seq()).withAdditionalAppendedFiles(appendedFiles) + entry.withAppendedAndDeletedFiles(appendedFiles, Seq()) } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index 5dbc2b329..073aa7473 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -397,7 +397,7 @@ case class IndexLogEntry( relations.head.data.properties.appendedFiles } - def withAdditionalAppendedFiles(files: Seq[String]): IndexLogEntry = { + def withAppendedAndDeletedFiles(appended: Seq[String], deleted: Seq[String]): IndexLogEntry = { copy( source = source.copy( plan = source.plan.copy( @@ -406,43 +406,7 @@ case class IndexLogEntry( relations.head.copy( data = relations.head.data.copy( properties = relations.head.data.properties.copy( - appendedFiles = relations.head.data.properties.appendedFiles ++ files)))))))) - } - - def withAdditionalDeletedFiles(files: Seq[String]): IndexLogEntry = { - copy( - source = source.copy( - plan = source.plan.copy( - properties = source.plan.properties.copy( - relations = Seq( - relations.head.copy( - data = relations.head.data.copy( - properties = relations.head.data.properties.copy( - deletedFiles = relations.head.data.properties.deletedFiles ++ files)))))))) - } - - def withNewAppendedFiles(files: Seq[String]): IndexLogEntry = { - copy( - source = source.copy( - plan = source.plan.copy( - properties = source.plan.properties.copy( - relations = Seq( - relations.head.copy( - data = relations.head.data.copy( - properties = relations.head.data.properties.copy( - appendedFiles = files)))))))) - } - - def withNewDeletedFiles(files: Seq[String]): IndexLogEntry = { - copy( - source = source.copy( - plan = source.plan.copy( - properties = source.plan.properties.copy( - relations = Seq( - relations.head.copy( - data = relations.head.data.copy( - properties = relations.head.data.properties.copy( - deletedFiles = files)))))))) + appendedFiles = appended, deletedFiles = deleted)))))))) } def bucketSpec: BucketSpec = From 49f34de162e39e3a138e4e4925cb94222745fb0c Mon Sep 17 00:00:00 2001 From: Apoorve Dave Date: Fri, 9 Oct 2020 11:52:28 -0700 Subject: [PATCH 20/29] remove comment --- .../com/microsoft/hyperspace/actions/RefreshAppendAction.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshAppendAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshAppendAction.scala index a5471a0e2..def6d9f08 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshAppendAction.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshAppendAction.scala @@ -84,9 +84,6 @@ class RefreshAppendAction( // Merge new index files with old index files. val mergedContent = Content(previousIndexLogEntry.content.root.merge(entry.content.root)) - // TODO: update "appended" and "deleted" list in log entry. - // https://github.com/microsoft/hyperspace/issues/183 - // New entry. entry.copy(content = mergedContent).withAppendedAndDeletedFiles(Seq(), deletedFiles) } From 3872fd9ca717028b2875c98ca9a1407c72e4c2ac Mon Sep 17 00:00:00 2001 From: Apoorve Dave Date: Fri, 9 Oct 2020 16:51:02 -0700 Subject: [PATCH 21/29] initial commit --- .../com/microsoft/hyperspace/index/rules/RuleUtils.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala index aeb7eb4ed..f47bce98c 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/rules/RuleUtils.scala @@ -107,7 +107,10 @@ object RuleUtils { indexes.filter(index => index.created && isHybridScanCandidate(index, filesByRelations.flatten)) } else { - indexes.filter(index => index.created && signatureValid(index)) + indexes.filter( + index => + index.created && signatureValid(index) && + index.deletedFiles.isEmpty && index.appendedFiles.isEmpty) } } From f5927ac21856d02a0300439090522567f2b1ad1f Mon Sep 17 00:00:00 2001 From: Apoorve Dave Date: Fri, 9 Oct 2020 19:24:26 -0700 Subject: [PATCH 22/29] review comments --- .../actions/RefreshActionBase.scala | 13 +++ .../hyperspace/index/RefreshIndexTests.scala | 96 ++++++++++--------- 2 files changed, 63 insertions(+), 46 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala index 36041f821..e91ed79ad 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala @@ -83,6 +83,8 @@ private[actions] abstract class RefreshActionBase( * Compare list of source data files from previous IndexLogEntry to list * of current source data files, validate fileInfo for existing files and * identify deleted source data files. + * Finally, append the previously known deleted files to the result. These + * are the files for which the index was never updated in the past. */ protected lazy val deletedFiles: Seq[String] = { val rels = previousIndexLogEntry.relations @@ -109,9 +111,18 @@ private[actions] abstract class RefreshActionBase( } } + // TODO: Add test for the scenario where existing deletedFiles and newly deleted + // files are updated. https://github.com/microsoft/hyperspace/issues/195. delFiles ++ previousIndexLogEntry.deletedFiles } + /** + * Compare list of source data files from previous IndexLogEntry to list + * of current source data files, validate fileInfo for existing files and + * identify newly appended source data files. + * Finally, append the previously known appended files to the result. These + * are the files for which index was never updated in the past. + */ protected lazy val appendedFiles = { val relation = previousIndexLogEntry.relations.head @@ -128,6 +139,8 @@ private[actions] abstract class RefreshActionBase( location.allFiles().map(_.getPath.toString) }.flatten + // TODO: Add test for the scenario where existing appendedFiles and newly appended + // files are updated. https://github.com/microsoft/hyperspace/issues/195. allFiles.diff(originalFiles) ++ previousIndexLogEntry.appendedFiles } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala index df170ce25..df3b92813 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala @@ -222,44 +222,44 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { } test( - "Validate refresh delete action updates appended and deleted files in metadata as" + + "Validate refresh delete action updates appended and deleted files in metadata as " + "expected, when some file gets deleted and some appended to source data.") { - withSQLConf( - IndexConstants.INDEX_LINEAGE_ENABLED -> "true", - IndexConstants.REFRESH_DELETE_ENABLED -> "true") { - withIndex(indexConfig.indexName) { - SampleData.save( - spark, - nonPartitionedDataPath, - Seq("Date", "RGUID", "Query", "imprs", "clicks")) - val df = spark.read.parquet(nonPartitionedDataPath) - hyperspace.createIndex(df, indexConfig) - - // Delete one source data file. - deleteDataFile(nonPartitionedDataPath) - - val oldFileList = fileList(nonPartitionedDataPath).toSet - - // Add some new data to source. - import spark.implicits._ - SampleData.testData - .take(3) - .toDF("Date", "RGUID", "Query", "imprs", "clicks") - .write - .mode("append") - .parquet(nonPartitionedDataPath) - - hyperspace.refreshIndex(indexConfig.indexName) - - // Check if refreshed index metadata has "appendedFiles" and "deletedFiles" updated. - val entry = logManager(indexConfig.indexName).getLatestStableLog() - assert(entry.isDefined) - assert(entry.get.isInstanceOf[IndexLogEntry]) - val indexLogEntry = entry.get.asInstanceOf[IndexLogEntry] - assert(indexLogEntry.deletedFiles.isEmpty) - - val newFileList = fileList(nonPartitionedDataPath).toSet - assert(indexLogEntry.appendedFiles.toSet.equals(newFileList -- oldFileList)) + withTempPathAsString { nonPartitionedDataPath => + withSQLConf( + IndexConstants.INDEX_LINEAGE_ENABLED -> "true", + IndexConstants.REFRESH_DELETE_ENABLED -> "true") { + withIndex(indexConfig.indexName) { + SampleData.save( + spark, + nonPartitionedDataPath, + Seq("Date", "RGUID", "Query", "imprs", "clicks")) + val df = spark.read.parquet(nonPartitionedDataPath) + hyperspace.createIndex(df, indexConfig) + + // Delete one source data file. + deleteDataFile(nonPartitionedDataPath) + + val oldFiles = listFiles(nonPartitionedDataPath).toSet + + // Add some new data to source. + import spark.implicits._ + SampleData.testData + .take(3) + .toDF("Date", "RGUID", "Query", "imprs", "clicks") + .write + .mode("append") + .parquet(nonPartitionedDataPath) + + hyperspace.refreshIndex(indexConfig.indexName) + + // Verify "deletedFiles" is cleared and "appendedFiles" is updated after refresh. + val indexLogEntry = getLatestStableLog(indexConfig.indexName) + val latestFiles = listFiles(nonPartitionedDataPath).toSet + + assert(indexLogEntry.deletedFiles.isEmpty) + assert((oldFiles -- latestFiles).isEmpty) + assert(indexLogEntry.appendedFiles.toSet.equals(latestFiles -- oldFiles)) + } } } } @@ -274,7 +274,7 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { val df = spark.read.parquet(testPath) hyperspace.createIndex(df, indexConfig) - val oldFileList = fileList(testPath).toSet + val oldFiles = listFiles(testPath).toSet // Delete one source data file. deleteDataFile(testPath) @@ -290,15 +290,12 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { hyperspace.refreshIndex(indexConfig.indexName) - // Check if refreshed index metadata has "appendedFiles" and "deletedFiles" updated. - val entry = logManager(indexConfig.indexName).getLatestStableLog() - assert(entry.isDefined) - assert(entry.get.isInstanceOf[IndexLogEntry]) - val indexLogEntry = entry.get.asInstanceOf[IndexLogEntry] + // Verify "appendedFiles" is cleared and "deletedFiles" is updated after refresh. + val indexLogEntry = getLatestStableLog(indexConfig.indexName) assert(indexLogEntry.appendedFiles.isEmpty) - val newFileList = fileList(testPath).toSet - assert(indexLogEntry.deletedFiles.toSet.equals(oldFileList -- newFileList)) + val latestFiles = listFiles(testPath).toSet + assert(indexLogEntry.deletedFiles.toSet.equals(oldFiles -- latestFiles)) } } } @@ -335,9 +332,16 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { IndexLogManagerFactoryImpl.create(indexPath) } - private def fileList(path: String): Seq[String] = { + private def listFiles(path: String): Seq[String] = { val absolutePath = PathUtils.makeAbsolute(path) val fs = absolutePath.getFileSystem(new Configuration) fs.listStatus(absolutePath).toSeq.map(_.getPath.toString) } + + private def getLatestStableLog(indexName: String): IndexLogEntry = { + val entry = logManager(indexName).getLatestStableLog() + assert(entry.isDefined) + assert(entry.get.isInstanceOf[IndexLogEntry]) + entry.get.asInstanceOf[IndexLogEntry] + } } From 521edcb589178ece4dca475d871eda5c0be42779 Mon Sep 17 00:00:00 2001 From: Apoorve Dave Date: Fri, 9 Oct 2020 19:30:20 -0700 Subject: [PATCH 23/29] rename test path --- .../hyperspace/index/RefreshIndexTests.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala index df3b92813..074bb5ec4 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala @@ -224,22 +224,22 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { test( "Validate refresh delete action updates appended and deleted files in metadata as " + "expected, when some file gets deleted and some appended to source data.") { - withTempPathAsString { nonPartitionedDataPath => + withTempPathAsString { testPath => withSQLConf( IndexConstants.INDEX_LINEAGE_ENABLED -> "true", IndexConstants.REFRESH_DELETE_ENABLED -> "true") { withIndex(indexConfig.indexName) { SampleData.save( spark, - nonPartitionedDataPath, + testPath, Seq("Date", "RGUID", "Query", "imprs", "clicks")) - val df = spark.read.parquet(nonPartitionedDataPath) + val df = spark.read.parquet(testPath) hyperspace.createIndex(df, indexConfig) // Delete one source data file. - deleteDataFile(nonPartitionedDataPath) + deleteDataFile(testPath) - val oldFiles = listFiles(nonPartitionedDataPath).toSet + val oldFiles = listFiles(testPath).toSet // Add some new data to source. import spark.implicits._ @@ -248,13 +248,13 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { .toDF("Date", "RGUID", "Query", "imprs", "clicks") .write .mode("append") - .parquet(nonPartitionedDataPath) + .parquet(testPath) hyperspace.refreshIndex(indexConfig.indexName) // Verify "deletedFiles" is cleared and "appendedFiles" is updated after refresh. val indexLogEntry = getLatestStableLog(indexConfig.indexName) - val latestFiles = listFiles(nonPartitionedDataPath).toSet + val latestFiles = listFiles(testPath).toSet assert(indexLogEntry.deletedFiles.isEmpty) assert((oldFiles -- latestFiles).isEmpty) From 2080edb865e5e49a4475b160410a4dcd16b8a920 Mon Sep 17 00:00:00 2001 From: Apoorve Dave Date: Fri, 9 Oct 2020 21:00:55 -0700 Subject: [PATCH 24/29] add test for "deletedFiles" in entry --- .../index/rules/HyperspaceRuleTestSuite.scala | 6 ++-- .../index/rules/RuleUtilsTest.scala | 32 +++++++++++++++++-- 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleTestSuite.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleTestSuite.scala index 4b5c29b98..5a0133e7b 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleTestSuite.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleTestSuite.scala @@ -34,7 +34,9 @@ trait HyperspaceRuleTestSuite extends HyperspaceSuite { name: String, indexCols: Seq[AttributeReference], includedCols: Seq[AttributeReference], - plan: LogicalPlan): IndexLogEntry = { + plan: LogicalPlan, + appendedFiles: Seq[String] = Seq(), + deletedFiles: Seq[String] = Seq()): IndexLogEntry = { val signClass = new RuleTestHelper.TestSignatureProvider().getClass.getName LogicalPlanSignatureProvider.create(signClass).signature(plan) match { @@ -43,7 +45,7 @@ trait HyperspaceRuleTestSuite extends HyperspaceSuite { Seq( Relation( Seq("dummy"), - Hdfs(Properties(Content(Directory("/")))), + Hdfs(Properties(Content(Directory("/")), appendedFiles, deletedFiles)), "schema", "format", Map())), diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala index 757592ca2..1e4ffe992 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala @@ -25,8 +25,8 @@ import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFil import org.apache.spark.sql.types.{IntegerType, StringType} import com.microsoft.hyperspace.actions.Constants -import com.microsoft.hyperspace.index.{IndexCollectionManager, IndexConfig, IndexConstants} -import com.microsoft.hyperspace.util.{FileUtils, PathUtils} +import com.microsoft.hyperspace.index.{IndexCollectionManager, IndexConfig, IndexConstants, LogicalPlanSignatureProvider} +import com.microsoft.hyperspace.util.{FileUtils, HyperspaceConf, PathUtils} class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { override val systemPath = PathUtils.makeAbsolute("src/test/resources/ruleUtilsTest") @@ -280,6 +280,34 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { } } + test( + "Verify no indexes are matched if signature matches but hybrid scan is disabled and" + + " 'deletedFiles' is non-empty.") { + assert(!HyperspaceConf.hybridScanEnabled(spark)) + // Here's an index where the singature is in sync with the latest data, but "deletedFiles" + // list is non-empty. + val entry = + createIndex("t1iTest", Seq(t1c1), Seq(t1c3), t1ProjectNode, deletedFiles = Seq("f1")) + + // Assert that signature of this new index matches with the latest data. + // Below is the logic for signature calculation is picked from RuleUtils class. + assert( + entry.signature.value.equals( + LogicalPlanSignatureProvider + .create(entry.signature.provider) + .signature(t1ProjectNode) + .get)) + + val indexManager = IndexCollectionManager(spark) + val allIndexes = indexManager.getIndexes(Seq(Constants.States.ACTIVE)) + + val usableIndexes = RuleUtils.getCandidateIndexes(spark, allIndexes, t1ProjectNode) + assert(usableIndexes.length === 3) + // Verify that even if signature matched, this index is not picked because of non-empty + // "deleted" files. + assert(!usableIndexes.exists(_.name.equals("t1iTest"))) + } + private def validateLogicalRelation(plan: LogicalPlan, expected: LogicalRelation): Unit = { val r = RuleUtils.getLogicalRelation(plan) assert(r.isDefined) From 3af06eba0cc2858de0b847143b616ad7ffa81e52 Mon Sep 17 00:00:00 2001 From: Apoorve Dave Date: Fri, 9 Oct 2020 21:17:02 -0700 Subject: [PATCH 25/29] unit tests added --- .../index/rules/RuleUtilsTest.scala | 34 +++++++++++++++++-- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala index 1e4ffe992..4e5c01dd8 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala @@ -281,8 +281,8 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { } test( - "Verify no indexes are matched if signature matches but hybrid scan is disabled and" + - " 'deletedFiles' is non-empty.") { + "RuleUtils.getCandidateIndexes: Verify no indexes are matched even if signature matches but " + + "hybrid scan is disabled and 'deletedFiles' is non-empty.") { assert(!HyperspaceConf.hybridScanEnabled(spark)) // Here's an index where the singature is in sync with the latest data, but "deletedFiles" // list is non-empty. @@ -300,9 +300,37 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { val indexManager = IndexCollectionManager(spark) val allIndexes = indexManager.getIndexes(Seq(Constants.States.ACTIVE)) + assert(allIndexes.exists(_.name.equals("t1iTest"))) + + val usableIndexes = RuleUtils.getCandidateIndexes(spark, allIndexes, t1ProjectNode) + // Verify that even if signature matched, this index is not picked because of non-empty + // "deleted" files. + assert(!usableIndexes.exists(_.name.equals("t1iTest"))) + } + + test( + "RuleUtils.getCandidateIndexes: Verify no indexes are matched even if signature matches but " + + "hybrid scan is disabled and 'appendedFiles' is non-empty.") { + assert(!HyperspaceConf.hybridScanEnabled(spark)) + // Here's an index where the singature is in sync with the latest data, but "deletedFiles" + // list is non-empty. + val entry = + createIndex("t1iTest", Seq(t1c1), Seq(t1c3), t1ProjectNode, appendedFiles = Seq("f1")) + + // Assert that signature of this new index matches with the latest data. + // Below is the logic for signature calculation is picked from RuleUtils class. + assert( + entry.signature.value.equals( + LogicalPlanSignatureProvider + .create(entry.signature.provider) + .signature(t1ProjectNode) + .get)) + + val indexManager = IndexCollectionManager(spark) + val allIndexes = indexManager.getIndexes(Seq(Constants.States.ACTIVE)) + assert(allIndexes.exists(_.name.equals("t1iTest"))) val usableIndexes = RuleUtils.getCandidateIndexes(spark, allIndexes, t1ProjectNode) - assert(usableIndexes.length === 3) // Verify that even if signature matched, this index is not picked because of non-empty // "deleted" files. assert(!usableIndexes.exists(_.name.equals("t1iTest"))) From 315b36dd4dc45224db25a5c1c5455de6e28e5494 Mon Sep 17 00:00:00 2001 From: Apoorve Dave Date: Fri, 9 Oct 2020 23:35:50 -0700 Subject: [PATCH 26/29] review comments, test fixes --- .../index/rules/FilterIndexRuleTest.scala | 4 +- .../index/rules/HyperspaceRuleTestSuite.scala | 2 +- .../index/rules/JoinIndexRuleTest.scala | 20 +++--- .../index/rules/RuleUtilsTest.scala | 69 ++++--------------- 4 files changed, 26 insertions(+), 69 deletions(-) diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/FilterIndexRuleTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/FilterIndexRuleTest.scala index 5e66afd4c..44d628666 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/FilterIndexRuleTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/FilterIndexRuleTest.scala @@ -51,10 +51,10 @@ class FilterIndexRuleTest extends HyperspaceRuleTestSuite { scanNode = LogicalRelation(relation, Seq(c1, c2, c3, c4), None, false) val indexPlan = Project(Seq(c1, c2, c3), scanNode) - createIndex(indexName1, Seq(c3, c2), Seq(c1), indexPlan) + createIndexLogEntry(indexName1, Seq(c3, c2), Seq(c1), indexPlan) val index2Plan = Project(Seq(c1, c2, c3, c4), scanNode) - createIndex(indexName2, Seq(c4, c2), Seq(c1, c3), index2Plan) + createIndexLogEntry(indexName2, Seq(c4, c2), Seq(c1, c3), index2Plan) } before { diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleTestSuite.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleTestSuite.scala index 5a0133e7b..56a144142 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleTestSuite.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleTestSuite.scala @@ -30,7 +30,7 @@ import com.microsoft.hyperspace.index.Hdfs.Properties trait HyperspaceRuleTestSuite extends HyperspaceSuite { private val filenames = Seq("f1.parquet", "f2.parquet") - def createIndex( + def createIndexLogEntry( name: String, indexCols: Seq[AttributeReference], includedCols: Seq[AttributeReference], diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleTest.scala index c5a51eea1..fb31302b2 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/JoinIndexRuleTest.scala @@ -90,11 +90,11 @@ class JoinIndexRuleTest extends HyperspaceRuleTestSuite with SQLHelper { // +- Filter isnotnull(t2c1#4) // +- Relation[t2c1#4,t2c2#5,t2c3#6,t2c4#7] parquet - createIndex("t1i1", Seq(t1c1), Seq(t1c3), t1ProjectNode) - createIndex("t1i2", Seq(t1c1, t1c2), Seq(t1c3), t1ProjectNode) - createIndex("t1i3", Seq(t1c2), Seq(t1c3), t1ProjectNode) - createIndex("t2i1", Seq(t2c1), Seq(t2c3), t2ProjectNode) - createIndex("t2i2", Seq(t2c1, t2c2), Seq(t2c3), t2ProjectNode) + createIndexLogEntry("t1i1", Seq(t1c1), Seq(t1c3), t1ProjectNode) + createIndexLogEntry("t1i2", Seq(t1c1, t1c2), Seq(t1c3), t1ProjectNode) + createIndexLogEntry("t1i3", Seq(t1c2), Seq(t1c3), t1ProjectNode) + createIndexLogEntry("t2i1", Seq(t2c1), Seq(t2c3), t2ProjectNode) + createIndexLogEntry("t2i2", Seq(t2c1, t2c2), Seq(t2c3), t2ProjectNode) } before { @@ -208,8 +208,10 @@ class JoinIndexRuleTest extends HyperspaceRuleTestSuite with SQLHelper { { // Test: should update plan if index exists to cover all implicit columns - val t1TestIndex = createIndex("t1Idx", Seq(t1c1), Seq(t1c2, t1c3, t1c4), t1FilterNode) - val t2TestIndex = createIndex("t2Idx", Seq(t2c1), Seq(t2c2, t2c3, t2c4), t2FilterNode) + val t1TestIndex = + createIndexLogEntry("t1Idx", Seq(t1c1), Seq(t1c2, t1c3, t1c4), t1FilterNode) + val t2TestIndex = + createIndexLogEntry("t2Idx", Seq(t2c1), Seq(t2c2, t2c3, t2c4), t2FilterNode) // clear cache so the new indexes gets added to it clearCache() @@ -406,9 +408,7 @@ class JoinIndexRuleTest extends HyperspaceRuleTestSuite with SQLHelper { // Mark the relation that the rule is applied and verify the plan does not change. val newPlan = plan transform { case r @ LogicalRelation(h: HadoopFsRelation, _, _, _) => - r.copy( - relation = - h.copy(options = Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark)) + r.copy(relation = h.copy(options = Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark)) } assert(JoinIndexRule(newPlan).equals(newPlan)) } diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala index 4e5c01dd8..e4d186c8c 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala @@ -20,12 +20,12 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, IsNotNull} import org.apache.spark.sql.catalyst.plans.{JoinType, SQLHelper} -import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, Project, RepartitionByExpression} +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, LogicalRelation, NoopCache} import org.apache.spark.sql.types.{IntegerType, StringType} import com.microsoft.hyperspace.actions.Constants -import com.microsoft.hyperspace.index.{IndexCollectionManager, IndexConfig, IndexConstants, LogicalPlanSignatureProvider} +import com.microsoft.hyperspace.index.{IndexCollectionManager, IndexConfig, IndexConstants} import com.microsoft.hyperspace.util.{FileUtils, HyperspaceConf, PathUtils} class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { @@ -79,11 +79,11 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { // +- Filter isnotnull(t2c1#4) // +- Relation[t2c1#4,t2c2#5,t2c3#6,t2c4#7] parquet - createIndex("t1i1", Seq(t1c1), Seq(t1c3), t1ProjectNode) - createIndex("t1i2", Seq(t1c1, t1c2), Seq(t1c3), t1ProjectNode) - createIndex("t1i3", Seq(t1c2), Seq(t1c3), t1ProjectNode) - createIndex("t2i1", Seq(t2c1), Seq(t2c3), t2ProjectNode) - createIndex("t2i2", Seq(t2c1, t2c2), Seq(t2c3), t2ProjectNode) + createIndexLogEntry("t1i1", Seq(t1c1), Seq(t1c3), t1ProjectNode) + createIndexLogEntry("t1i2", Seq(t1c1, t1c2), Seq(t1c3), t1ProjectNode) + createIndexLogEntry("t1i3", Seq(t1c2), Seq(t1c3), t1ProjectNode) + createIndexLogEntry("t2i1", Seq(t2c1), Seq(t2c3), t2ProjectNode) + createIndexLogEntry("t2i2", Seq(t2c1, t2c2), Seq(t2c3), t2ProjectNode) } test("Verify indexes are matched by signature correctly.") { @@ -284,56 +284,13 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { "RuleUtils.getCandidateIndexes: Verify no indexes are matched even if signature matches but " + "hybrid scan is disabled and 'deletedFiles' is non-empty.") { assert(!HyperspaceConf.hybridScanEnabled(spark)) - // Here's an index where the singature is in sync with the latest data, but "deletedFiles" - // list is non-empty. - val entry = - createIndex("t1iTest", Seq(t1c1), Seq(t1c3), t1ProjectNode, deletedFiles = Seq("f1")) - - // Assert that signature of this new index matches with the latest data. - // Below is the logic for signature calculation is picked from RuleUtils class. - assert( - entry.signature.value.equals( - LogicalPlanSignatureProvider - .create(entry.signature.provider) - .signature(t1ProjectNode) - .get)) - val indexManager = IndexCollectionManager(spark) - val allIndexes = indexManager.getIndexes(Seq(Constants.States.ACTIVE)) - assert(allIndexes.exists(_.name.equals("t1iTest"))) - - val usableIndexes = RuleUtils.getCandidateIndexes(spark, allIndexes, t1ProjectNode) - // Verify that even if signature matched, this index is not picked because of non-empty - // "deleted" files. - assert(!usableIndexes.exists(_.name.equals("t1iTest"))) - } - - test( - "RuleUtils.getCandidateIndexes: Verify no indexes are matched even if signature matches but " + - "hybrid scan is disabled and 'appendedFiles' is non-empty.") { - assert(!HyperspaceConf.hybridScanEnabled(spark)) - // Here's an index where the singature is in sync with the latest data, but "deletedFiles" - // list is non-empty. - val entry = - createIndex("t1iTest", Seq(t1c1), Seq(t1c3), t1ProjectNode, appendedFiles = Seq("f1")) - - // Assert that signature of this new index matches with the latest data. - // Below is the logic for signature calculation is picked from RuleUtils class. - assert( - entry.signature.value.equals( - LogicalPlanSignatureProvider - .create(entry.signature.provider) - .signature(t1ProjectNode) - .get)) - - val indexManager = IndexCollectionManager(spark) - val allIndexes = indexManager.getIndexes(Seq(Constants.States.ACTIVE)) - assert(allIndexes.exists(_.name.equals("t1iTest"))) - - val usableIndexes = RuleUtils.getCandidateIndexes(spark, allIndexes, t1ProjectNode) - // Verify that even if signature matched, this index is not picked because of non-empty - // "deleted" files. - assert(!usableIndexes.exists(_.name.equals("t1iTest"))) + val entry1 = createIndexLogEntry("t1iTest", Seq(t1c1), Seq(t1c3), t1ProjectNode) + val entry2 = entry1.withAppendedAndDeletedFiles(Seq(), Seq("f1")) + val entry3 = entry1.withAppendedAndDeletedFiles(Seq("f2"), Seq()) + val usableIndexes = + RuleUtils.getCandidateIndexes(spark, Seq(entry1, entry2, entry3), t1ProjectNode) + assert(usableIndexes.equals(Seq(entry1))) } private def validateLogicalRelation(plan: LogicalPlan, expected: LogicalRelation): Unit = { From 42de97a4b1ae4643ba12196cac6ba16b84488b3e Mon Sep 17 00:00:00 2001 From: Apoorve Dave Date: Fri, 9 Oct 2020 23:39:33 -0700 Subject: [PATCH 27/29] rephrase test name --- .../com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala index e4d186c8c..d9c24c191 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala @@ -281,8 +281,8 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { } test( - "RuleUtils.getCandidateIndexes: Verify no indexes are matched even if signature matches but " + - "hybrid scan is disabled and 'deletedFiles' is non-empty.") { + "RuleUtils.getCandidateIndexes: Verify indexes with non-empty 'deletedFiles' or " + + "'appendedFiles' are not usable indexes if hybrid scan is disabled.") { assert(!HyperspaceConf.hybridScanEnabled(spark)) val entry1 = createIndexLogEntry("t1iTest", Seq(t1c1), Seq(t1c3), t1ProjectNode) From 6d1f0fcc89747d62ce806afbe7913a793b5fcf89 Mon Sep 17 00:00:00 2001 From: Apoorve Dave Date: Fri, 9 Oct 2020 23:40:43 -0700 Subject: [PATCH 28/29] autoformat result --- .../hyperspace/index/rules/FilterIndexRuleTest.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/FilterIndexRuleTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/FilterIndexRuleTest.scala index 44d628666..dadee7dab 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/FilterIndexRuleTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/FilterIndexRuleTest.scala @@ -134,9 +134,7 @@ class FilterIndexRuleTest extends HyperspaceRuleTestSuite { // Mark the relation that the rule is applied and verify the plan does not change. val newPlan = plan transform { case r @ LogicalRelation(h: HadoopFsRelation, _, _, _) => - r.copy( - relation = - h.copy(options = Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark)) + r.copy(relation = h.copy(options = Map(IndexConstants.INDEX_RELATION_IDENTIFIER))(spark)) } assert(FilterIndexRule(newPlan).equals(newPlan)) } From 03f23da184c5bf71f384972853556543a0f82c3b Mon Sep 17 00:00:00 2001 From: Apoorve Dave Date: Sat, 10 Oct 2020 00:16:25 -0700 Subject: [PATCH 29/29] review comments --- .../index/rules/HyperspaceRuleTestSuite.scala | 2 +- .../index/rules/RuleUtilsTest.scala | 19 ++++++++++--------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleTestSuite.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleTestSuite.scala index 56a144142..d4f4f4471 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleTestSuite.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/HyperspaceRuleTestSuite.scala @@ -71,7 +71,7 @@ trait HyperspaceRuleTestSuite extends HyperspaceSuite { val logManager = new IndexLogManagerImpl(getIndexRootPath(name)) indexLogEntry.state = Constants.States.ACTIVE - logManager.writeLog(0, indexLogEntry) + assert(logManager.writeLog(0, indexLogEntry)) indexLogEntry case None => throw HyperspaceException("Invalid plan for index dataFrame.") diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala index d9c24c191..1932bd081 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala @@ -26,7 +26,8 @@ import org.apache.spark.sql.types.{IntegerType, StringType} import com.microsoft.hyperspace.actions.Constants import com.microsoft.hyperspace.index.{IndexCollectionManager, IndexConfig, IndexConstants} -import com.microsoft.hyperspace.util.{FileUtils, HyperspaceConf, PathUtils} +import com.microsoft.hyperspace.index.IndexConstants.INDEX_HYBRID_SCAN_ENABLED +import com.microsoft.hyperspace.util.{FileUtils, PathUtils} class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { override val systemPath = PathUtils.makeAbsolute("src/test/resources/ruleUtilsTest") @@ -283,14 +284,14 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { test( "RuleUtils.getCandidateIndexes: Verify indexes with non-empty 'deletedFiles' or " + "'appendedFiles' are not usable indexes if hybrid scan is disabled.") { - assert(!HyperspaceConf.hybridScanEnabled(spark)) - - val entry1 = createIndexLogEntry("t1iTest", Seq(t1c1), Seq(t1c3), t1ProjectNode) - val entry2 = entry1.withAppendedAndDeletedFiles(Seq(), Seq("f1")) - val entry3 = entry1.withAppendedAndDeletedFiles(Seq("f2"), Seq()) - val usableIndexes = - RuleUtils.getCandidateIndexes(spark, Seq(entry1, entry2, entry3), t1ProjectNode) - assert(usableIndexes.equals(Seq(entry1))) + withSQLConf(INDEX_HYBRID_SCAN_ENABLED -> "false") { + val entry1 = createIndexLogEntry("t1iTest", Seq(t1c1), Seq(t1c3), t1ProjectNode) + val entry2 = entry1.withAppendedAndDeletedFiles(Seq(), Seq("f1")) + val entry3 = entry1.withAppendedAndDeletedFiles(Seq("f2"), Seq()) + val usableIndexes = + RuleUtils.getCandidateIndexes(spark, Seq(entry1, entry2, entry3), t1ProjectNode) + assert(usableIndexes.equals(Seq(entry1))) + } } private def validateLogicalRelation(plan: LogicalPlan, expected: LogicalRelation): Unit = {