From 8cefec8e687adc0e9751769e8aeb0675cf9b1978 Mon Sep 17 00:00:00 2001 From: sezruby Date: Fri, 30 Oct 2020 11:10:21 +0900 Subject: [PATCH 1/6] Add Update structure in IndexLogEntry --- .../actions/RefreshActionBase.scala | 20 +---- .../actions/RefreshQuickAction.scala | 79 +++++++++++++++++++ .../hyperspace/index/IndexLogEntry.scala | 43 +++++++--- .../telemetry/HyperspaceEvent.scala | 11 +++ .../hyperspace/index/IndexLogEntryTest.scala | 75 ++++++++++++------ .../hyperspace/index/IndexManagerTests.scala | 2 +- .../hyperspace/index/RefreshIndexTests.scala | 63 ++++++++++++++- .../index/rules/RuleUtilsTest.scala | 8 +- 8 files changed, 240 insertions(+), 61 deletions(-) create mode 100644 src/main/scala/com/microsoft/hyperspace/actions/RefreshQuickAction.scala diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala index ffc066076..3c72f52ae 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala @@ -102,16 +102,8 @@ private[actions] abstract class RefreshActionBase( protected lazy val deletedFiles: Seq[FileInfo] = { val relation = previousIndexLogEntry.relations.head val originalFiles = relation.data.properties.content.fileInfos - val delFiles = originalFiles -- currentFiles - val delFileNames = delFiles.map(_.name) - // Remove duplicate deleted file names in the previous log entry. - val prevDeletedFiles = - previousIndexLogEntry.deletedFiles.filterNot(f => delFileNames.contains(f.name)) - - // TODO: Add test for the scenario where existing deletedFiles and newly deleted - // files are updated. https://github.com/microsoft/hyperspace/issues/195. - delFiles.toSeq ++ prevDeletedFiles + (originalFiles -- currentFiles).toSeq } /** @@ -144,15 +136,7 @@ private[actions] abstract class RefreshActionBase( protected lazy val appendedFiles: Seq[FileInfo] = { val relation = previousIndexLogEntry.relations.head val originalFiles = relation.data.properties.content.fileInfos - val newFiles = currentFiles -- originalFiles - val newFileNames = newFiles.map(_.name) - - // Remove duplicate appended file names in the previous log entry. - val prevAppendedFiles = - previousIndexLogEntry.appendedFiles.filterNot(f => newFileNames.contains(f.name)) - // TODO: Add test for the scenario where existing appendedFiles and newly appended - // files are updated. https://github.com/microsoft/hyperspace/issues/195. - newFiles.toSeq ++ prevAppendedFiles + (currentFiles -- originalFiles).toSeq } } diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshQuickAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshQuickAction.scala new file mode 100644 index 000000000..92d8a6bd5 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshQuickAction.scala @@ -0,0 +1,79 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.actions + +import org.apache.spark.sql.SparkSession + +import com.microsoft.hyperspace.HyperspaceException +import com.microsoft.hyperspace.index._ +import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEvent, RefreshQuickActionEvent} + +/** + * Action to refresh indexes metadata only with newly appended files and deleted files. + * + * @param spark SparkSession. + * @param logManager Index LogManager for index being refreshed. + * @param dataManager Index DataManager for index being refreshed. + */ +class RefreshQuickAction( + spark: SparkSession, + logManager: IndexLogManager, + dataManager: IndexDataManager) + extends RefreshActionBase(spark, logManager, dataManager) { + final override def op(): Unit = { + logInfo( + "Refresh index is updating metadata only with " + deletedFiles.size + " of" + + "deleted files and " + appendedFiles.size + " of appended files.") + } + + /** + * Validate index is in active state for refreshing and there are some changes + * in source data file(s). + */ + final override def validate(): Unit = { + super.validate() + + if (appendedFiles.isEmpty && deletedFiles.isEmpty) { + throw NoChangesException("Refresh incremental aborted as no source data change found.") + } + + // To handle deleted files, lineage column is required for the index. + if (deletedFiles.nonEmpty && !previousIndexLogEntry.hasLineageColumn(spark)) { + throw HyperspaceException( + "Index refresh (to handle deleted source data) is " + + "only supported on an index with lineage.") + } + } + + override protected def event(appInfo: AppInfo, message: String): HyperspaceEvent = { + RefreshQuickActionEvent(appInfo, logEntry.asInstanceOf[IndexLogEntry], message) + } + + /** + * Create a log entry with metadata update for appended files and deleted files. + * + * @return Refreshed index log entry. + */ + override def logEntry: LogEntry = { + val signatureProvider = LogicalPlanSignatureProvider.create() + val latestSignature = signatureProvider.signature(df.queryExecution.optimizedPlan).get + previousIndexLogEntry.copyWithUpdate( + Seq(Signature(signatureProvider.name, latestSignature)), + appendedFiles, + deletedFiles) + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index 6b04b7331..a7c0618c6 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -326,6 +326,19 @@ object LogicalPlanFingerprint { case class Properties(signatures: Seq[Signature]) } +/** + * Updated source data since lst time derived dataset was updated. Note that index data does not + * cover appendedFiles and deleted Files. + * + * @param appendedFiles Appended files. + * @param deletedFiles Deleted files. + * @param latestSignatures Latest signatures including appendedFiles and deletedFiles. + */ +case class Update( + latestSignatures: Seq[Signature], + appendedFiles: Option[Content] = None, + deletedFiles: Option[Content] = None) + // IndexLogEntry-specific Hdfs that represents the source data. case class Hdfs(properties: Hdfs.Properties) { val kind = "HDFS" @@ -335,13 +348,9 @@ object Hdfs { /** * Hdfs file properties. * @param content Content object representing Hdfs file based data source. - * @param appendedFiles Appended files since the last time derived dataset was updated. - * @param deletedFiles Deleted files since the last time derived dataset was updated. + * @param update Source data update since the last time derived dataset was updated. */ - case class Properties( - content: Content, - appendedFiles: Option[Content] = None, - deletedFiles: Option[Content] = None) + case class Properties(content: Content, update: Option[Update] = None) } // IndexLogEntry-specific Relation that represents the source relation. @@ -394,17 +403,22 @@ case class IndexLogEntry( relations.head.data.properties.content.fileInfos } - // FileInfo's 'name' contains the full path to the file. - def deletedFiles: Set[FileInfo] = { - relations.head.data.properties.deletedFiles.map(_.fileInfos).getOrElse(Set()) + def sourceUpdate: Option[Update] = { + relations.head.data.properties.update } // FileInfo's 'name' contains the full path to the file. def appendedFiles: Set[FileInfo] = { - relations.head.data.properties.appendedFiles.map(_.fileInfos).getOrElse(Set()) + sourceUpdate.flatMap(_.appendedFiles).map(_.fileInfos).getOrElse(Set()) + } + + // FileInfo's 'name' contains the full path to the file. + def deletedFiles: Set[FileInfo] = { + sourceUpdate.flatMap(_.deletedFiles).map(_.fileInfos).getOrElse(Set()) } - def withAppendedAndDeletedFiles( + def copyWithUpdate( + latestSignature: Seq[Signature], appended: Seq[FileInfo], deleted: Seq[FileInfo]): IndexLogEntry = { def toFileStatus(f: FileInfo) = { @@ -418,8 +432,11 @@ case class IndexLogEntry( relations.head.copy( data = relations.head.data.copy( properties = relations.head.data.properties.copy( - appendedFiles = Content.fromLeafFiles(appended.map(toFileStatus)), - deletedFiles = Content.fromLeafFiles(deleted.map(toFileStatus)))))))))) + update = Some( + Update( + latestSignature, + appendedFiles = Content.fromLeafFiles(appended.map(toFileStatus)), + deletedFiles = Content.fromLeafFiles(deleted.map(toFileStatus)))))))))))) } def bucketSpec: BucketSpec = diff --git a/src/main/scala/com/microsoft/hyperspace/telemetry/HyperspaceEvent.scala b/src/main/scala/com/microsoft/hyperspace/telemetry/HyperspaceEvent.scala index 59812ddc6..8957a1db8 100644 --- a/src/main/scala/com/microsoft/hyperspace/telemetry/HyperspaceEvent.scala +++ b/src/main/scala/com/microsoft/hyperspace/telemetry/HyperspaceEvent.scala @@ -116,6 +116,17 @@ case class CancelActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: St case class RefreshIncrementalActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: String) extends HyperspaceIndexCRUDEvent +/** + * Index Refresh Event for quick mode. Emitted when refresh is called on an index + * with "quick" mode to update index metadata only for appended/deleted source data files. + * + * @param appInfo AppInfo for spark application. + * @param index Related index. + * @param message Message about event. + */ +case class RefreshQuickActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: String) + extends HyperspaceIndexCRUDEvent + /** * Index Optimize Event for index files. * diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala index 7d3229cde..fcdb69883 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala @@ -126,22 +126,27 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter | "properties" : { } | } | }, - | "deletedFiles" : { - | "root" : { - | "name" : "", - | "files" : [ { - | "name" : "f1", - | "size" : 10, - | "modifiedTime" : 10 - | }], - | "subDirs" : [ ] + | "update" : { + | "latestSignatures" : [ { + | "provider" : "provider", + | "value" : "signature" } ], + | "deletedFiles" : { + | "root" : { + | "name" : "", + | "files" : [ { + | "name" : "f1", + | "size" : 10, + | "modifiedTime" : 10 + | }], + | "subDirs" : [ ] + | }, + | "fingerprint" : { + | "kind" : "NoOp", + | "properties" : { } + | } | }, - | "fingerprint" : { - | "kind" : "NoOp", - | "properties" : { } - | } - | }, - | "appendedFiles" : null + | "appendedFiles" : null + | } | }, | "kind" : "HDFS" | }, @@ -176,19 +181,39 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter StructType(Array(StructField("RGUID", StringType), StructField("Date", StringType))) val expectedSourcePlanProperties = SparkPlan.Properties( - Seq(Relation( - Seq("rootpath"), - Hdfs(Hdfs.Properties(Content( - Directory("", Seq(FileInfo("f1", 100L, 100L), FileInfo("f2", 200L, 200L)), Seq())), - None, - Some(Content(Directory("", Seq(FileInfo("f1", 10, 10))))))), - "schema", - "type", - Map())), + Seq( + Relation( + Seq("rootpath"), + Hdfs( + Hdfs.Properties( + Content( + Directory( + "", + Seq(FileInfo("f1", 100L, 100L), FileInfo("f2", 200L, 200L)), + Seq() + ) + ), + Some( + Update( + Seq(Signature("provider", "signature")), + None, + Some(Content(Directory("", Seq(FileInfo("f1", 10, 10))))) + ) + ) + ) + ), + "schema", + "type", + Map() + ) + ), null, null, LogicalPlanFingerprint( - LogicalPlanFingerprint.Properties(Seq(Signature("provider", "signatureValue"))))) + LogicalPlanFingerprint + .Properties(Seq(Signature("provider", "signatureValue"))) + ) + ) val expected = IndexLogEntry( "indexName", diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala index cc9504ceb..71c2ae7f1 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala @@ -489,7 +489,7 @@ class IndexManagerTests extends HyperspaceSuite with SQLHelper { assert(indexFiles.forall(_.getName.startsWith("part-0"))) assert(indexLog.state.equals("ACTIVE")) // Check all files belong to the provided index versions only. - assert(indexFiles.map(_.getParent.getName).toSet.equals(indexVersions)) + assert(indexFiles.map(_.getParent.getName).toSet === indexVersions) } private def expectedIndex( diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala index 241c4e5aa..76cf907f1 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.{AnalysisException, QueryTest} import com.microsoft.hyperspace.{Hyperspace, HyperspaceException, MockEventLogger, SampleData, TestUtils} import com.microsoft.hyperspace.TestUtils.logManager -import com.microsoft.hyperspace.actions.RefreshIncrementalAction +import com.microsoft.hyperspace.actions.{RefreshIncrementalAction, RefreshQuickAction} import com.microsoft.hyperspace.index.IndexConstants.REFRESH_MODE_INCREMENTAL import com.microsoft.hyperspace.telemetry.RefreshIncrementalActionEvent import com.microsoft.hyperspace.util.{FileUtils, PathUtils} @@ -385,6 +385,67 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { } } + test( + "Validate RefreshQuickAction updates appended and deleted files in metadata " + + "expected, when some file gets deleted and some appended to source data.") { + withTempPathAsString { testPath => + withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") { + withIndex(indexConfig.indexName) { + SampleData.save(spark, testPath, Seq("Date", "RGUID", "Query", "imprs", "clicks")) + val df = spark.read.parquet(testPath) + hyperspace.createIndex(df, indexConfig) + + val oldFiles = listFiles(testPath).toSet + + // Delete one source data file. + deleteOneDataFile(testPath) + + // Add some new data to source. + import spark.implicits._ + SampleData.testData + .take(3) + .toDF("Date", "RGUID", "Query", "imprs", "clicks") + .write + .mode("append") + .parquet(testPath) + + val prevIndexLogEntry = getLatestStableLog(indexConfig.indexName) + + val indexPath = PathUtils.makeAbsolute(s"$systemPath/${indexConfig.indexName}") + new RefreshQuickAction( + spark, + IndexLogManagerFactoryImpl.create(indexPath), + IndexDataManagerFactoryImpl.create(indexPath)) + .run() + + val indexLogEntry = getLatestStableLog(indexConfig.indexName) + val latestFiles = listFiles(testPath).toSet + val expectedDeletedFiles = oldFiles -- latestFiles + val expectedAppendedFiles = latestFiles -- oldFiles + + val signatureProvider = LogicalPlanSignatureProvider.create() + val latestDf = spark.read.parquet(testPath) + val expectedLatestSignature = + signatureProvider.signature(latestDf.queryExecution.optimizedPlan).get + + // Check `Update` is collected properly. + assert(indexLogEntry.sourceUpdate.isDefined) + assert( + indexLogEntry.sourceUpdate.get.latestSignatures.head.value + == expectedLatestSignature) + assert(indexLogEntry.appendedFiles === expectedAppendedFiles) + assert(indexLogEntry.deletedFiles === expectedDeletedFiles) + + // Check index data files and source data files are not updated. + assert( + indexLogEntry.relations.head.data.properties.content.fileInfos + === prevIndexLogEntry.relations.head.data.properties.content.fileInfos) + assert(indexLogEntry.content.fileInfos === prevIndexLogEntry.content.fileInfos) + } + } + } + } + /** * Delete one file from a given path. * diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala index dc6948d22..e4d81cfb5 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFil import org.apache.spark.sql.types.{IntegerType, StringType} import com.microsoft.hyperspace.actions.Constants -import com.microsoft.hyperspace.index.{FileInfo, IndexCollectionManager, IndexConfig, IndexConstants} +import com.microsoft.hyperspace.index.{FileInfo, IndexCollectionManager, IndexConfig, IndexConstants, Signature} import com.microsoft.hyperspace.index.IndexConstants.INDEX_HYBRID_SCAN_ENABLED import com.microsoft.hyperspace.util.{FileUtils, PathUtils} @@ -286,8 +286,10 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { "'appendedFiles' are not usable indexes if hybrid scan is disabled.") { withSQLConf(INDEX_HYBRID_SCAN_ENABLED -> "false") { val entry1 = createIndexLogEntry("t1iTest", Seq(t1c1), Seq(t1c3), t1ProjectNode) - val entry2 = entry1.withAppendedAndDeletedFiles(Seq(), Seq(FileInfo("file:/dir/f1", 1, 1))) - val entry3 = entry1.withAppendedAndDeletedFiles(Seq(FileInfo("file:/dir/f2", 1, 1)), Seq()) + val entry2 = + entry1.copyWithUpdate(Seq(Signature("", "")), Seq(), Seq(FileInfo("file:/dir/f1", 1, 1))) + val entry3 = + entry1.copyWithUpdate(Seq(Signature("", "")), Seq(FileInfo("file:/dir/f2", 1, 1)), Seq()) // IndexLogEntry.withAppendedAndDeletedFiles doesn't copy LogEntry's fields. // Thus, set the 'state' to ACTIVE manually so that these entries are considered // in RuleUtils.getCandidateIndexes. From 6e25dc4e6d67e21191de87ee4a4ee411b8490e93 Mon Sep 17 00:00:00 2001 From: sezruby Date: Sun, 1 Nov 2020 20:28:55 +0900 Subject: [PATCH 2/6] review commit --- .../hyperspace/actions/RefreshQuickAction.scala | 12 +++++++----- .../hyperspace/index/IndexLogEntry.scala | 16 ++++++++++------ .../hyperspace/index/IndexLogEntryTest.scala | 4 +++- .../hyperspace/index/RefreshIndexTests.scala | 2 +- .../hyperspace/index/rules/RuleUtilsTest.scala | 8 +++++--- 5 files changed, 26 insertions(+), 16 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshQuickAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshQuickAction.scala index 92d8a6bd5..1d7cb302b 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshQuickAction.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshQuickAction.scala @@ -70,10 +70,12 @@ class RefreshQuickAction( */ override def logEntry: LogEntry = { val signatureProvider = LogicalPlanSignatureProvider.create() - val latestSignature = signatureProvider.signature(df.queryExecution.optimizedPlan).get - previousIndexLogEntry.copyWithUpdate( - Seq(Signature(signatureProvider.name, latestSignature)), - appendedFiles, - deletedFiles) + val latestFingerprint = LogicalPlanFingerprint( + LogicalPlanFingerprint.Properties( + Seq( + Signature( + signatureProvider.name, + signatureProvider.signature(df.queryExecution.optimizedPlan).get)))) + previousIndexLogEntry.copyWithUpdate(latestFingerprint, appendedFiles, deletedFiles) } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index a7c0618c6..16f12d59d 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -327,15 +327,19 @@ object LogicalPlanFingerprint { } /** - * Updated source data since lst time derived dataset was updated. Note that index data does not - * cover appendedFiles and deleted Files. + * Captures any HDFS updates. * + * @note 'fingerprint' shouldn't be tied to [[LogicalPlanFingerprint]], but it's a free-form + * class, meaning it has "kind" and "properties" so that different classes can be plugged + * in. Thus, 'fingerprint' is still a generic field in terms of the metadata log + * specification. + * + * @param fingerprint Fingerprint of the update. * @param appendedFiles Appended files. * @param deletedFiles Deleted files. - * @param latestSignatures Latest signatures including appendedFiles and deletedFiles. */ case class Update( - latestSignatures: Seq[Signature], + fingerprint: LogicalPlanFingerprint, appendedFiles: Option[Content] = None, deletedFiles: Option[Content] = None) @@ -348,7 +352,7 @@ object Hdfs { /** * Hdfs file properties. * @param content Content object representing Hdfs file based data source. - * @param update Source data update since the last time derived dataset was updated. + * @param update Captures any updates since 'content' was created. */ case class Properties(content: Content, update: Option[Update] = None) } @@ -418,7 +422,7 @@ case class IndexLogEntry( } def copyWithUpdate( - latestSignature: Seq[Signature], + latestSignature: LogicalPlanFingerprint, appended: Seq[FileInfo], deleted: Seq[FileInfo]): IndexLogEntry = { def toFileStatus(f: FileInfo) = { diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala index fcdb69883..bc418718a 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala @@ -195,7 +195,9 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter ), Some( Update( - Seq(Signature("provider", "signature")), + LogicalPlanFingerprint( + LogicalPlanFingerprint.Properties( + Seq(Signature("signatureProvider", "dfSignature")))), None, Some(Content(Directory("", Seq(FileInfo("f1", 10, 10))))) ) diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala index 76cf907f1..0ae8aad87 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala @@ -431,7 +431,7 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { // Check `Update` is collected properly. assert(indexLogEntry.sourceUpdate.isDefined) assert( - indexLogEntry.sourceUpdate.get.latestSignatures.head.value + indexLogEntry.sourceUpdate.get.fingerprint.properties.signatures.head.value == expectedLatestSignature) assert(indexLogEntry.appendedFiles === expectedAppendedFiles) assert(indexLogEntry.deletedFiles === expectedDeletedFiles) diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala index e4d81cfb5..cfdc4f04f 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFil import org.apache.spark.sql.types.{IntegerType, StringType} import com.microsoft.hyperspace.actions.Constants -import com.microsoft.hyperspace.index.{FileInfo, IndexCollectionManager, IndexConfig, IndexConstants, Signature} +import com.microsoft.hyperspace.index.{FileInfo, IndexCollectionManager, IndexConfig, IndexConstants, LogicalPlanFingerprint, Signature} import com.microsoft.hyperspace.index.IndexConstants.INDEX_HYBRID_SCAN_ENABLED import com.microsoft.hyperspace.util.{FileUtils, PathUtils} @@ -285,11 +285,13 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { "RuleUtils.getCandidateIndexes: Verify indexes with non-empty 'deletedFiles' or " + "'appendedFiles' are not usable indexes if hybrid scan is disabled.") { withSQLConf(INDEX_HYBRID_SCAN_ENABLED -> "false") { + val fingerprint = LogicalPlanFingerprint( + LogicalPlanFingerprint.Properties(Seq(Signature("signatureProvider", "dfSignature")))) val entry1 = createIndexLogEntry("t1iTest", Seq(t1c1), Seq(t1c3), t1ProjectNode) val entry2 = - entry1.copyWithUpdate(Seq(Signature("", "")), Seq(), Seq(FileInfo("file:/dir/f1", 1, 1))) + entry1.copyWithUpdate(fingerprint, Seq(), Seq(FileInfo("file:/dir/f1", 1, 1))) val entry3 = - entry1.copyWithUpdate(Seq(Signature("", "")), Seq(FileInfo("file:/dir/f2", 1, 1)), Seq()) + entry1.copyWithUpdate(fingerprint, Seq(FileInfo("file:/dir/f2", 1, 1)), Seq()) // IndexLogEntry.withAppendedAndDeletedFiles doesn't copy LogEntry's fields. // Thus, set the 'state' to ACTIVE manually so that these entries are considered // in RuleUtils.getCandidateIndexes. From 40d63498f10617ed7c6fee8599694e137f536be1 Mon Sep 17 00:00:00 2001 From: sezruby Date: Sun, 1 Nov 2020 20:45:37 +0900 Subject: [PATCH 3/6] Remove RefreshQuickAction + fix test --- .../actions/RefreshQuickAction.scala | 81 ------------------- .../telemetry/HyperspaceEvent.scala | 11 --- .../hyperspace/index/IndexLogEntryTest.scala | 11 ++- .../hyperspace/index/RefreshIndexTests.scala | 63 +-------------- 4 files changed, 8 insertions(+), 158 deletions(-) delete mode 100644 src/main/scala/com/microsoft/hyperspace/actions/RefreshQuickAction.scala diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshQuickAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshQuickAction.scala deleted file mode 100644 index 1d7cb302b..000000000 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshQuickAction.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright (2020) The Hyperspace Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.microsoft.hyperspace.actions - -import org.apache.spark.sql.SparkSession - -import com.microsoft.hyperspace.HyperspaceException -import com.microsoft.hyperspace.index._ -import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEvent, RefreshQuickActionEvent} - -/** - * Action to refresh indexes metadata only with newly appended files and deleted files. - * - * @param spark SparkSession. - * @param logManager Index LogManager for index being refreshed. - * @param dataManager Index DataManager for index being refreshed. - */ -class RefreshQuickAction( - spark: SparkSession, - logManager: IndexLogManager, - dataManager: IndexDataManager) - extends RefreshActionBase(spark, logManager, dataManager) { - final override def op(): Unit = { - logInfo( - "Refresh index is updating metadata only with " + deletedFiles.size + " of" + - "deleted files and " + appendedFiles.size + " of appended files.") - } - - /** - * Validate index is in active state for refreshing and there are some changes - * in source data file(s). - */ - final override def validate(): Unit = { - super.validate() - - if (appendedFiles.isEmpty && deletedFiles.isEmpty) { - throw NoChangesException("Refresh incremental aborted as no source data change found.") - } - - // To handle deleted files, lineage column is required for the index. - if (deletedFiles.nonEmpty && !previousIndexLogEntry.hasLineageColumn(spark)) { - throw HyperspaceException( - "Index refresh (to handle deleted source data) is " + - "only supported on an index with lineage.") - } - } - - override protected def event(appInfo: AppInfo, message: String): HyperspaceEvent = { - RefreshQuickActionEvent(appInfo, logEntry.asInstanceOf[IndexLogEntry], message) - } - - /** - * Create a log entry with metadata update for appended files and deleted files. - * - * @return Refreshed index log entry. - */ - override def logEntry: LogEntry = { - val signatureProvider = LogicalPlanSignatureProvider.create() - val latestFingerprint = LogicalPlanFingerprint( - LogicalPlanFingerprint.Properties( - Seq( - Signature( - signatureProvider.name, - signatureProvider.signature(df.queryExecution.optimizedPlan).get)))) - previousIndexLogEntry.copyWithUpdate(latestFingerprint, appendedFiles, deletedFiles) - } -} diff --git a/src/main/scala/com/microsoft/hyperspace/telemetry/HyperspaceEvent.scala b/src/main/scala/com/microsoft/hyperspace/telemetry/HyperspaceEvent.scala index 8957a1db8..59812ddc6 100644 --- a/src/main/scala/com/microsoft/hyperspace/telemetry/HyperspaceEvent.scala +++ b/src/main/scala/com/microsoft/hyperspace/telemetry/HyperspaceEvent.scala @@ -116,17 +116,6 @@ case class CancelActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: St case class RefreshIncrementalActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: String) extends HyperspaceIndexCRUDEvent -/** - * Index Refresh Event for quick mode. Emitted when refresh is called on an index - * with "quick" mode to update index metadata only for appended/deleted source data files. - * - * @param appInfo AppInfo for spark application. - * @param index Related index. - * @param message Message about event. - */ -case class RefreshQuickActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: String) - extends HyperspaceIndexCRUDEvent - /** * Index Optimize Event for index files. * diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala index bc418718a..899275c9b 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala @@ -127,9 +127,12 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter | } | }, | "update" : { - | "latestSignatures" : [ { - | "provider" : "provider", - | "value" : "signature" } ], + | "fingerprint" : { + | "properties" : { + | "signatures" : [ { + | "provider" : "provider", + | "value" : "signatureValue" + | } ] } }, | "deletedFiles" : { | "root" : { | "name" : "", @@ -197,7 +200,7 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter Update( LogicalPlanFingerprint( LogicalPlanFingerprint.Properties( - Seq(Signature("signatureProvider", "dfSignature")))), + Seq(Signature("provider", "signatureValue")))), None, Some(Content(Directory("", Seq(FileInfo("f1", 10, 10))))) ) diff --git a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala index 0ae8aad87..241c4e5aa 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/RefreshIndexTests.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.{AnalysisException, QueryTest} import com.microsoft.hyperspace.{Hyperspace, HyperspaceException, MockEventLogger, SampleData, TestUtils} import com.microsoft.hyperspace.TestUtils.logManager -import com.microsoft.hyperspace.actions.{RefreshIncrementalAction, RefreshQuickAction} +import com.microsoft.hyperspace.actions.RefreshIncrementalAction import com.microsoft.hyperspace.index.IndexConstants.REFRESH_MODE_INCREMENTAL import com.microsoft.hyperspace.telemetry.RefreshIncrementalActionEvent import com.microsoft.hyperspace.util.{FileUtils, PathUtils} @@ -385,67 +385,6 @@ class RefreshIndexTests extends QueryTest with HyperspaceSuite { } } - test( - "Validate RefreshQuickAction updates appended and deleted files in metadata " + - "expected, when some file gets deleted and some appended to source data.") { - withTempPathAsString { testPath => - withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") { - withIndex(indexConfig.indexName) { - SampleData.save(spark, testPath, Seq("Date", "RGUID", "Query", "imprs", "clicks")) - val df = spark.read.parquet(testPath) - hyperspace.createIndex(df, indexConfig) - - val oldFiles = listFiles(testPath).toSet - - // Delete one source data file. - deleteOneDataFile(testPath) - - // Add some new data to source. - import spark.implicits._ - SampleData.testData - .take(3) - .toDF("Date", "RGUID", "Query", "imprs", "clicks") - .write - .mode("append") - .parquet(testPath) - - val prevIndexLogEntry = getLatestStableLog(indexConfig.indexName) - - val indexPath = PathUtils.makeAbsolute(s"$systemPath/${indexConfig.indexName}") - new RefreshQuickAction( - spark, - IndexLogManagerFactoryImpl.create(indexPath), - IndexDataManagerFactoryImpl.create(indexPath)) - .run() - - val indexLogEntry = getLatestStableLog(indexConfig.indexName) - val latestFiles = listFiles(testPath).toSet - val expectedDeletedFiles = oldFiles -- latestFiles - val expectedAppendedFiles = latestFiles -- oldFiles - - val signatureProvider = LogicalPlanSignatureProvider.create() - val latestDf = spark.read.parquet(testPath) - val expectedLatestSignature = - signatureProvider.signature(latestDf.queryExecution.optimizedPlan).get - - // Check `Update` is collected properly. - assert(indexLogEntry.sourceUpdate.isDefined) - assert( - indexLogEntry.sourceUpdate.get.fingerprint.properties.signatures.head.value - == expectedLatestSignature) - assert(indexLogEntry.appendedFiles === expectedAppendedFiles) - assert(indexLogEntry.deletedFiles === expectedDeletedFiles) - - // Check index data files and source data files are not updated. - assert( - indexLogEntry.relations.head.data.properties.content.fileInfos - === prevIndexLogEntry.relations.head.data.properties.content.fileInfos) - assert(indexLogEntry.content.fileInfos === prevIndexLogEntry.content.fileInfos) - } - } - } - } - /** * Delete one file from a given path. * From dc8a4a1e90ae10bc486eb0b6aa486bb74e393c3e Mon Sep 17 00:00:00 2001 From: sezruby Date: Sun, 1 Nov 2020 20:59:08 +0900 Subject: [PATCH 4/6] comment update --- .../com/microsoft/hyperspace/index/IndexLogEntry.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index 16f12d59d..526fff058 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -357,7 +357,15 @@ object Hdfs { case class Properties(content: Content, update: Option[Update] = None) } -// IndexLogEntry-specific Relation that represents the source relation. +/** + * IndexLogEntry-specific Relation that represents the source relation. + * + * @param rootPaths List of root paths for the source relation. + * @param data Source data since last time derived dataset was updated. + * @param dataSchemaJson Schema in json format. + * @param fileFormat File format name. + * @param options Options to read the source relation. + */ case class Relation( rootPaths: Seq[String], data: Hdfs, From a94a8dffa38b596b889c3c296db2a6516920afff Mon Sep 17 00:00:00 2001 From: sezruby Date: Sun, 1 Nov 2020 23:15:27 +0900 Subject: [PATCH 5/6] indent --- .../com/microsoft/hyperspace/index/IndexLogEntry.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index 526fff058..e3ede80cd 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -339,9 +339,9 @@ object LogicalPlanFingerprint { * @param deletedFiles Deleted files. */ case class Update( - fingerprint: LogicalPlanFingerprint, - appendedFiles: Option[Content] = None, - deletedFiles: Option[Content] = None) + fingerprint: LogicalPlanFingerprint, + appendedFiles: Option[Content] = None, + deletedFiles: Option[Content] = None) // IndexLogEntry-specific Hdfs that represents the source data. case class Hdfs(properties: Hdfs.Properties) { From 5a83b8de6343e737615bcb775c59e006e895efae Mon Sep 17 00:00:00 2001 From: sezruby Date: Mon, 2 Nov 2020 12:59:22 +0900 Subject: [PATCH 6/6] Remove latest fingerprint --- .../hyperspace/index/IndexLogEntry.scala | 15 +++++---------- .../hyperspace/index/IndexLogEntryTest.scala | 9 --------- 2 files changed, 5 insertions(+), 19 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index e3ede80cd..cd4570612 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -329,17 +329,10 @@ object LogicalPlanFingerprint { /** * Captures any HDFS updates. * - * @note 'fingerprint' shouldn't be tied to [[LogicalPlanFingerprint]], but it's a free-form - * class, meaning it has "kind" and "properties" so that different classes can be plugged - * in. Thus, 'fingerprint' is still a generic field in terms of the metadata log - * specification. - * - * @param fingerprint Fingerprint of the update. * @param appendedFiles Appended files. * @param deletedFiles Deleted files. */ case class Update( - fingerprint: LogicalPlanFingerprint, appendedFiles: Option[Content] = None, deletedFiles: Option[Content] = None) @@ -361,7 +354,9 @@ object Hdfs { * IndexLogEntry-specific Relation that represents the source relation. * * @param rootPaths List of root paths for the source relation. - * @param data Source data since last time derived dataset was updated. + * @param data Source data for the relation. + * Hdfs.properties.content captures source data which derived dataset was created from. + * Hdfs.properties.update captures any updates since the derived dataset was created. * @param dataSchemaJson Schema in json format. * @param fileFormat File format name. * @param options Options to read the source relation. @@ -430,7 +425,7 @@ case class IndexLogEntry( } def copyWithUpdate( - latestSignature: LogicalPlanFingerprint, + latestFingerprint: LogicalPlanFingerprint, appended: Seq[FileInfo], deleted: Seq[FileInfo]): IndexLogEntry = { def toFileStatus(f: FileInfo) = { @@ -440,13 +435,13 @@ case class IndexLogEntry( source = source.copy( plan = source.plan.copy( properties = source.plan.properties.copy( + fingerprint = latestFingerprint, relations = Seq( relations.head.copy( data = relations.head.data.copy( properties = relations.head.data.properties.copy( update = Some( Update( - latestSignature, appendedFiles = Content.fromLeafFiles(appended.map(toFileStatus)), deletedFiles = Content.fromLeafFiles(deleted.map(toFileStatus)))))))))))) } diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala index 899275c9b..cd44b674a 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala @@ -127,12 +127,6 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter | } | }, | "update" : { - | "fingerprint" : { - | "properties" : { - | "signatures" : [ { - | "provider" : "provider", - | "value" : "signatureValue" - | } ] } }, | "deletedFiles" : { | "root" : { | "name" : "", @@ -198,9 +192,6 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter ), Some( Update( - LogicalPlanFingerprint( - LogicalPlanFingerprint.Properties( - Seq(Signature("provider", "signatureValue")))), None, Some(Content(Directory("", Seq(FileInfo("f1", 10, 10))))) )