-
Notifications
You must be signed in to change notification settings - Fork 116
Incremental Refresh support with scan patterns #298
base: master
Are you sure you want to change the base?
Conversation
|
Note to reviewers: Tests for delete scenarios have not been added yet. Please have a look for general idea and comments. Nit comments are ok but not necessary at this time. Thank you. |
|
|
||
| /** df representing the complete set of data files which will be indexed once this refresh action | ||
| * finishes. */ | ||
| override protected lazy val df = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: This df represents all the data this will be indexed after this Action finishes.
E.g. if previously indexed files are f1, f2, f3
Newly added files are f4, f5, f6
but scan pattern matches only f4
Then this df will represent f1, f2, f3, f4
df is used in creation of metadata to represent the current snapshot of indexed data files (as before)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is also required for correct signature calculation of indexed data files
| def isMatch(path: String, scanPattern: String): Boolean = { | ||
| val scanSplits = scanPattern.split(Path.SEPARATOR) | ||
| scanSplits.nonEmpty && path.split(Path.SEPARATOR).contains(scanSplits.head) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
true if a path matches scan pattern
| def resolve(path: String, scanPattern: String): String = { | ||
| val scanSplits: Array[String] = scanPattern.split(Path.SEPARATOR) | ||
| val pathSplits: Array[String] = path.split(Path.SEPARATOR) | ||
| val splitPoint: Int = pathSplits.lastIndexOf(scanSplits.head) | ||
| var (prefix, suffix) = pathSplits.splitAt(splitPoint) | ||
|
|
||
| for (j <- 0 until math.max(scanSplits.length, suffix.length)) { | ||
| val resolvedPart = (Try(suffix(j)), Try(scanSplits(j))) match { | ||
| case (Success(path), Success(scan)) if FilenameUtils.wildcardMatch(path, scan) => path | ||
| case (Success(path), Success(scan)) if FilenameUtils.wildcardMatch(scan, path) => scan | ||
| case (Success(_), Success(_)) => throw new Exception("Incompatible scan pattern") | ||
| case (Success(path), Failure(_)) => path | ||
| case (Failure(_), Success(scan)) => scan | ||
| case _ => throw new Exception("Unexpected Exception") | ||
| } | ||
|
|
||
| prefix :+= resolvedPart | ||
| } | ||
| prefix.mkString(Path.SEPARATOR) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resolves a path with scan pattern. E.g.
path = "c:/data/"
scanPattern = "data/y=20*"
resolvedPath = "c:/data/y=20*"
e.g.
path = "c:/root/data/*/*/*"
scanPattern = "data/a*"
resolvedPath = "c:/root/data/a*/*/*" // for every level, we pick the more restrictive part. e.g. from * and a* we choose a* because it's more restrictive.
| override protected lazy val currentFiles: Set[FileInfo] = { | ||
| val relation = previousIndexLogEntry.relations.head | ||
| val dataSchema = DataType.fromJson(relation.dataSchemaJson).asInstanceOf[StructType] | ||
| val changedDf = spark.read | ||
| .schema(dataSchema) | ||
| .format(relation.fileFormat) | ||
| .options(relation.options) | ||
| .load(resolvedPaths: _*) | ||
| changedDf.queryExecution.optimizedPlan | ||
| .collect { | ||
| case LogicalRelation( | ||
| HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _), | ||
| _, | ||
| _, | ||
| _) => | ||
| location | ||
| .allFiles() | ||
| .map { f => | ||
| // For each file, if it already has a file id, add that id to its corresponding | ||
| // FileInfo. Note that if content of an existing file is changed, it is treated | ||
| // as a new file (i.e. its current file id is no longer valid). | ||
| val id = fileIdTracker.addFile(f) | ||
| FileInfo(f, id, asFullPath = true) | ||
| } | ||
| } | ||
| .flatten | ||
| .toSet | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
current files represents ALL files which match the scan pattern. Some of these files may already be indexed"
| override protected lazy val deletedFiles: Seq[FileInfo] = { | ||
| // Helper function to check if a file belongs to one of the resolved paths. | ||
| def fromResolvedPaths(file: FileInfo): Boolean = { | ||
| resolvedPaths.exists(p => FilenameUtils.wildcardMatch(file.name, p)) | ||
| } | ||
|
|
||
| val originalFiles = previousIndexLogEntry.relations.head.data.properties.content.fileInfos | ||
| .filter(fromResolvedPaths) | ||
|
|
||
| (originalFiles -- currentFiles).toSeq | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deleted files which match the scan pattern. We do not handle deleted files outside of scan pattern to avoid full file listing.
| // Store basePath of hive-partitioned data sources, if applicable. | ||
| val basePath = options.get("basePath") match { | ||
| case None => PathUtils.extractBasePath(location.partitionSpec()) | ||
| case p => p | ||
| } | ||
|
|
||
| // "path" key in options can incur multiple data read unexpectedly. | ||
| val opts = options - "path" | ||
| // Since "options" is case-insensitive map, it will change any previous entries of | ||
| // "basePath" to lowercase "basepath", making it unusable. | ||
| // Remove lowercase "basepath" and add proper cased "basePath". | ||
| val opts = basePath match { | ||
| case Some(path) => Map("basePath" -> path) ++ options - "path" - "basepath" | ||
| case _ => options - "path" - "basepath" | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is from another dependency PR #281
| /** | ||
| * Extract base data source path for from a given partition spec. | ||
| * @param partitionSpec PartitionSpec. | ||
| * @return Optional base path if partition spec is non empty. Else, None. | ||
| */ | ||
| def extractBasePath(partitionSpec: PartitionSpec): Option[String] = { | ||
| if (partitionSpec == PartitionSpec.emptySpec) { | ||
| None | ||
| } else { | ||
| // For example, we could have the following in PartitionSpec: | ||
| // - partition columns = "col1", "col2" | ||
| // - partitions: "/path/col1=1/col2=1", "/path/col1=1/col2=2", etc. | ||
| // , and going up the same number of directory levels as the number of partition columns | ||
| // will compute the base path. Note that PartitionSpec.partitions will always contain | ||
| // all the partitions in the path, so "partitions.head" is taken as an initial value. | ||
| val basePath = partitionSpec.partitionColumns | ||
| .foldLeft(partitionSpec.partitions.head.path)((path, _) => path.getParent) | ||
| Some(basePath.toString) | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from dependency PR #281
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you clean up extractBasePath impl?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you simply measure and share the performance described in "Success criteria" in #233 before moving forward?
I think the only gain is not performing listFiles for all source files ( list files => toSet => calc appended / deleted files), but a user should specify the exact scan patterns for all update. (if not, the index won't be applied)
If a user wants to run a query only for that portion of the data, then that makes sense though it makes hard to maintain the index.
| /** df representing the complete set of data files which will be indexed once this refresh action | ||
| * finishes. */ | ||
| override protected lazy val df = { | ||
| val relation = previousIndexLogEntry.relations.head |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to call val latestRelation = Hyperspace.getContext(spark).sourceProviderManager.refreshRelation(relations.head)
| changedDf.queryExecution.optimizedPlan | ||
| .collect { | ||
| case LogicalRelation( | ||
| HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to use SourceProvider API + If we don't support Delta Lake or any other sources, we need to check the source type and throw a proper exception.
Please create a function like "isScanPatternRefreshSupported" and throw an exception.
Yes that's the only goal. If user appended a small bunch of files, we could avoid listing all files. I am working on the perf part but other than that, could you please review the code for functionality? (thanks for the other comments as well). (We are not aiming for this specialized scenario where the user has partial indexes and also wants to query the partial data set on which the index is created) cc @rapoth, @imback82 |
# Conflicts: # src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala # src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedSource.scala
We should be able to figure out whether the partition in question is up to data (instead of enabling hybrid scan), no? |
The specific ask has been only for the case where refresh is needed on a couple of partitions. How can we solve this: |
|
Actually I doubt the usability of this feature.. What if a user tires to refresh index with To apply the index, a user should set the exact file lists when they are writing the query, without Hybrid Scan. In other words, the relation in their query should contain only original source file list + refreshed source file list (with scan pattern), historically. |
I assume if the user is using this feature, they would select the right partition in their query. I think we need to look at partition columns in the user query and only consider source files under that partition when applying index & Hybrid scan. (This should improve Hybrid scan scenario quite a bit) |
|
|
||
| /** paths resolved with scan pattern. | ||
| * Paths merged With scan pattern to choose more selective option | ||
| * e.g. if rootPath |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you update this?
| } else { | ||
| // New entry. | ||
| entry | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Coud you try this?
val updatedEntry = super[RefreshIncrementalAction].logEntry()
| entry | ||
| } | ||
|
|
||
| val relation = entry.source.plan.properties.relations.head |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| val relation = entry.source.plan.properties.relations.head | |
| val relation = updatedEntry.source.plan.properties.relations.head |
| val relation = entry.source.plan.properties.relations.head | ||
| val updatedRelation = | ||
| relation.copy(rootPaths = previousIndexLogEntry.relations.head.rootPaths) | ||
| updatedEntry.copy( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems not clear. Instead of this, how about passing ORIGINAL_ROOT_PATHS option via df.options? we could add it in val df = and exclude it in createRelation?
| // Root paths could be directories or leaf files. Make sure that all root paths either | ||
| // match the glob paths, in case of directories, or belong to glob paths, in case of | ||
| // files. | ||
| if (!location.rootPaths.forall(p => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this relevant to this PR or a missing part in the previous PR?
| // `InMemoryFileIndex` implementation returns `SerializableFileStatus` instead of the | ||
| // standard API's `FileStatus`. | ||
| index.allFiles.map(_.asInstanceOf[FileStatus]) | ||
| index.allFiles |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please keep this implementation (see above comments)
| /** | ||
| * Extract base data source path for from a given partition spec. | ||
| * @param partitionSpec PartitionSpec. | ||
| * @return Optional base path if partition spec is non empty. Else, None. | ||
| */ | ||
| def extractBasePath(partitionSpec: PartitionSpec): Option[String] = { | ||
| if (partitionSpec == PartitionSpec.emptySpec) { | ||
| None | ||
| } else { | ||
| // For example, we could have the following in PartitionSpec: | ||
| // - partition columns = "col1", "col2" | ||
| // - partitions: "/path/col1=1/col2=1", "/path/col1=1/col2=2", etc. | ||
| // , and going up the same number of directory levels as the number of partition columns | ||
| // will compute the base path. Note that PartitionSpec.partitions will always contain | ||
| // all the partitions in the path, so "partitions.head" is taken as an initial value. | ||
| val basePath = partitionSpec.partitionColumns | ||
| .foldLeft(partitionSpec.partitions.head.path)((path, _) => path.getParent) | ||
| Some(basePath.toString) | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you clean up extractBasePath impl?
| import com.microsoft.hyperspace.index.IndexConstants.GLOBBING_PATTERN_KEY | ||
| import com.microsoft.hyperspace.util.PathUtils | ||
|
|
||
| class RefreshScanTestsGlobData extends QueryTest with HyperspaceSuite { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add tests for DeltaLake source relation? Since it may require additional code change, we need to add the tests in this change. Otherwise, we could open a new PR for delta lake test based on this PR to check the behavior.
What is the context for this pull request?
Note to reviewers: Please follow the tracking issue for general algorithm used.
What changes were proposed in this pull request?
This PR supports incremental index refresh when scan patterns are provided
hyperspace.refreshIndex("indexName", scanPattern="<scan-pattern>")Therefore, the user would use this API in one of the following ways:
Data change in a particular partition:
refreshIndex("idx1", "incremental", scanPattern=Some("data/year=2020/month=10/day=29/hour=03/*"))Data change in a particular hierarchy:
refreshIndex("idx1", "incremental", scanPattern=Some("data/year=2020/month=07/*"))Supported scenarios:
Does this PR introduce any user-facing change?
Yes we introduce new refresh apis which can take scan pattern.
Constraints:
The scan pattern MUST start with the data folder which was picked for indexing. In the above examples, the scan pattern starts with
datawhich is the base path for indexing.How was this patch tested?
unit tests added