From 3d6daf049a4244a4ed3307baa20db542b1e24011 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Tue, 31 Mar 2020 08:14:17 +0200 Subject: [PATCH 01/12] ARROW-8063: [Python][Dataset] Start user guide for pyarrow.dataset --- docs/source/python/dataset.rst | 143 +++++++++++++++++++++++++++++++++ docs/source/python/index.rst | 1 + 2 files changed, 144 insertions(+) create mode 100644 docs/source/python/dataset.rst diff --git a/docs/source/python/dataset.rst b/docs/source/python/dataset.rst new file mode 100644 index 00000000000..89326b70779 --- /dev/null +++ b/docs/source/python/dataset.rst @@ -0,0 +1,143 @@ +.. Licensed to the Apache Software Foundation (ASF) under one +.. or more contributor license agreements. See the NOTICE file +.. distributed with this work for additional information +.. regarding copyright ownership. The ASF licenses this file +.. to you under the Apache License, Version 2.0 (the +.. "License"); you may not use this file except in compliance +.. with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, +.. software distributed under the License is distributed on an +.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +.. KIND, either express or implied. See the License for the +.. specific language governing permissions and limitations +.. under the License. + +.. currentmodule:: pyarrow.dataset + +.. _dataset: + +Tabular Datasets +================ + +.. warning:: + + The ``pyarrow.dataset`` module is experimental (specifically the classes), + and a stable API is not yet guaranteed. + +The ``pyarrow.dataset`` module provides functionality to efficiently work with +tabular, potentially larger than memory and multi-file, datasets: + +* A unified interface for different sources: supporting different sources + (files, database connection, ..), different file systems (local, cloud) and + different file formats (Parquet, CSV, JSON, Feather, ..) +* Discovery of sources (crawling directories, handle directory-based partitioned + datasets, basic schema normalization, ..) +* Optimized reading with pedicate pushdown (filtering rows), projection + (selecting columns), parallel reading or fine-grained managing of tasks. + + +For those familiar with the existing :class:`pyarrow.parquet.ParquetDataset` for +reading Parquet datasets: the goal of `pyarrow.dataset` is to provide similar +functionality, but not specific to the Parquet format, not tied to Python (for +example the R bindings of Arrow also have an interface for the Dateset API) and +with improved functionality (e.g. filtering on the file level and not only +partition keys). + + + +Reading Datasets +---------------- + + +Full blown example with NYC taxi data to show off, afterwards explain all parts: + +... + + + +Dataset discovery +~~~~~~~~~~~~~~~~~ + +- creating dataset +- get schema / files + + +Filtering data +~~~~~~~~~~~~~~ + +- columns / filter expressions + + + +Reading different file formats +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +- show feather + + + +Reading partitioned data +------------------------ + +- hive vs directory partitioning supported right now +- how to specify the schema with ds.partitioning() + + +Reading from cloud storage +-------------------------- + +- example with s3 filesystem / hdfs filesystem + + + +Manual specification of the Dataset +----------------------------------- + +The :func:`dataset` function allows to easily create a Dataset from a directory, +crawling all subdirectories for files and partitioning information. However, +when the dataset files and partitions are already known (for example, when this +information is stored in metadata), it is also possible to create a Dataset +explicitly using the lower level API without any automatic discovery or +inference. + +For the example here, we are going to use a dataset where the file names contain +additional partitioning information: + +.. ipython:: python + + # creating a dummy dataset: directory with two files + table = pa.table({'col1': range(3), 'col2': np.random.randn(3)}) + pq.write_table(table, "parquet_dataset_manual/data_file1.parquet") + pq.write_table(table, "parquet_dataset_manual/data_file2.parquet") + +To create a Dataset from a list of files, we need to specify the schema, format, +filesystem, and paths manually: + +.. ipython:: python + + import pyarrow.fs + + schema = pa.schema([("file", pa.int64()), ("col1", pa.int64()), ("col2", pa.float64())]) + + dataset = ds.FileSystemDataset( + schema, None, ds.ParquetFileFormat(), pa.fs.LocalFileSystem(), + ["parquet_dataset_manual/data_file1.parquet", "parquet_dataset_manual/data_file2.parquet"], + [ds.field('file') == 1, ds.field('file') == 2]) + +We also specified the "partition expressions" for our files, so this information +is included when reading the data and can be used for filtering: + +.. ipython:: python + + dataset.to_table().to_pandas() + dataset.to_table(filter=ds.field('file') == 1).to_pandas() + + +Manual scheduling +----------------- + +- fragments (get_fragments) +- scan / scan tasks / iterators of record batches diff --git a/docs/source/python/index.rst b/docs/source/python/index.rst index 2c7ec3d8f93..d4daf4029ac 100644 --- a/docs/source/python/index.rst +++ b/docs/source/python/index.rst @@ -47,6 +47,7 @@ files into Arrow structures. feather json parquet + dataset cuda extending_types extending From 1058d3076f1799136da3b27c82a92e35ed630eed Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Mon, 6 Apr 2020 22:45:41 +0200 Subject: [PATCH 02/12] add content --- docs/source/python/dataset.rst | 147 +++++++++++++++++++++++++++++++-- 1 file changed, 142 insertions(+), 5 deletions(-) diff --git a/docs/source/python/dataset.rst b/docs/source/python/dataset.rst index 89326b70779..14fbea82b19 100644 --- a/docs/source/python/dataset.rst +++ b/docs/source/python/dataset.rst @@ -40,7 +40,7 @@ tabular, potentially larger than memory and multi-file, datasets: For those familiar with the existing :class:`pyarrow.parquet.ParquetDataset` for -reading Parquet datasets: the goal of `pyarrow.dataset` is to provide similar +reading Parquet datasets: the goal of ``pyarrow.dataset`` is to provide similar functionality, but not specific to the Parquet format, not tied to Python (for example the R bindings of Arrow also have an interface for the Dateset API) and with improved functionality (e.g. filtering on the file level and not only @@ -54,34 +54,171 @@ Reading Datasets Full blown example with NYC taxi data to show off, afterwards explain all parts: +.. ipython:: python + + import pyarrow as pa + import pyarrow.dataset as ds + ... +For the next examples, we are first going to create a small dataset consisting +of a directory with two parquet files: + +.. ipython:: python + + import tempfile + import pathlib + + base = pathlib.Path(tempfile.gettempdir()) + (base / "parquet_dataset").mkdir(exist_ok=True) + + # creating an Arrow Table + table = pa.table({'a': range(10), 'b': np.random.randn(10), 'c': [1, 2] * 5}) + + # writing it into two parquet files + import pyarrow.parquet as pq + pq.write_table(table.slice(0, 5), base / "parquet_dataset/data1.parquet") + pq.write_table(table.slice(5, 10), base / "parquet_dataset/data2.parquet") Dataset discovery ~~~~~~~~~~~~~~~~~ -- creating dataset -- get schema / files +A :class:`Dataset` object can be created with the :func:`dataset` function. We +can pass it the path to the directory containing the data files: + +.. ipython:: python + + dataset = ds.dataset(base / "parquet_dataset/", format="parquet") + dataset + +Alternatively, it also accepts a path to a single file, or a list of file +paths. +Creating this :class:`Dataset` object did yet materialize the full dataset, but +it crawled the directory to find all the files: + +.. ipython:: python + + dataset.files + +and it did infer the schema (by default from the first file): + +.. ipython:: python + + print(dataset.schema.to_string(show_field_metadata=False)) + +With the :meth:`Dataset.to_table` method, we can read the dataset into a +pyarrow Table (note this will read everything at once, which can require a lot +of memory, see below on filtering / iterative loading): + + +.. ipython:: python + + dataset.to_table() + # converting to pandas to see the contents of the scanned table + dataset.to_table().to_pandas() Filtering data ~~~~~~~~~~~~~~ -- columns / filter expressions +To avoid reading all data when only needing a subset, the ``columns`` and +``filter`` keywords can be used. The ``columns`` keyword accepts a list of +column names: +.. ipython:: python + + dataset.to_table(columns=['a', 'b']).to_pandas() + +The ``filter`` keyword expects a boolean expression involving one of the +columns, and those expressions can be created using the :func:`field` helper +function: + +.. ipython:: python + + dataset.to_table(filter=ds.field('a') >= 7).to_pandas() + dataset.to_table(filter=ds.field('c') == 2).to_pandas() + +Any column can be referenced using the :func:`field` function (which creates a +:class:`FieldExpression`), and many different expressions can be created, +including the standard boolean comparison operators (equal, larger/less than, +etc), but also set membership testing: + +.. ipython:: python + + ds.field('a') != 3 + ds.field('a').isin([1, 2, 3]) Reading different file formats ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -- show feather +The above examples use Parquet files as dataset source, but the Dataset API +aims to provide a consistent interface for multiple file formats and sources. +Currently, Parquet and Feather / Arrow IPC file format are supported; more +formats are planned in the future. + +If we save the table as a Feather file instead of Parquet files: + +.. ipython:: python + + import pyarrow.feather as feather + + feather.write_feather(table, base / "data.feather") + +then we can read the Feather file using the same functions, but with specifying +``format="ipc"``: + +.. ipython:: python + + dataset = ds.dataset(base / "data.feather", format="ipc") + dataset.to_table().to_pandas().head() + +The format name as a string, like: +.. code-block:: + + ds.dataset(..., format="parquet") + +is a short-hand for + +.. code-block:: + + ds.dataset(..., format=ds.ParquetFileFormat()) + +Those file format objects can take keywords to customize reading (see for +example :class:`ParquetFileFormat`). Reading partitioned data ------------------------ +A dataset consisting of a flat directory with files was already shown above. +But the dataset can also contain nested directories defining a partitioned +dataset, where the sub-directory names hold information about which subset +of the data is stored in that directory. + +For example, a dataset partitioned by year and month may look like on disk: + +.. code-block:: text + + dataset_name/ + year=2007/ + month=01/ + data0.parquet + data1.parquet + ... + month=02/ + data0.parquet + data1.parquet + ... + month=03/ + ... + year=2008/ + month=01/ + ... + ... + - hive vs directory partitioning supported right now - how to specify the schema with ds.partitioning() From 2302712348d5cd2be853c3db479c97d10b0d38ff Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Thu, 9 Apr 2020 17:37:49 +0200 Subject: [PATCH 03/12] add section on partitioned datasets --- docs/source/python/dataset.rst | 142 +++++++++++++++++++++++++-------- 1 file changed, 108 insertions(+), 34 deletions(-) diff --git a/docs/source/python/dataset.rst b/docs/source/python/dataset.rst index 14fbea82b19..309ac47466d 100644 --- a/docs/source/python/dataset.rst +++ b/docs/source/python/dataset.rst @@ -89,7 +89,7 @@ can pass it the path to the directory containing the data files: .. ipython:: python - dataset = ds.dataset(base / "parquet_dataset/", format="parquet") + dataset = ds.dataset(base / "parquet_dataset", format="parquet") dataset Alternatively, it also accepts a path to a single file, or a list of file @@ -119,37 +119,6 @@ of memory, see below on filtering / iterative loading): # converting to pandas to see the contents of the scanned table dataset.to_table().to_pandas() -Filtering data -~~~~~~~~~~~~~~ - -To avoid reading all data when only needing a subset, the ``columns`` and -``filter`` keywords can be used. The ``columns`` keyword accepts a list of -column names: - -.. ipython:: python - - dataset.to_table(columns=['a', 'b']).to_pandas() - -The ``filter`` keyword expects a boolean expression involving one of the -columns, and those expressions can be created using the :func:`field` helper -function: - -.. ipython:: python - - dataset.to_table(filter=ds.field('a') >= 7).to_pandas() - dataset.to_table(filter=ds.field('c') == 2).to_pandas() - -Any column can be referenced using the :func:`field` function (which creates a -:class:`FieldExpression`), and many different expressions can be created, -including the standard boolean comparison operators (equal, larger/less than, -etc), but also set membership testing: - -.. ipython:: python - - ds.field('a') != 3 - ds.field('a').isin([1, 2, 3]) - - Reading different file formats ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -190,6 +159,41 @@ Those file format objects can take keywords to customize reading (see for example :class:`ParquetFileFormat`). +Filtering data +-------------- + +To avoid reading all data when only needing a subset, the ``columns`` and +``filter`` keywords can be used. + +The ``columns`` keyword can be used to only read a selection of the columns, +and accepts a list of column names: + +.. ipython:: python + + dataset = ds.dataset(base / "parquet_dataset", format="parquet") + dataset.to_table(columns=['a', 'b']).to_pandas() + +With the ``filter`` keyword, rows which do not match the filter predicate will +not be included in the returned table. The keyword expects a boolean expression +involving one of the columns, and those expressions can be created using the +:func:`field` helper function: + +.. ipython:: python + + dataset.to_table(filter=ds.field('a') >= 7).to_pandas() + dataset.to_table(filter=ds.field('c') == 2).to_pandas() + +Any column can be referenced using the :func:`field` function (which creates a +:class:`FieldExpression`), and many different expressions can be created, +including the standard boolean comparison operators (equal, larger/less than, +etc), but also set membership testing: + +.. ipython:: python + + ds.field('a') != 3 + ds.field('a').isin([1, 2, 3]) + + Reading partitioned data ------------------------ @@ -219,8 +223,78 @@ For example, a dataset partitioned by year and month may look like on disk: ... ... -- hive vs directory partitioning supported right now -- how to specify the schema with ds.partitioning() +The above partitioning scheme is using "/key=value/" directory names, as found +in Apache Hive. + +Let's create a small partitioned dataset. The:func:`~pyarrow.parquet.write_to_dataset` +function can write such hive-like partitioned datasets. + +.. ipython:: python + + table = pa.table({'a': range(10), 'b': np.random.randn(10), 'c': [1, 2] * 5, + 'part': ['a'] * 5 + ['b'] * 5}) + pq.write_to_dataset(table, str(base / "parquet_dataset_partitioned"), + partition_cols=['part']) + +The above created a directory with two subdirectories ("part=a" and "part=b"), +and the Parquet files written in those directories no longer include the "part" +column. + +Reading this dataset with :func:`dataset`, we now specify that the dataset +uses a hive-like partitioning scheme with the `partitioning` keyword: + +.. ipython:: python + + dataset = ds.dataset(str(base / "parquet_dataset_partitioned2"), format="parquet", + partitioning="hive") + dataset.files + +Although the partition fields are not included in the actual Parquet files, +they will be added back to the resulting table when scanning this dataset: + +.. ipython:: python + + dataset.to_table().to_pandas().head(3) + +We can now filter on the partition keys, which avoids loading certain files +at all if they do not match the predicate: + +.. ipython:: python + + dataset.to_table(filter=ds.field("part") == "b").to_pandas() + + +Different partitioning schemes +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The above example uses a hive-like directory scheme, such as "/year=2009/month=11/day=15". +We specified this passing the ``partitioning="hive"`` keyword. In this case, +the types of the partition keys are inferred from the file paths. + +It is also possible to explicitly define the schema of the partition keys +using the :func:`partitioning` function. For example: + +.. code-block:: + + part = ds.partitioning( + pa.schema([("year", pa.int16()), ("month", pa.int8()), ("day", pa.int32())]), + flavor="hive" + ) + dataset = ds.dataset(..., partitioning=part) + +In addition to the hive-like directory scheme, also a "directory partitioning" +scheme is supported, where the segments in the file path are the values of +the partition keys without including the name. The equivalent (year, month, day) +example would be "/2019/11/15/". + +Since the names are not included in the file paths, those need to be specified: + +.. code-block:: + + part = ds.partitioning(field_names=["year", "month", "day"]) + +Also here, a full schema can be provided to not let the types be inferred +from the file paths. Reading from cloud storage From 7150c5fcacfddb76d27e29ebb3fcf35eda8c21fe Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Fri, 10 Apr 2020 11:43:51 +0200 Subject: [PATCH 04/12] Apply suggestions from code review Co-Authored-By: Benjamin Kietzman --- docs/source/python/dataset.rst | 90 +++++++++++++++++----------------- 1 file changed, 46 insertions(+), 44 deletions(-) diff --git a/docs/source/python/dataset.rst b/docs/source/python/dataset.rst index 309ac47466d..59e00e0afb6 100644 --- a/docs/source/python/dataset.rst +++ b/docs/source/python/dataset.rst @@ -40,10 +40,10 @@ tabular, potentially larger than memory and multi-file, datasets: For those familiar with the existing :class:`pyarrow.parquet.ParquetDataset` for -reading Parquet datasets: the goal of ``pyarrow.dataset`` is to provide similar -functionality, but not specific to the Parquet format, not tied to Python (for -example the R bindings of Arrow also have an interface for the Dateset API) and -with improved functionality (e.g. filtering on the file level and not only +reading Parquet datasets: ``pyarrow.dataset``'s goal is similar but not specific +to the Parquet format and not tied to Python: the same datasets API is exposed +in the R bindings or Arrow. In addition ``pyarrow.dataset`` boasts improved +perfomance and new features (e.g. filtering within files rather than only on partition keys). @@ -52,7 +52,7 @@ Reading Datasets ---------------- -Full blown example with NYC taxi data to show off, afterwards explain all parts: +.. TODO Full blown example with NYC taxi data to show off, afterwards explain all parts: .. ipython:: python @@ -92,24 +92,24 @@ can pass it the path to the directory containing the data files: dataset = ds.dataset(base / "parquet_dataset", format="parquet") dataset -Alternatively, it also accepts a path to a single file, or a list of file -paths. +In addition to a base directory path, :func:`dataset` accepts +a path to a single file or a list of file paths. -Creating this :class:`Dataset` object did yet materialize the full dataset, but -it crawled the directory to find all the files: +Creating a :class:`Dataset` object loads nothing into memory, it only +crawls the directory to find all the files: .. ipython:: python dataset.files -and it did infer the schema (by default from the first file): +... and infers the dataset's schema (by default from the first file): .. ipython:: python print(dataset.schema.to_string(show_field_metadata=False)) -With the :meth:`Dataset.to_table` method, we can read the dataset into a -pyarrow Table (note this will read everything at once, which can require a lot +Using the :meth:`Dataset.to_table` method we can read the dataset (or a portion of it) into a +pyarrow Table (note that depending on the size of your dataset this can require a lot of memory, see below on filtering / iterative loading): @@ -122,8 +122,8 @@ of memory, see below on filtering / iterative loading): Reading different file formats ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -The above examples use Parquet files as dataset source, but the Dataset API -aims to provide a consistent interface for multiple file formats and sources. +The above examples use Parquet files as dataset source but the Dataset API +provides a consistent interface across multiple file formats and sources. Currently, Parquet and Feather / Arrow IPC file format are supported; more formats are planned in the future. @@ -143,20 +143,20 @@ then we can read the Feather file using the same functions, but with specifying dataset = ds.dataset(base / "data.feather", format="ipc") dataset.to_table().to_pandas().head() -The format name as a string, like: +Customizing file formats +~~~~~~~~~~~~~~~~~~~~~~~~ -.. code-block:: +class:`FileFormat`s can be customized using keywords. For example:: - ds.dataset(..., format="parquet") + format = ds.ParquetFileFormat(read_options={'dictionary_columns': ['a']}) + ds.dataset(..., format=format) -is a short-hand for +Will configure column ``a`` to be dictionary encoded on scan. +The format name as a string, like:: -.. code-block:: - - ds.dataset(..., format=ds.ParquetFileFormat()) + ds.dataset(..., format="parquet") -Those file format objects can take keywords to customize reading (see for -example :class:`ParquetFileFormat`). +is short hand for a default constructed class:`ParquetFileFormat`. Filtering data @@ -165,8 +165,7 @@ Filtering data To avoid reading all data when only needing a subset, the ``columns`` and ``filter`` keywords can be used. -The ``columns`` keyword can be used to only read a selection of the columns, -and accepts a list of column names: +The ``columns`` keyword can be used to only read the named columns: .. ipython:: python @@ -183,22 +182,24 @@ involving one of the columns, and those expressions can be created using the dataset.to_table(filter=ds.field('a') >= 7).to_pandas() dataset.to_table(filter=ds.field('c') == 2).to_pandas() -Any column can be referenced using the :func:`field` function (which creates a -:class:`FieldExpression`), and many different expressions can be created, -including the standard boolean comparison operators (equal, larger/less than, -etc), but also set membership testing: +Any column - not just partition columns - can be referenced using the +:func:`field` function (which creates a :class:`FieldExpression`). Operator +overloads are provided to compose filters including the comparisons +(equal, larger/less than, etc), set membership testing, and boolean +combinations (and, or, not): .. ipython:: python ds.field('a') != 3 ds.field('a').isin([1, 2, 3]) + ds.field('a') > ds.field('b') & ds.field('b') > 1 Reading partitioned data ------------------------ -A dataset consisting of a flat directory with files was already shown above. -But the dataset can also contain nested directories defining a partitioned +Above, a dataset consisting of a flat directory with files was shown. +However a dataset can exploit a nested directory structure defining a partitioned dataset, where the sub-directory names hold information about which subset of the data is stored in that directory. @@ -256,8 +257,8 @@ they will be added back to the resulting table when scanning this dataset: dataset.to_table().to_pandas().head(3) -We can now filter on the partition keys, which avoids loading certain files -at all if they do not match the predicate: +We can now filter on the partition keys, which avoids loading files +altogether if they do not match the predicate: .. ipython:: python @@ -287,14 +288,15 @@ scheme is supported, where the segments in the file path are the values of the partition keys without including the name. The equivalent (year, month, day) example would be "/2019/11/15/". -Since the names are not included in the file paths, those need to be specified: +Since the names are not included in the file paths, these must be specified +when constructing a directory partitioning: .. code-block:: part = ds.partitioning(field_names=["year", "month", "day"]) -Also here, a full schema can be provided to not let the types be inferred -from the file paths. +Directory directory partitioning also supports providing a full schema rather than inferring +types from file paths. Reading from cloud storage @@ -307,12 +309,12 @@ Reading from cloud storage Manual specification of the Dataset ----------------------------------- -The :func:`dataset` function allows to easily create a Dataset from a directory, -crawling all subdirectories for files and partitioning information. However, -when the dataset files and partitions are already known (for example, when this -information is stored in metadata), it is also possible to create a Dataset -explicitly using the lower level API without any automatic discovery or -inference. +The :func:`dataset` function allows easy creation of a Dataset viewing a directory, +crawling all subdirectories for files and partitioning information. However +sometimes discovery is not required and the dataset's files and partitions +are already known (for example, when this information is stored in metadata). +In this case it is possible to create a Dataset explicitly without any +automatic discovery or inference. For the example here, we are going to use a dataset where the file names contain additional partitioning information: @@ -338,8 +340,8 @@ filesystem, and paths manually: ["parquet_dataset_manual/data_file1.parquet", "parquet_dataset_manual/data_file2.parquet"], [ds.field('file') == 1, ds.field('file') == 2]) -We also specified the "partition expressions" for our files, so this information -is included when reading the data and can be used for filtering: +Since we specified the "partition expressions" for our files, this information +is materialized as columns when reading the data and can be used for filtering: .. ipython:: python From 713900707db85a64492a7429b028762f27a2c58e Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Fri, 10 Apr 2020 11:52:33 +0200 Subject: [PATCH 05/12] small fixups --- docs/source/python/dataset.rst | 48 ++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/docs/source/python/dataset.rst b/docs/source/python/dataset.rst index 59e00e0afb6..3073df97126 100644 --- a/docs/source/python/dataset.rst +++ b/docs/source/python/dataset.rst @@ -92,11 +92,11 @@ can pass it the path to the directory containing the data files: dataset = ds.dataset(base / "parquet_dataset", format="parquet") dataset -In addition to a base directory path, :func:`dataset` accepts -a path to a single file or a list of file paths. +In addition to a base directory path, :func:`dataset` accepts a path to a single +file or a list of file paths. -Creating a :class:`Dataset` object loads nothing into memory, it only -crawls the directory to find all the files: +Creating a :class:`Dataset` object loads nothing into memory, it only crawls the +directory to find all the files: .. ipython:: python @@ -108,9 +108,9 @@ crawls the directory to find all the files: print(dataset.schema.to_string(show_field_metadata=False)) -Using the :meth:`Dataset.to_table` method we can read the dataset (or a portion of it) into a -pyarrow Table (note that depending on the size of your dataset this can require a lot -of memory, see below on filtering / iterative loading): +Using the :meth:`Dataset.to_table` method we can read the dataset (or a portion +of it) into a pyarrow Table (note that depending on the size of your dataset +this can require a lot of memory, see below on filtering / iterative loading): .. ipython:: python @@ -146,18 +146,20 @@ then we can read the Feather file using the same functions, but with specifying Customizing file formats ~~~~~~~~~~~~~~~~~~~~~~~~ -class:`FileFormat`s can be customized using keywords. For example:: - - format = ds.ParquetFileFormat(read_options={'dictionary_columns': ['a']}) - ds.dataset(..., format=format) - -Will configure column ``a`` to be dictionary encoded on scan. The format name as a string, like:: ds.dataset(..., format="parquet") is short hand for a default constructed class:`ParquetFileFormat`. + ds.dataset(..., format=ds.ParquetFileForma()) + +The class:`FileFormat` objects can be customized using keywords. For example:: + + format = ds.ParquetFileFormat(read_options={'dictionary_columns': ['a']}) + ds.dataset(..., format=format) + +Will configure column ``a`` to be dictionary encoded on scan. Filtering data -------------- @@ -165,7 +167,7 @@ Filtering data To avoid reading all data when only needing a subset, the ``columns`` and ``filter`` keywords can be used. -The ``columns`` keyword can be used to only read the named columns: +The ``columns`` keyword can be used to only read the specified columns: .. ipython:: python @@ -173,9 +175,9 @@ The ``columns`` keyword can be used to only read the named columns: dataset.to_table(columns=['a', 'b']).to_pandas() With the ``filter`` keyword, rows which do not match the filter predicate will -not be included in the returned table. The keyword expects a boolean expression -involving one of the columns, and those expressions can be created using the -:func:`field` helper function: +not be included in the returned table. The keyword expects a boolean +:class:`Expression` involving one of the columns, and those expressions can be +created using the :func:`field` helper function: .. ipython:: python @@ -192,16 +194,16 @@ combinations (and, or, not): ds.field('a') != 3 ds.field('a').isin([1, 2, 3]) - ds.field('a') > ds.field('b') & ds.field('b') > 1 + (ds.field('a') > ds.field('b')) & (ds.field('b') > 1) Reading partitioned data ------------------------ -Above, a dataset consisting of a flat directory with files was shown. -However a dataset can exploit a nested directory structure defining a partitioned -dataset, where the sub-directory names hold information about which subset -of the data is stored in that directory. +Above, a dataset consisting of a flat directory with files was shown. However, a +dataset can exploit a nested directory structure defining a partitioned dataset, +where the sub-directory names hold information about which subset of the data is +stored in that directory. For example, a dataset partitioned by year and month may look like on disk: @@ -295,7 +297,7 @@ when constructing a directory partitioning: part = ds.partitioning(field_names=["year", "month", "day"]) -Directory directory partitioning also supports providing a full schema rather than inferring +Directory partitioning also supports providing a full schema rather than inferring types from file paths. From 0bcdcecb60b2d4d018e389b3b438fed3698832c0 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Fri, 10 Apr 2020 14:00:33 +0200 Subject: [PATCH 06/12] update expression wording + manual example --- docs/source/python/dataset.rst | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/docs/source/python/dataset.rst b/docs/source/python/dataset.rst index 3073df97126..6fa7c0074fb 100644 --- a/docs/source/python/dataset.rst +++ b/docs/source/python/dataset.rst @@ -176,19 +176,19 @@ The ``columns`` keyword can be used to only read the specified columns: With the ``filter`` keyword, rows which do not match the filter predicate will not be included in the returned table. The keyword expects a boolean -:class:`Expression` involving one of the columns, and those expressions can be -created using the :func:`field` helper function: +:class:`Expression` referencing at least one of the columns: .. ipython:: python dataset.to_table(filter=ds.field('a') >= 7).to_pandas() dataset.to_table(filter=ds.field('c') == 2).to_pandas() -Any column - not just partition columns - can be referenced using the -:func:`field` function (which creates a :class:`FieldExpression`). Operator -overloads are provided to compose filters including the comparisons -(equal, larger/less than, etc), set membership testing, and boolean -combinations (and, or, not): +The easiest way to construct those :class:`Expression` objects is by using the +:func:`field` helper function. Any column - not just partition columns - can be +referenced using the :func:`field` function (which creates a +:class:`FieldExpression`). Operator overloads are provided to compose filters +including the comparisons (equal, larger/less than, etc), set membership +testing, and boolean combinations (and, or, not): .. ipython:: python @@ -325,8 +325,8 @@ additional partitioning information: # creating a dummy dataset: directory with two files table = pa.table({'col1': range(3), 'col2': np.random.randn(3)}) - pq.write_table(table, "parquet_dataset_manual/data_file1.parquet") - pq.write_table(table, "parquet_dataset_manual/data_file2.parquet") + pq.write_table(table, "parquet_dataset_manual/data_2018.parquet") + pq.write_table(table, "parquet_dataset_manual/data_2019.parquet") To create a Dataset from a list of files, we need to specify the schema, format, filesystem, and paths manually: @@ -335,12 +335,12 @@ filesystem, and paths manually: import pyarrow.fs - schema = pa.schema([("file", pa.int64()), ("col1", pa.int64()), ("col2", pa.float64())]) + schema = pa.schema([("year", pa.int32()), ("col1", pa.int64()), ("col2", pa.float64())]) dataset = ds.FileSystemDataset( schema, None, ds.ParquetFileFormat(), pa.fs.LocalFileSystem(), - ["parquet_dataset_manual/data_file1.parquet", "parquet_dataset_manual/data_file2.parquet"], - [ds.field('file') == 1, ds.field('file') == 2]) + ["parquet_dataset_manual/data_2018.parquet", "parquet_dataset_manual/data_2019.parquet"], + [ds.field('year') == 2018, ds.field('year') == 2019]) Since we specified the "partition expressions" for our files, this information is materialized as columns when reading the data and can be used for filtering: @@ -348,7 +348,7 @@ is materialized as columns when reading the data and can be used for filtering: .. ipython:: python dataset.to_table().to_pandas() - dataset.to_table(filter=ds.field('file') == 1).to_pandas() + dataset.to_table(filter=ds.field('year') == 2019).to_pandas() Manual scheduling From 82ef2777def69950b2ec4c839617306b62506bbd Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Tue, 14 Apr 2020 12:14:54 +0200 Subject: [PATCH 07/12] add section on cloud storage --- docs/source/python/dataset.rst | 33 +++++++++++++++++++++++++++++---- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/docs/source/python/dataset.rst b/docs/source/python/dataset.rst index 6fa7c0074fb..2439f150026 100644 --- a/docs/source/python/dataset.rst +++ b/docs/source/python/dataset.rst @@ -52,6 +52,7 @@ Reading Datasets ---------------- + .. TODO Full blown example with NYC taxi data to show off, afterwards explain all parts: .. ipython:: python @@ -277,7 +278,7 @@ the types of the partition keys are inferred from the file paths. It is also possible to explicitly define the schema of the partition keys using the :func:`partitioning` function. For example: -.. code-block:: +.. code-block:: python part = ds.partitioning( pa.schema([("year", pa.int16()), ("month", pa.int8()), ("day", pa.int32())]), @@ -293,7 +294,7 @@ example would be "/2019/11/15/". Since the names are not included in the file paths, these must be specified when constructing a directory partitioning: -.. code-block:: +.. code-block:: python part = ds.partitioning(field_names=["year", "month", "day"]) @@ -304,8 +305,31 @@ types from file paths. Reading from cloud storage -------------------------- -- example with s3 filesystem / hdfs filesystem +In addition to local files, pyarrow also supports reading from cloud storage. +Currently, :class:`HDFS ` and +:class:`Amazon S3-compatible storage ` are supported. + +When passing a file URI, the file system will be inferred. For example, +specifying a S3 path: + +.. code-block:: python + + dataset = ds.dataset("s3://ursa-labs-taxi-data/", partitioning=["year", "month"]) + +Typically, you will want to customize the connection parameters, and then +a file system object can be created and passed to the ``filesystem`` keyword: + +.. code-block:: python + + from pyarrow import fs + + s3 = fs.S3FileSystem(region="us-east-2") + dataset = ds.dataset("ursa-labs-taxi-data/", filesystem=s3, + partitioning=["year", "month"]) +The currently available classes are :class:`~pyarrow.fs.S3FileSystem` and +:class:`~pyarrow.fs.HadoopFileSystem`. See the :ref:`filesystem` docs for more +details. Manual specification of the Dataset @@ -355,4 +379,5 @@ Manual scheduling ----------------- - fragments (get_fragments) -- scan / scan tasks / iterators of record batches +- scan / scan tasks / iterators of record batches -> show dummy code to iterate + through all record batches and do something with them From d1e771cd87522f7fabf66150d8d7b9d7974601a1 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Tue, 14 Apr 2020 13:37:00 +0200 Subject: [PATCH 08/12] update manual dataset API + sphinx fixes --- docs/source/python/dataset.rst | 41 +++++++++++++++++----------------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/docs/source/python/dataset.rst b/docs/source/python/dataset.rst index 2439f150026..a888ac32a8c 100644 --- a/docs/source/python/dataset.rst +++ b/docs/source/python/dataset.rst @@ -137,11 +137,11 @@ If we save the table as a Feather file instead of Parquet files: feather.write_feather(table, base / "data.feather") then we can read the Feather file using the same functions, but with specifying -``format="ipc"``: +``format="feather"``: .. ipython:: python - dataset = ds.dataset(base / "data.feather", format="ipc") + dataset = ds.dataset(base / "data.feather", format="feather") dataset.to_table().to_pandas().head() Customizing file formats @@ -151,16 +151,16 @@ The format name as a string, like:: ds.dataset(..., format="parquet") -is short hand for a default constructed class:`ParquetFileFormat`. +is short hand for a default constructed class:`ParquetFileFormat`:: ds.dataset(..., format=ds.ParquetFileForma()) -The class:`FileFormat` objects can be customized using keywords. For example:: +The :class:`FileFormat` objects can be customized using keywords. For example:: - format = ds.ParquetFileFormat(read_options={'dictionary_columns': ['a']}) - ds.dataset(..., format=format) + parquet_format = ds.ParquetFileFormat(read_options={'dictionary_columns': ['a']}) + ds.dataset(..., format=parquet_format) -Will configure column ``a`` to be dictionary encoded on scan. +Will configure column ``"a"`` to be dictionary encoded on scan. Filtering data -------------- @@ -230,7 +230,7 @@ For example, a dataset partitioned by year and month may look like on disk: The above partitioning scheme is using "/key=value/" directory names, as found in Apache Hive. -Let's create a small partitioned dataset. The:func:`~pyarrow.parquet.write_to_dataset` +Let's create a small partitioned dataset. The :func:`~pyarrow.parquet.write_to_dataset` function can write such hive-like partitioned datasets. .. ipython:: python @@ -249,7 +249,7 @@ uses a hive-like partitioning scheme with the `partitioning` keyword: .. ipython:: python - dataset = ds.dataset(str(base / "parquet_dataset_partitioned2"), format="parquet", + dataset = ds.dataset(str(base / "parquet_dataset_partitioned"), format="parquet", partitioning="hive") dataset.files @@ -306,8 +306,8 @@ Reading from cloud storage -------------------------- In addition to local files, pyarrow also supports reading from cloud storage. -Currently, :class:`HDFS ` and -:class:`Amazon S3-compatible storage ` are supported. +Currently, :class:`HDFS ` and +:class:`Amazon S3-compatible storage ` are supported. When passing a file URI, the file system will be inferred. For example, specifying a S3 path: @@ -349,22 +349,23 @@ additional partitioning information: # creating a dummy dataset: directory with two files table = pa.table({'col1': range(3), 'col2': np.random.randn(3)}) - pq.write_table(table, "parquet_dataset_manual/data_2018.parquet") - pq.write_table(table, "parquet_dataset_manual/data_2019.parquet") + (base / "parquet_dataset_manual").mkdir(exist_ok=True) + pq.write_table(table, base / "parquet_dataset_manual" / "data_2018.parquet") + pq.write_table(table, base / "parquet_dataset_manual" / "data_2019.parquet") -To create a Dataset from a list of files, we need to specify the schema, format, -filesystem, and paths manually: +To create a Dataset from a list of files, we need to specify the paths, schema, +format, filesystem, and partition expressions manually: .. ipython:: python - import pyarrow.fs + from pyarrow import fs - schema = pa.schema([("year", pa.int32()), ("col1", pa.int64()), ("col2", pa.float64())]) + schema = pa.schema([("year", pa.int64()), ("col1", pa.int64()), ("col2", pa.float64())]) dataset = ds.FileSystemDataset( - schema, None, ds.ParquetFileFormat(), pa.fs.LocalFileSystem(), - ["parquet_dataset_manual/data_2018.parquet", "parquet_dataset_manual/data_2019.parquet"], - [ds.field('year') == 2018, ds.field('year') == 2019]) + ["data_2018.parquet", "data_2019.parquet"], schema=schema, format=ds.ParquetFileFormat(), + filesystem=fs.SubTreeFileSystem(str(base / "parquet_dataset_manual"), fs.LocalFileSystem()), + partitions=[ds.field('year') == 2018, ds.field('year') == 2019]) Since we specified the "partition expressions" for our files, this information is materialized as columns when reading the data and can be used for filtering: From 6da02e420ea26808c39d4e9c405fa8efc79ebac5 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Tue, 14 Apr 2020 13:59:04 +0200 Subject: [PATCH 09/12] add small example for scan() iterating through record batches --- docs/source/python/api/dataset.rst | 3 ++- docs/source/python/dataset.rst | 17 +++++++++++++---- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/docs/source/python/api/dataset.rst b/docs/source/python/api/dataset.rst index 4cc59ac5fb1..b0cfd7572a2 100644 --- a/docs/source/python/api/dataset.rst +++ b/docs/source/python/api/dataset.rst @@ -50,9 +50,10 @@ Classes PartitioningFactory DirectoryPartitioning HivePartitioning + Dataset FileSystemDataset FileSystemFactoryOptions FileSystemDatasetFactory - Dataset + UnionDataset Scanner Expression diff --git a/docs/source/python/dataset.rst b/docs/source/python/dataset.rst index a888ac32a8c..0d122ea6bee 100644 --- a/docs/source/python/dataset.rst +++ b/docs/source/python/dataset.rst @@ -113,7 +113,6 @@ Using the :meth:`Dataset.to_table` method we can read the dataset (or a portion of it) into a pyarrow Table (note that depending on the size of your dataset this can require a lot of memory, see below on filtering / iterative loading): - .. ipython:: python dataset.to_table() @@ -379,6 +378,16 @@ is materialized as columns when reading the data and can be used for filtering: Manual scheduling ----------------- -- fragments (get_fragments) -- scan / scan tasks / iterators of record batches -> show dummy code to iterate - through all record batches and do something with them +.. + Possible content: + - fragments (get_fragments) + - scan / scan tasks / iterators of record batches + +The :func:`~Dataset.to_table` method loads all selected data into memory +at once resulting in a pyarrow Table. Alternatively, a datasetscan also be +scanned one RecordBatch at a time in an iterative manner using the +:func:`~Dataset.scan` method:: + + for scan_task in dataset.scan(columns=[...], filter=...): + for record_batch in scan_task.execute(): + # process the record batch From 7d7ba3fa04289f20618e557618fe59d174b10a13 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Tue, 14 Apr 2020 15:20:46 +0200 Subject: [PATCH 10/12] update intro --- docs/source/python/dataset.rst | 40 +++++++++++++++------------------- 1 file changed, 17 insertions(+), 23 deletions(-) diff --git a/docs/source/python/dataset.rst b/docs/source/python/dataset.rst index 0d122ea6bee..7215f2d7136 100644 --- a/docs/source/python/dataset.rst +++ b/docs/source/python/dataset.rst @@ -28,16 +28,19 @@ Tabular Datasets and a stable API is not yet guaranteed. The ``pyarrow.dataset`` module provides functionality to efficiently work with -tabular, potentially larger than memory and multi-file, datasets: +tabular, potentially larger than memory and multi-file datasets: -* A unified interface for different sources: supporting different sources - (files, database connection, ..), different file systems (local, cloud) and - different file formats (Parquet, CSV, JSON, Feather, ..) +* A unified interface for different sources: supporting different sources and + file formats (Parquet, Feather files) and different file systems (local, + cloud). * Discovery of sources (crawling directories, handle directory-based partitioned datasets, basic schema normalization, ..) -* Optimized reading with pedicate pushdown (filtering rows), projection +* Optimized reading with predicate pushdown (filtering rows), projection (selecting columns), parallel reading or fine-grained managing of tasks. +Currently, only Parquet and Feather / Arrow IPC files are supported. The goal +is to expand this in the future to other file formats and data sources (e.g. +database connections). For those familiar with the existing :class:`pyarrow.parquet.ParquetDataset` for reading Parquet datasets: ``pyarrow.dataset``'s goal is similar but not specific @@ -47,29 +50,20 @@ perfomance and new features (e.g. filtering within files rather than only on partition keys). - Reading Datasets ---------------- - - .. TODO Full blown example with NYC taxi data to show off, afterwards explain all parts: -.. ipython:: python - - import pyarrow as pa - import pyarrow.dataset as ds - -... - - -For the next examples, we are first going to create a small dataset consisting +For the examples below, let's create a small dataset consisting of a directory with two parquet files: .. ipython:: python import tempfile import pathlib + import pyarrow as pa + import pyarrow.parquet as pq base = pathlib.Path(tempfile.gettempdir()) (base / "parquet_dataset").mkdir(exist_ok=True) @@ -78,7 +72,6 @@ of a directory with two parquet files: table = pa.table({'a': range(10), 'b': np.random.randn(10), 'c': [1, 2] * 5}) # writing it into two parquet files - import pyarrow.parquet as pq pq.write_table(table.slice(0, 5), base / "parquet_dataset/data1.parquet") pq.write_table(table.slice(5, 10), base / "parquet_dataset/data2.parquet") @@ -90,6 +83,7 @@ can pass it the path to the directory containing the data files: .. ipython:: python + import pyarrow.dataset as ds dataset = ds.dataset(base / "parquet_dataset", format="parquet") dataset @@ -285,10 +279,10 @@ using the :func:`partitioning` function. For example: ) dataset = ds.dataset(..., partitioning=part) -In addition to the hive-like directory scheme, also a "directory partitioning" -scheme is supported, where the segments in the file path are the values of -the partition keys without including the name. The equivalent (year, month, day) -example would be "/2019/11/15/". +"Directory partitioning" is also supported, where the segments in the file path +represent the values of the partition keys without including the name (the +field name are implicit in the segment's index). For example, given field names +"year", "month", and "day", one path might be "/2019/11/15". Since the names are not included in the file paths, these must be specified when constructing a directory partitioning: @@ -384,7 +378,7 @@ Manual scheduling - scan / scan tasks / iterators of record batches The :func:`~Dataset.to_table` method loads all selected data into memory -at once resulting in a pyarrow Table. Alternatively, a datasetscan also be +at once resulting in a pyarrow Table. Alternatively, a dataset can also be scanned one RecordBatch at a time in an iterative manner using the :func:`~Dataset.scan` method:: From 24f72168dc52967e62451febd661ac1b5472dbb7 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Tue, 14 Apr 2020 16:03:46 +0200 Subject: [PATCH 11/12] add note in the parquet user guide about use_legacy_dataset=False --- docs/source/python/parquet.rst | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/docs/source/python/parquet.rst b/docs/source/python/parquet.rst index 173919d9a26..039ae965c7c 100644 --- a/docs/source/python/parquet.rst +++ b/docs/source/python/parquet.rst @@ -377,6 +377,30 @@ from a remote filesystem into a pandas dataframe you may need to run ``sort_index`` to maintain row ordering (as long as the ``preserve_index`` option was enabled on write). +.. note:: + + The ParquetDataset is being reimplemented based on the new generic Dataset + API (see the :ref:`dataset` docs for an overview). This is not yet the + default, but can already be enabled by passing the ``use_legacy_dataset=False`` + keyword to :class:`ParquetDataset` or :func:`read_table`:: + + pq.ParquetDataset('dataset_name/', use_legacy_dataset=False) + + Enabling this gives the following new features: + + - Filtering on all columns (using row group statistics) instead of only on + the partition keys. + - More fine-grained partitioning: support for a directory partitioning scheme + in addition to the Hive-like partitioning (e.g. "/2019/11/15/" instead of + "/year=2019/month=11/day=15/"), and the ability to specify a schema for + the partition keys. + - General performance improvement and bug fixes. + + In the future, this will be turned on by default. The new implementation + does not yet cover all existing ParquetDataset features (e.g. specifying + the ``metadata``, or the ``pieces`` property API). Feedback is very welcome. + + Using with Spark ---------------- From 45a956d67e74dbd9f2300a909f8a973ebd047355 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Tue, 14 Apr 2020 16:34:45 +0200 Subject: [PATCH 12/12] fix titles in index.rst --- docs/source/index.rst | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/source/index.rst b/docs/source/index.rst index 14bc5a7f857..a3f3302cff0 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -53,10 +53,10 @@ such topics as: cpp/index python/index - `Java `_ - `C GLib `_ - `JavaScript `_ - `R `_ + Java + C GLib + JavaScript + R .. _toc.development: