Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/scripts/r_windows_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# specific language governing permissions and limitations
# under the License.

set -x
set -ex

: ${ARROW_HOME:=$(pwd)}
# Make sure it is absolute and exported
Expand Down
8 changes: 7 additions & 1 deletion cpp/cmake_modules/DefineOptions.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,15 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
define_option(ARROW_BUILD_BENCHMARKS_REFERENCE
"Build the Arrow micro reference benchmarks" OFF)

if(ARROW_BUILD_SHARED)
set(ARROW_TEST_LINKAGE_DEFAULT "shared")
else()
set(ARROW_TEST_LINKAGE_DEFAULT "static")
endif()

define_option_string(ARROW_TEST_LINKAGE
"Linkage of Arrow libraries with unit tests executables."
"shared"
"${ARROW_TEST_LINKAGE_DEFAULT}"
"shared"
"static")

Expand Down
17 changes: 17 additions & 0 deletions cpp/src/arrow/dataset/dataset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ InMemoryDataset::InMemoryDataset(std::shared_ptr<Table> table)
: Dataset(table->schema()),
get_batches_(new TableRecordBatchGenerator(std::move(table))) {}

Result<std::shared_ptr<Dataset>> InMemoryDataset::ReplaceSchema(
std::shared_ptr<Schema> schema) const {
RETURN_NOT_OK(CheckProjectable(*schema_, *schema));
return std::make_shared<InMemoryDataset>(std::move(schema), get_batches_);
}

FragmentIterator InMemoryDataset::GetFragmentsImpl(
std::shared_ptr<ScanOptions> scan_options) {
auto schema = this->schema();
Expand Down Expand Up @@ -175,6 +181,17 @@ Result<std::shared_ptr<UnionDataset>> UnionDataset::Make(std::shared_ptr<Schema>
new UnionDataset(std::move(schema), std::move(children)));
}

Result<std::shared_ptr<Dataset>> UnionDataset::ReplaceSchema(
std::shared_ptr<Schema> schema) const {
auto children = children_;
for (auto& child : children) {
ARROW_ASSIGN_OR_RAISE(child, child->ReplaceSchema(schema));
}

return std::shared_ptr<Dataset>(
new UnionDataset(std::move(schema), std::move(children)));
}

FragmentIterator UnionDataset::GetFragmentsImpl(std::shared_ptr<ScanOptions> options) {
return GetFragmentsFromDatasets(children_, options);
}
Expand Down
27 changes: 20 additions & 7 deletions cpp/src/arrow/dataset/dataset.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,13 @@ class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this<Dataset> {
/// \brief The name identifying the kind of Dataset
virtual std::string type_name() const = 0;

/// \brief Return a copy of this Dataset with a different schema.
///
/// The copy will view the same Fragments. If the new schema is not compatible with the
/// original dataset's schema then an error will be raised.
virtual Result<std::shared_ptr<Dataset>> ReplaceSchema(
std::shared_ptr<Schema> schema) const = 0;

virtual ~Dataset() = default;

protected:
Expand Down Expand Up @@ -155,20 +162,23 @@ class ARROW_DS_EXPORT InMemoryDataset : public Dataset {
};

InMemoryDataset(std::shared_ptr<Schema> schema,
std::unique_ptr<RecordBatchGenerator> get_batches)
std::shared_ptr<RecordBatchGenerator> get_batches)
: Dataset(std::move(schema)), get_batches_(std::move(get_batches)) {}

// Convenience constructor taking a fixed list of batches
InMemoryDataset(std::shared_ptr<Schema> schema, RecordBatchVector batches);

explicit InMemoryDataset(std::shared_ptr<Table> table);

FragmentIterator GetFragmentsImpl(std::shared_ptr<ScanOptions> options) override;

std::string type_name() const override { return "in-memory"; }

private:
std::unique_ptr<RecordBatchGenerator> get_batches_;
Result<std::shared_ptr<Dataset>> ReplaceSchema(
std::shared_ptr<Schema> schema) const override;

protected:
FragmentIterator GetFragmentsImpl(std::shared_ptr<ScanOptions> options) override;

std::shared_ptr<RecordBatchGenerator> get_batches_;
};

/// \brief A Dataset wrapping child Datasets.
Expand All @@ -182,13 +192,16 @@ class ARROW_DS_EXPORT UnionDataset : public Dataset {
static Result<std::shared_ptr<UnionDataset>> Make(std::shared_ptr<Schema> schema,
DatasetVector children);

FragmentIterator GetFragmentsImpl(std::shared_ptr<ScanOptions> options) override;

const DatasetVector& children() const { return children_; }

std::string type_name() const override { return "union"; }

Result<std::shared_ptr<Dataset>> ReplaceSchema(
std::shared_ptr<Schema> schema) const override;

protected:
FragmentIterator GetFragmentsImpl(std::shared_ptr<ScanOptions> options) override;

explicit UnionDataset(std::shared_ptr<Schema> schema, DatasetVector children)
: Dataset(std::move(schema)), children_(std::move(children)) {}

Expand Down
140 changes: 128 additions & 12 deletions cpp/src/arrow/dataset/dataset_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,35 @@ TEST_F(TestInMemoryFragment, Scan) {

class TestInMemoryDataset : public DatasetFixtureMixin {};

TEST_F(TestInMemoryDataset, ReplaceSchema) {
constexpr int64_t kBatchSize = 1;
constexpr int64_t kNumberBatches = 1;

SetSchema({field("i32", int32()), field("f64", float64())});
auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_);
auto reader = ConstantArrayGenerator::Repeat(kNumberBatches, batch);

auto dataset = std::make_shared<InMemoryDataset>(
schema_, RecordBatchVector{static_cast<size_t>(kNumberBatches), batch});

// drop field
ASSERT_OK(dataset->ReplaceSchema(schema({field("i32", int32())})).status());
// add field (will be materialized as null during projection)
ASSERT_OK(dataset->ReplaceSchema(schema({field("str", utf8())})).status());
// incompatible type
ASSERT_RAISES(TypeError,
dataset->ReplaceSchema(schema({field("i32", utf8())})).status());
// incompatible nullability
ASSERT_RAISES(
TypeError,
dataset->ReplaceSchema(schema({field("f64", float64(), /*nullable=*/false)}))
.status());
// add non-nullable field
ASSERT_RAISES(TypeError,
dataset->ReplaceSchema(schema({field("str", utf8(), /*nullable=*/false)}))
.status());
}

TEST_F(TestInMemoryDataset, GetFragments) {
constexpr int64_t kBatchSize = 1024;
constexpr int64_t kNumberBatches = 16;
Expand All @@ -60,8 +89,6 @@ TEST_F(TestInMemoryDataset, GetFragments) {
auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_);
auto reader = ConstantArrayGenerator::Repeat(kNumberBatches, batch);

// It is safe to copy fragment multiple time since Scan() does not consume
// the internal array.
auto dataset = std::make_shared<InMemoryDataset>(
schema_, RecordBatchVector{static_cast<size_t>(kNumberBatches), batch});

Expand All @@ -70,6 +97,45 @@ TEST_F(TestInMemoryDataset, GetFragments) {

class TestUnionDataset : public DatasetFixtureMixin {};

TEST_F(TestUnionDataset, ReplaceSchema) {
constexpr int64_t kBatchSize = 1;
constexpr int64_t kNumberBatches = 1;

SetSchema({field("i32", int32()), field("f64", float64())});
auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_);

std::vector<std::shared_ptr<RecordBatch>> batches{static_cast<size_t>(kNumberBatches),
batch};

DatasetVector children = {
std::make_shared<InMemoryDataset>(schema_, batches),
std::make_shared<InMemoryDataset>(schema_, batches),
};

const int64_t total_batches = children.size() * kNumberBatches;
auto reader = ConstantArrayGenerator::Repeat(total_batches, batch);

ASSERT_OK_AND_ASSIGN(auto dataset, UnionDataset::Make(schema_, children));
AssertDatasetEquals(reader.get(), dataset.get());

// drop field
ASSERT_OK(dataset->ReplaceSchema(schema({field("i32", int32())})).status());
// add nullable field (will be materialized as null during projection)
ASSERT_OK(dataset->ReplaceSchema(schema({field("str", utf8())})).status());
// incompatible type
ASSERT_RAISES(TypeError,
dataset->ReplaceSchema(schema({field("i32", utf8())})).status());
// incompatible nullability
ASSERT_RAISES(
TypeError,
dataset->ReplaceSchema(schema({field("f64", float64(), /*nullable=*/false)}))
.status());
// add non-nullable field
ASSERT_RAISES(TypeError,
dataset->ReplaceSchema(schema({field("str", utf8(), /*nullable=*/false)}))
.status());
}

TEST_F(TestUnionDataset, GetFragments) {
constexpr int64_t kBatchSize = 1024;
constexpr int64_t kChildPerNode = 2;
Expand Down Expand Up @@ -105,9 +171,7 @@ TEST_F(TestUnionDataset, GetFragments) {
AssertDatasetEquals(reader.get(), root_dataset.get());
}

class TestDataset : public DatasetFixtureMixin {};

TEST_F(TestDataset, TrivialScan) {
TEST_F(TestUnionDataset, TrivialScan) {
constexpr int64_t kNumberBatches = 16;
constexpr int64_t kBatchSize = 1024;

Expand All @@ -129,6 +193,57 @@ TEST_F(TestDataset, TrivialScan) {
AssertDatasetEquals(reader.get(), dataset.get());
}

TEST(TestProjector, CheckProjectable) {
struct Assert {
explicit Assert(FieldVector from) : from_(from) {}
Schema from_;

void ProjectableTo(FieldVector to) {
ARROW_EXPECT_OK(CheckProjectable(from_, Schema(to)));
}

void NotProjectableTo(FieldVector to, std::string substr = "") {
EXPECT_RAISES_WITH_MESSAGE_THAT(TypeError, testing::HasSubstr(substr),
CheckProjectable(from_, Schema(to)));
}
};

auto i8 = field("i8", int8());
auto u16 = field("u16", uint16());
auto str = field("str", utf8());
auto i8_req = field("i8", int8(), false);
auto u16_req = field("u16", uint16(), false);
auto str_req = field("str", utf8(), false);

// trivial
Assert({}).ProjectableTo({});
Assert({i8}).ProjectableTo({i8});
Assert({i8, u16_req}).ProjectableTo({i8, u16_req});

// reorder
Assert({i8, u16}).ProjectableTo({u16, i8});
Assert({i8, str, u16}).ProjectableTo({u16, i8, str});

// drop field(s)
Assert({i8}).ProjectableTo({});

// add field(s)
Assert({}).ProjectableTo({i8});
Assert({}).ProjectableTo({i8, u16});
Assert({}).NotProjectableTo({u16_req},
"is not nullable and does not exist in origin schema");
Assert({i8}).NotProjectableTo({u16_req, i8});

// change nullability
Assert({i8}).NotProjectableTo({i8_req},
"not nullable but is not required in origin schema");
Assert({i8_req}).ProjectableTo({i8});

// change field type
Assert({i8}).NotProjectableTo({field("i8", utf8())},
"fields had matching names but differing types");
}

TEST(TestProjector, MismatchedType) {
constexpr int64_t kBatchSize = 1024;

Expand Down Expand Up @@ -229,8 +344,8 @@ TEST(TestProjector, NonTrivial) {
AssertBatchesEqual(*expected_batch, *reconciled_batch);
}

class TestEndToEnd : public TestDataset {
void SetUp() {
class TestEndToEnd : public TestUnionDataset {
void SetUp() override {
bool nullable = false;
SetSchema({
field("region", utf8(), nullable),
Expand Down Expand Up @@ -377,9 +492,9 @@ TEST_F(TestEndToEnd, EndToEndSingleDataset) {
ASSERT_OK(scanner_builder->Project(columns));

// An optional filter expression may also be specified. The filter expression
// is evaluated against input rows. Only rows for which the filter evaluates to true are
// yielded. Predicate pushdown optimizations are applied using partition information if
// available.
// is evaluated against input rows. Only rows for which the filter evaluates to true
// are yielded. Predicate pushdown optimizations are applied using partition
// information if available.
//
// This API decouples predicate pushdown from the Dataset implementation
// and partition discovery.
Expand Down Expand Up @@ -413,7 +528,7 @@ inline std::shared_ptr<Schema> SchemaFromNames(const std::vector<std::string> na
return schema(fields);
}

class TestSchemaUnification : public TestDataset {
class TestSchemaUnification : public TestUnionDataset {
public:
using i32 = util::optional<int32_t>;
using PathAndContent = std::vector<std::pair<std::string, std::string>>;
Expand Down Expand Up @@ -487,7 +602,8 @@ class TestSchemaUnification : public TestDataset {
ASSERT_OK_AND_ASSIGN(auto ds1, get_source("/dataset/alpha", {ds1_df1, ds1_df2}));
ASSERT_OK_AND_ASSIGN(auto ds2, get_source("/dataset/beta", {ds2_df1, ds2_df2}));

// FIXME(bkietz) this is a hack: allow differing schemas for the purposes of this test
// FIXME(bkietz) this is a hack: allow differing schemas for the purposes of this
// test
class DisparateSchemasUnionDataset : public UnionDataset {
public:
DisparateSchemasUnionDataset(std::shared_ptr<Schema> schema, DatasetVector children)
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,14 @@ Result<std::shared_ptr<FileSystemDataset>> FileSystemDataset::Make(
std::move(filesystem), std::move(forest), std::move(partitions)));
}

Result<std::shared_ptr<Dataset>> FileSystemDataset::ReplaceSchema(
std::shared_ptr<Schema> schema) const {
RETURN_NOT_OK(CheckProjectable(*schema_, *schema));
return std::shared_ptr<Dataset>(
new FileSystemDataset(std::move(schema), partition_expression_, format_,
filesystem_, forest_, partitions_));
}

std::vector<std::string> FileSystemDataset::files() const {
std::vector<std::string> files;

Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/dataset/file_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,9 @@ class ARROW_DS_EXPORT FileSystemDataset : public Dataset {

std::string type_name() const override { return "filesystem"; }

Result<std::shared_ptr<Dataset>> ReplaceSchema(
std::shared_ptr<Schema> schema) const override;

const std::shared_ptr<FileFormat>& format() const { return format_; }

std::vector<std::string> files() const;
Expand Down
24 changes: 24 additions & 0 deletions cpp/src/arrow/dataset/file_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,30 @@ TEST_F(TestFileSystemDataset, Basic) {
AssertFilesAre(dataset_, {"A/a", "A/B/b"});
}

TEST_F(TestFileSystemDataset, ReplaceSchema) {
auto schm = schema({field("i32", int32()), field("f64", float64())});
auto format = std::make_shared<DummyFileFormat>(schm);
ASSERT_OK_AND_ASSIGN(auto dataset,
FileSystemDataset::Make(schm, scalar(true), format, fs_, {}));

// drop field
ASSERT_OK(dataset->ReplaceSchema(schema({field("i32", int32())})).status());
// add nullable field (will be materialized as null during projection)
ASSERT_OK(dataset->ReplaceSchema(schema({field("str", utf8())})).status());
// incompatible type
ASSERT_RAISES(TypeError,
dataset->ReplaceSchema(schema({field("i32", utf8())})).status());
// incompatible nullability
ASSERT_RAISES(
TypeError,
dataset->ReplaceSchema(schema({field("f64", float64(), /*nullable=*/false)}))
.status());
// add non-nullable field
ASSERT_RAISES(TypeError,
dataset->ReplaceSchema(schema({field("str", utf8(), /*nullable=*/false)}))
.status());
}

TEST_F(TestFileSystemDataset, RootPartitionPruning) {
auto root_partition = ("a"_ == 5).Copy();
MakeDataset({fs::File("a"), fs::File("b")}, root_partition);
Expand Down
Loading