From 3c621c14bdb0fd9d9a361ac3ac2465e96b7caf20 Mon Sep 17 00:00:00 2001 From: David Li Date: Thu, 25 Mar 2021 11:31:07 -0400 Subject: [PATCH 1/9] ARROW-10882: [C++] Create InMemoryDataset from RecordBatchReader --- cpp/src/arrow/dataset/dataset.cc | 16 ++++++++++++++++ cpp/src/arrow/dataset/dataset.h | 2 ++ cpp/src/arrow/dataset/dataset_test.cc | 14 ++++++++++++++ 3 files changed, 32 insertions(+) diff --git a/cpp/src/arrow/dataset/dataset.cc b/cpp/src/arrow/dataset/dataset.cc index df155784924..9bc24b04920 100644 --- a/cpp/src/arrow/dataset/dataset.cc +++ b/cpp/src/arrow/dataset/dataset.cc @@ -148,6 +148,22 @@ InMemoryDataset::InMemoryDataset(std::shared_ptr table) : Dataset(table->schema()), get_batches_(new TableRecordBatchGenerator(std::move(table))) {} +struct ReaderRecordBatchGenerator : InMemoryDataset::RecordBatchGenerator { + explicit ReaderRecordBatchGenerator(std::shared_ptr reader) + : reader_(std::move(reader)) {} + + RecordBatchIterator Get() const final { + auto reader = reader_; + return MakeFunctionIterator([reader] { return reader->Next(); }); + } + + std::shared_ptr reader_; +}; + +InMemoryDataset::InMemoryDataset(std::shared_ptr reader) + : Dataset(reader->schema()), + get_batches_(new ReaderRecordBatchGenerator(std::move(reader))) {} + Result> InMemoryDataset::ReplaceSchema( std::shared_ptr schema) const { RETURN_NOT_OK(CheckProjectable(*schema_, *schema)); diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index a28b79840d6..169959633c4 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -32,6 +32,7 @@ #include "arrow/util/mutex.h" namespace arrow { + namespace dataset { /// \brief A granular piece of a Dataset, such as an individual file. @@ -184,6 +185,7 @@ class ARROW_DS_EXPORT InMemoryDataset : public Dataset { InMemoryDataset(std::shared_ptr schema, RecordBatchVector batches); explicit InMemoryDataset(std::shared_ptr
table); + explicit InMemoryDataset(std::shared_ptr reader); std::string type_name() const override { return "in-memory"; } diff --git a/cpp/src/arrow/dataset/dataset_test.cc b/cpp/src/arrow/dataset/dataset_test.cc index a3603558924..70eaf5cfa73 100644 --- a/cpp/src/arrow/dataset/dataset_test.cc +++ b/cpp/src/arrow/dataset/dataset_test.cc @@ -79,6 +79,20 @@ TEST_F(TestInMemoryDataset, ReplaceSchema) { .status()); } +TEST_F(TestInMemoryDataset, FromReader) { + constexpr int64_t kBatchSize = 1024; + constexpr int64_t kNumberBatches = 16; + + SetSchema({field("i32", int32()), field("f64", float64())}); + auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_); + auto source_reader = ConstantArrayGenerator::Repeat(kNumberBatches, batch); + auto target_reader = ConstantArrayGenerator::Repeat(kNumberBatches, batch); + + auto dataset = std::make_shared(source_reader); + + AssertDatasetEquals(target_reader.get(), dataset.get()); +} + TEST_F(TestInMemoryDataset, GetFragments) { constexpr int64_t kBatchSize = 1024; constexpr int64_t kNumberBatches = 16; From 0fa931511c5d1090b3be0aa3b1d342be3c8ea874 Mon Sep 17 00:00:00 2001 From: David Li Date: Thu, 25 Mar 2021 13:28:47 -0400 Subject: [PATCH 2/9] ARROW-10882: [Python] Expose InMemoryDataset --- python/pyarrow/_dataset.pyx | 71 +++++++++++++++++++- python/pyarrow/dataset.py | 32 +++++++-- python/pyarrow/includes/libarrow_dataset.pxd | 5 ++ python/pyarrow/tests/test_dataset.py | 54 ++++++++++++++- python/pyarrow/util.py | 8 +++ 5 files changed, 161 insertions(+), 9 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 12ddcee5343..de317533a21 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -26,11 +26,11 @@ import os import pyarrow as pa from pyarrow.lib cimport * -from pyarrow.lib import frombytes, tobytes +from pyarrow.lib import ArrowTypeError, frombytes, tobytes from pyarrow.includes.libarrow_dataset cimport * from pyarrow._fs cimport FileSystem, FileInfo, FileSelector from pyarrow._csv cimport ConvertOptions, ParseOptions, ReadOptions -from pyarrow.util import _is_path_like, _stringify_path +from pyarrow.util import _is_iterable, _is_path_like, _stringify_path from pyarrow._parquet cimport ( _create_writer_properties, _create_arrow_writer_properties, @@ -441,6 +441,73 @@ cdef class Dataset(_Weakrefable): return pyarrow_wrap_schema(self.dataset.schema()) +cdef class InMemoryDataset(Dataset): + """A Dataset wrapping in-memory data. + + Parameters + ---------- + source + The data for this dataset. Can be a RecordBatch, Table, list of + RecordBatch/Table, iterable of RecordBatch, or a RecordBatchReader. + If an iterable is provided, the schema must also be provided. + schema : Schema, optional + Only required if passing an iterable as the source. + """ + + cdef: + CInMemoryDataset* in_memory_dataset + + def __init__(self, source, Schema schema=None): + cdef: + RecordBatchReader reader + shared_ptr[CInMemoryDataset] in_memory_dataset + + if isinstance(source, (pa.RecordBatch, pa.Table)): + source = [source] + + if isinstance(source, (list, tuple)): + batches = [] + for item in source: + if isinstance(item, pa.RecordBatch): + batches.append(item) + elif isinstance(item, pa.Table): + batches.extend(item.to_batches()) + else: + raise TypeError( + 'Expected a list of tables or batches. The given list ' + 'contains a ' + type(item).__name__) + if schema is None: + schema = item.schema + elif not schema.equals(item.schema): + raise ArrowTypeError( + f'Item has schema {item.schema} which which does not ' + f'match expected schema {schema}') + if not batches and schema is None: + raise ValueError('Must provide schema to construct in-memory ' + 'dataset from an empty list') + reader = pa.ipc.RecordBatchReader.from_batches(schema, batches) + elif isinstance(source, pa.ipc.RecordBatchReader): + reader = source + elif _is_iterable(source): + if schema is None: + raise ValueError('Must provide schema to construct in-memory ' + 'dataset from an iterable') + reader = pa.ipc.RecordBatchReader.from_batches(schema, source) + else: + raise TypeError( + 'Expected a table, batch, iterable of tables/batches, or a ' + 'record batch reader instead of the given type: ' + + type(source).__name__ + ) + + in_memory_dataset = make_shared[CInMemoryDataset](reader.reader) + self.init( in_memory_dataset) + + cdef void init(self, const shared_ptr[CDataset]& sp): + Dataset.init(self, sp) + self.in_memory_dataset = sp.get() + + cdef class UnionDataset(Dataset): """A Dataset wrapping child datasets. diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 195d414b047..cf14dcb9f55 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -18,7 +18,7 @@ """Dataset is currently unstable. APIs subject to change without notice.""" import pyarrow as pa -from pyarrow.util import _stringify_path, _is_path_like +from pyarrow.util import _is_iterable, _stringify_path, _is_path_like from pyarrow._dataset import ( # noqa CsvFileFormat, @@ -37,6 +37,7 @@ HivePartitioning, IpcFileFormat, IpcFileWriteOptions, + InMemoryDataset, ParquetDatasetFactory, ParquetFactoryOptions, ParquetFileFormat, @@ -408,6 +409,13 @@ def _filesystem_dataset(source, schema=None, filesystem=None, return factory.finish(schema) +def _in_memory_dataset(source, schema=None, **kwargs): + if any(v is not None for v in kwargs.values()): + raise ValueError( + "For in-memory datasets, you cannot pass any additional arguments") + return InMemoryDataset(source, schema) + + def _union_dataset(children, schema=None, **kwargs): if any(v is not None for v in kwargs.values()): raise ValueError( @@ -508,7 +516,8 @@ def dataset(source, schema=None, format=None, filesystem=None, Parameters ---------- - source : path, list of paths, dataset, list of datasets or URI + source : path, list of paths, dataset, list of datasets, (list or + iterable of) batches or tables, RecordBatchReader, or URI Path pointing to a single file: Open a FileSystemDataset from a single file. Path pointing to a directory: @@ -524,6 +533,11 @@ def dataset(source, schema=None, format=None, filesystem=None, A nested UnionDataset gets constructed, it allows arbitrary composition of other datasets. Note that additional keyword arguments are not allowed. + (List or iterable of) batches or tables: + Create an InMemoryDataset. If an iterable or empty list is given, + a schema must also be given. + RecordBatchReader: + Create an InMemoryDataset. schema : Schema, optional Optionally provide the Schema for the Dataset, in which case it will not be inferred from the source. @@ -636,7 +650,6 @@ def dataset(source, schema=None, format=None, filesystem=None, selector_ignore_prefixes=ignore_prefixes ) - # TODO(kszucs): support InMemoryDataset for a table input if _is_path_like(source): return _filesystem_dataset(source, **kwargs) elif isinstance(source, (tuple, list)): @@ -644,13 +657,22 @@ def dataset(source, schema=None, format=None, filesystem=None, return _filesystem_dataset(source, **kwargs) elif all(isinstance(elem, Dataset) for elem in source): return _union_dataset(source, **kwargs) + elif all(isinstance(elem, (pa.RecordBatch, pa.Table)) + for elem in source): + return _in_memory_dataset(source, **kwargs) else: unique_types = set(type(elem).__name__ for elem in source) type_names = ', '.join('{}'.format(t) for t in unique_types) raise TypeError( - 'Expected a list of path-like or dataset objects. The given ' - 'list contains the following types: {}'.format(type_names) + 'Expected a list of path-like or dataset objects, or a list ' + 'of batches or tables. The given list contains the following ' + 'types: {}'.format(type_names) ) + elif isinstance(source, (pa.RecordBatch, pa.ipc.RecordBatchReader, + pa.Table)): + return _in_memory_dataset(source, **kwargs) + elif _is_iterable(source): + return _in_memory_dataset(source, **kwargs) else: raise TypeError( 'Expected a path-like, list of path-likes or a list of Datasets ' diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index f7f2a142001..db2e73acdff 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -124,6 +124,11 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: CResult[shared_ptr[CScannerBuilder]] NewScan() + cdef cppclass CInMemoryDataset "arrow::dataset::InMemoryDataset"( + CDataset): + CInMemoryDataset(shared_ptr[CRecordBatchReader]) + CInMemoryDataset(shared_ptr[CTable]) + cdef cppclass CUnionDataset "arrow::dataset::UnionDataset"( CDataset): @staticmethod diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 36cff9958f9..ae16635b0f8 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -1622,13 +1622,16 @@ def test_construct_from_invalid_sources_raise(multisourcefs): fs.FileSelector('/schema'), format=ds.ParquetFileFormat() ) + batch1 = pa.RecordBatch.from_arrays([pa.array(range(10))], names=["a"]) + batch2 = pa.RecordBatch.from_arrays([pa.array(range(10))], names=["b"]) with pytest.raises(TypeError, match='Expected.*FileSystemDatasetFactory'): ds.dataset([child1, child2]) expected = ( - "Expected a list of path-like or dataset objects. The given list " - "contains the following types: int" + "Expected a list of path-like or dataset objects, or a list " + "of batches or tables. The given list contains the following " + "types: int" ) with pytest.raises(TypeError, match=expected): ds.dataset([1, 2, 3]) @@ -1640,6 +1643,53 @@ def test_construct_from_invalid_sources_raise(multisourcefs): with pytest.raises(TypeError, match=expected): ds.dataset(None) + expected = ( + "Must provide schema to construct in-memory dataset from an iterable" + ) + with pytest.raises(ValueError, match=expected): + ds.dataset((batch1 for _ in range(3))) + + expected = ( + "Must provide schema to construct in-memory dataset from an empty list" + ) + with pytest.raises(ValueError, match=expected): + ds.InMemoryDataset([]) + + expected = ( + "Item has schema b: int64 which which does not match expected schema " + "a: int64" + ) + with pytest.raises(TypeError, match=expected): + ds.dataset([batch1, batch2]) + + expected = ( + "Expected a list of path-like or dataset objects, or a list of " + "batches or tables. The given list contains the following types:" + ) + with pytest.raises(TypeError, match=expected): + ds.dataset([batch1, 0]) + + expected = ( + "Expected a list of tables or batches. The given list contains a int" + ) + with pytest.raises(TypeError, match=expected): + ds.InMemoryDataset([batch1, 0]) + + +def test_construct_in_memory(): + batch = pa.RecordBatch.from_arrays([pa.array(range(10))], names=["a"]) + table = pa.Table.from_batches([batch]) + reader = pa.ipc.RecordBatchReader.from_batches( + batch.schema, (batch for _ in range(1))) + iterable = (batch for _ in range(1)) + + for source in (batch, table, reader, [batch], [table]): + dataset = ds.dataset(source) + assert dataset.to_table().equals(table) + + assert ds.dataset(iterable, schema=batch.schema).to_table().equals(table) + assert ds.dataset([], schema=pa.schema([])).to_table() == pa.table([]) + @pytest.mark.parquet def test_open_dataset_partitioned_directory(tempdir): diff --git a/python/pyarrow/util.py b/python/pyarrow/util.py index e91294a3a1b..446e6733351 100644 --- a/python/pyarrow/util.py +++ b/python/pyarrow/util.py @@ -62,6 +62,14 @@ def __instancecheck__(self, other): return _DeprecatedMeta(old_name, (new_class,), {}) +def _is_iterable(obj): + try: + iter(obj) + return True + except TypeError: + return False + + def _is_path_like(path): # PEP519 filesystem path protocol is available from python 3.6, so pathlib # doesn't implement __fspath__ for earlier versions From 0900fb42bd18a31567f7f091e966cbd5fb9cab91 Mon Sep 17 00:00:00 2001 From: David Li Date: Thu, 25 Mar 2021 13:46:37 -0400 Subject: [PATCH 3/9] ARROW-10882: [Python] Allow writing an iterable of batches --- python/pyarrow/dataset.py | 15 ++++++++++----- python/pyarrow/tests/test_dataset.py | 22 ++++++++++++++++++++++ 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index cf14dcb9f55..c38b529f085 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -698,9 +698,11 @@ def write_dataset(data, base_dir, basename_template=None, format=None, Parameters ---------- - data : Dataset, Table/RecordBatch, or list of Table/RecordBatch + data : Dataset, Table/RecordBatch, RecordBatchReader, or list or + iterable of Table/RecordBatch The data to write. This can be a Dataset instance or - in-memory Arrow data. + in-memory Arrow data. If an iterable is given, the schema must + also be given. base_dir : str The root directory where to write the dataset. basename_template : str, optional @@ -735,12 +737,15 @@ def write_dataset(data, base_dir, basename_template=None, format=None, elif isinstance(data, (pa.Table, pa.RecordBatch)): schema = schema or data.schema data = [data] - elif isinstance(data, list): + elif isinstance(data, (list, tuple)): schema = schema or data[0].schema + elif isinstance(data, pa.ipc.RecordBatchReader) or _is_iterable(data): + data = InMemoryDataset(data, schema=schema) + schema = schema or data.schema else: raise ValueError( - "Only Dataset, Table/RecordBatch or a list of Table/RecordBatch " - "objects are supported." + "Only Dataset, Table/RecordBatch, RecordBatchReader, or a list " + "or iterable of Table/RecordBatch objects are supported." ) if format is None and isinstance(data, FileSystemDataset): diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index ae16635b0f8..3b4b5e18eb1 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -2906,6 +2906,28 @@ def test_write_table_multiple_fragments(tempdir): ) +def test_write_iterable(tempdir): + table = pa.table([ + pa.array(range(20)), pa.array(np.random.randn(20)), + pa.array(np.repeat(['a', 'b'], 10)) + ], names=["f1", "f2", "part"]) + + base_dir = tempdir / 'inmemory_iterable' + ds.write_dataset((batch for batch in table.to_batches()), base_dir, + schema=table.schema, + basename_template='dat_{i}.arrow', format="feather") + result = ds.dataset(base_dir, format="ipc").to_table() + assert result.equals(table) + + base_dir = tempdir / 'inmemory_reader' + reader = pa.ipc.RecordBatchReader.from_batches(table.schema, + table.to_batches()) + ds.write_dataset(reader, base_dir, + basename_template='dat_{i}.arrow', format="feather") + result = ds.dataset(base_dir, format="ipc").to_table() + assert result.equals(table) + + def test_write_table_partitioned_dict(tempdir): # ensure writing table partitioned on a dictionary column works without # specifying the dictionary values explicitly From 878d6dd9a2d75893db087da02fae1d7469384e8e Mon Sep 17 00:00:00 2001 From: David Li Date: Thu, 25 Mar 2021 13:53:13 -0400 Subject: [PATCH 4/9] ARROW-10882: [Python] Clarify that iterables of tables are not accepted --- python/pyarrow/dataset.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index c38b529f085..c343f76db63 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -516,8 +516,8 @@ def dataset(source, schema=None, format=None, filesystem=None, Parameters ---------- - source : path, list of paths, dataset, list of datasets, (list or - iterable of) batches or tables, RecordBatchReader, or URI + source : path, list of paths, dataset, list of datasets, (list of) batches + or tables, iterable of batches, RecordBatchReader, or URI Path pointing to a single file: Open a FileSystemDataset from a single file. Path pointing to a directory: @@ -533,11 +533,9 @@ def dataset(source, schema=None, format=None, filesystem=None, A nested UnionDataset gets constructed, it allows arbitrary composition of other datasets. Note that additional keyword arguments are not allowed. - (List or iterable of) batches or tables: + (List of) batches or tables, iterable of batches, or RecordBatchReader: Create an InMemoryDataset. If an iterable or empty list is given, a schema must also be given. - RecordBatchReader: - Create an InMemoryDataset. schema : Schema, optional Optionally provide the Schema for the Dataset, in which case it will not be inferred from the source. @@ -698,8 +696,8 @@ def write_dataset(data, base_dir, basename_template=None, format=None, Parameters ---------- - data : Dataset, Table/RecordBatch, RecordBatchReader, or list or - iterable of Table/RecordBatch + data : Dataset, Table/RecordBatch, RecordBatchReader, list of + Table/RecordBatch, or iterable of RecordBatch The data to write. This can be a Dataset instance or in-memory Arrow data. If an iterable is given, the schema must also be given. @@ -744,8 +742,8 @@ def write_dataset(data, base_dir, basename_template=None, format=None, schema = schema or data.schema else: raise ValueError( - "Only Dataset, Table/RecordBatch, RecordBatchReader, or a list " - "or iterable of Table/RecordBatch objects are supported." + "Only Dataset, Table/RecordBatch, RecordBatchReader, a list " + "of Tables/RecordBatches, or iterable of batches are supported." ) if format is None and isinstance(data, FileSystemDataset): From 906d4934b6d7dde794eea074a98535c0512fd5e5 Mon Sep 17 00:00:00 2001 From: David Li Date: Thu, 25 Mar 2021 13:54:19 -0400 Subject: [PATCH 5/9] Undo unnecessary change --- cpp/src/arrow/dataset/dataset.h | 1 - 1 file changed, 1 deletion(-) diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index 169959633c4..6be83059fc1 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -32,7 +32,6 @@ #include "arrow/util/mutex.h" namespace arrow { - namespace dataset { /// \brief A granular piece of a Dataset, such as an individual file. From 87c5943d9cb194f0b46df32068780769a8f907f6 Mon Sep 17 00:00:00 2001 From: David Li Date: Mon, 29 Mar 2021 10:17:13 -0400 Subject: [PATCH 6/9] Raise error when re-scanning one-shot InMemoryDataset --- cpp/src/arrow/dataset/dataset.cc | 8 ++++++- cpp/src/arrow/dataset/dataset_test.cc | 3 +++ python/pyarrow/_dataset.pyx | 11 +++++---- python/pyarrow/dataset.py | 4 +++- python/pyarrow/tests/test_dataset.py | 33 +++++++++++++++++++++++++-- 5 files changed, 51 insertions(+), 8 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset.cc b/cpp/src/arrow/dataset/dataset.cc index 9bc24b04920..6d4eeb5603f 100644 --- a/cpp/src/arrow/dataset/dataset.cc +++ b/cpp/src/arrow/dataset/dataset.cc @@ -150,14 +150,20 @@ InMemoryDataset::InMemoryDataset(std::shared_ptr
table) struct ReaderRecordBatchGenerator : InMemoryDataset::RecordBatchGenerator { explicit ReaderRecordBatchGenerator(std::shared_ptr reader) - : reader_(std::move(reader)) {} + : reader_(std::move(reader)), consumed_(false) {} RecordBatchIterator Get() const final { + if (consumed_) { + return MakeErrorIterator>(Status::Invalid( + "RecordBatchReader-backed InMemoryDataset was already consumed")); + } + consumed_ = true; auto reader = reader_; return MakeFunctionIterator([reader] { return reader->Next(); }); } std::shared_ptr reader_; + mutable bool consumed_; }; InMemoryDataset::InMemoryDataset(std::shared_ptr reader) diff --git a/cpp/src/arrow/dataset/dataset_test.cc b/cpp/src/arrow/dataset/dataset_test.cc index 70eaf5cfa73..e7b9954fec4 100644 --- a/cpp/src/arrow/dataset/dataset_test.cc +++ b/cpp/src/arrow/dataset/dataset_test.cc @@ -91,6 +91,9 @@ TEST_F(TestInMemoryDataset, FromReader) { auto dataset = std::make_shared(source_reader); AssertDatasetEquals(target_reader.get(), dataset.get()); + // Such datasets can only be scanned once + ASSERT_OK_AND_ASSIGN(auto fragments, dataset->GetFragments()); + ASSERT_RAISES(Invalid, fragments.Next()); } TEST_F(TestInMemoryDataset, GetFragments) { diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index de317533a21..a86e5521bc3 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -480,19 +480,23 @@ cdef class InMemoryDataset(Dataset): schema = item.schema elif not schema.equals(item.schema): raise ArrowTypeError( - f'Item has schema {item.schema} which which does not ' - f'match expected schema {schema}') + f'Item has schema\n{item.schema}\nwhich does not ' + f'match expected schema\n{schema}') if not batches and schema is None: raise ValueError('Must provide schema to construct in-memory ' 'dataset from an empty list') - reader = pa.ipc.RecordBatchReader.from_batches(schema, batches) + table = pa.Table.from_batches(batches, schema=schema) + in_memory_dataset = make_shared[CInMemoryDataset]( + pyarrow_unwrap_table(table)) elif isinstance(source, pa.ipc.RecordBatchReader): reader = source + in_memory_dataset = make_shared[CInMemoryDataset](reader.reader) elif _is_iterable(source): if schema is None: raise ValueError('Must provide schema to construct in-memory ' 'dataset from an iterable') reader = pa.ipc.RecordBatchReader.from_batches(schema, source) + in_memory_dataset = make_shared[CInMemoryDataset](reader.reader) else: raise TypeError( 'Expected a table, batch, iterable of tables/batches, or a ' @@ -500,7 +504,6 @@ cdef class InMemoryDataset(Dataset): type(source).__name__ ) - in_memory_dataset = make_shared[CInMemoryDataset](reader.reader) self.init( in_memory_dataset) cdef void init(self, const shared_ptr[CDataset]& sp): diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index c343f76db63..68afb5321ba 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -535,7 +535,9 @@ def dataset(source, schema=None, format=None, filesystem=None, Note that additional keyword arguments are not allowed. (List of) batches or tables, iterable of batches, or RecordBatchReader: Create an InMemoryDataset. If an iterable or empty list is given, - a schema must also be given. + a schema must also be given. If an iterable or RecordBatchReader + is given, the resulting dataset can only be scanned once; further + attempts will raise an error. schema : Schema, optional Optionally provide the Schema for the Dataset, in which case it will not be inferred from the source. diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 3b4b5e18eb1..a3e56c7e6a0 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -1679,8 +1679,7 @@ def test_construct_from_invalid_sources_raise(multisourcefs): def test_construct_in_memory(): batch = pa.RecordBatch.from_arrays([pa.array(range(10))], names=["a"]) table = pa.Table.from_batches([batch]) - reader = pa.ipc.RecordBatchReader.from_batches( - batch.schema, (batch for _ in range(1))) + reader = pa.ipc.RecordBatchReader.from_batches(batch.schema, [batch]) iterable = (batch for _ in range(1)) for source in (batch, table, reader, [batch], [table]): @@ -1690,6 +1689,36 @@ def test_construct_in_memory(): assert ds.dataset(iterable, schema=batch.schema).to_table().equals(table) assert ds.dataset([], schema=pa.schema([])).to_table() == pa.table([]) + # When constructed from batches/tables, should be reusable + for source in (batch, table, [batch], [table]): + dataset = ds.dataset(source) + assert len(list(dataset.get_fragments())) == 1 + assert len(list(dataset.get_fragments())) == 1 + assert dataset.to_table().equals(table) + assert dataset.to_table().equals(table) + + # When constructed from readers/iterators, should be one-shot + match = "InMemoryDataset was already consumed" + for factory in ( + lambda: pa.ipc.RecordBatchReader.from_batches( + batch.schema, [batch]), + lambda: (batch for _ in range(1)), + ): + dataset = ds.dataset(factory(), schema=batch.schema) + # Getting fragments consumes the underlying iterator + assert len(list(dataset.get_fragments())) == 1 + with pytest.raises(pa.ArrowInvalid, match=match): + list(dataset.get_fragments()) + with pytest.raises(pa.ArrowInvalid, match=match): + dataset.to_table() + # Materializing consumes the underlying iterator + dataset = ds.dataset(factory(), schema=batch.schema) + dataset.to_table() + with pytest.raises(pa.ArrowInvalid, match=match): + list(dataset.get_fragments()) + with pytest.raises(pa.ArrowInvalid, match=match): + dataset.to_table() + @pytest.mark.parquet def test_open_dataset_partitioned_directory(tempdir): From 7508c01ef932ae3c6d5eed77496e4f73e334674e Mon Sep 17 00:00:00 2001 From: David Li Date: Mon, 29 Mar 2021 10:52:29 -0400 Subject: [PATCH 7/9] Fix InMemoryFragment being constructed with empty schema --- cpp/src/arrow/dataset/dataset.cc | 7 +++++-- cpp/src/arrow/dataset/dataset_test.cc | 15 +++++++++++++++ python/pyarrow/tests/test_dataset.py | 13 ++++++++----- 3 files changed, 28 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset.cc b/cpp/src/arrow/dataset/dataset.cc index 6d4eeb5603f..2df34145cd9 100644 --- a/cpp/src/arrow/dataset/dataset.cc +++ b/cpp/src/arrow/dataset/dataset.cc @@ -66,8 +66,11 @@ InMemoryFragment::InMemoryFragment(std::shared_ptr schema, InMemoryFragment::InMemoryFragment(RecordBatchVector record_batches, Expression partition_expression) - : InMemoryFragment(record_batches.empty() ? schema({}) : record_batches[0]->schema(), - std::move(record_batches), std::move(partition_expression)) {} + : Fragment(std::move(partition_expression), /*schema=*/nullptr), + record_batches_(std::move(record_batches)) { + // Order of argument evaluation is undefined, so compute physical_schema here + physical_schema_ = record_batches_.empty() ? schema({}) : record_batches_[0]->schema(); +} Result InMemoryFragment::Scan(std::shared_ptr options) { // Make an explicit copy of record_batches_ to ensure Scan can be called diff --git a/cpp/src/arrow/dataset/dataset_test.cc b/cpp/src/arrow/dataset/dataset_test.cc index e7b9954fec4..b2b82561359 100644 --- a/cpp/src/arrow/dataset/dataset_test.cc +++ b/cpp/src/arrow/dataset/dataset_test.cc @@ -110,6 +110,21 @@ TEST_F(TestInMemoryDataset, GetFragments) { AssertDatasetEquals(reader.get(), dataset.get()); } +TEST_F(TestInMemoryDataset, InMemoryFragment) { + constexpr int64_t kBatchSize = 1024; + constexpr int64_t kNumberBatches = 16; + + SetSchema({field("i32", int32()), field("f64", float64())}); + auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_); + RecordBatchVector batches{batch}; + + // Regression test: previously this constructor relied on undefined behavior (order of + // evaluation of arguments) leading to fragments being constructed with empty schemas + auto fragment = std::make_shared(batches); + ASSERT_OK_AND_ASSIGN(auto schema, fragment->ReadPhysicalSchema()); + AssertSchemaEqual(batch->schema(), schema); +} + class TestUnionDataset : public DatasetFixtureMixin {}; TEST_F(TestUnionDataset, ReplaceSchema) { diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index a3e56c7e6a0..ebd99922f9e 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -1684,7 +1684,7 @@ def test_construct_in_memory(): for source in (batch, table, reader, [batch], [table]): dataset = ds.dataset(source) - assert dataset.to_table().equals(table) + assert dataset.to_table() == table assert ds.dataset(iterable, schema=batch.schema).to_table().equals(table) assert ds.dataset([], schema=pa.schema([])).to_table() == pa.table([]) @@ -1694,8 +1694,9 @@ def test_construct_in_memory(): dataset = ds.dataset(source) assert len(list(dataset.get_fragments())) == 1 assert len(list(dataset.get_fragments())) == 1 - assert dataset.to_table().equals(table) - assert dataset.to_table().equals(table) + assert dataset.to_table() == table + assert dataset.to_table() == table + assert next(dataset.get_fragments()).to_table() == table # When constructed from readers/iterators, should be one-shot match = "InMemoryDataset was already consumed" @@ -1706,14 +1707,16 @@ def test_construct_in_memory(): ): dataset = ds.dataset(factory(), schema=batch.schema) # Getting fragments consumes the underlying iterator - assert len(list(dataset.get_fragments())) == 1 + fragments = list(dataset.get_fragments()) + assert len(fragments) == 1 + assert fragments[0].to_table() == table with pytest.raises(pa.ArrowInvalid, match=match): list(dataset.get_fragments()) with pytest.raises(pa.ArrowInvalid, match=match): dataset.to_table() # Materializing consumes the underlying iterator dataset = ds.dataset(factory(), schema=batch.schema) - dataset.to_table() + assert dataset.to_table() == table with pytest.raises(pa.ArrowInvalid, match=match): list(dataset.get_fragments()) with pytest.raises(pa.ArrowInvalid, match=match): From 14330416b6a92aa84848116b97ec06a64d24f3f0 Mon Sep 17 00:00:00 2001 From: David Li Date: Mon, 29 Mar 2021 10:59:28 -0400 Subject: [PATCH 8/9] Use InMemoryDataset for writing --- python/pyarrow/_dataset.pyx | 26 ++++++++------------------ python/pyarrow/dataset.py | 7 +++---- python/pyarrow/tests/test_dataset.py | 2 +- 3 files changed, 12 insertions(+), 23 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index a86e5521bc3..387471185a1 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -2670,10 +2670,14 @@ def _get_partition_keys(Expression partition_expression): def _filesystemdataset_write( - data not None, object base_dir not None, str basename_template not None, - Schema schema not None, FileSystem filesystem not None, + Dataset data not None, + object base_dir not None, + str basename_template not None, + Schema schema not None, + FileSystem filesystem not None, Partitioning partitioning not None, - FileWriteOptions file_options not None, bint use_threads, + FileWriteOptions file_options not None, + bint use_threads, int max_partitions, ): """ @@ -2691,21 +2695,7 @@ def _filesystemdataset_write( c_options.max_partitions = max_partitions c_options.basename_template = tobytes(basename_template) - if isinstance(data, Dataset): - scanner = data._scanner(use_threads=use_threads) - else: - # data is list of batches/tables - for table in data: - if isinstance(table, Table): - for batch in table.to_batches(): - c_batches.push_back(( batch).sp_batch) - else: - c_batches.push_back(( table).sp_batch) - - data = Fragment.wrap(shared_ptr[CFragment]( - new CInMemoryFragment(move(c_batches), _true.unwrap()))) - - scanner = Scanner.from_fragment(data, schema, use_threads=use_threads) + scanner = data._scanner(use_threads=use_threads) c_scanner = ( scanner).unwrap() with nogil: diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 68afb5321ba..0c65070d872 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -734,12 +734,11 @@ def write_dataset(data, base_dir, basename_template=None, format=None, if isinstance(data, Dataset): schema = schema or data.schema - elif isinstance(data, (pa.Table, pa.RecordBatch)): - schema = schema or data.schema - data = [data] elif isinstance(data, (list, tuple)): schema = schema or data[0].schema - elif isinstance(data, pa.ipc.RecordBatchReader) or _is_iterable(data): + data = InMemoryDataset(data, schema=schema) + elif isinstance(data, (pa.RecordBatch, pa.ipc.RecordBatchReader, + pa.Table)) or _is_iterable(data): data = InMemoryDataset(data, schema=schema) schema = schema or data.schema else: diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index ebd99922f9e..a7dd1520168 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -1656,7 +1656,7 @@ def test_construct_from_invalid_sources_raise(multisourcefs): ds.InMemoryDataset([]) expected = ( - "Item has schema b: int64 which which does not match expected schema " + "Item has schema\nb: int64\nwhich does not match expected schema\n" "a: int64" ) with pytest.raises(TypeError, match=expected): From c63e2466b0d6b1250a39ad18eaae2a4e653da530 Mon Sep 17 00:00:00 2001 From: David Li Date: Mon, 29 Mar 2021 16:42:21 -0400 Subject: [PATCH 9/9] Remove unused variable --- cpp/src/arrow/dataset/dataset_test.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/cpp/src/arrow/dataset/dataset_test.cc b/cpp/src/arrow/dataset/dataset_test.cc index b2b82561359..1db96b8b5c3 100644 --- a/cpp/src/arrow/dataset/dataset_test.cc +++ b/cpp/src/arrow/dataset/dataset_test.cc @@ -112,7 +112,6 @@ TEST_F(TestInMemoryDataset, GetFragments) { TEST_F(TestInMemoryDataset, InMemoryFragment) { constexpr int64_t kBatchSize = 1024; - constexpr int64_t kNumberBatches = 16; SetSchema({field("i32", int32()), field("f64", float64())}); auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_);