From a7e22becdf5c0c3047c62c93b1fb0f4281303f84 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Tue, 17 Mar 2020 16:18:21 -0400 Subject: [PATCH 1/2] ARROW-8074: [Dataset][Python] Allow construction of FileFragments from in memory buffers --- cpp/src/arrow/dataset/dataset.cc | 76 +++++++++---- cpp/src/arrow/dataset/dataset.h | 18 ++-- cpp/src/arrow/dataset/dataset_internal.h | 99 +++++++++++++++++ cpp/src/arrow/dataset/dataset_test.cc | 23 ++-- cpp/src/arrow/dataset/discovery_test.cc | 3 +- cpp/src/arrow/dataset/file_base.cc | 6 ++ cpp/src/arrow/dataset/file_base.h | 2 + cpp/src/arrow/dataset/scanner.cc | 58 +--------- cpp/src/arrow/dataset/scanner.h | 3 - cpp/src/arrow/dataset/scanner_internal.h | 69 ------------ cpp/src/arrow/dataset/scanner_test.cc | 4 +- python/pyarrow/_dataset.pyx | 107 ++++++++++++++++++- python/pyarrow/dataset.py | 1 + python/pyarrow/includes/libarrow_dataset.pxd | 27 ++++- python/pyarrow/lib.pxd | 2 + python/pyarrow/tests/test_dataset.py | 39 +++++-- 16 files changed, 353 insertions(+), 184 deletions(-) delete mode 100644 cpp/src/arrow/dataset/scanner_internal.h diff --git a/cpp/src/arrow/dataset/dataset.cc b/cpp/src/arrow/dataset/dataset.cc index 0e062e37c02..6d687a10a02 100644 --- a/cpp/src/arrow/dataset/dataset.cc +++ b/cpp/src/arrow/dataset/dataset.cc @@ -37,15 +37,13 @@ const std::shared_ptr& Fragment::schema() const { return scan_options_->schema(); } -InMemoryFragment::InMemoryFragment( - std::vector> record_batches, - std::shared_ptr scan_options) +InMemoryFragment::InMemoryFragment(RecordBatchVector record_batches, + std::shared_ptr scan_options) : Fragment(std::move(scan_options)), record_batches_(std::move(record_batches)) {} -InMemoryFragment::InMemoryFragment( - std::vector> record_batches, - std::shared_ptr scan_options, - std::shared_ptr partition_expression) +InMemoryFragment::InMemoryFragment(RecordBatchVector record_batches, + std::shared_ptr scan_options, + std::shared_ptr partition_expression) : Fragment(std::move(scan_options), std::move(partition_expression)), record_batches_(std::move(record_batches)) {} @@ -57,7 +55,7 @@ Result InMemoryFragment::Scan(std::shared_ptr con // RecordBatch -> ScanTask auto scan_options = scan_options_; auto fn = [=](std::shared_ptr batch) -> std::shared_ptr { - std::vector> batches{batch}; + RecordBatchVector batches{batch}; return ::arrow::internal::make_unique( std::move(batches), std::move(scan_options), std::move(context)); }; @@ -106,20 +104,6 @@ FragmentIterator Dataset::GetFragments(std::shared_ptr scan_options return GetFragmentsImpl(std::move(simplified_scan_options)); } -struct VectorRecordBatchGenerator : InMemoryDataset::RecordBatchGenerator { - explicit VectorRecordBatchGenerator(std::vector> batches) - : batches_(std::move(batches)) {} - - RecordBatchIterator Get() const final { return MakeVectorIterator(batches_); } - - std::vector> batches_; -}; - -InMemoryDataset::InMemoryDataset(std::shared_ptr schema, - std::vector> batches) - : Dataset(std::move(schema)), - get_batches_(new VectorRecordBatchGenerator(std::move(batches))) {} - struct TableRecordBatchGenerator : InMemoryDataset::RecordBatchGenerator { explicit TableRecordBatchGenerator(std::shared_ptr table) : table_(std::move(table)) {} @@ -130,6 +114,8 @@ struct TableRecordBatchGenerator : InMemoryDataset::RecordBatchGenerator { return MakeFunctionIterator([reader, table] { return reader->Next(); }); } + bool uniform_schema_guaranteed() const final { return true; } + std::shared_ptr
table_; }; @@ -137,6 +123,50 @@ InMemoryDataset::InMemoryDataset(std::shared_ptr
table) : Dataset(table->schema()), get_batches_(new TableRecordBatchGenerator(std::move(table))) {} +struct VectorRecordBatchGenerator : InMemoryDataset::RecordBatchGenerator { + explicit VectorRecordBatchGenerator(RecordBatchVector batches) + : batches_(std::move(batches)) {} + + RecordBatchIterator Get() const final { return MakeVectorIterator(batches_); } + + bool uniform_schema_guaranteed() const final { return true; } + + RecordBatchVector batches_; +}; + +Result> InMemoryDataset::Make( + RecordBatchVector batches) { + if (batches.empty()) { + return Status::Invalid( + "InMemoryDataset::Make requires at least one batch or an explicit schema"); + } + return Make(batches[0]->schema(), std::move(batches)); +} + +Result> InMemoryDataset::Make( + std::shared_ptr schema, RecordBatchVector batches) { + for (const auto& batch : batches) { + if (!batch->schema()->Equals(*schema)) { + return Status::TypeError("InMemoryDataset::Make requires uniform schemas"); + } + } + return std::make_shared( + std::move(schema), + internal::make_unique(std::move(batches))); +} + +Result> InMemoryDataset::Make( + std::shared_ptr fragment, std::shared_ptr context) { + ARROW_ASSIGN_OR_RAISE(auto scan_task_it, fragment->Scan(context)); + + TableAggregator aggregator; + RETURN_NOT_OK( + aggregator.AppendFrom(FilterAndProjectScanTask::Wrap(std::move(scan_task_it)), + fragment->scan_options(), context)); + + return Make(fragment->schema(), std::move(aggregator.batches)); +} + FragmentIterator InMemoryDataset::GetFragmentsImpl( std::shared_ptr scan_options) { auto schema = this->schema(); @@ -149,7 +179,7 @@ FragmentIterator InMemoryDataset::GetFragmentsImpl( " which did not match InMemorySource's: ", *schema); } - std::vector> batches; + RecordBatchVector batches; auto batch_size = scan_options->batch_size; auto n_batches = BitUtil::CeilDiv(batch->num_rows(), batch_size); diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index 318a356a06b..d171bc5bc70 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -83,10 +83,10 @@ class ARROW_DS_EXPORT Fragment { /// RecordBatch. class ARROW_DS_EXPORT InMemoryFragment : public Fragment { public: - InMemoryFragment(std::vector> record_batches, + InMemoryFragment(RecordBatchVector record_batches, std::shared_ptr scan_options); - InMemoryFragment(std::vector> record_batches, + InMemoryFragment(RecordBatchVector record_batches, std::shared_ptr scan_options, std::shared_ptr partition_expression); @@ -97,7 +97,7 @@ class ARROW_DS_EXPORT InMemoryFragment : public Fragment { std::string type_name() const override { return "in-memory"; } protected: - std::vector> record_batches_; + RecordBatchVector record_batches_; }; /// \brief A container of zero or more Fragments. A Dataset acts as a discovery mechanism @@ -152,17 +152,23 @@ class ARROW_DS_EXPORT InMemoryDataset : public Dataset { public: virtual ~RecordBatchGenerator() = default; virtual RecordBatchIterator Get() const = 0; + virtual bool uniform_schema_guaranteed() const { return false; } }; InMemoryDataset(std::shared_ptr schema, std::unique_ptr get_batches) : Dataset(std::move(schema)), get_batches_(std::move(get_batches)) {} + explicit InMemoryDataset(std::shared_ptr
table); + // Convenience constructor taking a fixed list of batches - InMemoryDataset(std::shared_ptr schema, - std::vector> batches); + static Result> Make(RecordBatchVector batches); + static Result> Make(std::shared_ptr schema, + RecordBatchVector batches); - explicit InMemoryDataset(std::shared_ptr
table); + // Convenience constructor which scans a fragment's batches + static Result> Make( + std::shared_ptr fragment, std::shared_ptr context); FragmentIterator GetFragmentsImpl(std::shared_ptr options) override; diff --git a/cpp/src/arrow/dataset/dataset_internal.h b/cpp/src/arrow/dataset/dataset_internal.h index 910695fc5e1..d8a2dbd83a3 100644 --- a/cpp/src/arrow/dataset/dataset_internal.h +++ b/cpp/src/arrow/dataset/dataset_internal.h @@ -18,16 +18,21 @@ #pragma once #include +#include #include #include #include #include "arrow/dataset/dataset.h" +#include "arrow/dataset/filter.h" +#include "arrow/dataset/scanner.h" #include "arrow/dataset/type_fwd.h" #include "arrow/record_batch.h" #include "arrow/scalar.h" #include "arrow/type.h" #include "arrow/util/iterator.h" +#include "arrow/util/task_group.h" +#include "arrow/util/thread_pool.h" namespace arrow { namespace dataset { @@ -64,5 +69,99 @@ inline std::shared_ptr SchemaFromColumnNames( return schema(std::move(columns)); } +inline std::shared_ptr MakeTaskGroup( + const std::shared_ptr& options, + const std::shared_ptr& context) { + return options->use_threads ? internal::TaskGroup::MakeThreaded(context->thread_pool) + : internal::TaskGroup::MakeSerial(); +} + +static inline RecordBatchIterator FilterRecordBatch(RecordBatchIterator it, + const ExpressionEvaluator& evaluator, + const Expression& filter, + MemoryPool* pool) { + return MakeMaybeMapIterator( + [&filter, &evaluator, pool](std::shared_ptr in) { + return evaluator.Evaluate(filter, *in, pool).Map([&](compute::Datum selection) { + return evaluator.Filter(selection, in); + }); + }, + std::move(it)); +} + +static inline RecordBatchIterator ProjectRecordBatch(RecordBatchIterator it, + RecordBatchProjector* projector, + MemoryPool* pool) { + return MakeMaybeMapIterator( + [=](std::shared_ptr in) { return projector->Project(*in, pool); }, + std::move(it)); +} + +class FilterAndProjectScanTask : public ScanTask { + public: + explicit FilterAndProjectScanTask(std::shared_ptr task) + : ScanTask(task->options(), task->context()), task_(std::move(task)) {} + + Result Execute() override { + ARROW_ASSIGN_OR_RAISE(auto it, task_->Execute()); + auto filter_it = FilterRecordBatch(std::move(it), *options_->evaluator, + *options_->filter, context_->pool); + return ProjectRecordBatch(std::move(filter_it), &task_->options()->projector, + context_->pool); + } + + static ScanTaskIterator Wrap(ScanTaskIterator scan_task_it) { + auto wrap_scan_task = + [](std::shared_ptr task) -> std::shared_ptr { + return std::make_shared(std::move(task)); + }; + return MakeMapIterator(wrap_scan_task, std::move(scan_task_it)); + } + + private: + std::shared_ptr task_; +}; + +struct TableAggregator { + void Append(std::shared_ptr batch) { + std::lock_guard lock(m); + batches.emplace_back(std::move(batch)); + } + + template + Status AppendFrom(ScanTaskIterator scan_task_it, Args&&... args) { + auto task_group = MakeTaskGroup(std::forward(args)...); + + for (auto maybe_scan_task : scan_task_it) { + ARROW_ASSIGN_OR_RAISE(auto scan_task, std::move(maybe_scan_task)); + AppendFrom(std::move(scan_task), task_group.get()); + } + + // Wait for all tasks to complete, or the first error. + return task_group->Finish(); + } + + void AppendFrom(std::shared_ptr scan_task, internal::TaskGroup* task_group) { + task_group->Append([this, scan_task] { + ARROW_ASSIGN_OR_RAISE(auto batch_it, scan_task->Execute()); + for (auto maybe_batch : batch_it) { + ARROW_ASSIGN_OR_RAISE(auto batch, std::move(maybe_batch)); + Append(std::move(batch)); + } + + return Status::OK(); + }); + } + + Result> Finish(const std::shared_ptr& schema) { + std::shared_ptr
out; + RETURN_NOT_OK(Table::FromRecordBatches(schema, batches, &out)); + return out; + } + + std::mutex m; + std::vector> batches; +}; + } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/dataset_test.cc b/cpp/src/arrow/dataset/dataset_test.cc index 8741d169eab..1de369e7e78 100644 --- a/cpp/src/arrow/dataset/dataset_test.cc +++ b/cpp/src/arrow/dataset/dataset_test.cc @@ -62,8 +62,10 @@ TEST_F(TestInMemoryDataset, GetFragments) { // It is safe to copy fragment multiple time since Scan() does not consume // the internal array. - auto dataset = std::make_shared( - schema_, RecordBatchVector{static_cast(kNumberBatches), batch}); + ASSERT_OK_AND_ASSIGN( + auto dataset, + InMemoryDataset::Make( + schema_, RecordBatchVector{static_cast(kNumberBatches), batch})); AssertDatasetEquals(reader.get(), dataset.get()); } @@ -84,8 +86,10 @@ TEST_F(TestUnionDataset, GetFragments) { // Creates a complete binary tree of depth kCompleteBinaryTreeDepth where the // leaves are InMemoryDataset containing kChildPerNode fragments. - auto l1_leaf_dataset = std::make_shared( - schema_, RecordBatchVector{static_cast(kChildPerNode), batch}); + ASSERT_OK_AND_ASSIGN( + auto l1_leaf_dataset, + InMemoryDataset::Make( + schema_, RecordBatchVector{static_cast(kChildPerNode), batch})); ASSERT_OK_AND_ASSIGN( auto l2_leaf_tree_dataset, @@ -117,10 +121,9 @@ TEST_F(TestDataset, TrivialScan) { std::vector> batches{static_cast(kNumberBatches), batch}; - DatasetVector children = { - std::make_shared(schema_, batches), - std::make_shared(schema_, batches), - }; + DatasetVector children(2, nullptr); + ASSERT_OK_AND_ASSIGN(children[0], InMemoryDataset::Make(schema_, batches)); + ASSERT_OK_AND_ASSIGN(children[1], InMemoryDataset::Make(schema_, batches)); const int64_t total_batches = children.size() * kNumberBatches; auto reader = ConstantArrayGenerator::Repeat(total_batches, batch); @@ -231,7 +234,7 @@ TEST(TestProjector, NonTrivial) { } class TestEndToEnd : public TestDataset { - void SetUp() { + void SetUp() override { bool nullable = false; SetSchema({ field("region", utf8(), nullable), @@ -407,7 +410,7 @@ class TestSchemaUnification : public TestDataset { public: using i32 = util::optional; - void SetUp() { + void SetUp() override { using PathAndContent = std::vector>; // The following test creates 2 sources with divergent but compatible diff --git a/cpp/src/arrow/dataset/discovery_test.cc b/cpp/src/arrow/dataset/discovery_test.cc index bbb851fe35f..d6d2786b39d 100644 --- a/cpp/src/arrow/dataset/discovery_test.cc +++ b/cpp/src/arrow/dataset/discovery_test.cc @@ -67,8 +67,7 @@ class MockDatasetFactory : public DatasetFactory { Result> Finish( const std::shared_ptr& schema) override { - return std::make_shared(schema, - std::vector>{}); + return InMemoryDataset::Make(schema, RecordBatchVector{}); } protected: diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 0eccfcdc324..3ced8ca678b 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -39,6 +39,12 @@ Result> FileSource::Open() const { return std::make_shared<::arrow::io::BufferReader>(buffer()); } +Result> FileFormat::MakeFragment(FileSource source) { + ARROW_ASSIGN_OR_RAISE(auto schema, Inspect(source)); + auto options = ScanOptions::Make(std::move(schema)); + return MakeFragment(std::move(source), std::move(options), scalar(true)); +} + Result> FileFormat::MakeFragment( FileSource source, std::shared_ptr options) { return MakeFragment(std::move(source), std::move(options), scalar(true)); diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index b3f3dabcd7c..422a3139704 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -138,6 +138,8 @@ class ARROW_DS_EXPORT FileFormat : public std::enable_shared_from_this> MakeFragment(FileSource source, std::shared_ptr options); + + Result> MakeFragment(FileSource source); }; /// \brief A Fragment that is stored in a file with a known format diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index b07344d3bd8..000fc41921b 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -19,16 +19,12 @@ #include #include -#include #include "arrow/dataset/dataset.h" #include "arrow/dataset/dataset_internal.h" #include "arrow/dataset/filter.h" -#include "arrow/dataset/scanner_internal.h" #include "arrow/table.h" #include "arrow/util/iterator.h" -#include "arrow/util/task_group.h" -#include "arrow/util/thread_pool.h" namespace arrow { namespace dataset { @@ -96,10 +92,7 @@ Result Scanner::Scan() { // Apply the filter and/or projection to incoming RecordBatches by // wrapping the ScanTask with a FilterAndProjectScanTask - auto wrap_scan_task = [](std::shared_ptr task) -> std::shared_ptr { - return std::make_shared(std::move(task)); - }; - return MakeMapIterator(wrap_scan_task, std::move(scan_task_it)); + return FilterAndProjectScanTask::Wrap(std::move(scan_task_it)); } Result ScanTaskIteratorFromRecordBatch( @@ -160,56 +153,11 @@ Result> ScannerBuilder::Finish() const { return std::make_shared(dataset_, std::move(options), context_); } -using arrow::internal::TaskGroup; - -std::shared_ptr Scanner::TaskGroup() const { - return options_->use_threads ? TaskGroup::MakeThreaded(context_->thread_pool) - : TaskGroup::MakeSerial(); -} - -struct TableAggregator { - void Append(std::shared_ptr batch) { - std::lock_guard lock(m); - batches.emplace_back(std::move(batch)); - } - - Result> Finish(const std::shared_ptr& schema) { - std::shared_ptr
out; - RETURN_NOT_OK(Table::FromRecordBatches(schema, batches, &out)); - return out; - } - - std::mutex m; - std::vector> batches; -}; - -struct ScanTaskPromise { - Status operator()() { - ARROW_ASSIGN_OR_RAISE(auto it, task->Execute()); - for (auto maybe_batch : it) { - ARROW_ASSIGN_OR_RAISE(auto batch, std::move(maybe_batch)); - aggregator.Append(std::move(batch)); - } - - return Status::OK(); - } - - TableAggregator& aggregator; - std::shared_ptr task; -}; - Result> Scanner::ToTable() { - auto task_group = TaskGroup(); + ARROW_ASSIGN_OR_RAISE(auto scan_task_it, Scan()); TableAggregator aggregator; - ARROW_ASSIGN_OR_RAISE(auto it, Scan()); - for (auto maybe_scan_task : it) { - ARROW_ASSIGN_OR_RAISE(auto scan_task, std::move(maybe_scan_task)); - task_group->Append(ScanTaskPromise{aggregator, std::move(scan_task)}); - } - - // Wait for all tasks to complete, or the first error. - RETURN_NOT_OK(task_group->Finish()); + RETURN_NOT_OK(aggregator.AppendFrom(std::move(scan_task_it), options_, context_)); return aggregator.Finish(options_->schema()); } diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index 54c37c1f2d3..f78482f048f 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -177,9 +177,6 @@ class ARROW_DS_EXPORT Scanner { const std::shared_ptr& context() const { return context_; } protected: - /// \brief Return a TaskGroup according to ScanContext thread rules. - std::shared_ptr TaskGroup() const; - std::shared_ptr dataset_; std::shared_ptr options_; std::shared_ptr context_; diff --git a/cpp/src/arrow/dataset/scanner_internal.h b/cpp/src/arrow/dataset/scanner_internal.h deleted file mode 100644 index 5cae2a79e08..00000000000 --- a/cpp/src/arrow/dataset/scanner_internal.h +++ /dev/null @@ -1,69 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#include - -#include "arrow/dataset/dataset_internal.h" -#include "arrow/dataset/filter.h" -#include "arrow/dataset/scanner.h" - -namespace arrow { -namespace dataset { - -static inline RecordBatchIterator FilterRecordBatch(RecordBatchIterator it, - const ExpressionEvaluator& evaluator, - const Expression& filter, - MemoryPool* pool) { - return MakeMaybeMapIterator( - [&filter, &evaluator, pool](std::shared_ptr in) { - return evaluator.Evaluate(filter, *in, pool).Map([&](compute::Datum selection) { - return evaluator.Filter(selection, in); - }); - }, - std::move(it)); -} - -static inline RecordBatchIterator ProjectRecordBatch(RecordBatchIterator it, - RecordBatchProjector* projector, - MemoryPool* pool) { - return MakeMaybeMapIterator( - [=](std::shared_ptr in) { return projector->Project(*in, pool); }, - std::move(it)); -} - -class FilterAndProjectScanTask : public ScanTask { - public: - explicit FilterAndProjectScanTask(std::shared_ptr task) - : ScanTask(task->options(), task->context()), task_(std::move(task)) {} - - Result Execute() override { - ARROW_ASSIGN_OR_RAISE(auto it, task_->Execute()); - auto filter_it = FilterRecordBatch(std::move(it), *options_->evaluator, - *options_->filter, context_->pool); - return ProjectRecordBatch(std::move(filter_it), &task_->options()->projector, - context_->pool); - } - - private: - std::shared_ptr task_; -}; - -} // namespace dataset -} // namespace arrow diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index 8e5c221da5e..4889ef58d74 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -38,8 +38,8 @@ class TestScanner : public DatasetFixtureMixin { std::vector> batches{static_cast(kNumberBatches), batch}; - DatasetVector children{static_cast(kNumberChildDatasets), - std::make_shared(batch->schema(), batches)}; + EXPECT_OK_AND_ASSIGN(auto child, InMemoryDataset::Make(batch->schema(), batches)); + DatasetVector children{static_cast(kNumberChildDatasets), child}; EXPECT_OK_AND_ASSIGN(auto dataset, UnionDataset::Make(batch->schema(), children)); diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 34791bb4111..c0921598658 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -26,7 +26,8 @@ import pyarrow as pa from pyarrow.lib cimport * from pyarrow.includes.libarrow_dataset cimport * from pyarrow.compat import frombytes, tobytes -from pyarrow._fs cimport FileSystem, FileInfo, FileSelector +from pyarrow._fs cimport FileSystem, FileInfo, FileSelector, LocalFileSystem +#from pyarrow.lib import as_c_buffer def _forbid_instantiation(klass, subclasses_instead=True): @@ -70,6 +71,8 @@ cdef class Dataset: self = UnionDataset.__new__(UnionDataset) elif typ == 'filesystem': self = FileSystemDataset.__new__(FileSystemDataset) + elif typ == 'in-memory': + self = InMemoryDataset.__new__(InMemoryDataset) else: raise TypeError(typ) @@ -259,6 +262,68 @@ cdef class UnionDataset(Dataset): self.union_dataset = sp.get() +cdef class InMemoryDataset(Dataset): + """A Dataset wrapping in-memory RecordBatches. + + RecordBatches' schemas must be uniform. + One of batches, fragment, or table must be provided. + + Parameters + ---------- + batches : list of RecordBatch + One or more explicit RecordBatches + fragment : Fragment + A fragment to be scanned for RecordBatches + table : Table + A table whose RecordBatches will be wrapped as an InMemoryDataset + memory_pool : MemoryPool + A MemoryPool from which memory for scanned RecordBatches will be + allocated. May only be specified if fragment is provided. + """ + + cdef: + CInMemoryDataset* inmem_dataset + + def __init__(self, batches=None, Fragment fragment=None, Table table=None, + MemoryPool memory_pool=None): + cdef: + shared_ptr[CInMemoryDataset] inmem_dataset + + RecordBatch batch + CRecordBatchVector c_batches + + shared_ptr[CScanContext] context + + shared_ptr[CTable] c_table + + if batches is not None: + for batch in batches: + c_batches.push_back(pyarrow_unwrap_batch(batch)) + + inmem_dataset = GetResultValue(CInMemoryDataset.MakeFromBatches( + move(c_batches))) + + elif fragment is not None: + context = make_shared[CScanContext]() + context.get().pool = maybe_unbox_memory_pool(memory_pool) + inmem_dataset = GetResultValue(CInMemoryDataset.MakeFromFragment( + fragment.wrapped, move(context))) + + elif table is not None: + inmem_dataset = make_shared[CInMemoryDataset]( + pyarrow_unwrap_table(table)) + + else: + raise ValueError("at least one of batches, fragment, " + "or table must be provided") + + self.init( inmem_dataset) + + cdef void init(self, const shared_ptr[CDataset]& sp): + Dataset.init(self, sp) + self.inmem_dataset = sp.get() + + cdef class FileSystemDataset(Dataset): """A Dataset created from a set of files on a particular filesystem. @@ -377,6 +442,9 @@ cdef class Fragment: self.wrapped = sp self.fragment = sp.get() + cdef inline shared_ptr[CFragment] unwrap(self): + return self.wrapped + @staticmethod cdef wrap(const shared_ptr[CFragment]& sp): # there's no discriminant in Fragment, so we can't downcast @@ -402,11 +470,46 @@ cdef class Fragment: cdef class FileFragment(Fragment): - """A Fragment representing a data file.""" + """A Fragment representing a data file. + + Parameters + ---------- + file_format : FileFormat + The format of the created fragment. + source : str or buffer + The path or buffer from which the file's data will be read. + filesystem : FileSystem + The filesystem which files are from. Ignored if source is a Buffer. + """ cdef: CFileFragment* file_fragment + def __init__(self, FileFormat file_format not None, + source, FileSystem filesystem = None): + cdef: + CResult[shared_ptr[CFragment]] c_fragment + shared_ptr[CBuffer] c_buffer + c_string c_path + CFileSystem* c_fs + CFileFormat* c_format + + c_format = file_format.unwrap().get() + + try: + c_buffer = as_c_buffer(source) + c_fragment = c_format.MakeFragment(CFileSource(move(c_buffer))) + + except ValueError: + if filesystem is None: + filesystem = LocalFileSystem() + + c_path = tobytes(source) + c_fs = filesystem.unwrap().get() + c_fragment = c_format.MakeFragment(CFileSource(move(c_path), c_fs)) + + self.init(GetResultValue(move(c_fragment))) + cdef void init(self, const shared_ptr[CFragment]& sp): Fragment.init(self, sp) self.file_fragment = sp.get() diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 78aca361914..8ff657d3971 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -38,6 +38,7 @@ Fragment, HivePartitioning, InExpression, + InMemoryDataset, IpcFileFormat, IsValidExpression, NotExpression, diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 9cac4be4b09..5f870b112ec 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -30,6 +30,10 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: CIterator[shared_ptr[CRecordBatch]]): pass + cdef cppclass CRecordBatchVector "arrow::RecordBatchVector"( + vector[shared_ptr[CRecordBatch]]): + pass + cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: @@ -196,16 +200,19 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: vector[shared_ptr[CDatasetFactory]] factories) cdef cppclass CFileSource "arrow::dataset::FileSource": + CFileSource(c_string path, CFileSystem* filesystem) + CFileSource(shared_ptr[CBuffer] buffer) const c_string& path() - cdef cppclass CFileFormat "arrow::dataset::FileFormat": - c_string type_name() - cdef cppclass CFileFragment "arrow::dataset::FileFragment"( CFragment): const CFileSource& source() shared_ptr[CFileFormat] format() + cdef cppclass CFileFormat "arrow::dataset::FileFormat": + c_string type_name() + CResult[shared_ptr[CFragment]] MakeFragment(CFileSource source) + cdef cppclass CFileSystemDataset \ "arrow::dataset::FileSystemDataset"(CDataset): @staticmethod @@ -216,10 +223,22 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: shared_ptr[CFileSystem] filesystem, vector[CFileInfo] infos, CExpressionVector partitions) - c_string type() vector[c_string] files() const shared_ptr[CFileFormat] format() + cdef cppclass CInMemoryDataset \ + "arrow::dataset::InMemoryDataset"(CDataset): + CInMemoryDataset(shared_ptr[CTable] table) + + @staticmethod + CResult[shared_ptr[CInMemoryDataset]] MakeFromBatches "Make"( + CRecordBatchVector batches) + + @staticmethod + CResult[shared_ptr[CInMemoryDataset]] MakeFromFragment "Make"( + shared_ptr[CFragment] fragment, + shared_ptr[CScanContext] scan_context) + cdef cppclass CParquetFileFormatReaderOptions \ "arrow::dataset::ParquetFileFormat::ReaderOptions": c_bool use_buffered_stream diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd index bac57c1bc43..13994133ad1 100644 --- a/python/pyarrow/lib.pxd +++ b/python/pyarrow/lib.pxd @@ -628,3 +628,5 @@ cdef public shared_ptr[CSparseCSCMatrix] pyarrow_unwrap_sparse_csc_matrix( object sparse_tensor) cdef public shared_ptr[CSparseCSFTensor] pyarrow_unwrap_sparse_csf_tensor( object sparse_tensor) + +cdef public shared_ptr[CBuffer] as_c_buffer(object o) except * diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 5477080d220..995a5e7ca09 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -77,13 +77,7 @@ def _table_from_pandas(df): return table.replace_schema_metadata() -@pytest.fixture -def mockfs(request): - request.config.pyarrow.requires('parquet') - import pyarrow.parquet as pq - - mockfs = fs._MockFileSystem() - +def _basic_table(): data = [ list(range(5)), list(map(float, range(5))), @@ -95,7 +89,17 @@ def mockfs(request): pa.field('str', pa.string()) ]) batch = pa.record_batch(data, schema=schema) - table = pa.Table.from_batches([batch]) + return pa.Table.from_batches([batch]) + + +@pytest.fixture +def mockfs(request): + request.config.pyarrow.requires('parquet') + import pyarrow.parquet as pq + + mockfs = fs._MockFileSystem() + + table = _basic_table() directories = [ 'subdir/1/xxx', @@ -111,6 +115,18 @@ def mockfs(request): return mockfs +@pytest.fixture +def parquetbuffer(request): + request.config.pyarrow.requires('parquet') + import pyarrow.parquet as pq + + table = _basic_table() + + imos = pa.BufferOutputStream() + pq.write_table(table, imos) + return imos.getvalue() + + @pytest.fixture(scope='module') def multisourcefs(request): request.config.pyarrow.requires('pandas') @@ -843,3 +859,10 @@ def test_ipc_format(tempdir): dataset = ds.dataset(path, format="ipc") result = dataset.to_table() assert result.equals(table) + + +def test_parquet_buffer(parquetbuffer): + fragment = ds.FileFragment(ds.ParquetFileFormat(), parquetbuffer) + dataset = ds.InMemoryDataset(fragment=fragment) + assert dataset.to_table().equals(_basic_table()) + From 453d74d438262e3e86d5a8a505bfc1fbf51f4b08 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Wed, 18 Mar 2020 10:40:21 -0400 Subject: [PATCH 2/2] lint fixes --- python/pyarrow/_dataset.pyx | 5 ++--- python/pyarrow/tests/test_dataset.py | 1 - 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index c0921598658..fd56d24b468 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -27,7 +27,6 @@ from pyarrow.lib cimport * from pyarrow.includes.libarrow_dataset cimport * from pyarrow.compat import frombytes, tobytes from pyarrow._fs cimport FileSystem, FileInfo, FileSelector, LocalFileSystem -#from pyarrow.lib import as_c_buffer def _forbid_instantiation(klass, subclasses_instead=True): @@ -285,7 +284,7 @@ cdef class InMemoryDataset(Dataset): CInMemoryDataset* inmem_dataset def __init__(self, batches=None, Fragment fragment=None, Table table=None, - MemoryPool memory_pool=None): + MemoryPool memory_pool=None): cdef: shared_ptr[CInMemoryDataset] inmem_dataset @@ -486,7 +485,7 @@ cdef class FileFragment(Fragment): CFileFragment* file_fragment def __init__(self, FileFormat file_format not None, - source, FileSystem filesystem = None): + source, FileSystem filesystem=None): cdef: CResult[shared_ptr[CFragment]] c_fragment shared_ptr[CBuffer] c_buffer diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 995a5e7ca09..e5045ea8bb8 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -865,4 +865,3 @@ def test_parquet_buffer(parquetbuffer): fragment = ds.FileFragment(ds.ParquetFileFormat(), parquetbuffer) dataset = ds.InMemoryDataset(fragment=fragment) assert dataset.to_table().equals(_basic_table()) -