From 733a6f3a3a928122eaec38dd390ef10cc9faabf7 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 11 May 2023 18:40:59 -0700 Subject: [PATCH 1/5] feat: sketch out a minimal protocol interface --- .../{dataset.py => dataset/__init__.py} | 0 python/pyarrow/dataset/protocol.py | 77 +++++++++++++++++++ 2 files changed, 77 insertions(+) rename python/pyarrow/{dataset.py => dataset/__init__.py} (100%) create mode 100644 python/pyarrow/dataset/protocol.py diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset/__init__.py similarity index 100% rename from python/pyarrow/dataset.py rename to python/pyarrow/dataset/__init__.py diff --git a/python/pyarrow/dataset/protocol.py b/python/pyarrow/dataset/protocol.py new file mode 100644 index 00000000000..4e4eb84106b --- /dev/null +++ b/python/pyarrow/dataset/protocol.py @@ -0,0 +1,77 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Protocol definitions for pyarrow.dataset + +These provide the abstract interface for a dataset. Other libraries may implement +this interface to expose their data, without having to extend PyArrow's classes. + +Applications and libraries that want to consume datasets should accept datasets +that implement these protocols, rather than requiring the specific +PyArrow classes. +""" +from abc import abstractmethod +from typing import Iterator, List, Optional, Protocol + +from pyarrow.dataset import Expression +from pyarrow import Table, IntegerArray, RecordBatch, RecordBatchReader, Schema + + +class Scanner(Protocol): + @abstractmethod + def count_rows(self) -> int: + ... + + @abstractmethod + def head(self, num_rows: int) -> Table: + ... + + @abstractmethod + def take(self, indices: IntegerArray) -> Table: + ... + + @abstractmethod + def to_table(self) -> Table: + ... + + @abstractmethod + def to_batches(self) -> Iterator[RecordBatch]: + ... + + @abstractmethod + def to_reader(self) -> RecordBatchReader: + ... + + +class Scannable(Protocol): + @abstractmethod + def scanner(self, columns: Optional[List[str]] = None, + filter: Optional[Expression] = None, **kwargs) -> Scanner: + ... + + @abstractmethod + def schema(self) -> Schema: + ... + + +class Fragment(Scannable): + ... + + +class Dataset(Scannable): + @abstractmethod + def get_fragments(self, filter: Optional[Expression] = None) -> Iterator[Fragment]: + ... From b81ca2dfe85e5ebd12198db54b29e227fcf1b972 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sun, 28 May 2023 13:47:03 -0700 Subject: [PATCH 2/5] wip: start a document --- docs/source/python/integration/dataset.rst | 90 ++++++++++++++++++++++ python/pyarrow/dataset/protocol.py | 11 ++- 2 files changed, 97 insertions(+), 4 deletions(-) create mode 100644 docs/source/python/integration/dataset.rst diff --git a/docs/source/python/integration/dataset.rst b/docs/source/python/integration/dataset.rst new file mode 100644 index 00000000000..7e25144ce78 --- /dev/null +++ b/docs/source/python/integration/dataset.rst @@ -0,0 +1,90 @@ +.. Licensed to the Apache Software Foundation (ASF) under one +.. or more contributor license agreements. See the NOTICE file +.. distributed with this work for additional information +.. regarding copyright ownership. The ASF licenses this file +.. to you under the Apache License, Version 2.0 (the +.. "License"); you may not use this file except in compliance +.. with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, +.. software distributed under the License is distributed on an +.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +.. KIND, either express or implied. See the License for the +.. specific language governing permissions and limitations +.. under the License. + +.. currentmodule:: pyarrow.dataset + +Extending PyArrow Datasets +========================== + +PyArrow provides a core protocol for datasets, so third-party libraries can both +produce and consume PyArrow datasets. + +Dataset Producers +----------------- + +If you are a library implementing a new data source, you'll want to be able to +produce a PyArrow-compatible dataset. Your dataset could be backed by the classes +implemented in PyArrow or you could implement your own classes. Either way, you +should implement the protocol below. + +When implementing the dataset, consider the following: + +* To scale to very large dataset, don't eagerly load all the fragments into memory. + Instead, load fragments once a filter is passed. This allows you to skip loading + metadata about fragments that aren't relevant to queries. For example, if you + have a dataset that uses Hive-style paritioning for a column ``date`` and the + user passes a filter for ``date=2023-01-01``, then you can skip listing directory + for HIVE partitions that don't match that date. +* Filters passed down should be fully executed. While other systems have scanners + that are "best-effort", only executing the parts of the filter that it can, PyArrow + datasets should always remove all rows that don't match the filter. + + +Dataset Consumers +----------------- + +If you are a query engine, you'll want to be able to +consume any PyArrow datasets. To make sure your integration is compatible +with any dataset, you should only call methods that are included in the +protocol. Dataset implementations provided by PyArrow implements additional +options and methods beyond those, but they should not be relied upon. + +There are two general patterns for consuming PyArrow datasets: reading a single +stream or reading a stream per fragment. + +If you have a streaming execution model, you can recieve a single stream +of data by calling ``dataset.scanner(filter=..., columns=...).to_reader()``. +This will return a RecordBatchReader, which can be exported over the +:ref:`C Stream Interface `. The record batches yield +from the stream can then be passed to worker threads for parallelism. + +If you are using a partition-based or distributed model, you can split the +dataset into fragments and then distribute those fragments into tasks that +create their own scanners and readers. In this case, the code looks more +like: + +.. code-block:: python + + fragments = list(dataset.get_fragments(filter=..., columns=...)) + + def scan_partition(i): + fragment = fragments[i] + scanner = fragment.scanner() + return reader = scanner.to_reader() + +Fragments are pickleable, so they can be passed to remote workers in a +distributed system. + +If your engine supports predicate (filter) and projection (column) pushdown, +you can pass those down to the dataset by passing them to the ``scanner``. + + +The protocol +------------ + +.. literalinclude:: ../../python/pyarrow/dataset/protocol.py + :language: python diff --git a/python/pyarrow/dataset/protocol.py b/python/pyarrow/dataset/protocol.py index 4e4eb84106b..14f783958b2 100644 --- a/python/pyarrow/dataset/protocol.py +++ b/python/pyarrow/dataset/protocol.py @@ -31,6 +31,7 @@ class Scanner(Protocol): + """A scanner implementation for a dataset.""" @abstractmethod def count_rows(self) -> int: ... @@ -43,13 +44,12 @@ def head(self, num_rows: int) -> Table: def take(self, indices: IntegerArray) -> Table: ... - @abstractmethod def to_table(self) -> Table: - ... + self.to_reader().read_all() - @abstractmethod def to_batches(self) -> Iterator[RecordBatch]: - ... + for batch in self.to_reader(): + yield batch @abstractmethod def to_reader(self) -> RecordBatchReader: @@ -68,6 +68,9 @@ def schema(self) -> Schema: class Fragment(Scannable): + """A fragment of a dataset. + + This class should be pickleable so that it can be used in a distrubuted scan.""" ... From 777031122ef3cf32c9eacaa64621681184e5462f Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sun, 11 Jun 2023 18:30:02 -0700 Subject: [PATCH 3/5] add docs, diagram and tests --- docs/source/python/integration.rst | 1 + docs/source/python/integration/dataset.rst | 57 ++++++- .../integration/pyarrow_dataset_protocol.svg | 4 + python/pyarrow/dataset/protocol.py | 144 +++++++++++++++--- python/pyarrow/tests/test_dataset_protocol.py | 29 ++++ 5 files changed, 202 insertions(+), 33 deletions(-) create mode 100644 docs/source/python/integration/pyarrow_dataset_protocol.svg create mode 100644 python/pyarrow/tests/test_dataset_protocol.py diff --git a/docs/source/python/integration.rst b/docs/source/python/integration.rst index 997bc52102f..1c05b9f3e19 100644 --- a/docs/source/python/integration.rst +++ b/docs/source/python/integration.rst @@ -38,3 +38,4 @@ This allows to easily integrate PyArrow with other languages and technologies. integration/python_java integration/extending integration/cuda + integration/dataset diff --git a/docs/source/python/integration/dataset.rst b/docs/source/python/integration/dataset.rst index 7e25144ce78..9d5826f34a1 100644 --- a/docs/source/python/integration/dataset.rst +++ b/docs/source/python/integration/dataset.rst @@ -23,6 +23,39 @@ Extending PyArrow Datasets PyArrow provides a core protocol for datasets, so third-party libraries can both produce and consume PyArrow datasets. +The idea is that any library can have a method that returns their dataset as a +PyArrow dataset. Then, any query engine can consume that dataset and push down +filters and projections. + +.. image:: pyarrow_dataset_protocol.svg + :alt: A diagram showing the workflow for using the PyArrow Dataset protocol. + There are two flows shown, one for streams and one for tasks. The stream + case shows a linear flow from a producer class, to a dataset, to a + scanner, and finally to a RecordBatchReader. The tasks case shows a + similar diagram, except the dataset is split into fragments, which are + then distributed to tasks, which each create their own scanner and + RecordBatchReader. + +Producers are responsible for outputting a class that conforms to the protocol. + +Consumers are responsible for calling methods on the protocol to get the data +out of the dataset. The protocol supports getting data as a single stream or +as a series of tasks which may be distributed. + +From the perspective of a user, this looks something like + +.. code-block:: python + + dataset = producer_library.get_dataset(...) + df = consumer_library.read_dataset(dataset) + df.filter("x > 0").select("y") + +Here, the consumer would pass the filter ``x > 0`` and the projection of ``y`` down +to the producer through the dataset protocol. Thus, the user gets to enjoy the +performance benefits of pushing down filters and projections while being able +to specify those in their preferred query engine. + + Dataset Producers ----------------- @@ -33,15 +66,17 @@ should implement the protocol below. When implementing the dataset, consider the following: -* To scale to very large dataset, don't eagerly load all the fragments into memory. - Instead, load fragments once a filter is passed. This allows you to skip loading - metadata about fragments that aren't relevant to queries. For example, if you - have a dataset that uses Hive-style paritioning for a column ``date`` and the - user passes a filter for ``date=2023-01-01``, then you can skip listing directory - for HIVE partitions that don't match that date. * Filters passed down should be fully executed. While other systems have scanners that are "best-effort", only executing the parts of the filter that it can, PyArrow datasets should always remove all rows that don't match the filter. +* The API does not require that a dataset has metadata about all fragments + loaded into memory. Indeed, to scale to very large Datasets, don't eagerly + load all the fragment metadata into memory. Instead, load fragment metadata + once a filter is passed. This allows you to skip loading metadata about + fragments that aren't relevant to queries. For example, if you have a dataset + that uses Hive-style paritioning for a column ``date`` and the user passes a + filter for ``date=2023-01-01``, then you can skip listing directory for HIVE + partitions that don't match that date. Dataset Consumers @@ -51,7 +86,8 @@ If you are a query engine, you'll want to be able to consume any PyArrow datasets. To make sure your integration is compatible with any dataset, you should only call methods that are included in the protocol. Dataset implementations provided by PyArrow implements additional -options and methods beyond those, but they should not be relied upon. +options and methods beyond those, but they should not be relied upon without +checking for specific classes. There are two general patterns for consuming PyArrow datasets: reading a single stream or reading a stream per fragment. @@ -86,5 +122,10 @@ you can pass those down to the dataset by passing them to the ``scanner``. The protocol ------------ -.. literalinclude:: ../../python/pyarrow/dataset/protocol.py +This module can be imported starting in PyArrow ``13.0.0`` at +``pyarrow.dataset.protocol``. The protocol is defined with ``typing.Protocol`` +classes. They can be checked at runtime with ``isinstance`` but can also be +checked statically with Python type checkers like ``mypy``. + +.. literalinclude:: ../../../../python/pyarrow/dataset/protocol.py :language: python diff --git a/docs/source/python/integration/pyarrow_dataset_protocol.svg b/docs/source/python/integration/pyarrow_dataset_protocol.svg new file mode 100644 index 00000000000..7b6e464ca69 --- /dev/null +++ b/docs/source/python/integration/pyarrow_dataset_protocol.svg @@ -0,0 +1,4 @@ + + + +
Consumer
Consumer
Producer
Producer
Stream
Stream
Tasks
Tasks
PyArrow Dataset Protocol
PyArrow Dataset Protocol
Consumer
Consumer
Producer
Producer
Producer class
Producer class
Dataset
Dataset
Scanner
Scanner
RecordBatchReader
RecordBatchReader
Producer class
Producer class
Dataset
Dataset
Scanner
Scanner
RecordBatchReader
RecordBatchReader
Fragment
Fragment
Scanner
Scanner
RecordBatchReader
RecordBatchReader
Fragment
Fragment
Scanner
Scanner
RecordBatchReader
RecordBatchReader
Fragment
Fragment
Text is not SVG - cannot display
\ No newline at end of file diff --git a/python/pyarrow/dataset/protocol.py b/python/pyarrow/dataset/protocol.py index 14f783958b2..7380ff1eda1 100644 --- a/python/pyarrow/dataset/protocol.py +++ b/python/pyarrow/dataset/protocol.py @@ -22,59 +22,153 @@ Applications and libraries that want to consume datasets should accept datasets that implement these protocols, rather than requiring the specific PyArrow classes. + +See Extending PyArrow Datasets for more information: + +https://arrow.apache.org/docs/python/integration/dataset.html """ -from abc import abstractmethod -from typing import Iterator, List, Optional, Protocol +from abc import abstractmethod, abstractproperty +from typing import Iterator, List, Optional, Protocol, runtime_checkable from pyarrow.dataset import Expression -from pyarrow import Table, IntegerArray, RecordBatch, RecordBatchReader, Schema +from pyarrow import Table, RecordBatchReader, Schema +@runtime_checkable class Scanner(Protocol): - """A scanner implementation for a dataset.""" + """ + A scanner implementation for a dataset. + + This may be a scan of a whole dataset, or a scan of a single fragment. + """ @abstractmethod def count_rows(self) -> int: + """ + Count the number of rows in this dataset. + + Implementors may provide optimized code paths that compute this from metadata. + + Returns + ------- + int + The number of rows in the dataset. + """ ... - + @abstractmethod def head(self, num_rows: int) -> Table: - ... + """ + Get the first ``num_rows`` rows of the dataset. - @abstractmethod - def take(self, indices: IntegerArray) -> Table: + Parameters + ---------- + num_rows : int + The number of rows to return. + + Returns + ------- + Table + A table containing the first ``num_rows`` rows of the dataset. + """ ... - - def to_table(self) -> Table: - self.to_reader().read_all() - - def to_batches(self) -> Iterator[RecordBatch]: - for batch in self.to_reader(): - yield batch @abstractmethod def to_reader(self) -> RecordBatchReader: + """ + Create a Record Batch Reader for this scan. + + This is used to read the data in chunks. + + Returns + ------- + RecordBatchReader + """ ... +@runtime_checkable class Scannable(Protocol): @abstractmethod def scanner(self, columns: Optional[List[str]] = None, - filter: Optional[Expression] = None, **kwargs) -> Scanner: - ... - - @abstractmethod - def schema(self) -> Schema: + filter: Optional[Expression] = None, batch_size: Optional[int] = None, + use_threads: bool = True, + **kwargs) -> Scanner: + """Create a scanner for this dataset. + + Parameters + ---------- + columns : List[str], optional + Names of columns to include in the scan. If None, all columns are + included. + filter : Expression, optional + Filter expression to apply to the scan. If None, no filter is applied. + batch_size : int, optional + The number of rows to include in each batch. If None, the default + value is used. The default value is implementation specific. + use_threads : bool, default True + Whether to use multiple threads to read the rows. It is expected + that consumers reading a whole dataset in one scanner will keep this + as True, while consumers reading a single fragment per worker will + typically set this to False. + + Notes + ----- + The filters must be fully satisfied. If the dataset cannot satisfy the + filter, it should raise an error. + + Only the following expressions are allowed in the filter: + - Equality / inequalities (==, !=, <, >, <=, >=) + - Conjunctions (and, or) + - Field references (e.g. "a" or "a.b.c") + - Literals (e.g. 1, 1.0, "a", True) + - cast + - is_null / not_null + - isin + - between + - negation (not) + + """ ... -class Fragment(Scannable): +@runtime_checkable +class Fragment(Scannable, Protocol): """A fragment of a dataset. - - This class should be pickleable so that it can be used in a distrubuted scan.""" + + This might be a partition, a file, a file chunk, etc. + + This class should be pickleable so that it can be used in a distributed scan.""" ... -class Dataset(Scannable): +@runtime_checkable +class Dataset(Scannable, Protocol): @abstractmethod - def get_fragments(self, filter: Optional[Expression] = None) -> Iterator[Fragment]: + def get_fragments( + self, + filter: Optional[Expression] = None, **kwargs + ) -> Iterator[Fragment]: + """Get the fragments of this dataset. + + Parameters + ---------- + filter : Expression, optional + Filter expression to use to prune which fragments are selected. + See Scannable.scanner for details on allowed filters. The filter is + just used to prune which fragments are selected. It does not need to + save the filter to apply to the scan. That is handled by the scanner. + **kwargs : dict + Additional arguments to pass to underlying implementation. + """ + ... + + @abstractproperty + def schema(self) -> Schema: + """ + Get the schema of this dataset. + + Returns + ------- + Schema + """ ... diff --git a/python/pyarrow/tests/test_dataset_protocol.py b/python/pyarrow/tests/test_dataset_protocol.py new file mode 100644 index 00000000000..f2567415492 --- /dev/null +++ b/python/pyarrow/tests/test_dataset_protocol.py @@ -0,0 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Test that PyArrow datasets conform to the protocol.""" +import pyarrow.dataset.protocol as protocol +import pyarrow.dataset as ds + + +def test_dataset_protocol(): + assert isinstance(ds.Dataset, protocol.Dataset) + assert isinstance(ds.Fragment, protocol.Fragment) + + assert isinstance(ds.Dataset, protocol.Scannable) + assert isinstance(ds.Fragment, protocol.Scannable) + + assert isinstance(ds.Scanner, protocol.Scanner) From 0f8a61cf165090244088e95dee762bf0f9953505 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sun, 11 Jun 2023 21:28:13 -0700 Subject: [PATCH 4/5] refinements --- docs/source/python/integration/dataset.rst | 35 ++++++++++++---------- python/pyarrow/dataset/protocol.py | 8 ++++- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/docs/source/python/integration/dataset.rst b/docs/source/python/integration/dataset.rst index 9d5826f34a1..849ae6d9272 100644 --- a/docs/source/python/integration/dataset.rst +++ b/docs/source/python/integration/dataset.rst @@ -15,21 +15,17 @@ .. specific language governing permissions and limitations .. under the License. -.. currentmodule:: pyarrow.dataset - Extending PyArrow Datasets ========================== PyArrow provides a core protocol for datasets, so third-party libraries can both -produce and consume PyArrow datasets. - -The idea is that any library can have a method that returns their dataset as a -PyArrow dataset. Then, any query engine can consume that dataset and push down -filters and projections. +produce and consume classes that conform to useful subset of the PyArrow dataset +API. This subset provides enough functionality to provide predicate and filter +pushdown. The subset of the API is contained in ``pyarrow.dataset.protocol``. .. image:: pyarrow_dataset_protocol.svg :alt: A diagram showing the workflow for using the PyArrow Dataset protocol. - There are two flows shown, one for streams and one for tasks. The stream + There are two flows shown, one for stream and one for tasks. The stream case shows a linear flow from a producer class, to a dataset, to a scanner, and finally to a RecordBatchReader. The tasks case shows a similar diagram, except the dataset is split into fragments, which are @@ -42,7 +38,7 @@ Consumers are responsible for calling methods on the protocol to get the data out of the dataset. The protocol supports getting data as a single stream or as a series of tasks which may be distributed. -From the perspective of a user, this looks something like +From the perspective of a user, the code looks like: .. code-block:: python @@ -68,7 +64,10 @@ When implementing the dataset, consider the following: * Filters passed down should be fully executed. While other systems have scanners that are "best-effort", only executing the parts of the filter that it can, PyArrow - datasets should always remove all rows that don't match the filter. + datasets should always remove all rows that don't match the filter. If the + implementation cannot execute the filter, it should raise an exception. A + limited set of expressions are allowed in these filters for the general + protocol. See the docstrings for ``Scannable`` below for details. * The API does not require that a dataset has metadata about all fragments loaded into memory. Indeed, to scale to very large Datasets, don't eagerly load all the fragment metadata into memory. Instead, load fragment metadata @@ -90,18 +89,17 @@ options and methods beyond those, but they should not be relied upon without checking for specific classes. There are two general patterns for consuming PyArrow datasets: reading a single -stream or reading a stream per fragment. +stream or creating a scan task per fragment. -If you have a streaming execution model, you can recieve a single stream +If you have a streaming execution model, you can receive a single stream of data by calling ``dataset.scanner(filter=..., columns=...).to_reader()``. This will return a RecordBatchReader, which can be exported over the :ref:`C Stream Interface `. The record batches yield from the stream can then be passed to worker threads for parallelism. -If you are using a partition-based or distributed model, you can split the -dataset into fragments and then distribute those fragments into tasks that -create their own scanners and readers. In this case, the code looks more -like: +If you are using a task-based model, you can split the dataset into fragments +and then distribute those fragments into tasks that create their own scanners +and readers. In this case, the code looks more like: .. code-block:: python @@ -117,6 +115,11 @@ distributed system. If your engine supports predicate (filter) and projection (column) pushdown, you can pass those down to the dataset by passing them to the ``scanner``. +Column pushdown is limited to selecting a subset of columns from the schema. +Some implementations, including PyArrow may also support projecting and +renaming columns, but this is not part of the protocol. Predicate pushdown +is limited to a subset of expressions. See the docstrings for ``Scannable`` +for the allowed expressions. The protocol diff --git a/python/pyarrow/dataset/protocol.py b/python/pyarrow/dataset/protocol.py index 7380ff1eda1..1f6657f916a 100644 --- a/python/pyarrow/dataset/protocol.py +++ b/python/pyarrow/dataset/protocol.py @@ -28,7 +28,13 @@ https://arrow.apache.org/docs/python/integration/dataset.html """ from abc import abstractmethod, abstractproperty -from typing import Iterator, List, Optional, Protocol, runtime_checkable +from typing import Iterator, List, Optional + +# TODO: remove once we drop support for Python 3.7 +if sys.version_info >= (3, 8): + from typing import Protocol, runtime_checkable +else: + from typing_extensions import Protocol, runtime_checkable from pyarrow.dataset import Expression from pyarrow import Table, RecordBatchReader, Schema From be648e2f601a2ea00f6178acfff962860eb008c3 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Mon, 3 Jul 2023 15:33:59 -0600 Subject: [PATCH 5/5] remove filters for now --- docs/source/python/integration/dataset.rst | 56 +++++++++------------- python/pyarrow/dataset/protocol.py | 50 ++++++------------- 2 files changed, 37 insertions(+), 69 deletions(-) diff --git a/docs/source/python/integration/dataset.rst b/docs/source/python/integration/dataset.rst index 849ae6d9272..4d12b89896a 100644 --- a/docs/source/python/integration/dataset.rst +++ b/docs/source/python/integration/dataset.rst @@ -18,9 +18,13 @@ Extending PyArrow Datasets ========================== +.. warn:: + + This protocol is currently experimental. + PyArrow provides a core protocol for datasets, so third-party libraries can both produce and consume classes that conform to useful subset of the PyArrow dataset -API. This subset provides enough functionality to provide predicate and filter +API. This subset provides enough functionality to provide projection pushdown. The subset of the API is contained in ``pyarrow.dataset.protocol``. .. image:: pyarrow_dataset_protocol.svg @@ -38,18 +42,24 @@ Consumers are responsible for calling methods on the protocol to get the data out of the dataset. The protocol supports getting data as a single stream or as a series of tasks which may be distributed. -From the perspective of a user, the code looks like: +As an example, from the perspective of the user this is what the code looks like +to retrieve a Delta Lake table as a dataset and use it in DuckDB: .. code-block:: python + :emphasize-lines: 2,6 + + from deltalake import DeltaTable + table = DeltaTable("path/to/table") + dataset = table.to_pyarrow_dataset() - dataset = producer_library.get_dataset(...) - df = consumer_library.read_dataset(dataset) - df.filter("x > 0").select("y") + import duckdb + df = duckdb.arrow(dataset) + df.project("y") -Here, the consumer would pass the filter ``x > 0`` and the projection of ``y`` down -to the producer through the dataset protocol. Thus, the user gets to enjoy the -performance benefits of pushing down filters and projections while being able -to specify those in their preferred query engine. +Here, the DuckDB would pass the the projection of ``y`` down to the producer +through the dataset protocol. The deltalake scanner would then only read the +column ``y``. Thus, the user gets to enjoy the performance benefits of pushing +down projections while being able to specify those in their preferred query engine. Dataset Producers @@ -60,24 +70,6 @@ produce a PyArrow-compatible dataset. Your dataset could be backed by the classe implemented in PyArrow or you could implement your own classes. Either way, you should implement the protocol below. -When implementing the dataset, consider the following: - -* Filters passed down should be fully executed. While other systems have scanners - that are "best-effort", only executing the parts of the filter that it can, PyArrow - datasets should always remove all rows that don't match the filter. If the - implementation cannot execute the filter, it should raise an exception. A - limited set of expressions are allowed in these filters for the general - protocol. See the docstrings for ``Scannable`` below for details. -* The API does not require that a dataset has metadata about all fragments - loaded into memory. Indeed, to scale to very large Datasets, don't eagerly - load all the fragment metadata into memory. Instead, load fragment metadata - once a filter is passed. This allows you to skip loading metadata about - fragments that aren't relevant to queries. For example, if you have a dataset - that uses Hive-style paritioning for a column ``date`` and the user passes a - filter for ``date=2023-01-01``, then you can skip listing directory for HIVE - partitions that don't match that date. - - Dataset Consumers ----------------- @@ -92,7 +84,7 @@ There are two general patterns for consuming PyArrow datasets: reading a single stream or creating a scan task per fragment. If you have a streaming execution model, you can receive a single stream -of data by calling ``dataset.scanner(filter=..., columns=...).to_reader()``. +of data by calling ``dataset.scanner(columns=...).to_reader()``. This will return a RecordBatchReader, which can be exported over the :ref:`C Stream Interface `. The record batches yield from the stream can then be passed to worker threads for parallelism. @@ -103,7 +95,7 @@ and readers. In this case, the code looks more like: .. code-block:: python - fragments = list(dataset.get_fragments(filter=..., columns=...)) + fragments = list(dataset.get_fragments(columns=...)) def scan_partition(i): fragment = fragments[i] @@ -113,13 +105,11 @@ and readers. In this case, the code looks more like: Fragments are pickleable, so they can be passed to remote workers in a distributed system. -If your engine supports predicate (filter) and projection (column) pushdown, +If your engine supports projection (column) pushdown, you can pass those down to the dataset by passing them to the ``scanner``. Column pushdown is limited to selecting a subset of columns from the schema. Some implementations, including PyArrow may also support projecting and -renaming columns, but this is not part of the protocol. Predicate pushdown -is limited to a subset of expressions. See the docstrings for ``Scannable`` -for the allowed expressions. +renaming columns, but this is not part of the protocol. The protocol diff --git a/python/pyarrow/dataset/protocol.py b/python/pyarrow/dataset/protocol.py index 1f6657f916a..d8696507ba0 100644 --- a/python/pyarrow/dataset/protocol.py +++ b/python/pyarrow/dataset/protocol.py @@ -23,11 +23,13 @@ that implement these protocols, rather than requiring the specific PyArrow classes. +The pyarrow.dataset.Dataset class itself implements this protocol. + See Extending PyArrow Datasets for more information: https://arrow.apache.org/docs/python/integration/dataset.html """ -from abc import abstractmethod, abstractproperty +from abc import abstractmethod from typing import Iterator, List, Optional # TODO: remove once we drop support for Python 3.7 @@ -50,21 +52,21 @@ class Scanner(Protocol): @abstractmethod def count_rows(self) -> int: """ - Count the number of rows in this dataset. + Count the number of rows in this dataset or fragment. Implementors may provide optimized code paths that compute this from metadata. Returns ------- int - The number of rows in the dataset. + The number of rows in the dataset or fragment. """ ... @abstractmethod def head(self, num_rows: int) -> Table: """ - Get the first ``num_rows`` rows of the dataset. + Get the first ``num_rows`` rows of the dataset or fragment. Parameters ---------- @@ -74,7 +76,7 @@ def head(self, num_rows: int) -> Table: Returns ------- Table - A table containing the first ``num_rows`` rows of the dataset. + A table containing the first ``num_rows`` rows of the dataset or fragment. """ ... @@ -96,7 +98,7 @@ def to_reader(self) -> RecordBatchReader: class Scannable(Protocol): @abstractmethod def scanner(self, columns: Optional[List[str]] = None, - filter: Optional[Expression] = None, batch_size: Optional[int] = None, + batch_size: Optional[int] = None, use_threads: bool = True, **kwargs) -> Scanner: """Create a scanner for this dataset. @@ -106,33 +108,14 @@ def scanner(self, columns: Optional[List[str]] = None, columns : List[str], optional Names of columns to include in the scan. If None, all columns are included. - filter : Expression, optional - Filter expression to apply to the scan. If None, no filter is applied. batch_size : int, optional The number of rows to include in each batch. If None, the default value is used. The default value is implementation specific. use_threads : bool, default True - Whether to use multiple threads to read the rows. It is expected - that consumers reading a whole dataset in one scanner will keep this + Whether to use multiple threads to read the rows. Often consumers + reading a whole dataset in one scanner will keep this as True, while consumers reading a single fragment per worker will - typically set this to False. - - Notes - ----- - The filters must be fully satisfied. If the dataset cannot satisfy the - filter, it should raise an error. - - Only the following expressions are allowed in the filter: - - Equality / inequalities (==, !=, <, >, <=, >=) - - Conjunctions (and, or) - - Field references (e.g. "a" or "a.b.c") - - Literals (e.g. 1, 1.0, "a", True) - - cast - - is_null / not_null - - isin - - between - - negation (not) - + set this to False. """ ... @@ -151,24 +134,19 @@ class Fragment(Scannable, Protocol): class Dataset(Scannable, Protocol): @abstractmethod def get_fragments( - self, - filter: Optional[Expression] = None, **kwargs + self, **kwargs ) -> Iterator[Fragment]: """Get the fragments of this dataset. Parameters ---------- - filter : Expression, optional - Filter expression to use to prune which fragments are selected. - See Scannable.scanner for details on allowed filters. The filter is - just used to prune which fragments are selected. It does not need to - save the filter to apply to the scan. That is handled by the scanner. **kwargs : dict Additional arguments to pass to underlying implementation. """ ... - @abstractproperty + @property + @abstractmethod def schema(self) -> Schema: """ Get the schema of this dataset.