From d3946753bfc020780adf3fe84ccec5df38e774c5 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Tue, 13 Jul 2021 11:38:52 +0200 Subject: [PATCH 1/8] Example of efficiently writing/loading arrays and tables --- docs/source/python/ipc.rst | 91 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) diff --git a/docs/source/python/ipc.rst b/docs/source/python/ipc.rst index 6d90179b39e..213a8de2f06 100644 --- a/docs/source/python/ipc.rst +++ b/docs/source/python/ipc.rst @@ -154,6 +154,97 @@ DataFrame output: df = pa.ipc.open_file(buf).read_pandas() df[:5] +Efficiently Writing and Reading Arrow Arrays +-------------------------------------------- + +Being optimized for zero copy and memory mapped data, Arrow allows to easily +read and write arrays consuming the minimum amount of resident memory. + +When writing and reading raw Arrow data, we can use the Arrow File Format +or the Arrow Streaming Format. + +To dump an array to file, you can use the :meth:`~pyarrow.ipc.new_file` +which will provide a new :class:`~pyarrow.ipc.RecordBatchFileWriter` instance +that can be used to write batches of data to that file. + +For example to write an array of 100M integers, we could write it in 1000 chunks +of 100000 entries: + +.. ipython:: python + + BATCH_SIZE = 100000 + NUM_BATCHES = 1000 + + schema = pa.schema([pa.field('nums', pa.int32())]) + + with pa.OSFile('bigfile.arrow', 'wb') as sink: + with pa.ipc.new_file(sink, schema) as writer: + for row in range(NUM_BATCHES): + batch = pa.record_batch([pa.array(range(BATCH_SIZE), type=pa.int32())], schema) + writer.write(batch) + +record batches support multiple columns, so in practice we always write the +equivalent of a :class:`~pyarrow.Table`. + +Writing in batches is effective because we in theory need to keep in memory only +the current batch we are writing. But when reading back, we can be even more effective +by directly mapping the data from disk and avoid allocating any new memory on read. + +Under normal conditions, reading back our file will consume a few hundred megabytes +of memory: + +.. ipython:: python + + with pa.OSFile('bigfile.arrow', 'rb') as source: + loaded_array = pa.ipc.open_file(source).read_all() + + print("LEN:", len(loaded_array)) + print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20)) + +To more efficiently read big data from disk, we can memory map the file, so that +the arrow can directly reference the data mapped from disk and avoid having to +allocate its own memory. +In such case the operating system will be able to page in the mapped memory +lazily and page it out without any write back cost when under pressure, +allowing to more easily read arrays bigger than the total memory. + +.. ipython:: python + + with pa.memory_map('bigfile.arrow', 'r') as source: + loaded_array = pa.ipc.open_file(source).read_all() + print("LEN:", len(loaded_array)) + print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20)) + +Equally we can write back to disk the loaded array without consuming any +extra memory thanks to the fact that iterating over the array will just +scan through the data without the need to make copies of it + +.. ipython:: python + + with pa.OSFile('bigfile2.arrow', 'wb') as sink: + with pa.ipc.new_file(sink, schema) as writer: + for chunk in loaded_array[0].iterchunks(): + batch = pa.record_batch([chunk], schema) + writer.write(batch) + print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20)) + +Most high level APIs like :meth:`~pyarrow.parquet.read_table` also provide a +``memory_map`` option. But in those cases, the memory mapping can't help with +reducing resident memory consumption. Because Parquet data needs to be decoded +from the parquet format and compression, it can't be directly mapped from disk, +thus the ``memory_map`` option might perform better on some systems but won't +help much with resident memory consumption. + +.. code-block:: python + + >>> pq_array = pa.parquet.read_table("area1.parquet", memory_map=True) + >>> print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20)) + RSS: 4299MB + + >>> pq_array = pa.parquet.read_table("area1.parquet", memory_map=False) + >>> print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20)) + RSS: 4299MB + Arbitrary Object Serialization ------------------------------ From 8344c3e8048f59f5ae58ae29aa13f5e02ad7ab2c Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Tue, 13 Jul 2021 11:49:30 +0200 Subject: [PATCH 2/8] Reduce example size --- docs/source/python/ipc.rst | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/source/python/ipc.rst b/docs/source/python/ipc.rst index 213a8de2f06..8b9d2496a59 100644 --- a/docs/source/python/ipc.rst +++ b/docs/source/python/ipc.rst @@ -167,12 +167,12 @@ To dump an array to file, you can use the :meth:`~pyarrow.ipc.new_file` which will provide a new :class:`~pyarrow.ipc.RecordBatchFileWriter` instance that can be used to write batches of data to that file. -For example to write an array of 100M integers, we could write it in 1000 chunks -of 100000 entries: +For example to write an array of 10M integers, we could write it in 1000 chunks +of 10000 entries: .. ipython:: python - BATCH_SIZE = 100000 + BATCH_SIZE = 10000 NUM_BATCHES = 1000 schema = pa.schema([pa.field('nums', pa.int32())]) From e95dfd6f57389ffef9043bfed344768876d8c06e Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Wed, 21 Jul 2021 15:49:10 +0200 Subject: [PATCH 3/8] Incorporate further feedbacks --- docs/source/python/ipc.rst | 28 ++-------------------------- docs/source/python/parquet.rst | 23 +++++++++++++++++++++++ 2 files changed, 25 insertions(+), 26 deletions(-) diff --git a/docs/source/python/ipc.rst b/docs/source/python/ipc.rst index 8b9d2496a59..a67a7a57b7d 100644 --- a/docs/source/python/ipc.rst +++ b/docs/source/python/ipc.rst @@ -215,35 +215,11 @@ allowing to more easily read arrays bigger than the total memory. print("LEN:", len(loaded_array)) print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20)) -Equally we can write back to disk the loaded array without consuming any -extra memory thanks to the fact that iterating over the array will just -scan through the data without the need to make copies of it - -.. ipython:: python - - with pa.OSFile('bigfile2.arrow', 'wb') as sink: - with pa.ipc.new_file(sink, schema) as writer: - for chunk in loaded_array[0].iterchunks(): - batch = pa.record_batch([chunk], schema) - writer.write(batch) - print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20)) +.. note:: Most high level APIs like :meth:`~pyarrow.parquet.read_table` also provide a ``memory_map`` option. But in those cases, the memory mapping can't help with -reducing resident memory consumption. Because Parquet data needs to be decoded -from the parquet format and compression, it can't be directly mapped from disk, -thus the ``memory_map`` option might perform better on some systems but won't -help much with resident memory consumption. - -.. code-block:: python - - >>> pq_array = pa.parquet.read_table("area1.parquet", memory_map=True) - >>> print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20)) - RSS: 4299MB - - >>> pq_array = pa.parquet.read_table("area1.parquet", memory_map=False) - >>> print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20)) - RSS: 4299MB +reducing resident memory consumption. See :ref:`parquet_mmap` for details. Arbitrary Object Serialization ------------------------------ diff --git a/docs/source/python/parquet.rst b/docs/source/python/parquet.rst index 0db0df1bc4c..abd28432b98 100644 --- a/docs/source/python/parquet.rst +++ b/docs/source/python/parquet.rst @@ -112,6 +112,29 @@ In general, a Python file object will have the worst read performance, while a string file path or an instance of :class:`~.NativeFile` (especially memory maps) will perform the best. +.. _parquet_mmap: + +Reading Parquet and Memory Mapping +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Because Parquet data needs to be decoded from the parquet format +and compression, it can't be directly mapped from disk. +Thus the ``memory_map`` option might perform better on some systems +but won't help much with resident memory consumption. + +.. code-block:: python + + >>> pq_array = pa.parquet.read_table("area1.parquet", memory_map=True) + >>> print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20)) + RSS: 4299MB + + >>> pq_array = pa.parquet.read_table("area1.parquet", memory_map=False) + >>> print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20)) + RSS: 4299MB + +If you need to deal with parquet data bigger than memory, +the :ref:`dataset` and partitioning is probably what you are looking for. + Parquet file writing options ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ From 939ef1614b2b680b1bea1b1263df27abf56f30aa Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Wed, 21 Jul 2021 15:52:43 +0200 Subject: [PATCH 4/8] oops, indent note --- docs/source/python/ipc.rst | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/source/python/ipc.rst b/docs/source/python/ipc.rst index a67a7a57b7d..da03de8ccb2 100644 --- a/docs/source/python/ipc.rst +++ b/docs/source/python/ipc.rst @@ -217,9 +217,9 @@ allowing to more easily read arrays bigger than the total memory. .. note:: -Most high level APIs like :meth:`~pyarrow.parquet.read_table` also provide a -``memory_map`` option. But in those cases, the memory mapping can't help with -reducing resident memory consumption. See :ref:`parquet_mmap` for details. + Most high level APIs like :meth:`~pyarrow.parquet.read_table` also provide a + ``memory_map`` option. But in those cases, the memory mapping can't help with + reducing resident memory consumption. See :ref:`parquet_mmap` for details. Arbitrary Object Serialization ------------------------------ From f56d5ce65232adbada2ec24fc9dbf447a4727a45 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Wed, 21 Jul 2021 15:58:50 +0200 Subject: [PATCH 5/8] typo --- docs/source/python/ipc.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/python/ipc.rst b/docs/source/python/ipc.rst index da03de8ccb2..6132eb96e23 100644 --- a/docs/source/python/ipc.rst +++ b/docs/source/python/ipc.rst @@ -202,7 +202,7 @@ of memory: print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20)) To more efficiently read big data from disk, we can memory map the file, so that -the arrow can directly reference the data mapped from disk and avoid having to +Arrow can directly reference the data mapped from disk and avoid having to allocate its own memory. In such case the operating system will be able to page in the mapped memory lazily and page it out without any write back cost when under pressure, From a74bae26fab885175da50ceb2d7fb8b63aaf100e Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Thu, 22 Jul 2021 16:02:05 +0200 Subject: [PATCH 6/8] Address more comments --- docs/source/python/ipc.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/source/python/ipc.rst b/docs/source/python/ipc.rst index 6132eb96e23..4b8cf0a89bf 100644 --- a/docs/source/python/ipc.rst +++ b/docs/source/python/ipc.rst @@ -210,14 +210,14 @@ allowing to more easily read arrays bigger than the total memory. .. ipython:: python - with pa.memory_map('bigfile.arrow', 'r') as source: + with pa.memory_map('bigfile.arrow', 'rb') as source: loaded_array = pa.ipc.open_file(source).read_all() print("LEN:", len(loaded_array)) print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20)) .. note:: - Most high level APIs like :meth:`~pyarrow.parquet.read_table` also provide a + Other high level APIs like :meth:`~pyarrow.parquet.read_table` also provide a ``memory_map`` option. But in those cases, the memory mapping can't help with reducing resident memory consumption. See :ref:`parquet_mmap` for details. From 898da3d270382894fc3f5e05da22e7ba75983d45 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Thu, 22 Jul 2021 16:03:00 +0200 Subject: [PATCH 7/8] titlecasing --- docs/source/python/parquet.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/source/python/parquet.rst b/docs/source/python/parquet.rst index abd28432b98..cab385b8b5d 100644 --- a/docs/source/python/parquet.rst +++ b/docs/source/python/parquet.rst @@ -117,7 +117,7 @@ maps) will perform the best. Reading Parquet and Memory Mapping ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -Because Parquet data needs to be decoded from the parquet format +Because Parquet data needs to be decoded from the Parquet format and compression, it can't be directly mapped from disk. Thus the ``memory_map`` option might perform better on some systems but won't help much with resident memory consumption. @@ -132,7 +132,7 @@ but won't help much with resident memory consumption. >>> print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20)) RSS: 4299MB -If you need to deal with parquet data bigger than memory, +If you need to deal with Parquet data bigger than memory, the :ref:`dataset` and partitioning is probably what you are looking for. Parquet file writing options From 6aa5956f3c12d59fb46fcccfbbc49d3130804a9d Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Fri, 23 Jul 2021 11:29:33 +0200 Subject: [PATCH 8/8] retitle --- docs/source/python/ipc.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/source/python/ipc.rst b/docs/source/python/ipc.rst index 4b8cf0a89bf..249780a8dcd 100644 --- a/docs/source/python/ipc.rst +++ b/docs/source/python/ipc.rst @@ -154,8 +154,8 @@ DataFrame output: df = pa.ipc.open_file(buf).read_pandas() df[:5] -Efficiently Writing and Reading Arrow Arrays --------------------------------------------- +Efficiently Writing and Reading Arrow Data +------------------------------------------ Being optimized for zero copy and memory mapped data, Arrow allows to easily read and write arrays consuming the minimum amount of resident memory.