Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions cpp/src/arrow/array/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -317,13 +317,10 @@ class RepeatedArrayFactory {

Status Visit(const DictionaryType& type) {
const auto& value = checked_cast<const DictionaryScalar&>(scalar_).value;
ARROW_ASSIGN_OR_RAISE(auto dictionary, MakeArrayFromScalar(*value, 1, pool_));

ARROW_ASSIGN_OR_RAISE(auto zero, MakeScalar(type.index_type(), 0));
ARROW_ASSIGN_OR_RAISE(auto indices, MakeArrayFromScalar(*zero, length_, pool_));

ARROW_ASSIGN_OR_RAISE(auto indices,
MakeArrayFromScalar(*value.index, length_, pool_));
out_ = std::make_shared<DictionaryArray>(scalar_.type, std::move(indices),
std::move(dictionary));
value.dictionary);
return Status::OK();
}

Expand Down
5 changes: 4 additions & 1 deletion cpp/src/arrow/compare.cc
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,10 @@ class ScalarEqualsVisitor {
Status Visit(const UnionScalar& left) { return Status::NotImplemented("union"); }

Status Visit(const DictionaryScalar& left) {
return Status::NotImplemented("dictionary");
const auto& right = checked_cast<const DictionaryScalar&>(right_);
result_ = left.value.index->Equals(right.value.index) &&
left.value.dictionary->Equals(right.value.dictionary);
return Status::OK();
}

Status Visit(const ExtensionScalar& left) {
Expand Down
31 changes: 24 additions & 7 deletions cpp/src/arrow/compute/kernels/scalar_set_lookup.cc
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,25 @@ struct MatchVisitor {
}
};

void ExecArrayOrScalar(KernelContext* ctx, const Datum& in, Datum* out,
std::function<Status(const ArrayData&)> array_impl) {
if (in.is_array()) {
KERNEL_RETURN_IF_ERROR(ctx, array_impl(*in.array()));
return;
}

std::shared_ptr<Array> in_array;
std::shared_ptr<Scalar> out_scalar;
KERNEL_RETURN_IF_ERROR(ctx, MakeArrayFromScalar(*in.scalar(), 1).Value(&in_array));
KERNEL_RETURN_IF_ERROR(ctx, array_impl(*in_array->data()));
KERNEL_RETURN_IF_ERROR(ctx, out->make_array()->GetScalar(0).Value(&out_scalar));
*out = std::move(out_scalar);
}

void ExecMatch(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
MatchVisitor dispatch(ctx, *batch[0].array(), out);
ctx->SetStatus(dispatch.Execute());
ExecArrayOrScalar(ctx, batch[0], out, [&](const ArrayData& in) {
return MatchVisitor(ctx, in, out).Execute();
});
}

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -284,8 +300,9 @@ struct IsInVisitor {
};

void ExecIsIn(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
IsInVisitor dispatch(ctx, *batch[0].array(), out);
ctx->SetStatus(dispatch.Execute());
ExecArrayOrScalar(ctx, batch[0], out, [&](const ArrayData& in) {
return IsInVisitor(ctx, in, out).Execute();
});
}

// Unary set lookup kernels available for the following input types
Expand All @@ -302,7 +319,7 @@ void AddBasicSetLookupKernels(ScalarKernel kernel,
ScalarFunction* func) {
auto AddKernels = [&](const std::vector<std::shared_ptr<DataType>>& types) {
for (const std::shared_ptr<DataType>& ty : types) {
kernel.signature = KernelSignature::Make({InputType::Array(ty)}, out_ty);
kernel.signature = KernelSignature::Make({ty}, out_ty);
DCHECK_OK(func->AddKernel(kernel));
}
};
Expand Down Expand Up @@ -331,7 +348,7 @@ void RegisterScalarSetLookup(FunctionRegistry* registry) {

AddBasicSetLookupKernels(isin_base, /*output_type=*/boolean(), isin.get());

isin_base.signature = KernelSignature::Make({InputType::Array(null())}, boolean());
isin_base.signature = KernelSignature::Make({null()}, boolean());
isin_base.null_handling = NullHandling::COMPUTED_PREALLOCATE;
DCHECK_OK(isin->AddKernel(isin_base));
DCHECK_OK(registry->AddFunction(isin));
Expand All @@ -347,7 +364,7 @@ void RegisterScalarSetLookup(FunctionRegistry* registry) {
auto match = std::make_shared<ScalarFunction>("match", Arity::Unary());
AddBasicSetLookupKernels(match_base, /*output_type=*/int32(), match.get());

match_base.signature = KernelSignature::Make({InputType::Array(null())}, int32());
match_base.signature = KernelSignature::Make({null()}, int32());
DCHECK_OK(match->AddKernel(match_base));
DCHECK_OK(registry->AddFunction(match));
}
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/arrow/dataset/dataset_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -760,18 +760,20 @@ TEST_F(TestSchemaUnification, SelectMixedColumnsAndFilter) {
TEST(TestDictPartitionColumn, SelectPartitionColumnFilterPhysicalColumn) {
auto partition_field = field("part", dictionary(int32(), utf8()));
auto path = "/dataset/part=one/data.json";
auto dictionary = ArrayFromJSON(utf8(), R"(["one"])");

auto mock_fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
ARROW_EXPECT_OK(mock_fs->CreateFile(path, R"([ {"phy_1": 111, "phy_2": 211} ])",
/* recursive */ true));
/*recursive=*/true));

auto physical_schema = SchemaFromNames({"phy_1", "phy_2"});
auto format = std::make_shared<JSONRecordBatchFileFormat>(
[=](const FileSource&) { return physical_schema; });

FileSystemFactoryOptions options;
options.partition_base_dir = "/dataset";
options.partitioning = std::make_shared<HivePartitioning>(schema({partition_field}));
options.partitioning = std::make_shared<HivePartitioning>(schema({partition_field}),
ArrayVector{dictionary});

ASSERT_OK_AND_ASSIGN(auto factory,
FileSystemDatasetFactory::Make(mock_fs, {path}, format, options));
Expand Down
126 changes: 96 additions & 30 deletions cpp/src/arrow/dataset/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@

#include <algorithm>
#include <chrono>
#include <map>
#include <memory>
#include <set>
#include <stack>
#include <utility>
#include <vector>

#include "arrow/array/array_base.h"
#include "arrow/array/builder_binary.h"
#include "arrow/compute/api_scalar.h"
#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/file_base.h"
#include "arrow/dataset/filter.h"
Expand Down Expand Up @@ -135,20 +138,53 @@ Status KeyValuePartitioning::SetDefaultValuesFromKeys(const Expression& expr,
}

Result<std::shared_ptr<Expression>> KeyValuePartitioning::ConvertKey(
const Key& key, const Schema& schema) {
ARROW_ASSIGN_OR_RAISE(auto field, FieldRef(key.name).GetOneOrNone(schema));
if (field == nullptr) {
const Key& key, const Schema& schema, const ArrayVector& dictionaries) {
ARROW_ASSIGN_OR_RAISE(auto match, FieldRef(key.name).FindOneOrNone(schema));
if (match.indices().empty()) {
return scalar(true);
}

ARROW_ASSIGN_OR_RAISE(auto converted, Scalar::Parse(field->type(), key.value));
auto field_index = match.indices()[0];
auto field = schema.field(field_index);

std::shared_ptr<Scalar> converted;

if (field->type()->id() == Type::DICTIONARY) {
if (dictionaries.empty() || dictionaries[field_index] == nullptr) {
return Status::Invalid("No dictionary provided for dictionary field ",
field->ToString());
}

DictionaryScalar::ValueType value;
value.dictionary = dictionaries[field_index];

if (!value.dictionary->type()->Equals(
checked_cast<const DictionaryType&>(*field->type()).value_type())) {
return Status::TypeError("Dictionary supplied for field ", field->ToString(),
" had incorrect type ",
value.dictionary->type()->ToString());
}

// look up the partition value in the dictionary
ARROW_ASSIGN_OR_RAISE(converted, Scalar::Parse(value.dictionary->type(), key.value));
ARROW_ASSIGN_OR_RAISE(auto index, compute::Match(converted, value.dictionary));
value.index = index.scalar();
if (!value.index->is_valid) {
return Status::Invalid("Dictionary supplied for field ", field->ToString(),
" does not contain '", key.value, "'");
}
converted = std::make_shared<DictionaryScalar>(std::move(value), field->type());
} else {
ARROW_ASSIGN_OR_RAISE(converted, Scalar::Parse(field->type(), key.value));
}

return equal(field_ref(field->name()), scalar(converted));
}

Result<std::shared_ptr<Expression>> KeyValuePartitioning::Parse(
const std::string& segment, int i) const {
if (auto key = ParseKey(segment, i)) {
return ConvertKey(*key, *schema_);
return ConvertKey(*key, *schema_, dictionaries_);
}

return scalar(true);
Expand Down Expand Up @@ -202,8 +238,12 @@ Result<std::string> DirectoryPartitioning::FormatKey(const Key& key, int i) cons

class KeyValuePartitioningInspectImpl {
public:
static Result<std::shared_ptr<DataType>> InferType(
const std::string& name, const std::vector<std::string>& reprs) {
explicit KeyValuePartitioningInspectImpl(const PartitioningFactoryOptions& options)
: options_(options) {}

static Result<std::shared_ptr<DataType>> InferType(const std::string& name,
const std::set<std::string>& reprs,
int max_partition_dictionary_size) {
if (reprs.empty()) {
return Status::Invalid("No segments were available for field '", name,
"'; couldn't infer type");
Expand All @@ -218,7 +258,11 @@ class KeyValuePartitioningInspectImpl {
return int32();
}

return utf8();
if (reprs.size() > static_cast<size_t>(max_partition_dictionary_size)) {
return utf8();
}

return dictionary(int32(), utf8());
}

int GetOrInsertField(const std::string& name) {
Expand All @@ -235,38 +279,50 @@ class KeyValuePartitioningInspectImpl {
InsertRepr(GetOrInsertField(name), std::move(repr));
}

void InsertRepr(int index, std::string repr) {
values_[index].push_back(std::move(repr));
}
void InsertRepr(int index, std::string repr) { values_[index].insert(std::move(repr)); }

Result<std::shared_ptr<Schema>> Finish(ArrayVector* dictionaries) {
if (options_.max_partition_dictionary_size != 0) {
dictionaries->resize(name_to_index_.size());
}

Result<std::shared_ptr<Schema>> Finish() {
std::vector<std::shared_ptr<Field>> fields(name_to_index_.size());

for (const auto& name_index : name_to_index_) {
const auto& name = name_index.first;
auto index = name_index.second;
ARROW_ASSIGN_OR_RAISE(auto type, InferType(name, values_[index]));
fields[index] = field(name, type);
ARROW_ASSIGN_OR_RAISE(auto type, InferType(name, values_[index],
options_.max_partition_dictionary_size));
if (type->id() == Type::DICTIONARY) {
StringBuilder builder;
for (const auto& repr : values_[index]) {
RETURN_NOT_OK(builder.Append(repr));
}
RETURN_NOT_OK(builder.Finish(&dictionaries->at(index)));
}
fields[index] = field(name, std::move(type));
}

return ::arrow::schema(std::move(fields));
}

private:
std::unordered_map<std::string, int> name_to_index_;
std::vector<std::vector<std::string>> values_;
std::vector<std::set<std::string>> values_;
const PartitioningFactoryOptions& options_;
};

class DirectoryPartitioningFactory : public PartitioningFactory {
public:
explicit DirectoryPartitioningFactory(std::vector<std::string> field_names)
: field_names_(std::move(field_names)) {}
DirectoryPartitioningFactory(std::vector<std::string> field_names,
PartitioningFactoryOptions options)
: field_names_(std::move(field_names)), options_(options) {}

std::string type_name() const override { return "schema"; }

Result<std::shared_ptr<Schema>> Inspect(
const std::vector<std::string>& paths) const override {
KeyValuePartitioningInspectImpl impl;
const std::vector<std::string>& paths) override {
KeyValuePartitioningInspectImpl impl(options_);

for (const auto& name : field_names_) {
impl.GetOrInsertField(name);
Expand All @@ -281,7 +337,7 @@ class DirectoryPartitioningFactory : public PartitioningFactory {
}
}

return impl.Finish();
return impl.Finish(&dictionaries_);
}

Result<std::shared_ptr<Partitioning>> Finish(
Expand All @@ -294,7 +350,7 @@ class DirectoryPartitioningFactory : public PartitioningFactory {
// drop fields which aren't in field_names_
auto out_schema = SchemaFromColumnNames(schema, field_names_);

return std::make_shared<DirectoryPartitioning>(std::move(out_schema));
return std::make_shared<DirectoryPartitioning>(std::move(out_schema), dictionaries_);
}

struct MakeWritePlanImpl;
Expand All @@ -308,6 +364,8 @@ class DirectoryPartitioningFactory : public PartitioningFactory {

private:
std::vector<std::string> field_names_;
ArrayVector dictionaries_;
PartitioningFactoryOptions options_;
};

struct DirectoryPartitioningFactory::MakeWritePlanImpl {
Expand Down Expand Up @@ -550,9 +608,9 @@ Result<WritePlan> DirectoryPartitioningFactory::MakeWritePlan(
}

std::shared_ptr<PartitioningFactory> DirectoryPartitioning::MakeFactory(
std::vector<std::string> field_names) {
std::vector<std::string> field_names, PartitioningFactoryOptions options) {
return std::shared_ptr<PartitioningFactory>(
new DirectoryPartitioningFactory(std::move(field_names)));
new DirectoryPartitioningFactory(std::move(field_names), options));
}

util::optional<KeyValuePartitioning::Key> HivePartitioning::ParseKey(
Expand All @@ -571,11 +629,14 @@ Result<std::string> HivePartitioning::FormatKey(const Key& key, int i) const {

class HivePartitioningFactory : public PartitioningFactory {
public:
explicit HivePartitioningFactory(PartitioningFactoryOptions options)
: options_(options) {}

std::string type_name() const override { return "hive"; }

Result<std::shared_ptr<Schema>> Inspect(
const std::vector<std::string>& paths) const override {
KeyValuePartitioningInspectImpl impl;
const std::vector<std::string>& paths) override {
KeyValuePartitioningInspectImpl impl(options_);

for (auto path : paths) {
for (auto&& segment : fs::internal::SplitAbstractPath(path)) {
Expand All @@ -585,17 +646,22 @@ class HivePartitioningFactory : public PartitioningFactory {
}
}

return impl.Finish();
return impl.Finish(&dictionaries_);
}

Result<std::shared_ptr<Partitioning>> Finish(
const std::shared_ptr<Schema>& schema) const override {
return std::shared_ptr<Partitioning>(new HivePartitioning(schema));
return std::shared_ptr<Partitioning>(new HivePartitioning(schema, dictionaries_));
}

private:
ArrayVector dictionaries_;
PartitioningFactoryOptions options_;
};

std::shared_ptr<PartitioningFactory> HivePartitioning::MakeFactory() {
return std::shared_ptr<PartitioningFactory>(new HivePartitioningFactory());
std::shared_ptr<PartitioningFactory> HivePartitioning::MakeFactory(
PartitioningFactoryOptions options) {
return std::shared_ptr<PartitioningFactory>(new HivePartitioningFactory(options));
}

std::string StripPrefixAndFilename(const std::string& path, const std::string& prefix) {
Expand Down
Loading