Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions cpp/src/arrow/array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1569,6 +1569,20 @@ class RepeatedArrayFactory {
return FinishFixedWidth(value.data(), value.size());
}

Status Visit(const DictionaryType& type) {
std::shared_ptr<Array> dictionary, indices;

const auto& value = checked_cast<const DictionaryScalar&>(scalar_).value;
RETURN_NOT_OK(MakeArrayFromScalar(pool_, *value, 1, &dictionary));

ARROW_ASSIGN_OR_RAISE(auto zero, MakeScalar(type.index_type(), 0));
RETURN_NOT_OK(MakeArrayFromScalar(pool_, *zero, length_, &indices));

*out_ = std::make_shared<DictionaryArray>(scalar_.type, std::move(indices),
std::move(dictionary));
return Status::OK();
}

Status Visit(const DataType& type) {
return Status::NotImplemented("construction from scalar of type ", *scalar_.type);
}
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ class MemoryPool;
// Helper functions

struct DictionaryBuilderCase {
template <typename ValueType>
Status Visit(const ValueType&, typename ValueType::c_type* = nullptr) {
template <typename ValueType, typename Enable = typename ValueType::c_type>
Status Visit(const ValueType&) {
return CreateFor<ValueType>();
}

Expand Down
60 changes: 47 additions & 13 deletions cpp/src/arrow/dataset/dataset_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -403,13 +403,21 @@ TEST_F(TestEndToEnd, EndToEndSingleDataset) {
AssertTablesEqual(*expected, *table, false, true);
}

inline std::shared_ptr<Schema> SchemaFromNames(const std::vector<std::string> names) {
std::vector<std::shared_ptr<Field>> fields;
for (const auto& name : names) {
fields.push_back(field(name, int32()));
}

return schema(fields);
}

class TestSchemaUnification : public TestDataset {
public:
using i32 = util::optional<int32_t>;
using PathAndContent = std::vector<std::pair<std::string, std::string>>;

void SetUp() {
using PathAndContent = std::vector<std::pair<std::string, std::string>>;

void SetUp() override {
// The following test creates 2 sources with divergent but compatible
// schemas. Each source have a common partitioning where the
// fields are not materialized in the data fragments.
Expand Down Expand Up @@ -443,7 +451,7 @@ class TestSchemaUnification : public TestDataset {
auto get_source =
[this](std::string base,
std::vector<std::string> paths) -> Result<std::shared_ptr<Dataset>> {
auto resolver = [this](const FileSource& source) -> std::shared_ptr<Schema> {
auto resolver = [](const FileSource& source) -> std::shared_ptr<Schema> {
auto path = source.path();
// A different schema for each data fragment.
if (path == ds1_df1) {
Expand Down Expand Up @@ -488,15 +496,6 @@ class TestSchemaUnification : public TestDataset {
std::make_shared<DisparateSchemasUnionDataset>(schema_, DatasetVector{ds1, ds2});
}

std::shared_ptr<Schema> SchemaFromNames(const std::vector<std::string> names) {
std::vector<std::shared_ptr<Field>> fields;
for (const auto& name : names) {
fields.push_back(field(name, int32()));
}

return schema(fields);
}

template <typename TupleType>
void AssertScanEquals(std::shared_ptr<Scanner> scanner,
const std::vector<TupleType>& expected_rows) {
Expand Down Expand Up @@ -642,5 +641,40 @@ TEST_F(TestSchemaUnification, SelectMixedColumnsAndFilter) {
AssertBuilderEquals(scan_builder, rows);
}

TEST(TestDictPartitionColumn, SelectPartitionColumnFilterPhysicalColumn) {
auto partition_field = field("part", dictionary(int32(), utf8()));
auto path = "/dataset/part=one/data.json";

auto mock_fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
ARROW_EXPECT_OK(mock_fs->CreateFile(path, R"([ {"phy_1": 111, "phy_2": 211} ])",
/* recursive */ true));

auto physical_schema = SchemaFromNames({"phy_1", "phy_2"});
auto format = std::make_shared<JSONRecordBatchFileFormat>(
[=](const FileSource&) { return physical_schema; });

FileSystemFactoryOptions options;
options.partition_base_dir = "/dataset";
options.partitioning = std::make_shared<HivePartitioning>(schema({partition_field}));

ASSERT_OK_AND_ASSIGN(auto factory,
FileSystemDatasetFactory::Make(mock_fs, {path}, format, options));

ASSERT_OK_AND_ASSIGN(auto schema, factory->Inspect());

ASSERT_OK_AND_ASSIGN(auto dataset, factory->Finish(schema));

// Selects re-ordered virtual column with a filter on a physical column
ASSERT_OK_AND_ASSIGN(auto scan_builder, dataset->NewScan());
ASSERT_OK(scan_builder->Filter("phy_1"_ == 111));

ASSERT_OK(scan_builder->Project({"part"}));

ASSERT_OK_AND_ASSIGN(auto scanner, scan_builder->Finish());
ASSERT_OK_AND_ASSIGN(auto table, scanner->ToTable());
AssertArraysEqual(*table->column(0)->chunk(0),
*DictArrayFromJSON(partition_field->type(), "[0]", "[\"one\"]"));
}

} // namespace dataset
} // namespace arrow
29 changes: 23 additions & 6 deletions cpp/src/arrow/ipc/json_simple.cc
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ class StructConverter final : public ConcreteConverter<StructConverter> {
};

// ------------------------------------------------------------------------
// Converter for struct arrays
// Converter for union arrays

class UnionConverter final : public ConcreteConverter<UnionConverter> {
public:
Expand Down Expand Up @@ -701,9 +701,8 @@ class UnionConverter final : public ConcreteConverter<UnionConverter> {
return builder_->AppendNull();
}

// Append a JSON value that is either an array of N elements in order
// or an object mapping struct names to values (omitted struct members
// are mapped to null).
// Append a JSON value that must be a 2-long array, containing the type_id
// and value of the UnionArray's slot.
Status AppendValue(const rj::Value& json_obj) override {
if (json_obj.IsNull()) {
return AppendNull();
Expand Down Expand Up @@ -822,8 +821,8 @@ Status GetConverter(const std::shared_ptr<DataType>& type,
return Status::OK();
}

Status ArrayFromJSON(const std::shared_ptr<DataType>& type,
const util::string_view& json_string, std::shared_ptr<Array>* out) {
Status ArrayFromJSON(const std::shared_ptr<DataType>& type, util::string_view json_string,
std::shared_ptr<Array>* out) {
std::shared_ptr<Converter> converter;
RETURN_NOT_OK(GetConverter(type, &converter));

Expand All @@ -849,6 +848,24 @@ Status ArrayFromJSON(const std::shared_ptr<DataType>& type, const char* json_str
return ArrayFromJSON(type, util::string_view(json_string), out);
}

Status DictArrayFromJSON(const std::shared_ptr<DataType>& type,
util::string_view indices_json,
util::string_view dictionary_json, std::shared_ptr<Array>* out) {
if (type->id() != Type::DICTIONARY) {
return Status::TypeError("DictArrayFromJSON requires dictionary type, got ", *type);
}

const auto& dictionary_type = checked_cast<const DictionaryType&>(*type);

std::shared_ptr<Array> indices, dictionary;
RETURN_NOT_OK(ArrayFromJSON(dictionary_type.index_type(), indices_json, &indices));
RETURN_NOT_OK(
ArrayFromJSON(dictionary_type.value_type(), dictionary_json, &dictionary));

return DictionaryArray::FromArrays(type, std::move(indices), std::move(dictionary),
out);
}

} // namespace json
} // namespace internal
} // namespace ipc
Expand Down
11 changes: 6 additions & 5 deletions cpp/src/arrow/ipc/json_simple.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

// Implement a simple JSON representation format for arrays

#ifndef ARROW_IPC_JSON_SIMPLE_H
#define ARROW_IPC_JSON_SIMPLE_H
#pragma once

#include <memory>
#include <string>
Expand All @@ -41,16 +40,18 @@ Status ArrayFromJSON(const std::shared_ptr<DataType>&, const std::string& json,
std::shared_ptr<Array>* out);

ARROW_EXPORT
Status ArrayFromJSON(const std::shared_ptr<DataType>&, const util::string_view& json,
Status ArrayFromJSON(const std::shared_ptr<DataType>&, util::string_view json,
std::shared_ptr<Array>* out);

ARROW_EXPORT
Status ArrayFromJSON(const std::shared_ptr<DataType>&, const char* json,
std::shared_ptr<Array>* out);

ARROW_EXPORT
Status DictArrayFromJSON(const std::shared_ptr<DataType>&, util::string_view indices_json,
util::string_view dictionary_json, std::shared_ptr<Array>* out);

} // namespace json
} // namespace internal
} // namespace ipc
} // namespace arrow

#endif // ARROW_IPC_JSON_SIMPLE_H
40 changes: 34 additions & 6 deletions cpp/src/arrow/ipc/json_simple_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1179,10 +1179,14 @@ TEST(TestDenseUnion, Errors) {
std::shared_ptr<DataType> type = union_({field_a, field_b}, {4, 8}, UnionMode::DENSE);
std::shared_ptr<Array> array;

ASSERT_RAISES(Invalid, ArrayFromJSON(type, "[\"\"]", &array));
ASSERT_RAISES(Invalid, ArrayFromJSON(type, "[[0, 8]]", &array));
ASSERT_RAISES(Invalid, ArrayFromJSON(type, "[\"not a valid type_id\"]", &array));
ASSERT_RAISES(Invalid,
ArrayFromJSON(type, "[[0, 99]]", &array)); // 0 is not one of {4, 8}
ASSERT_RAISES(Invalid,
ArrayFromJSON(type, "[[4, \"\"]]", &array)); // "" is not a valid int8()

ASSERT_RAISES(Invalid, ArrayFromJSON(type, "[\"not a pair\"]", &array));
ASSERT_RAISES(Invalid, ArrayFromJSON(type, "[[0]]", &array));
ASSERT_RAISES(Invalid, ArrayFromJSON(type, "[[4, \"\"]]", &array));
ASSERT_RAISES(Invalid, ArrayFromJSON(type, "[[8, true, 1]]", &array));
}

Expand All @@ -1192,13 +1196,37 @@ TEST(TestSparseUnion, Errors) {
std::shared_ptr<DataType> type = union_({field_a, field_b}, {4, 8}, UnionMode::SPARSE);
std::shared_ptr<Array> array;

ASSERT_RAISES(Invalid, ArrayFromJSON(type, "[\"\"]", &array));
ASSERT_RAISES(Invalid, ArrayFromJSON(type, "[[0, 8]]", &array));
ASSERT_RAISES(Invalid, ArrayFromJSON(type, "[[0]]", &array));
ASSERT_RAISES(Invalid, ArrayFromJSON(type, "[\"not a valid type_id\"]", &array));
ASSERT_RAISES(Invalid, ArrayFromJSON(type, "[[0, 99]]", &array));
ASSERT_RAISES(Invalid, ArrayFromJSON(type, "[[4, \"\"]]", &array));

ASSERT_RAISES(Invalid, ArrayFromJSON(type, "[\"not a pair\"]", &array));
ASSERT_RAISES(Invalid, ArrayFromJSON(type, "[[0]]", &array));
ASSERT_RAISES(Invalid, ArrayFromJSON(type, "[[8, true, 1]]", &array));
}

TEST(TestDictionary, Basics) {
auto type = dictionary(int32(), utf8());
auto array =
DictArrayFromJSON(type, "[null, 2, 1, 0]", R"(["whiskey", "tango", "foxtrot"])");

auto expected_indices = ArrayFromJSON(int32(), "[null, 2, 1, 0]");
auto expected_dictionary = ArrayFromJSON(utf8(), R"(["whiskey", "tango", "foxtrot"])");

ASSERT_ARRAYS_EQUAL(DictionaryArray(type, expected_indices, expected_dictionary),
*array);
}

TEST(TestDictionary, Errors) {
auto type = dictionary(int32(), utf8());
std::shared_ptr<Array> array;

ASSERT_RAISES(Invalid,
DictArrayFromJSON(type, "[\"not a valid index\"]", "[\"\"]", &array));
ASSERT_RAISES(Invalid, DictArrayFromJSON(type, "[0, 1]", "[1]",
&array)); // dict value isn't string
}

} // namespace json
} // namespace internal
} // namespace ipc
Expand Down
79 changes: 47 additions & 32 deletions cpp/src/arrow/scalar.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,25 +127,31 @@ size_t Scalar::Hash::hash(const Scalar& scalar) { return ScalarHashImpl(scalar).
StringScalar::StringScalar(std::string s)
: StringScalar(Buffer::FromString(std::move(s))) {}

FixedSizeBinaryScalar::FixedSizeBinaryScalar(const std::shared_ptr<Buffer>& value,
const std::shared_ptr<DataType>& type)
: BinaryScalar(value, type) {
ARROW_CHECK_EQ(checked_cast<const FixedSizeBinaryType&>(*type).byte_width(),
value->size());
FixedSizeBinaryScalar::FixedSizeBinaryScalar(std::shared_ptr<Buffer> value,
std::shared_ptr<DataType> type)
: BinaryScalar(std::move(value), std::move(type)) {
ARROW_CHECK_EQ(checked_cast<const FixedSizeBinaryType&>(*this->type).byte_width(),
this->value->size());
}

BaseListScalar::BaseListScalar(const std::shared_ptr<Array>& value,
const std::shared_ptr<DataType>& type)
: Scalar{type, true}, value(value) {}
BaseListScalar::BaseListScalar(std::shared_ptr<Array> value,
std::shared_ptr<DataType> type)
: Scalar{std::move(type), true}, value(std::move(value)) {}

BaseListScalar::BaseListScalar(const std::shared_ptr<Array>& value)
: BaseListScalar(value, value->type()) {}
BaseListScalar::BaseListScalar(std::shared_ptr<Array> value)
: Scalar(value->type(), true), value(std::move(value)) {}

FixedSizeListScalar::FixedSizeListScalar(const std::shared_ptr<Array>& value,
const std::shared_ptr<DataType>& type)
: BaseListScalar(value, type) {
ARROW_CHECK_EQ(value->length(),
checked_cast<const FixedSizeListType*>(type.get())->list_size());
FixedSizeListScalar::FixedSizeListScalar(std::shared_ptr<Array> value,
std::shared_ptr<DataType> type)
: BaseListScalar(std::move(value), std::move(type)) {
ARROW_CHECK_EQ(this->value->length(),
checked_cast<const FixedSizeListType&>(*this->type).list_size());
}

DictionaryScalar::DictionaryScalar(std::shared_ptr<DataType> type)
: Scalar(std::move(type)),
value(
MakeNullScalar(checked_cast<const DictionaryType&>(*this->type).value_type())) {
}

template <typename T>
Expand Down Expand Up @@ -173,15 +179,18 @@ struct MakeNullImpl {
return Status::OK();
}

const std::shared_ptr<DataType>& type_;
std::shared_ptr<Scalar> Finish() && {
// Should not fail.
DCHECK_OK(VisitTypeInline(*type_, this));
return std::move(out_);
}

std::shared_ptr<DataType> type_;
std::shared_ptr<Scalar> out_;
};

std::shared_ptr<Scalar> MakeNullScalar(const std::shared_ptr<DataType>& type) {
MakeNullImpl impl = {type, nullptr};
// Should not fail.
DCHECK_OK(VisitTypeInline(*type, &impl));
return std::move(impl.out_);
std::shared_ptr<Scalar> MakeNullScalar(std::shared_ptr<DataType> type) {
return MakeNullImpl{std::move(type), nullptr}.Finish();
}

std::string Scalar::ToString() const {
Expand All @@ -207,34 +216,40 @@ struct ScalarParseImpl {

Status Visit(const LargeBinaryType&) { return FinishWithBuffer(); }

Status Visit(const FixedSizeBinaryType& t) { return FinishWithBuffer(); }
Status Visit(const FixedSizeBinaryType&) { return FinishWithBuffer(); }

Status Visit(const DictionaryType& t) {
ARROW_ASSIGN_OR_RAISE(auto value, Scalar::Parse(t.value_type(), s_));
return Finish(std::move(value));
}

Status Visit(const DataType& t) {
return Status::NotImplemented("parsing scalars of type ", t);
}

template <typename Arg>
Status Finish(Arg&& arg) {
return MakeScalar(type_, std::forward<Arg>(arg)).Value(out_);
return MakeScalar(std::move(type_), std::forward<Arg>(arg)).Value(&out_);
}

Status FinishWithBuffer() { return Finish(Buffer::FromString(s_.to_string())); }

ScalarParseImpl(const std::shared_ptr<DataType>& type, util::string_view s,
std::shared_ptr<Scalar>* out)
: type_(type), s_(s), out_(out) {}
Result<std::shared_ptr<Scalar>> Finish() && {
RETURN_NOT_OK(VisitTypeInline(*type_, this));
return std::move(out_);
}

ScalarParseImpl(std::shared_ptr<DataType> type, util::string_view s)
: type_(std::move(type)), s_(s) {}

const std::shared_ptr<DataType>& type_;
std::shared_ptr<DataType> type_;
util::string_view s_;
std::shared_ptr<Scalar>* out_;
std::shared_ptr<Scalar> out_;
};

Result<std::shared_ptr<Scalar>> Scalar::Parse(const std::shared_ptr<DataType>& type,
util::string_view s) {
std::shared_ptr<Scalar> out;
ScalarParseImpl impl = {type, s, &out};
RETURN_NOT_OK(VisitTypeInline(*type, &impl));
return out;
return ScalarParseImpl{type, s}.Finish();
}

namespace internal {
Expand Down
Loading