Skip to content
This repository was archived by the owner on Jun 14, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,8 @@ private[actions] abstract class RefreshActionBase(
protected lazy val deletedFiles: Seq[FileInfo] = {
val relation = previousIndexLogEntry.relations.head
val originalFiles = relation.data.properties.content.fileInfos
val delFiles = originalFiles -- currentFiles
val delFileNames = delFiles.map(_.name)

// Remove duplicate deleted file names in the previous log entry.
val prevDeletedFiles =
previousIndexLogEntry.deletedFiles.filterNot(f => delFileNames.contains(f.name))

// TODO: Add test for the scenario where existing deletedFiles and newly deleted
// files are updated. https://github.com/microsoft/hyperspace/issues/195.
delFiles.toSeq ++ prevDeletedFiles
(originalFiles -- currentFiles).toSeq
}

/**
Expand Down Expand Up @@ -144,15 +136,7 @@ private[actions] abstract class RefreshActionBase(
protected lazy val appendedFiles: Seq[FileInfo] = {
val relation = previousIndexLogEntry.relations.head
val originalFiles = relation.data.properties.content.fileInfos
val newFiles = currentFiles -- originalFiles
val newFileNames = newFiles.map(_.name)

// Remove duplicate appended file names in the previous log entry.
val prevAppendedFiles =
previousIndexLogEntry.appendedFiles.filterNot(f => newFileNames.contains(f.name))

// TODO: Add test for the scenario where existing appendedFiles and newly appended
// files are updated. https://github.com/microsoft/hyperspace/issues/195.
newFiles.toSeq ++ prevAppendedFiles
(currentFiles -- originalFiles).toSeq
}
}
52 changes: 38 additions & 14 deletions src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,16 @@ object LogicalPlanFingerprint {
case class Properties(signatures: Seq[Signature])
}

/**
* Captures any HDFS updates.
*
* @param appendedFiles Appended files.
* @param deletedFiles Deleted files.
*/
case class Update(
appendedFiles: Option[Content] = None,
deletedFiles: Option[Content] = None)

// IndexLogEntry-specific Hdfs that represents the source data.
case class Hdfs(properties: Hdfs.Properties) {
val kind = "HDFS"
Expand All @@ -335,16 +345,22 @@ object Hdfs {
/**
* Hdfs file properties.
* @param content Content object representing Hdfs file based data source.
* @param appendedFiles Appended files since the last time derived dataset was updated.
* @param deletedFiles Deleted files since the last time derived dataset was updated.
* @param update Captures any updates since 'content' was created.
*/
case class Properties(
content: Content,
appendedFiles: Option[Content] = None,
deletedFiles: Option[Content] = None)
case class Properties(content: Content, update: Option[Update] = None)
}

// IndexLogEntry-specific Relation that represents the source relation.
/**
* IndexLogEntry-specific Relation that represents the source relation.
*
* @param rootPaths List of root paths for the source relation.
* @param data Source data for the relation.
* Hdfs.properties.content captures source data which derived dataset was created from.
* Hdfs.properties.update captures any updates since the derived dataset was created.
* @param dataSchemaJson Schema in json format.
* @param fileFormat File format name.
* @param options Options to read the source relation.
*/
case class Relation(
rootPaths: Seq[String],
data: Hdfs,
Expand Down Expand Up @@ -394,17 +410,22 @@ case class IndexLogEntry(
relations.head.data.properties.content.fileInfos
}

// FileInfo's 'name' contains the full path to the file.
def deletedFiles: Set[FileInfo] = {
relations.head.data.properties.deletedFiles.map(_.fileInfos).getOrElse(Set())
def sourceUpdate: Option[Update] = {
relations.head.data.properties.update
}

// FileInfo's 'name' contains the full path to the file.
def appendedFiles: Set[FileInfo] = {
relations.head.data.properties.appendedFiles.map(_.fileInfos).getOrElse(Set())
sourceUpdate.flatMap(_.appendedFiles).map(_.fileInfos).getOrElse(Set())
}

// FileInfo's 'name' contains the full path to the file.
def deletedFiles: Set[FileInfo] = {
sourceUpdate.flatMap(_.deletedFiles).map(_.fileInfos).getOrElse(Set())
}
Copy link
Contributor

@imback82 imback82 Nov 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the following be updated or not (can be done as a follow up PR)?

// index.deletedFiles and index.appendedFiles should be non-empty until Hybrid Scan
// handles the lists properly. Otherwise, as the source file list of each index entry
// (entry.allSourceFileInfo) also contains the appended and deleted files, we cannot
// get the actual appended files and deleted files correctly.
indexes.filter(
index =>
index.created && index.deletedFiles.isEmpty && index.appendedFiles.isEmpty &&
isHybridScanCandidate(index, filesByRelations.flatten))

Copy link
Collaborator Author

@sezruby sezruby Nov 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's required until

  • RefreshQuickAction & refresh mode api
  • getCandidateIndex fix
    • compare with the latest signature if exists
    • if so, tag hybrid scan required = true
  • transformPlanToUseIndex & hybrid scan fix
    • use appendedFiles and deletedFiles if exist


def withAppendedAndDeletedFiles(
def copyWithUpdate(
latestFingerprint: LogicalPlanFingerprint,
appended: Seq[FileInfo],
deleted: Seq[FileInfo]): IndexLogEntry = {
def toFileStatus(f: FileInfo) = {
Expand All @@ -414,12 +435,15 @@ case class IndexLogEntry(
source = source.copy(
plan = source.plan.copy(
properties = source.plan.properties.copy(
fingerprint = latestFingerprint,
relations = Seq(
relations.head.copy(
data = relations.head.data.copy(
properties = relations.head.data.properties.copy(
appendedFiles = Content.fromLeafFiles(appended.map(toFileStatus)),
deletedFiles = Content.fromLeafFiles(deleted.map(toFileStatus))))))))))
update = Some(
Update(
appendedFiles = Content.fromLeafFiles(appended.map(toFileStatus)),
deletedFiles = Content.fromLeafFiles(deleted.map(toFileStatus))))))))))))
}

def bucketSpec: BucketSpec =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,22 +126,24 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter
| "properties" : { }
| }
| },
| "deletedFiles" : {
| "root" : {
| "name" : "",
| "files" : [ {
| "name" : "f1",
| "size" : 10,
| "modifiedTime" : 10
| }],
| "subDirs" : [ ]
| "update" : {
| "deletedFiles" : {
| "root" : {
| "name" : "",
| "files" : [ {
| "name" : "f1",
| "size" : 10,
| "modifiedTime" : 10
| }],
| "subDirs" : [ ]
| },
| "fingerprint" : {
| "kind" : "NoOp",
| "properties" : { }
| }
| },
| "fingerprint" : {
| "kind" : "NoOp",
| "properties" : { }
| }
| },
| "appendedFiles" : null
| "appendedFiles" : null
| }
| },
| "kind" : "HDFS"
| },
Expand Down Expand Up @@ -176,19 +178,38 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter
StructType(Array(StructField("RGUID", StringType), StructField("Date", StringType)))

val expectedSourcePlanProperties = SparkPlan.Properties(
Seq(Relation(
Seq("rootpath"),
Hdfs(Hdfs.Properties(Content(
Directory("", Seq(FileInfo("f1", 100L, 100L), FileInfo("f2", 200L, 200L)), Seq())),
None,
Some(Content(Directory("", Seq(FileInfo("f1", 10, 10))))))),
"schema",
"type",
Map())),
Seq(
Relation(
Seq("rootpath"),
Hdfs(
Hdfs.Properties(
Content(
Directory(
"",
Seq(FileInfo("f1", 100L, 100L), FileInfo("f2", 200L, 200L)),
Seq()
)
),
Some(
Update(
None,
Some(Content(Directory("", Seq(FileInfo("f1", 10, 10)))))
)
)
)
),
"schema",
"type",
Map()
)
),
null,
null,
LogicalPlanFingerprint(
LogicalPlanFingerprint.Properties(Seq(Signature("provider", "signatureValue")))))
LogicalPlanFingerprint
.Properties(Seq(Signature("provider", "signatureValue")))
)
)

val expected = IndexLogEntry(
"indexName",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ class IndexManagerTests extends HyperspaceSuite with SQLHelper {
assert(indexFiles.forall(_.getName.startsWith("part-0")))
assert(indexLog.state.equals("ACTIVE"))
// Check all files belong to the provided index versions only.
assert(indexFiles.map(_.getParent.getName).toSet.equals(indexVersions))
assert(indexFiles.map(_.getParent.getName).toSet === indexVersions)
}

private def expectedIndex(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFil
import org.apache.spark.sql.types.{IntegerType, StringType}

import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.index.{FileInfo, IndexCollectionManager, IndexConfig, IndexConstants}
import com.microsoft.hyperspace.index.{FileInfo, IndexCollectionManager, IndexConfig, IndexConstants, LogicalPlanFingerprint, Signature}
import com.microsoft.hyperspace.index.IndexConstants.INDEX_HYBRID_SCAN_ENABLED
import com.microsoft.hyperspace.util.{FileUtils, PathUtils}

Expand Down Expand Up @@ -285,9 +285,13 @@ class RuleUtilsTest extends HyperspaceRuleTestSuite with SQLHelper {
"RuleUtils.getCandidateIndexes: Verify indexes with non-empty 'deletedFiles' or " +
"'appendedFiles' are not usable indexes if hybrid scan is disabled.") {
withSQLConf(INDEX_HYBRID_SCAN_ENABLED -> "false") {
val fingerprint = LogicalPlanFingerprint(
LogicalPlanFingerprint.Properties(Seq(Signature("signatureProvider", "dfSignature"))))
val entry1 = createIndexLogEntry("t1iTest", Seq(t1c1), Seq(t1c3), t1ProjectNode)
val entry2 = entry1.withAppendedAndDeletedFiles(Seq(), Seq(FileInfo("file:/dir/f1", 1, 1)))
val entry3 = entry1.withAppendedAndDeletedFiles(Seq(FileInfo("file:/dir/f2", 1, 1)), Seq())
val entry2 =
entry1.copyWithUpdate(fingerprint, Seq(), Seq(FileInfo("file:/dir/f1", 1, 1)))
val entry3 =
entry1.copyWithUpdate(fingerprint, Seq(FileInfo("file:/dir/f2", 1, 1)), Seq())
// IndexLogEntry.withAppendedAndDeletedFiles doesn't copy LogEntry's fields.
// Thus, set the 'state' to ACTIVE manually so that these entries are considered
// in RuleUtils.getCandidateIndexes.
Expand Down