diff --git a/cpp/src/arrow/array/util.cc b/cpp/src/arrow/array/util.cc index edd547bb558..915482bdd96 100644 --- a/cpp/src/arrow/array/util.cc +++ b/cpp/src/arrow/array/util.cc @@ -317,13 +317,10 @@ class RepeatedArrayFactory { Status Visit(const DictionaryType& type) { const auto& value = checked_cast(scalar_).value; - ARROW_ASSIGN_OR_RAISE(auto dictionary, MakeArrayFromScalar(*value, 1, pool_)); - - ARROW_ASSIGN_OR_RAISE(auto zero, MakeScalar(type.index_type(), 0)); - ARROW_ASSIGN_OR_RAISE(auto indices, MakeArrayFromScalar(*zero, length_, pool_)); - + ARROW_ASSIGN_OR_RAISE(auto indices, + MakeArrayFromScalar(*value.index, length_, pool_)); out_ = std::make_shared(scalar_.type, std::move(indices), - std::move(dictionary)); + value.dictionary); return Status::OK(); } diff --git a/cpp/src/arrow/compare.cc b/cpp/src/arrow/compare.cc index 3b215ba392b..b2b26d16b13 100644 --- a/cpp/src/arrow/compare.cc +++ b/cpp/src/arrow/compare.cc @@ -943,7 +943,10 @@ class ScalarEqualsVisitor { Status Visit(const UnionScalar& left) { return Status::NotImplemented("union"); } Status Visit(const DictionaryScalar& left) { - return Status::NotImplemented("dictionary"); + const auto& right = checked_cast(right_); + result_ = left.value.index->Equals(right.value.index) && + left.value.dictionary->Equals(right.value.dictionary); + return Status::OK(); } Status Visit(const ExtensionScalar& left) { diff --git a/cpp/src/arrow/compute/kernels/scalar_set_lookup.cc b/cpp/src/arrow/compute/kernels/scalar_set_lookup.cc index 1eb810a3960..734dfe6eab9 100644 --- a/cpp/src/arrow/compute/kernels/scalar_set_lookup.cc +++ b/cpp/src/arrow/compute/kernels/scalar_set_lookup.cc @@ -211,9 +211,25 @@ struct MatchVisitor { } }; +void ExecArrayOrScalar(KernelContext* ctx, const Datum& in, Datum* out, + std::function array_impl) { + if (in.is_array()) { + KERNEL_RETURN_IF_ERROR(ctx, array_impl(*in.array())); + return; + } + + std::shared_ptr in_array; + std::shared_ptr out_scalar; + KERNEL_RETURN_IF_ERROR(ctx, MakeArrayFromScalar(*in.scalar(), 1).Value(&in_array)); + KERNEL_RETURN_IF_ERROR(ctx, array_impl(*in_array->data())); + KERNEL_RETURN_IF_ERROR(ctx, out->make_array()->GetScalar(0).Value(&out_scalar)); + *out = std::move(out_scalar); +} + void ExecMatch(KernelContext* ctx, const ExecBatch& batch, Datum* out) { - MatchVisitor dispatch(ctx, *batch[0].array(), out); - ctx->SetStatus(dispatch.Execute()); + ExecArrayOrScalar(ctx, batch[0], out, [&](const ArrayData& in) { + return MatchVisitor(ctx, in, out).Execute(); + }); } // ---------------------------------------------------------------------- @@ -284,8 +300,9 @@ struct IsInVisitor { }; void ExecIsIn(KernelContext* ctx, const ExecBatch& batch, Datum* out) { - IsInVisitor dispatch(ctx, *batch[0].array(), out); - ctx->SetStatus(dispatch.Execute()); + ExecArrayOrScalar(ctx, batch[0], out, [&](const ArrayData& in) { + return IsInVisitor(ctx, in, out).Execute(); + }); } // Unary set lookup kernels available for the following input types @@ -302,7 +319,7 @@ void AddBasicSetLookupKernels(ScalarKernel kernel, ScalarFunction* func) { auto AddKernels = [&](const std::vector>& types) { for (const std::shared_ptr& ty : types) { - kernel.signature = KernelSignature::Make({InputType::Array(ty)}, out_ty); + kernel.signature = KernelSignature::Make({ty}, out_ty); DCHECK_OK(func->AddKernel(kernel)); } }; @@ -331,7 +348,7 @@ void RegisterScalarSetLookup(FunctionRegistry* registry) { AddBasicSetLookupKernels(isin_base, /*output_type=*/boolean(), isin.get()); - isin_base.signature = KernelSignature::Make({InputType::Array(null())}, boolean()); + isin_base.signature = KernelSignature::Make({null()}, boolean()); isin_base.null_handling = NullHandling::COMPUTED_PREALLOCATE; DCHECK_OK(isin->AddKernel(isin_base)); DCHECK_OK(registry->AddFunction(isin)); @@ -347,7 +364,7 @@ void RegisterScalarSetLookup(FunctionRegistry* registry) { auto match = std::make_shared("match", Arity::Unary()); AddBasicSetLookupKernels(match_base, /*output_type=*/int32(), match.get()); - match_base.signature = KernelSignature::Make({InputType::Array(null())}, int32()); + match_base.signature = KernelSignature::Make({null()}, int32()); DCHECK_OK(match->AddKernel(match_base)); DCHECK_OK(registry->AddFunction(match)); } diff --git a/cpp/src/arrow/dataset/dataset_test.cc b/cpp/src/arrow/dataset/dataset_test.cc index 4be6e33e2c0..ce5ed457468 100644 --- a/cpp/src/arrow/dataset/dataset_test.cc +++ b/cpp/src/arrow/dataset/dataset_test.cc @@ -760,10 +760,11 @@ TEST_F(TestSchemaUnification, SelectMixedColumnsAndFilter) { TEST(TestDictPartitionColumn, SelectPartitionColumnFilterPhysicalColumn) { auto partition_field = field("part", dictionary(int32(), utf8())); auto path = "/dataset/part=one/data.json"; + auto dictionary = ArrayFromJSON(utf8(), R"(["one"])"); auto mock_fs = std::make_shared(fs::kNoTime); ARROW_EXPECT_OK(mock_fs->CreateFile(path, R"([ {"phy_1": 111, "phy_2": 211} ])", - /* recursive */ true)); + /*recursive=*/true)); auto physical_schema = SchemaFromNames({"phy_1", "phy_2"}); auto format = std::make_shared( @@ -771,7 +772,8 @@ TEST(TestDictPartitionColumn, SelectPartitionColumnFilterPhysicalColumn) { FileSystemFactoryOptions options; options.partition_base_dir = "/dataset"; - options.partitioning = std::make_shared(schema({partition_field})); + options.partitioning = std::make_shared(schema({partition_field}), + ArrayVector{dictionary}); ASSERT_OK_AND_ASSIGN(auto factory, FileSystemDatasetFactory::Make(mock_fs, {path}, format, options)); diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc index f2d8488276b..adfd56fc512 100644 --- a/cpp/src/arrow/dataset/partition.cc +++ b/cpp/src/arrow/dataset/partition.cc @@ -19,12 +19,15 @@ #include #include -#include #include +#include #include #include #include +#include "arrow/array/array_base.h" +#include "arrow/array/builder_binary.h" +#include "arrow/compute/api_scalar.h" #include "arrow/dataset/dataset_internal.h" #include "arrow/dataset/file_base.h" #include "arrow/dataset/filter.h" @@ -135,20 +138,53 @@ Status KeyValuePartitioning::SetDefaultValuesFromKeys(const Expression& expr, } Result> KeyValuePartitioning::ConvertKey( - const Key& key, const Schema& schema) { - ARROW_ASSIGN_OR_RAISE(auto field, FieldRef(key.name).GetOneOrNone(schema)); - if (field == nullptr) { + const Key& key, const Schema& schema, const ArrayVector& dictionaries) { + ARROW_ASSIGN_OR_RAISE(auto match, FieldRef(key.name).FindOneOrNone(schema)); + if (match.indices().empty()) { return scalar(true); } - ARROW_ASSIGN_OR_RAISE(auto converted, Scalar::Parse(field->type(), key.value)); + auto field_index = match.indices()[0]; + auto field = schema.field(field_index); + + std::shared_ptr converted; + + if (field->type()->id() == Type::DICTIONARY) { + if (dictionaries.empty() || dictionaries[field_index] == nullptr) { + return Status::Invalid("No dictionary provided for dictionary field ", + field->ToString()); + } + + DictionaryScalar::ValueType value; + value.dictionary = dictionaries[field_index]; + + if (!value.dictionary->type()->Equals( + checked_cast(*field->type()).value_type())) { + return Status::TypeError("Dictionary supplied for field ", field->ToString(), + " had incorrect type ", + value.dictionary->type()->ToString()); + } + + // look up the partition value in the dictionary + ARROW_ASSIGN_OR_RAISE(converted, Scalar::Parse(value.dictionary->type(), key.value)); + ARROW_ASSIGN_OR_RAISE(auto index, compute::Match(converted, value.dictionary)); + value.index = index.scalar(); + if (!value.index->is_valid) { + return Status::Invalid("Dictionary supplied for field ", field->ToString(), + " does not contain '", key.value, "'"); + } + converted = std::make_shared(std::move(value), field->type()); + } else { + ARROW_ASSIGN_OR_RAISE(converted, Scalar::Parse(field->type(), key.value)); + } + return equal(field_ref(field->name()), scalar(converted)); } Result> KeyValuePartitioning::Parse( const std::string& segment, int i) const { if (auto key = ParseKey(segment, i)) { - return ConvertKey(*key, *schema_); + return ConvertKey(*key, *schema_, dictionaries_); } return scalar(true); @@ -202,8 +238,12 @@ Result DirectoryPartitioning::FormatKey(const Key& key, int i) cons class KeyValuePartitioningInspectImpl { public: - static Result> InferType( - const std::string& name, const std::vector& reprs) { + explicit KeyValuePartitioningInspectImpl(const PartitioningFactoryOptions& options) + : options_(options) {} + + static Result> InferType(const std::string& name, + const std::set& reprs, + int max_partition_dictionary_size) { if (reprs.empty()) { return Status::Invalid("No segments were available for field '", name, "'; couldn't infer type"); @@ -218,7 +258,11 @@ class KeyValuePartitioningInspectImpl { return int32(); } - return utf8(); + if (reprs.size() > static_cast(max_partition_dictionary_size)) { + return utf8(); + } + + return dictionary(int32(), utf8()); } int GetOrInsertField(const std::string& name) { @@ -235,18 +279,28 @@ class KeyValuePartitioningInspectImpl { InsertRepr(GetOrInsertField(name), std::move(repr)); } - void InsertRepr(int index, std::string repr) { - values_[index].push_back(std::move(repr)); - } + void InsertRepr(int index, std::string repr) { values_[index].insert(std::move(repr)); } + + Result> Finish(ArrayVector* dictionaries) { + if (options_.max_partition_dictionary_size != 0) { + dictionaries->resize(name_to_index_.size()); + } - Result> Finish() { std::vector> fields(name_to_index_.size()); for (const auto& name_index : name_to_index_) { const auto& name = name_index.first; auto index = name_index.second; - ARROW_ASSIGN_OR_RAISE(auto type, InferType(name, values_[index])); - fields[index] = field(name, type); + ARROW_ASSIGN_OR_RAISE(auto type, InferType(name, values_[index], + options_.max_partition_dictionary_size)); + if (type->id() == Type::DICTIONARY) { + StringBuilder builder; + for (const auto& repr : values_[index]) { + RETURN_NOT_OK(builder.Append(repr)); + } + RETURN_NOT_OK(builder.Finish(&dictionaries->at(index))); + } + fields[index] = field(name, std::move(type)); } return ::arrow::schema(std::move(fields)); @@ -254,19 +308,21 @@ class KeyValuePartitioningInspectImpl { private: std::unordered_map name_to_index_; - std::vector> values_; + std::vector> values_; + const PartitioningFactoryOptions& options_; }; class DirectoryPartitioningFactory : public PartitioningFactory { public: - explicit DirectoryPartitioningFactory(std::vector field_names) - : field_names_(std::move(field_names)) {} + DirectoryPartitioningFactory(std::vector field_names, + PartitioningFactoryOptions options) + : field_names_(std::move(field_names)), options_(options) {} std::string type_name() const override { return "schema"; } Result> Inspect( - const std::vector& paths) const override { - KeyValuePartitioningInspectImpl impl; + const std::vector& paths) override { + KeyValuePartitioningInspectImpl impl(options_); for (const auto& name : field_names_) { impl.GetOrInsertField(name); @@ -281,7 +337,7 @@ class DirectoryPartitioningFactory : public PartitioningFactory { } } - return impl.Finish(); + return impl.Finish(&dictionaries_); } Result> Finish( @@ -294,7 +350,7 @@ class DirectoryPartitioningFactory : public PartitioningFactory { // drop fields which aren't in field_names_ auto out_schema = SchemaFromColumnNames(schema, field_names_); - return std::make_shared(std::move(out_schema)); + return std::make_shared(std::move(out_schema), dictionaries_); } struct MakeWritePlanImpl; @@ -308,6 +364,8 @@ class DirectoryPartitioningFactory : public PartitioningFactory { private: std::vector field_names_; + ArrayVector dictionaries_; + PartitioningFactoryOptions options_; }; struct DirectoryPartitioningFactory::MakeWritePlanImpl { @@ -550,9 +608,9 @@ Result DirectoryPartitioningFactory::MakeWritePlan( } std::shared_ptr DirectoryPartitioning::MakeFactory( - std::vector field_names) { + std::vector field_names, PartitioningFactoryOptions options) { return std::shared_ptr( - new DirectoryPartitioningFactory(std::move(field_names))); + new DirectoryPartitioningFactory(std::move(field_names), options)); } util::optional HivePartitioning::ParseKey( @@ -571,11 +629,14 @@ Result HivePartitioning::FormatKey(const Key& key, int i) const { class HivePartitioningFactory : public PartitioningFactory { public: + explicit HivePartitioningFactory(PartitioningFactoryOptions options) + : options_(options) {} + std::string type_name() const override { return "hive"; } Result> Inspect( - const std::vector& paths) const override { - KeyValuePartitioningInspectImpl impl; + const std::vector& paths) override { + KeyValuePartitioningInspectImpl impl(options_); for (auto path : paths) { for (auto&& segment : fs::internal::SplitAbstractPath(path)) { @@ -585,17 +646,22 @@ class HivePartitioningFactory : public PartitioningFactory { } } - return impl.Finish(); + return impl.Finish(&dictionaries_); } Result> Finish( const std::shared_ptr& schema) const override { - return std::shared_ptr(new HivePartitioning(schema)); + return std::shared_ptr(new HivePartitioning(schema, dictionaries_)); } + + private: + ArrayVector dictionaries_; + PartitioningFactoryOptions options_; }; -std::shared_ptr HivePartitioning::MakeFactory() { - return std::shared_ptr(new HivePartitioningFactory()); +std::shared_ptr HivePartitioning::MakeFactory( + PartitioningFactoryOptions options) { + return std::shared_ptr(new HivePartitioningFactory(options)); } std::string StripPrefixAndFilename(const std::string& path, const std::string& prefix) { diff --git a/cpp/src/arrow/dataset/partition.h b/cpp/src/arrow/dataset/partition.h index 56a8db965e1..85973ab315e 100644 --- a/cpp/src/arrow/dataset/partition.h +++ b/cpp/src/arrow/dataset/partition.h @@ -86,6 +86,17 @@ class ARROW_DS_EXPORT Partitioning { std::shared_ptr schema_; }; +struct PartitioningFactoryOptions { + /// When inferring a schema for partition fields, string fields may be inferred as + /// a dictionary type instead. This can be more efficient when materializing virtual + /// columns. If the number of discovered unique values of a string field exceeds + /// max_partition_dictionary_size, it will instead be inferred as a string. + /// + /// max_partition_dictionary_size = 0: No fields will be inferred as dictionary. + /// max_partition_dictionary_size = -1: All fields will be inferred as dictionary. + int max_partition_dictionary_size = 0; +}; + /// \brief PartitioningFactory provides creation of a partitioning when the /// specific schema must be inferred from available paths (no explicit schema is known). class ARROW_DS_EXPORT PartitioningFactory { @@ -96,8 +107,9 @@ class ARROW_DS_EXPORT PartitioningFactory { virtual std::string type_name() const = 0; /// Get the schema for the resulting Partitioning. + /// This may reset internal state, for example dictionaries of unique representations. virtual Result> Inspect( - const std::vector& paths) const = 0; + const std::vector& paths) = 0; /// Create a partitioning using the provided schema /// (fields may be dropped). @@ -169,7 +181,8 @@ class ARROW_DS_EXPORT KeyValuePartitioning : public Partitioning { /// Convert a Key to a full expression. /// If the field referenced in key is absent from the schema will be ignored. static Result> ConvertKey(const Key& key, - const Schema& schema); + const Schema& schema, + const ArrayVector& dictionaries); /// Extract a partition key from a path segment. virtual util::optional ParseKey(const std::string& segment, int i) const = 0; @@ -182,7 +195,14 @@ class ARROW_DS_EXPORT KeyValuePartitioning : public Partitioning { Result Format(const Expression& expr, int i) const override; protected: - using Partitioning::Partitioning; + KeyValuePartitioning(std::shared_ptr schema, ArrayVector dictionaries) + : Partitioning(std::move(schema)), dictionaries_(std::move(dictionaries)) { + if (dictionaries_.empty()) { + dictionaries_.resize(schema_->num_fields()); + } + } + + ArrayVector dictionaries_; }; /// \brief DirectoryPartitioning parses one segment of a path for each field in its @@ -193,8 +213,11 @@ class ARROW_DS_EXPORT KeyValuePartitioning : public Partitioning { /// parsed to ("year"_ == 2009 and "month"_ == 11) class ARROW_DS_EXPORT DirectoryPartitioning : public KeyValuePartitioning { public: - explicit DirectoryPartitioning(std::shared_ptr schema) - : KeyValuePartitioning(std::move(schema)) {} + // If a field in schema is of dictionary type, the corresponding element of dictionaries + // must be contain the dictionary of values for that field. + explicit DirectoryPartitioning(std::shared_ptr schema, + ArrayVector dictionaries = {}) + : KeyValuePartitioning(std::move(schema), std::move(dictionaries)) {} std::string type_name() const override { return "schema"; } @@ -203,7 +226,7 @@ class ARROW_DS_EXPORT DirectoryPartitioning : public KeyValuePartitioning { Result FormatKey(const Key& key, int i) const override; static std::shared_ptr MakeFactory( - std::vector field_names); + std::vector field_names, PartitioningFactoryOptions = {}); }; /// \brief Multi-level, directory based partitioning @@ -217,8 +240,10 @@ class ARROW_DS_EXPORT DirectoryPartitioning : public KeyValuePartitioning { /// "/day=321/ignored=3.4/year=2009" parses to ("year"_ == 2009 and "day"_ == 321) class ARROW_DS_EXPORT HivePartitioning : public KeyValuePartitioning { public: - explicit HivePartitioning(std::shared_ptr schema) - : KeyValuePartitioning(std::move(schema)) {} + // If a field in schema is of dictionary type, the corresponding element of dictionaries + // must be contain the dictionary of values for that field. + explicit HivePartitioning(std::shared_ptr schema, ArrayVector dictionaries = {}) + : KeyValuePartitioning(std::move(schema), std::move(dictionaries)) {} std::string type_name() const override { return "hive"; } @@ -230,7 +255,8 @@ class ARROW_DS_EXPORT HivePartitioning : public KeyValuePartitioning { static util::optional ParseKey(const std::string& segment); - static std::shared_ptr MakeFactory(); + static std::shared_ptr MakeFactory( + PartitioningFactoryOptions = {}); }; /// \brief Implementation provided by lambda or other callable @@ -268,40 +294,28 @@ ARROW_DS_EXPORT std::vector StripPrefixAndFilename( class ARROW_DS_EXPORT PartitioningOrFactory { public: explicit PartitioningOrFactory(std::shared_ptr partitioning) - : variant_(std::move(partitioning)) {} + : partitioning_(std::move(partitioning)) {} explicit PartitioningOrFactory(std::shared_ptr factory) - : variant_(std::move(factory)) {} + : factory_(std::move(factory)) {} PartitioningOrFactory& operator=(std::shared_ptr partitioning) { - variant_ = std::move(partitioning); - return *this; + return *this = PartitioningOrFactory(std::move(partitioning)); } PartitioningOrFactory& operator=(std::shared_ptr factory) { - variant_ = std::move(factory); - return *this; + return *this = PartitioningOrFactory(std::move(factory)); } - std::shared_ptr partitioning() const { - if (util::holds_alternative>(variant_)) { - return util::get>(variant_); - } - return NULLPTR; - } + const std::shared_ptr& partitioning() const { return partitioning_; } - std::shared_ptr factory() const { - if (util::holds_alternative>(variant_)) { - return util::get>(variant_); - } - return NULLPTR; - } + const std::shared_ptr& factory() const { return factory_; } Result> GetOrInferSchema(const std::vector& paths); private: - util::variant, std::shared_ptr> - variant_; + std::shared_ptr factory_; + std::shared_ptr partitioning_; }; } // namespace dataset diff --git a/cpp/src/arrow/dataset/partition_test.cc b/cpp/src/arrow/dataset/partition_test.cc index fad81cc4a7a..093cc424c5d 100644 --- a/cpp/src/arrow/dataset/partition_test.cc +++ b/cpp/src/arrow/dataset/partition_test.cc @@ -59,7 +59,7 @@ class TestPartitioning : public ::testing::Test { const std::vector>& expected) { ASSERT_OK_AND_ASSIGN(auto actual, factory_->Inspect(paths)); ASSERT_EQ(*actual, Schema(expected)); - ASSERT_OK(factory_->Finish(actual).status()); + ASSERT_OK_AND_ASSIGN(partitioning_, factory_->Finish(actual)); } void AssertInspectError(const std::vector& paths) { @@ -75,6 +75,10 @@ class TestPartitioning : public ::testing::Test { return field(std::move(name), utf8()); } + static std::shared_ptr Dict(std::string name) { + return field(std::move(name), dictionary(int32(), utf8())); + } + std::shared_ptr partitioning_; std::shared_ptr factory_; }; @@ -139,6 +143,48 @@ TEST_F(TestPartitioning, DiscoverSchema) { AssertInspect({"/0/1", "/hello"}, {Str("alpha"), Int("beta")}); } +TEST_F(TestPartitioning, DictionaryInference) { + PartitioningFactoryOptions options; + options.max_partition_dictionary_size = 2; + factory_ = DirectoryPartitioning::MakeFactory({"alpha", "beta"}, options); + + // type is still int32 if possible + AssertInspect({"/0/1"}, {Int("alpha"), Int("beta")}); + + // successful dictionary inference + AssertInspect({"/a/0"}, {Dict("alpha"), Int("beta")}); + AssertInspect({"/a/0", "/a/1"}, {Dict("alpha"), Int("beta")}); + AssertInspect({"/a/0", "/b/0", "/a/1", "/b/1"}, {Dict("alpha"), Int("beta")}); + + // fall back to string if max dictionary size is exceeded + AssertInspect({"/a/0", "/b/0", "/c/1", "/d/1"}, {Str("alpha"), Int("beta")}); +} + +TEST_F(TestPartitioning, DictionaryHasUniqueValues) { + PartitioningFactoryOptions options; + options.max_partition_dictionary_size = -1; + factory_ = DirectoryPartitioning::MakeFactory({"alpha"}, options); + + auto alpha = Dict("alpha"); + AssertInspect({"/a", "/b", "/a", "/b", "/c", "/a"}, {alpha}); + ASSERT_OK_AND_ASSIGN(auto partitioning, factory_->Finish(schema({alpha}))); + + auto expected_dictionary = internal::checked_pointer_cast( + ArrayFromJSON(utf8(), R"(["a", "b", "c"])")); + + for (int32_t i = 0; i < expected_dictionary->length(); ++i) { + DictionaryScalar::ValueType index_and_dictionary{std::make_shared(i), + expected_dictionary}; + auto dictionary_scalar = + std::make_shared(index_and_dictionary, alpha->type()); + + auto path = "/" + expected_dictionary->GetString(i); + AssertParse(path, "alpha"_ == dictionary_scalar); + } + + AssertParseError("/yosemite"); // not in inspected dictionary +} + TEST_F(TestPartitioning, DiscoverSchemaSegfault) { // ARROW-7638 factory_ = DirectoryPartitioning::MakeFactory({"alpha", "beta"}); @@ -185,6 +231,53 @@ TEST_F(TestPartitioning, DiscoverHiveSchema) { {Int("alpha"), Int("beta"), Str("gamma")}); } +TEST_F(TestPartitioning, HiveDictionaryInference) { + PartitioningFactoryOptions options; + options.max_partition_dictionary_size = 2; + factory_ = HivePartitioning::MakeFactory(options); + + // type is still int32 if possible + AssertInspect({"/alpha=0/beta=1"}, {Int("alpha"), Int("beta")}); + + // successful dictionary inference + AssertInspect({"/alpha=a/beta=0"}, {Dict("alpha"), Int("beta")}); + AssertInspect({"/alpha=a/beta=0", "/alpha=a/1"}, {Dict("alpha"), Int("beta")}); + AssertInspect( + {"/alpha=a/beta=0", "/alpha=b/beta=0", "/alpha=a/beta=1", "/alpha=b/beta=1"}, + {Dict("alpha"), Int("beta")}); + + // fall back to string if max dictionary size is exceeded + AssertInspect( + {"/alpha=a/beta=0", "/alpha=b/beta=0", "/alpha=c/beta=1", "/alpha=d/beta=1"}, + {Str("alpha"), Int("beta")}); +} + +TEST_F(TestPartitioning, HiveDictionaryHasUniqueValues) { + PartitioningFactoryOptions options; + options.max_partition_dictionary_size = -1; + factory_ = HivePartitioning::MakeFactory(options); + + auto alpha = Dict("alpha"); + AssertInspect({"/alpha=a", "/alpha=b", "/alpha=a", "/alpha=b", "/alpha=c", "/alpha=a"}, + {alpha}); + ASSERT_OK_AND_ASSIGN(auto partitioning, factory_->Finish(schema({alpha}))); + + auto expected_dictionary = internal::checked_pointer_cast( + ArrayFromJSON(utf8(), R"(["a", "b", "c"])")); + + for (int32_t i = 0; i < expected_dictionary->length(); ++i) { + DictionaryScalar::ValueType index_and_dictionary{std::make_shared(i), + expected_dictionary}; + auto dictionary_scalar = + std::make_shared(index_and_dictionary, alpha->type()); + + auto path = "/alpha=" + expected_dictionary->GetString(i); + AssertParse(path, "alpha"_ == dictionary_scalar); + } + + AssertParseError("/alpha=yosemite"); // not in inspected dictionary +} + TEST_F(TestPartitioning, EtlThenHive) { DirectoryPartitioning etl_part(schema({field("year", int16()), field("month", int8()), field("day", int8()), field("hour", int8())})); diff --git a/cpp/src/arrow/scalar.cc b/cpp/src/arrow/scalar.cc index 9bde7d1e3fb..29e8b85307c 100644 --- a/cpp/src/arrow/scalar.cc +++ b/cpp/src/arrow/scalar.cc @@ -22,6 +22,7 @@ #include #include "arrow/array.h" +#include "arrow/array/util.h" #include "arrow/buffer.h" #include "arrow/compare.h" #include "arrow/type.h" @@ -179,9 +180,10 @@ Result> StructScalar::field(FieldRef ref) const { DictionaryScalar::DictionaryScalar(std::shared_ptr type) : Scalar(std::move(type)), - value( - MakeNullScalar(checked_cast(*this->type).value_type())) { -} + value{MakeNullScalar(checked_cast(*this->type).index_type()), + MakeArrayOfNull(checked_cast(*this->type).value_type(), + 0) + .ValueOrDie()} {} template using scalar_constructor_has_arrow_type = @@ -226,6 +228,11 @@ std::string Scalar::ToString() const { if (!this->is_valid) { return "null"; } + if (type->id() == Type::DICTIONARY) { + auto dict_scalar = checked_cast(this); + return dict_scalar->value.dictionary->ToString() + "[" + + dict_scalar->value.index->ToString() + "]"; + } auto maybe_repr = CastTo(utf8()); if (maybe_repr.ok()) { return checked_cast(*maybe_repr.ValueOrDie()).value->ToString(); diff --git a/cpp/src/arrow/scalar.h b/cpp/src/arrow/scalar.h index b9dc792208a..b8db124d14c 100644 --- a/cpp/src/arrow/scalar.h +++ b/cpp/src/arrow/scalar.h @@ -395,8 +395,10 @@ struct ARROW_EXPORT DenseUnionScalar : public UnionScalar { struct ARROW_EXPORT DictionaryScalar : public Scalar { using TypeClass = DictionaryType; - using ValueType = std::shared_ptr; - ValueType value; + struct ValueType { + std::shared_ptr index; + std::shared_ptr dictionary; + } value; explicit DictionaryScalar(std::shared_ptr type); diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 5ffb11f34c8..25a1b89ac05 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -1172,7 +1172,7 @@ cdef class DirectoryPartitioning(Partitioning): self.directory_partitioning = sp.get() @staticmethod - def discover(field_names): + def discover(field_names, object max_partition_dictionary_size=0): """ Discover a DirectoryPartitioning. @@ -1180,6 +1180,11 @@ cdef class DirectoryPartitioning(Partitioning): ---------- field_names : list of str The names to associate with the values from the subdirectory names. + max_partition_dictionary_size : int or None, default 0 + The maximum number of unique values to consider for dictionary + encoding. By default no field will be inferred as dictionary + encoded. If None is provided dictionary encoding will be used for + every string field. Returns ------- @@ -1187,12 +1192,18 @@ cdef class DirectoryPartitioning(Partitioning): To be used in the FileSystemFactoryOptions. """ cdef: - PartitioningFactory factory + CPartitioningFactoryOptions options vector[c_string] c_field_names + + if max_partition_dictionary_size is None: + max_partition_dictionary_size = -1 + + options.max_partition_dictionary_size = \ + int(max_partition_dictionary_size) + c_field_names = [tobytes(s) for s in field_names] - factory = PartitioningFactory.wrap( - CDirectoryPartitioning.MakeFactory(c_field_names)) - return factory + return PartitioningFactory.wrap( + CDirectoryPartitioning.MakeFactory(c_field_names, options)) cdef class HivePartitioning(Partitioning): @@ -1243,20 +1254,34 @@ cdef class HivePartitioning(Partitioning): self.hive_partitioning = sp.get() @staticmethod - def discover(): + def discover(object max_partition_dictionary_size=0): """ Discover a HivePartitioning. + Params + ------ + max_partition_dictionary_size : int or None, default 0 + The maximum number of unique values to consider for dictionary + encoding. By default no field will be inferred as dictionary + encoded. If -1 is provided dictionary encoding will be used for + every string field. + Returns ------- PartitioningFactory To be used in the FileSystemFactoryOptions. """ cdef: - PartitioningFactory factory - factory = PartitioningFactory.wrap( - CHivePartitioning.MakeFactory()) - return factory + CPartitioningFactoryOptions options + + if max_partition_dictionary_size is None: + max_partition_dictionary_size = -1 + + options.max_partition_dictionary_size = \ + int(max_partition_dictionary_size) + + return PartitioningFactory.wrap( + CHivePartitioning.MakeFactory(options)) cdef class DatasetFactory: diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index f80fdb9aba0..eb46d568c5b 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -267,6 +267,10 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: CResult[shared_ptr[CExpression]] Parse(const c_string & path) const const shared_ptr[CSchema] & schema() + cdef cppclass CPartitioningFactoryOptions \ + "arrow::dataset::PartitioningFactoryOptions": + int max_partition_dictionary_size + cdef cppclass CPartitioningFactory "arrow::dataset::PartitioningFactory": pass @@ -276,14 +280,15 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: @staticmethod shared_ptr[CPartitioningFactory] MakeFactory( - vector[c_string] field_names) + vector[c_string] field_names, CPartitioningFactoryOptions) cdef cppclass CHivePartitioning \ "arrow::dataset::HivePartitioning"(CPartitioning): CHivePartitioning(shared_ptr[CSchema] schema) @staticmethod - shared_ptr[CPartitioningFactory] MakeFactory() + shared_ptr[CPartitioningFactory] MakeFactory( + CPartitioningFactoryOptions) cdef cppclass CPartitioningOrFactory \ "arrow::dataset::PartitioningOrFactory": diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 4bf419284d1..2bfaae3ff0e 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -810,6 +810,39 @@ def test_partitioning_factory(mockfs): assert isinstance(hive_partitioning_factory, ds.PartitioningFactory) +def test_partitioning_factory_dictionary(mockfs): + paths_or_selector = fs.FileSelector('subdir', recursive=True) + format = ds.ParquetFileFormat() + options = ds.FileSystemFactoryOptions('subdir') + + max_size_to_inferred_type = { + 0: pa.string(), + 1: pa.string(), + 2: pa.dictionary(pa.int32(), pa.string()), + 64: pa.dictionary(pa.int32(), pa.string()), + None: pa.dictionary(pa.int32(), pa.string()), + } + + for max_size, expected_type in max_size_to_inferred_type.items(): + options.partitioning_factory = ds.DirectoryPartitioning.discover( + ['group', 'key'], + max_partition_dictionary_size=max_size) + + factory = ds.FileSystemDatasetFactory( + mockfs, paths_or_selector, format, options) + + inferred_schema = factory.inspect() + assert inferred_schema.field('key').type == expected_type + + if expected_type == pa.string(): + continue + + table = factory.finish().to_table().combine_chunks() + actual = table.column('key').chunk(0) + expected = pa.array(['xxx'] * 5 + ['yyy'] * 5).dictionary_encode() + assert actual.equals(expected) + + def test_partitioning_function(): schema = pa.schema([("year", pa.int16()), ("month", pa.int8())]) names = ["year", "month"] diff --git a/r/tests/testthat/test-expression.R b/r/tests/testthat/test-expression.R index df5b87ca890..f75926eb8d3 100644 --- a/r/tests/testthat/test-expression.R +++ b/r/tests/testthat/test-expression.R @@ -48,7 +48,6 @@ test_that("C++ expressions", { ts <- Expression$scalar(as.POSIXct("2020-01-17 11:11:11")) i64 <- Expression$scalar(bit64::as.integer64(42)) time <- Expression$scalar(hms::hms(56, 34, 12)) - dict <- Expression$scalar(factor("a")) expect_is(f == g, "Expression") expect_is(f == 4, "Expression") @@ -57,7 +56,6 @@ test_that("C++ expressions", { expect_is(f == date, "Expression") expect_is(f == i64, "Expression") expect_is(f == time, "Expression") - expect_is(f == dict, "Expression") # can't seem to make this work right now because of R Ops.method dispatch # expect_is(f == as.Date("2020-01-15"), "Expression") expect_is(f == ts, "Expression")