Skip to content
This repository was archived by the owner on Jun 14, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/main/scala/com/microsoft/hyperspace/Hyperspace.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ class Hyperspace(spark: SparkSession) {
* @param mode Refresh mode. Currently supported modes are `incremental` and `full`.
*/
def refreshIndex(indexName: String, mode: String): Unit = {
indexManager.refresh(indexName, mode)
indexManager.refresh(indexName, mode, None)
}

def refreshIndex(indexName: String, mode: String, scanPattern: Option[String]): Unit = {
indexManager.refresh(indexName, mode, scanPattern)
}

/**
Expand Down
169 changes: 169 additions & 0 deletions src/main/scala/com/microsoft/hyperspace/actions/RefreshScan.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Copyright (2020) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.actions

import scala.util.{Failure, Success, Try}

import org.apache.commons.io.FilenameUtils
import org.apache.hadoop.fs.Path
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex}
import org.apache.spark.sql.types.{DataType, StructType}

import com.microsoft.hyperspace.index._

class RefreshScan(
spark: SparkSession,
logManager: IndexLogManager,
dataManager: IndexDataManager,
scanPattern: String)
extends RefreshIncrementalAction(spark, logManager, dataManager) {

/** df representing the complete set of data files which will be indexed once this refresh action
* finishes. */
override protected lazy val df = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: This df represents all the data this will be indexed after this Action finishes.
E.g. if previously indexed files are f1, f2, f3
Newly added files are f4, f5, f6
but scan pattern matches only f4

Then this df will represent f1, f2, f3, f4

df is used in creation of metadata to represent the current snapshot of indexed data files (as before)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is also required for correct signature calculation of indexed data files

val relation = previousIndexLogEntry.relations.head
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to call val latestRelation = Hyperspace.getContext(spark).sourceProviderManager.refreshRelation(relations.head)

val previouslyIndexedData = relation.data.properties.content
val newlyIndexedData = previouslyIndexedData.fileInfos -- deletedFiles ++ appendedFiles
val newlyIndexedDataFiles: Seq[String] = newlyIndexedData.map(_.name).toSeq

val dataSchema = DataType.fromJson(relation.dataSchemaJson).asInstanceOf[StructType]
spark.read
.schema(dataSchema)
.format(relation.fileFormat)
.options(relation.options)
.load(newlyIndexedDataFiles: _*)
}

private def isMatch(path: String, scanPattern: String): Boolean = {
val scanSplits = scanPattern.split(Path.SEPARATOR)
scanSplits.nonEmpty && path.split(Path.SEPARATOR).contains(scanSplits.head)
}

private def resolve(path: String, scanPattern: String): String = {
val scanSplits: Array[String] = scanPattern.split(Path.SEPARATOR)
val pathSplits: Array[String] = path.split(Path.SEPARATOR)
val splitPoint: Int = pathSplits.lastIndexOf(scanSplits.head)
var (prefix, suffix) = pathSplits.splitAt(splitPoint)

for (j <- 0 until math.max(scanSplits.length, suffix.length)) {
val resolvedPart = (Try(suffix(j)), Try(scanSplits(j))) match {
case (Success(path), Success(scan)) if FilenameUtils.wildcardMatch(path, scan) => path
case (Success(path), Success(scan)) if FilenameUtils.wildcardMatch(scan, path) => scan
case (Success(_), Success(_)) => throw new Exception("Incompatible scan pattern")
case (Success(path), Failure(_)) => path
case (Failure(_), Success(scan)) => scan
case _ => throw new Exception("Unexpected Exception")
}

prefix :+= resolvedPart
}
prefix.mkString(Path.SEPARATOR)
}

/** paths resolved with scan pattern.
* Paths merged With scan pattern to choose more selective option
* e.g. if rootPath
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you update this?

*/
private lazy val resolvedPaths = {
val relation = previousIndexLogEntry.relations.head
relation.rootPaths.collect {
case path if isMatch(path, scanPattern) => resolve(path, scanPattern)
}
}

private def checkPathExists(path: String): Boolean = {
val hadoopConf = spark.sessionState.newHadoopConf()
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(hadoopConf)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
globPath.nonEmpty && globPath.forall(fs.exists)
}

override def logEntry: LogEntry = {
// TODO: Deduplicate from super.logEntry()
val entry = getIndexLogEntry(spark, df, indexConfig, indexDataPath)

// If there is no deleted files, there are index data files only for appended data in this
// version and we need to add the index data files of previous index version.
// Otherwise, as previous index data is rewritten in this version while excluding
// indexed rows from deleted files, all necessary index data files exist in this version.
val updatedEntry = if (deletedFiles.isEmpty) {
// Merge new index files with old index files.
val mergedContent = Content(previousIndexLogEntry.content.root.merge(entry.content.root))
entry.copy(content = mergedContent)
} else {
// New entry.
entry
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coud you try this?

val updatedEntry = super[RefreshIncrementalAction].logEntry()


val relation = entry.source.plan.properties.relations.head
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
val relation = entry.source.plan.properties.relations.head
val relation = updatedEntry.source.plan.properties.relations.head

val updatedRelation =
relation.copy(rootPaths = previousIndexLogEntry.relations.head.rootPaths)
updatedEntry.copy(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems not clear. Instead of this, how about passing ORIGINAL_ROOT_PATHS option via df.options? we could add it in val df = and exclude it in createRelation?

source = updatedEntry.source.copy(plan = updatedEntry.source.plan.copy(
properties = updatedEntry.source.plan.properties.copy(relations = Seq(updatedRelation)))))
}

/** Deleted files which match resolved paths */
override protected lazy val deletedFiles: Seq[FileInfo] = {
// Helper function to check if a file belongs to one of the resolved paths.
def fromResolvedPaths(file: FileInfo): Boolean = {
resolvedPaths.exists(p => FilenameUtils.wildcardMatch(file.name, p))
}

val originalFiles = previousIndexLogEntry.relations.head.data.properties.content.fileInfos
.filter(fromResolvedPaths)

(originalFiles -- currentFiles).toSeq
}
Comment on lines +125 to +135
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deleted files which match the scan pattern. We do not handle deleted files outside of scan pattern to avoid full file listing.


override protected lazy val currentFiles: Set[FileInfo] = {
resolvedPaths.filter(checkPathExists) match {
case Nil => Set.empty
case _ =>
val relation = previousIndexLogEntry.relations.head
val dataSchema = DataType.fromJson(relation.dataSchemaJson).asInstanceOf[StructType]
val changedDf = spark.read
.schema(dataSchema)
.format(relation.fileFormat)
.options(relation.options)
.load(resolvedPaths: _*)
changedDf.queryExecution.optimizedPlan
.collect {
case LogicalRelation(
HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to use SourceProvider API + If we don't support Delta Lake or any other sources, we need to check the source type and throw a proper exception.
Please create a function like "isScanPatternRefreshSupported" and throw an exception.

_,
_,
_) =>
location
.allFiles()
.map { f =>
// For each file, if it already has a file id, add that id to its corresponding
// FileInfo. Note that if content of an existing file is changed, it is treated
// as a new file (i.e. its current file id is no longer valid).
val id = fileIdTracker.addFile(f)
FileInfo(f, id, asFullPath = true)
}
}
.flatten
.toSet
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ class CachingIndexCollectionManager(
super.vacuum(indexName)
}

override def refresh(indexName: String, mode: String): Unit = {
override def refresh(indexName: String, mode: String, scanPattern: Option[String]): Unit = {
clearCache()
super.refresh(indexName, mode)
super.refresh(indexName, mode, scanPattern)
}

override def optimize(indexName: String, mode: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,25 @@ class IndexCollectionManager(
}
}

override def refresh(indexName: String, mode: String): Unit = {
override def refresh(
indexName: String,
mode: String,
scanPattern: Option[String] = None): Unit = {
withLogManager(indexName) { logManager =>
val indexPath = PathResolver(spark.sessionState.conf).getIndexPath(indexName)
val dataManager = indexDataManagerFactory.create(indexPath)

if (scanPattern.isDefined && !mode.equalsIgnoreCase(REFRESH_MODE_INCREMENTAL)) {
throw HyperspaceException(
"Scan Patterns are currently only supported for " +
s"$REFRESH_MODE_INCREMENTAL.")
}

if (mode.equalsIgnoreCase(REFRESH_MODE_INCREMENTAL)) {
new RefreshIncrementalAction(spark, logManager, dataManager).run()
scanPattern match {
case Some(pattern) => new RefreshScan(spark, logManager, dataManager, pattern).run()
case _ => new RefreshIncrementalAction(spark, logManager, dataManager).run()
}
} else if (mode.equalsIgnoreCase(REFRESH_MODE_FULL)) {
new RefreshAction(spark, logManager, dataManager).run()
} else if (mode.equalsIgnoreCase(REFRESH_MODE_QUICK)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ trait IndexManager {
*
* @param indexName Name of the index to refresh.
*/
def refresh(indexName: String, mode: String): Unit
def refresh(indexName: String, mode: String, scanPattern: Option[String]): Unit

/**
* Optimize index by changing the underlying index data layout (e.g., compaction).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.microsoft.hyperspace.index.sources.default

import java.util.Locale

import org.apache.commons.io.FilenameUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.spark.deploy.SparkHadoopUtil
Expand Down Expand Up @@ -106,7 +107,14 @@ class DefaultFileBasedSource(private val spark: SparkSession) extends FileBasedS
.toMap

val globPathValues = globPaths.values.flatten.toSet
if (!location.rootPaths.forall(globPathValues.contains)) {

// Root paths could be directories or leaf files. Make sure that all root paths either
// match the glob paths, in case of directories, or belong to glob paths, in case of
// files.
if (!location.rootPaths.forall(p =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this relevant to this PR or a missing part in the previous PR?

globPathValues.exists(g =>
FilenameUtils.equalsNormalized(p.toString, g.toString) ||
FilenameUtils.directoryContains(g.toString, p.toString)))) {
throw HyperspaceException(
"Some glob patterns do not match with available root " +
s"paths of the source data. Please check if $pattern matches all of " +
Expand Down Expand Up @@ -281,7 +289,7 @@ class DefaultFileBasedSource(private val spark: SparkSession) extends FileBasedS
// Keep the `asInstanceOf` to force casting or fallback because Databrick's
// `InMemoryFileIndex` implementation returns `SerializableFileStatus` instead of the
// standard API's `FileStatus`.
index.allFiles.map(_.asInstanceOf[FileStatus])
index.allFiles
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please keep this implementation (see above comments)

} catch {
case e: ClassCastException if e.getMessage.contains("SerializableFileStatus") =>
val dbClassName = "org.apache.spark.sql.execution.datasources.SerializableFileStatus"
Expand Down
22 changes: 22 additions & 0 deletions src/main/scala/com/microsoft/hyperspace/util/PathUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.microsoft.hyperspace.util

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.spark.sql.execution.datasources.PartitionSpec

object PathUtils {
def makeAbsolute(path: String): Path = makeAbsolute(new Path(path))
Expand All @@ -27,6 +28,27 @@ object PathUtils {
fs.makeQualified(path)
}

/**
* Extract base data source path for from a given partition spec.
* @param partitionSpec PartitionSpec.
* @return Optional base path if partition spec is non empty. Else, None.
*/
def extractBasePath(partitionSpec: PartitionSpec): Option[String] = {
if (partitionSpec == PartitionSpec.emptySpec) {
None
} else {
// For example, we could have the following in PartitionSpec:
// - partition columns = "col1", "col2"
// - partitions: "/path/col1=1/col2=1", "/path/col1=1/col2=2", etc.
// , and going up the same number of directory levels as the number of partition columns
// will compute the base path. Note that PartitionSpec.partitions will always contain
// all the partitions in the path, so "partitions.head" is taken as an initial value.
val basePath = partitionSpec.partitionColumns
.foldLeft(partitionSpec.partitions.head.path)((path, _) => path.getParent)
Some(basePath.toString)
}
}

Comment on lines +31 to +51
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from dependency PR #281

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clean up extractBasePath impl?

/* Definition taken from org.apache.spark.sql.execution.datasources.PartitionAwareFileIndex. */
// SPARK-15895: Metadata files (e.g. Parquet summary files) and temporary files should not be
// counted as data files, so that they shouldn't participate partition discovery.
Expand Down
Loading