Skip to content
This repository was archived by the owner on Jun 14, 2024. It is now read-only.
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ private[actions] abstract class RefreshActionBase(
IndexConfig(previousIndexLogEntry.name, ddColumns.indexed, ddColumns.included)
}

final override def logEntry: LogEntry = getIndexLogEntry(spark, df, indexConfig, indexDataPath)
override def logEntry: LogEntry = getIndexLogEntry(spark, df, indexConfig, indexDataPath)

final override val transientState: String = REFRESHING

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.microsoft.hyperspace.actions

import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
Expand Down Expand Up @@ -44,26 +43,16 @@ class RefreshDeleteAction(
spark: SparkSession,
logManager: IndexLogManager,
dataManager: IndexDataManager)
extends RefreshActionBase(spark, logManager, dataManager)
extends RefreshDeleteActionBase(spark, logManager, dataManager)
with Logging {

final override protected def event(appInfo: AppInfo, message: String): HyperspaceEvent = {
RefreshDeleteActionEvent(appInfo, logEntry.asInstanceOf[IndexLogEntry], message)
}

Copy link
Contributor Author

@pirz pirz Sep 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To Reviewers: validate is moved to (new) class RefreshDeleteActionBase.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, thanks for letting us know.

/**
* Validate index has lineage column and it is in active state for refreshing and
* there are some deleted source data file(s).
*/
final override def validate(): Unit = {
override def validate(): Unit = {
super.validate()
if (!previousIndexLogEntry.hasLineageColumn(spark)) {
throw HyperspaceException(
"Index refresh (to handle deleted source data) is " +
"only supported on an index with lineage.")
}

if (deletedFiles.isEmpty) {
if (sourceFilesDiff._1.isEmpty) {
throw HyperspaceException("Refresh aborted as no deleted source data file found.")
}
}
Expand All @@ -76,50 +65,17 @@ class RefreshDeleteAction(
final override def op(): Unit = {
logInfo(
"Refresh index is updating index by removing index entries " +
s"corresponding to ${deletedFiles.length} deleted source data files.")
s"corresponding to ${sourceFilesDiff._1.length} deleted source data files.")

val refreshDF =
spark.read
.parquet(previousIndexLogEntry.content.files.map(_.toString): _*)
.filter(!col(s"${IndexConstants.DATA_FILE_NAME_COLUMN}").isin(deletedFiles: _*))
.filter(!col(s"${IndexConstants.DATA_FILE_NAME_COLUMN}").isin(sourceFilesDiff._1: _*))

refreshDF.write.saveWithBuckets(
refreshDF,
indexDataPath.toString,
previousIndexLogEntry.numBuckets,
indexConfig.indexedColumns)
}

/**
* Compare list of source data files from previous IndexLogEntry to list
* of current source data files, validate fileInfo for existing files and
* identify deleted source data files.
*/
private lazy val deletedFiles: Seq[String] = {
val rels = previousIndexLogEntry.relations
val originalFiles = rels.head.data.properties.content.fileInfos
val currentFiles = rels.head.rootPaths
.flatMap { p =>
Content
.fromDirectory(path = new Path(p), throwIfNotExists = true)
.fileInfos
}
.map(f => f.name -> f)
.toMap

var delFiles = Seq[String]()
originalFiles.foreach { f =>
currentFiles.get(f.name) match {
case Some(v) =>
if (!f.equals(v)) {
throw HyperspaceException(
"Index refresh (to handle deleted source data) aborted. " +
s"Existing source data file info is changed (file: ${f.name}).")
}
case None => delFiles :+= f.name
}
}

delFiles
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (2020) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.actions

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession

import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.index.{Content, IndexDataManager, IndexLogManager}

private[actions] abstract class RefreshDeleteActionBase(
Copy link
Contributor Author

@pirz pirz Sep 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To Reviewers:
This class is a simple code refactor. validate and deletedFiles defs are copied from RefreshDeleteAction class here as they are shared between RefreshDeleteAction and DeleteOnReadAction classes.
The code is the same as before, except for validate that now has an extra check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To Reviewers This class needs a rename, as its children are no longer dealing only with delete-related items. Any suggestion?

spark: SparkSession,
logManager: IndexLogManager,
dataManager: IndexDataManager)
extends RefreshActionBase(spark, logManager, dataManager) {

/**
* Validate index has lineage column, and it is in active state for refreshing.
*/
override def validate(): Unit = {
super.validate()
if (!previousIndexLogEntry.hasLineageColumn(spark)) {
throw HyperspaceException(
"Index refresh to update source content in index metadata is " +
"only supported on an index with lineage.")
}
}

/**
* Compare list of source data files from previous IndexLogEntry to list
* of current source data files, validate fileInfo for existing files and
* identify deleted and appended source data files.
*/
protected lazy val sourceFilesDiff: (Seq[String], Seq[String]) = {
val rels = previousIndexLogEntry.relations
val originalFiles = rels.head.data.properties.content.fileInfos
val currentFiles = rels.head.rootPaths
.flatMap { p =>
Content
.fromDirectory(path = new Path(p), throwIfNotExists = true)
.fileInfos
}
.map(f => f.name -> f)
.toMap

var delFiles = Seq[String]()
originalFiles.foreach { f =>
currentFiles.get(f.name) match {
case Some(v) =>
if (!f.equals(v)) {
throw HyperspaceException(
"Index refresh (to handle deleted or appended source data) aborted. " +
s"Existing source data file info is changed (file: ${f.name}).")
}
case None => delFiles :+= f.name
}
}

(delFiles, (currentFiles.keySet diff originalFiles.map(_.name)).toSeq)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright (2020) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.actions

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession

import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEvent, RefreshSourceContentActionEvent}

/**
* Refresh index by updating list of deleted and appended source data files and
* index signature in index metadata.
* If some original source data file(s) are deleted or appended between previous
* version of index and now, this Action refreshes index as follows:
* 1. Deleted and appended source data files are identified.
* 2. New index fingerprint is computed w.r.t latest source data files. This captures
* both deleted source data files and appended ones.
* 3. IndexLogEntry is updated by modifying list of deleted and appended source data
* files and index fingerprint, computed in above steps.
*
* @param spark SparkSession.
* @param logManager Index LogManager for index being refreshed.
* @param dataManager Index DataManager for index being refreshed.
*/
class RefreshSourceContentAction(
spark: SparkSession,
logManager: IndexLogManager,
dataManager: IndexDataManager)
extends RefreshDeleteActionBase(spark, logManager, dataManager)
with Logging {

private lazy val newDeletedFiles: Seq[String] =
sourceFilesDiff._1 diff previousIndexLogEntry.deletedFiles

private lazy val newAppendedFiles: Seq[String] =
sourceFilesDiff._2 diff previousIndexLogEntry.appendedFiles

final override protected def event(appInfo: AppInfo, message: String): HyperspaceEvent = {
RefreshSourceContentActionEvent(appInfo, logEntry.asInstanceOf[IndexLogEntry], message)
}

override def validate(): Unit = {
super.validate()
if (newDeletedFiles.isEmpty && newAppendedFiles.isEmpty) {
throw HyperspaceException(
"Refresh aborted as no new deleted or appended source data file found.")
}
}

final override def op(): Unit = {
logInfo(
"Refresh index is updating index metadata by adding " +
s"${newDeletedFiles.length} new files to the list of deleted source data files and " +
s"${newAppendedFiles.length} new files to the list of appended source data files.")
}

/**
* Compute new index fingerprint using latest source data files and create
* new IndexLogEntry with updated list of deleted and appended source data
* files and new index fingerprint.
*
* @return Updated IndexLogEntry.
*/
final override def logEntry: LogEntry = {
// Compute index fingerprint using current source data file.
val signatureProvider = LogicalPlanSignatureProvider.create()
val newSignature = signatureProvider.signature(df.queryExecution.optimizedPlan) match {
case Some(s) =>
LogicalPlanFingerprint(
LogicalPlanFingerprint.Properties(Seq(Signature(signatureProvider.name, s))))

case None => throw HyperspaceException("Invalid source plan found during index refresh.")
}

// Grab nested structures from previous IndexLogEntry.
val source = previousIndexLogEntry.source
val plan = source.plan
val planProps = plan.properties
val relation = planProps.relations.head
val data = relation.data
val dataProps = data.properties
val deleted = dataProps.deleted
val appended = dataProps.appended

// Create a new IndexLogEntry by updating deleted files and fingerprint.
previousIndexLogEntry.copy(
source = source.copy(
plan = plan.copy(
properties = planProps.copy(
fingerprint = newSignature,
relations = Seq(
relation.copy(
data = data.copy(
properties = dataProps.copy(
deleted = deleted ++ newDeletedFiles,
appended = appended ++ newAppendedFiles))))))))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ class IndexCollectionManager(
val dataManager = indexDataManagerFactory.create(indexPath)
if (HyperspaceConf.refreshDeleteEnabled(spark)) {
new RefreshDeleteAction(spark, logManager, dataManager).run()
} else if (HyperspaceConf.refreshSourceContentEnabled(spark)) {
new RefreshSourceContentAction(spark, logManager, dataManager).run()
} else {
new RefreshAction(spark, logManager, dataManager).run()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,7 @@ object IndexConstants {

val REFRESH_DELETE_ENABLED = "spark.hyperspace.index.refresh.delete.enabled"
val REFRESH_DELETE_ENABLED_DEFAULT = "false"

val REFRESH_SOURCE_CONTENT_ENABLED = "spark.hyperspace.index.refresh.source.content.enabled"
val REFRESH_SOURCE_CONTENT_ENABLED_DEFAULT = "false"
}
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ case class Hdfs(properties: Hdfs.Properties) {
val kind = "HDFS"
}
object Hdfs {
case class Properties(content: Content)
case class Properties(content: Content, deleted: Seq[String] = Nil, appended: Seq[String] = Nil)
}

// IndexLogEntry-specific Relation that represents the source relation.
Expand Down Expand Up @@ -410,6 +410,14 @@ case class IndexLogEntry(
sourcePlanSignatures.head
}

def deletedFiles: Seq[String] = {
relations.head.data.properties.deleted
}

def appendedFiles: Seq[String] = {
relations.head.data.properties.appended
}

def hasLineageColumn(spark: SparkSession): Boolean = {
ResolverUtils
.resolve(spark, IndexConstants.DATA_FILE_NAME_COLUMN, schema.fieldNames)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,17 @@ case class CancelActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: St
case class RefreshDeleteActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: String)
extends HyperspaceIndexCRUDEvent

/**
* Index Refresh Event for deleted and appended source files. Emitted when refresh is called
* on an index to update index metadata according to latest source data files.
*
* @param appInfo AppInfo for spark application.
* @param index Related index.
* @param message Message about event.
*/
case class RefreshSourceContentActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: String)
extends HyperspaceIndexCRUDEvent

/**
* Index usage event. This event is emitted when an index is picked instead of original data
* source by one of the hyperspace rules.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,12 @@ object HyperspaceConf {
IndexConstants.REFRESH_DELETE_ENABLED_DEFAULT)
.toBoolean
}

def refreshSourceContentEnabled(spark: SparkSession): Boolean = {
spark.conf
.get(
IndexConstants.REFRESH_SOURCE_CONTENT_ENABLED,
IndexConstants.REFRESH_SOURCE_CONTENT_ENABLED_DEFAULT)
.toBoolean
}
}
4 changes: 2 additions & 2 deletions src/test/scala/com/microsoft/hyperspace/SampleData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ object SampleData {
val df = testData.toDF(columns: _*)
partitionColumns match {
case Some(pcs) =>
df.write.partitionBy(pcs: _*).parquet(path)
df.write.mode("overwrite").partitionBy(pcs: _*).parquet(path)
case None =>
df.write.parquet(path)
df.write.mode("overwrite").parquet(path)
}
}
}
Loading