diff --git a/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala b/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala index ffc066076..3c72f52ae 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/RefreshActionBase.scala @@ -102,16 +102,8 @@ private[actions] abstract class RefreshActionBase( protected lazy val deletedFiles: Seq[FileInfo] = { val relation = previousIndexLogEntry.relations.head val originalFiles = relation.data.properties.content.fileInfos - val delFiles = originalFiles -- currentFiles - val delFileNames = delFiles.map(_.name) - // Remove duplicate deleted file names in the previous log entry. - val prevDeletedFiles = - previousIndexLogEntry.deletedFiles.filterNot(f => delFileNames.contains(f.name)) - - // TODO: Add test for the scenario where existing deletedFiles and newly deleted - // files are updated. https://github.com/microsoft/hyperspace/issues/195. - delFiles.toSeq ++ prevDeletedFiles + (originalFiles -- currentFiles).toSeq } /** @@ -144,15 +136,7 @@ private[actions] abstract class RefreshActionBase( protected lazy val appendedFiles: Seq[FileInfo] = { val relation = previousIndexLogEntry.relations.head val originalFiles = relation.data.properties.content.fileInfos - val newFiles = currentFiles -- originalFiles - val newFileNames = newFiles.map(_.name) - - // Remove duplicate appended file names in the previous log entry. - val prevAppendedFiles = - previousIndexLogEntry.appendedFiles.filterNot(f => newFileNames.contains(f.name)) - // TODO: Add test for the scenario where existing appendedFiles and newly appended - // files are updated. https://github.com/microsoft/hyperspace/issues/195. - newFiles.toSeq ++ prevAppendedFiles + (currentFiles -- originalFiles).toSeq } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index 6b04b7331..cd4570612 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -326,6 +326,16 @@ object LogicalPlanFingerprint { case class Properties(signatures: Seq[Signature]) } +/** + * Captures any HDFS updates. + * + * @param appendedFiles Appended files. + * @param deletedFiles Deleted files. + */ +case class Update( + appendedFiles: Option[Content] = None, + deletedFiles: Option[Content] = None) + // IndexLogEntry-specific Hdfs that represents the source data. case class Hdfs(properties: Hdfs.Properties) { val kind = "HDFS" @@ -335,16 +345,22 @@ object Hdfs { /** * Hdfs file properties. * @param content Content object representing Hdfs file based data source. - * @param appendedFiles Appended files since the last time derived dataset was updated. - * @param deletedFiles Deleted files since the last time derived dataset was updated. + * @param update Captures any updates since 'content' was created. */ - case class Properties( - content: Content, - appendedFiles: Option[Content] = None, - deletedFiles: Option[Content] = None) + case class Properties(content: Content, update: Option[Update] = None) } -// IndexLogEntry-specific Relation that represents the source relation. +/** + * IndexLogEntry-specific Relation that represents the source relation. + * + * @param rootPaths List of root paths for the source relation. + * @param data Source data for the relation. + * Hdfs.properties.content captures source data which derived dataset was created from. + * Hdfs.properties.update captures any updates since the derived dataset was created. + * @param dataSchemaJson Schema in json format. + * @param fileFormat File format name. + * @param options Options to read the source relation. + */ case class Relation( rootPaths: Seq[String], data: Hdfs, @@ -394,17 +410,22 @@ case class IndexLogEntry( relations.head.data.properties.content.fileInfos } - // FileInfo's 'name' contains the full path to the file. - def deletedFiles: Set[FileInfo] = { - relations.head.data.properties.deletedFiles.map(_.fileInfos).getOrElse(Set()) + def sourceUpdate: Option[Update] = { + relations.head.data.properties.update } // FileInfo's 'name' contains the full path to the file. def appendedFiles: Set[FileInfo] = { - relations.head.data.properties.appendedFiles.map(_.fileInfos).getOrElse(Set()) + sourceUpdate.flatMap(_.appendedFiles).map(_.fileInfos).getOrElse(Set()) + } + + // FileInfo's 'name' contains the full path to the file. + def deletedFiles: Set[FileInfo] = { + sourceUpdate.flatMap(_.deletedFiles).map(_.fileInfos).getOrElse(Set()) } - def withAppendedAndDeletedFiles( + def copyWithUpdate( + latestFingerprint: LogicalPlanFingerprint, appended: Seq[FileInfo], deleted: Seq[FileInfo]): IndexLogEntry = { def toFileStatus(f: FileInfo) = { @@ -414,12 +435,15 @@ case class IndexLogEntry( source = source.copy( plan = source.plan.copy( properties = source.plan.properties.copy( + fingerprint = latestFingerprint, relations = Seq( relations.head.copy( data = relations.head.data.copy( properties = relations.head.data.properties.copy( - appendedFiles = Content.fromLeafFiles(appended.map(toFileStatus)), - deletedFiles = Content.fromLeafFiles(deleted.map(toFileStatus)))))))))) + update = Some( + Update( + appendedFiles = Content.fromLeafFiles(appended.map(toFileStatus)), + deletedFiles = Content.fromLeafFiles(deleted.map(toFileStatus)))))))))))) } def bucketSpec: BucketSpec = diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala index 7d3229cde..cd44b674a 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala @@ -126,22 +126,24 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter | "properties" : { } | } | }, - | "deletedFiles" : { - | "root" : { - | "name" : "", - | "files" : [ { - | "name" : "f1", - | "size" : 10, - | "modifiedTime" : 10 - | }], - | "subDirs" : [ ] + | "update" : { + | "deletedFiles" : { + | "root" : { + | "name" : "", + | "files" : [ { + | "name" : "f1", + | "size" : 10, + | "modifiedTime" : 10 + | }], + | "subDirs" : [ ] + | }, + | "fingerprint" : { + | "kind" : "NoOp", + | "properties" : { } + | } | }, - | "fingerprint" : { - | "kind" : "NoOp", - | "properties" : { } - | } - | }, - | "appendedFiles" : null + | "appendedFiles" : null + | } | }, | "kind" : "HDFS" | }, @@ -176,19 +178,38 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter StructType(Array(StructField("RGUID", StringType), StructField("Date", StringType))) val expectedSourcePlanProperties = SparkPlan.Properties( - Seq(Relation( - Seq("rootpath"), - Hdfs(Hdfs.Properties(Content( - Directory("", Seq(FileInfo("f1", 100L, 100L), FileInfo("f2", 200L, 200L)), Seq())), - None, - Some(Content(Directory("", Seq(FileInfo("f1", 10, 10))))))), - "schema", - "type", - Map())), + Seq( + Relation( + Seq("rootpath"), + Hdfs( + Hdfs.Properties( + Content( + Directory( + "", + Seq(FileInfo("f1", 100L, 100L), FileInfo("f2", 200L, 200L)), + Seq() + ) + ), + Some( + Update( + None, + Some(Content(Directory("", Seq(FileInfo("f1", 10, 10))))) + ) + ) + ) + ), + "schema", + "type", + Map() + ) + ), null, null, LogicalPlanFingerprint( - LogicalPlanFingerprint.Properties(Seq(Signature("provider", "signatureValue"))))) + LogicalPlanFingerprint + .Properties(Seq(Signature("provider", "signatureValue"))) + ) + ) val expected = IndexLogEntry( "indexName", diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala index cc9504ceb..71c2ae7f1 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexManagerTests.scala @@ -489,7 +489,7 @@ class IndexManagerTests extends HyperspaceSuite with SQLHelper { assert(indexFiles.forall(_.getName.startsWith("part-0"))) assert(indexLog.state.equals("ACTIVE")) // Check all files belong to the provided index versions only. - assert(indexFiles.map(_.getParent.getName).toSet.equals(indexVersions)) + assert(indexFiles.map(_.getParent.getName).toSet === indexVersions) } private def expectedIndex( diff --git a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala index dc6948d22..cfdc4f04f 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/rules/RuleUtilsTest.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFil import org.apache.spark.sql.types.{IntegerType, StringType} import com.microsoft.hyperspace.actions.Constants -import com.microsoft.hyperspace.index.{FileInfo, IndexCollectionManager, IndexConfig, IndexConstants} +import com.microsoft.hyperspace.index.{FileInfo, IndexCollectionManager, IndexConfig, IndexConstants, LogicalPlanFingerprint, Signature} import com.microsoft.hyperspace.index.IndexConstants.INDEX_HYBRID_SCAN_ENABLED import com.microsoft.hyperspace.util.{FileUtils, PathUtils} @@ -285,9 +285,13 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper { "RuleUtils.getCandidateIndexes: Verify indexes with non-empty 'deletedFiles' or " + "'appendedFiles' are not usable indexes if hybrid scan is disabled.") { withSQLConf(INDEX_HYBRID_SCAN_ENABLED -> "false") { + val fingerprint = LogicalPlanFingerprint( + LogicalPlanFingerprint.Properties(Seq(Signature("signatureProvider", "dfSignature")))) val entry1 = createIndexLogEntry("t1iTest", Seq(t1c1), Seq(t1c3), t1ProjectNode) - val entry2 = entry1.withAppendedAndDeletedFiles(Seq(), Seq(FileInfo("file:/dir/f1", 1, 1))) - val entry3 = entry1.withAppendedAndDeletedFiles(Seq(FileInfo("file:/dir/f2", 1, 1)), Seq()) + val entry2 = + entry1.copyWithUpdate(fingerprint, Seq(), Seq(FileInfo("file:/dir/f1", 1, 1))) + val entry3 = + entry1.copyWithUpdate(fingerprint, Seq(FileInfo("file:/dir/f2", 1, 1)), Seq()) // IndexLogEntry.withAppendedAndDeletedFiles doesn't copy LogEntry's fields. // Thus, set the 'state' to ACTIVE manually so that these entries are considered // in RuleUtils.getCandidateIndexes.