-
Notifications
You must be signed in to change notification settings - Fork 116
[PROPOSAL]: Support for Scan Pattern for Scalable Refresh Index for Large Data Sources #276
Description
Describe the problem
Goals
Original Issue #223, Dependencies: #269
- Support incremental refresh call on scan pattern
- Prevent full file listing of df creation when incremental refresh is called with scan pattern
Independent Scenarios to support
-
Globbing data sources
E.g. original source data path =data/*/*/*/*- create index supports this <PR # handles this>
- Incremental refresh on scanPattern =
data/*/a/* - Incremental refresh on scanPattern =
data/*/a
hs.refreshIndex("indexname", scanPattern = "data/*/m=11/*")
-
Hive-partitioned data sources
E.g. original source data path = data- create index supports this today
- Incremental refresh on scanPattern =
data/y=2020/m=11/ - Incremental refresh on scanPattern =
data/y=2020/* - Incremental refresh on scanPattern =
data/*/m=11
hs.refreshIndex("indexname", scanPattern = "data/*/b/*")
Describe your proposed solution
General Ideas:
-
Setting "basePath" option in a dataframe is sufficient to inform spark of hive-partition schema if it exists.
-
For hive-partitioned data source: setting rootpath + scanPattern should suffice for listing the right set of files for refresh.
- e.g. if actual files look like data/y=2020/m=11/d=1/f1.parquet and scanPattern is data/y=2020/*,
dfUsedForRefresh = spark.read.option("basePath", "data").load("data/y=2020")can be used to refresh index. This will ensure the partition columns are picked and only the files from "data/y=2020" are picked.
- e.g. if actual files look like data/y=2020/m=11/d=1/f1.parquet and scanPattern is data/y=2020/*,
-
For globbing data sources: Setting "path" option to a globbing pattern (starting from data root, until the leaf directory) will tell spark to list which files to use in refresh.
- e.g. if actual files look like
data/a/b/c/d/f1.parquet, and therefore,globbing pattern = data/*/*/*/*, and scanPattern isdata/*/b/*,
dfUsedForRefresh = spark.read.load("data/*/b/*/*")can be used to refresh index. This will ensure only the files from "data/y=2020" are picked.
- e.g. if actual files look like
Algo:
-
Globbing Data Sources
CreateIndex:
- Set rootPath (in metadata) =data/*/*/*/*(same as globbing pattern)
Refresh Incremental:
- scanPattern =data/*/b/*
- Note: "basePath" is not required for globbing patterns but it does no harm even if present
- mergedPath = merge(globPath, scanPattern) =>
e.g.merge (data/*/*/*/*, data/*/b/*) = data/*/b/*/*
-dfUsedForRefresh = spark.read.options(...).format(...).load(mergedPath)
- Metadata changes => new metadata = old metadata + new Index Files in index content + new data files (from scan pattern) in data content -
Hive-Partitioned Data Sources
CreateIndex:
- Set rootPath (in metadata) = "data" (this is done currently)
- Set basePath (in metadata) = "data" (this is new)
Refresh Incremental:
- scanPattern =data/*/m=11/*
- Note: "basePath" is required to correctly identify partition columns.
- Set "basePath" = basePath from metadata
- mergedPath = merge(globPath, scanPattern) =>
e.g. merge (data, data/*/m=11/*) =data/*/m=11/*
-dfUsedForRefresh = spark.read.options(...).format(...).option("basePath", basePath).load(mergedPath)=> note the additional "basePath" option
- Metadata changes => new metadata = old metadata + new index files in index content + new data files (from scan pattern) in data content
Additional context
Constraints:
How to handle deleted data? Should we consider deleted data across ALL directories or just those which satisfy the scan pattern?
-
If we stick with ALL directories, we fail on goal 2 (not listing all files)
-
If we stick with only scan-pattern directories, we will have an inconsistent situation where the metadata doesn't reflect the true list of deleted files. : (Note: This can be supported by assuming the other deleted files were deleted after incremental refresh)
- This design only considers append only data. We will come back to the design which would support deleted file scenarios.
- Data will either be hive-partitioned or globbing pattern but not a combination of both. This should support most user scenarios.