-
Notifications
You must be signed in to change notification settings - Fork 4k
Description
Describe the enhancement requested
Based on discussion in the 2023-08-30 Arrow community meeting. This is a continuation of #35568 and #33986.
We'd like to have a protocol for sharing unmaterialized datasets that:
- Can be consumed as one or more streams of Arrow data
- Can have projections and filters pushed down to the scanner
This would provide a extendible connection between scanners and query engines. Data formats might include Iceberg, Delta Lake, Lance, and PyArrow datasets (parquet, JSON, CSV). Query engines could include DuckDB, DataFusion, Polars, PyVelox, PySpark, Ray, and Dask. Such a connection would let end-users employ their preferred query engine to load any supported dataset. From their perspective, usage would might look like:
from deltalake import DeltaTable
table = DeltaTable("path/to/table")
import duckdb
duckdb.sql("SELECT y FROM table WHERE x > 3")The protocol is largely invisible to the user. Behind the scenes, duckdb would call __arrow_scanner__() on table to get a scannable object. It would then pass down the column selection ['y'] and the filter x > 3 to the scanner, and get the get the resulting data stream as input to the query.
Shape of the protocol
The overall shape would look roughly like:
from abc import ABC
class AbstractArrowScannable(ABC):
def __arrow_scanner__(self) -> AbstractArrowScanner
class AbstractArrowScanner(ABC):
def get_schema(self) -> capsule[ArrowSchema]:
...
def get_stream(
self,
columns: List[str],
filter: SubstraitExpression,
) -> capsule[ArrowArrayStream]:
...
def get_partitions(self, filter: filter: SubstraitExpression) -> list[AbstractArrowScanner]:
...Data and schema are returned as C Data Interface objects (see: #35531). Predicates are passed as Substrait extended expressions.
Component(s)
Python