Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
2c3cfb0
fix: use StripPrefix() for FilenamePartitioning
sanjibansg Apr 24, 2022
ba752aa
test: roundtrip test for partitioning
sanjibansg Apr 24, 2022
d666ce9
feat: modify Parse() to accept both directory and filename prefix
sanjibansg Apr 26, 2022
8da2d5e
feat: ARROW_EXPORT PartitionPathFormat
sanjibansg Apr 27, 2022
ab1e387
fix: remove shadowed variable fixed_path
sanjibansg Apr 27, 2022
52ec5ee
feat: use PartitionPathFormat as argument to Parse methods
sanjibansg Apr 28, 2022
ecaffb7
test: Parse tests to use PartitionPathArgument object
sanjibansg Apr 29, 2022
9d277cd
feat: modify Parse() method in PyArrow to use PartitionPathFormat
sanjibansg Apr 29, 2022
6d8c4f2
docs: update docstring of StripPrefixAndFilename
sanjibansg Apr 29, 2022
6876f21
refactor: rename prefix to filename in PartitionPathFormat
sanjibansg Apr 29, 2022
ec93579
feat: Inspect() methods to use PartitionPathFormat
sanjibansg Apr 29, 2022
83ea5fd
fix: use PartitionPathFormat in dataset___PartitioningFactory__Inspec…
sanjibansg Apr 29, 2022
5b5cf95
fix: using PartitionPathFormat in arrowExports for R
sanjibansg May 3, 2022
c526065
fix: creating PartitionPathFormat object only in dataset___Partitioni…
sanjibansg May 4, 2022
c7d1136
fix: Remove R implementation, instead put check in InspectSchemas
sanjibansg May 5, 2022
267ac94
fix: GetOrInferSchema() to have PartitionPathFormat in argument
sanjibansg May 5, 2022
084fd96
refactor: remove including partition.h in r/src/arrow_types.h
sanjibansg May 5, 2022
6133121
fix: cpp lint
sanjibansg May 5, 2022
14b2711
feat: Parse() method to take the complete path
sanjibansg May 23, 2022
a524acc
feat: [Python] Parse method to take the entire path
sanjibansg May 23, 2022
e08b2e5
fix: python test changes for new parse method signature
sanjibansg May 23, 2022
1a61b0c
fix: cpp & python lint
sanjibansg May 23, 2022
2ae2ea3
fix: modify tests to have correct parse strings
sanjibansg May 23, 2022
718c924
fix: processing segments in ParseKeys method of FunctionPartitioning
sanjibansg May 26, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/discovery.cc
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ Result<std::shared_ptr<Dataset>> FileSystemDatasetFactory::Finish(FinishOptions

std::vector<std::shared_ptr<FileFragment>> fragments;
for (const auto& info : files_) {
auto fixed_path = StripPrefixAndFilename(info.path(), options_.partition_base_dir);
auto fixed_path = StripPrefix(info.path(), options_.partition_base_dir);
ARROW_ASSIGN_OR_RAISE(auto partition, partitioning->Parse(fixed_path));
ARROW_ASSIGN_OR_RAISE(auto fragment, format_->MakeFragment({info, fs_}, partition));
fragments.push_back(fragment);
Expand Down
20 changes: 10 additions & 10 deletions cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -270,11 +270,11 @@ Future<> FileWriter::Finish() {

namespace {

Status WriteBatch(std::shared_ptr<RecordBatch> batch, compute::Expression guarantee,
FileSystemDatasetWriteOptions write_options,
std::function<Status(std::shared_ptr<RecordBatch>,
const Partitioning::PartitionPathFormat&)>
write) {
Status WriteBatch(
std::shared_ptr<RecordBatch> batch, compute::Expression guarantee,
FileSystemDatasetWriteOptions write_options,
std::function<Status(std::shared_ptr<RecordBatch>, const PartitionPathFormat&)>
write) {
ARROW_ASSIGN_OR_RAISE(auto groups, write_options.partitioning->Partition(batch));
batch.reset(); // drop to hopefully conserve memory

Expand All @@ -292,7 +292,7 @@ Status WriteBatch(std::shared_ptr<RecordBatch> batch, compute::Expression guaran
for (std::size_t index = 0; index < groups.batches.size(); index++) {
auto partition_expression = and_(groups.expressions[index], guarantee);
auto next_batch = groups.batches[index];
Partitioning::PartitionPathFormat destination;
PartitionPathFormat destination;
ARROW_ASSIGN_OR_RAISE(destination,
write_options.partitioning->Format(partition_expression));
RETURN_NOT_OK(write(next_batch, destination));
Expand Down Expand Up @@ -337,10 +337,10 @@ class DatasetWritingSinkNodeConsumer : public compute::SinkNodeConsumer {
return WriteBatch(
batch, guarantee, write_options_,
[this](std::shared_ptr<RecordBatch> next_batch,
const Partitioning::PartitionPathFormat& destination) {
const PartitionPathFormat& destination) {
return task_group_.AddTask([this, next_batch, destination] {
Future<> has_room = dataset_writer_->WriteRecordBatch(
next_batch, destination.directory, destination.prefix);
next_batch, destination.directory, destination.filename);
if (!has_room.is_finished()) {
// We don't have to worry about sequencing backpressure here since
// task_group_ serves as our sequencer. If batches continue to arrive after
Expand Down Expand Up @@ -481,11 +481,11 @@ class TeeNode : public compute::MapNode {
compute::Expression guarantee) {
return WriteBatch(batch, guarantee, write_options_,
[this](std::shared_ptr<RecordBatch> next_batch,
const Partitioning::PartitionPathFormat& destination) {
const PartitionPathFormat& destination) {
return task_group_.AddTask([this, next_batch, destination] {
util::tracing::Span span;
Future<> has_room = dataset_writer_->WriteRecordBatch(
next_batch, destination.directory, destination.prefix);
next_batch, destination.directory, destination.filename);
if (!has_room.is_finished()) {
this->Pause();
return has_room.Then([this] { this->Resume(); });
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,7 @@ ParquetDatasetFactory::CollectParquetFragments(const Partitioning& partitioning)
auto row_groups = Iota(metadata_subset->num_row_groups());

auto partition_expression =
partitioning.Parse(StripPrefixAndFilename(path, options_.partition_base_dir))
partitioning.Parse(StripPrefix(path, options_.partition_base_dir))
.ValueOr(compute::literal(true));

ARROW_ASSIGN_OR_RAISE(
Expand Down
32 changes: 19 additions & 13 deletions cpp/src/arrow/dataset/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ Result<compute::Expression> KeyValuePartitioning::Parse(const std::string& path)
return and_(std::move(expressions));
}

Result<Partitioning::PartitionPathFormat> KeyValuePartitioning::Format(
Result<PartitionPathFormat> KeyValuePartitioning::Format(
const compute::Expression& expr) const {
ScalarVector values{static_cast<size_t>(schema_->num_fields()), nullptr};

Expand Down Expand Up @@ -377,7 +377,8 @@ DirectoryPartitioning::DirectoryPartitioning(std::shared_ptr<Schema> schema,

Result<std::vector<KeyValuePartitioning::Key>> DirectoryPartitioning::ParseKeys(
const std::string& path) const {
std::vector<std::string> segments = fs::internal::SplitAbstractPath(path);
std::vector<std::string> segments =
fs::internal::SplitAbstractPath(fs::internal::GetAbstractPathParent(path).first);
return ParsePartitionSegments(segments);
}

Expand All @@ -390,23 +391,24 @@ FilenamePartitioning::FilenamePartitioning(std::shared_ptr<Schema> schema,

Result<std::vector<KeyValuePartitioning::Key>> FilenamePartitioning::ParseKeys(
const std::string& path) const {
std::vector<std::string> segments =
fs::internal::SplitAbstractPath(StripNonPrefix(path), kFilenamePartitionSep);
std::vector<std::string> segments = fs::internal::SplitAbstractPath(
StripNonPrefix(fs::internal::GetAbstractPathParent(path).second),
kFilenamePartitionSep);
return ParsePartitionSegments(segments);
}

Result<Partitioning::PartitionPathFormat> DirectoryPartitioning::FormatValues(
Result<PartitionPathFormat> DirectoryPartitioning::FormatValues(
const ScalarVector& values) const {
std::vector<std::string> segments;
ARROW_ASSIGN_OR_RAISE(segments, FormatPartitionSegments(values));
return PartitionPathFormat{fs::internal::JoinAbstractPath(std::move(segments)), ""};
}

Result<Partitioning::PartitionPathFormat> FilenamePartitioning::FormatValues(
Result<PartitionPathFormat> FilenamePartitioning::FormatValues(
const ScalarVector& values) const {
std::vector<std::string> segments;
ARROW_ASSIGN_OR_RAISE(segments, FormatPartitionSegments(values));
return Partitioning::PartitionPathFormat{
return PartitionPathFormat{
"", fs::internal::JoinAbstractPath(std::move(segments), kFilenamePartitionSep) +
kFilenamePartitionSep};
}
Expand Down Expand Up @@ -721,7 +723,8 @@ Result<std::vector<KeyValuePartitioning::Key>> HivePartitioning::ParseKeys(
const std::string& path) const {
std::vector<Key> keys;

for (const auto& segment : fs::internal::SplitAbstractPath(path)) {
for (const auto& segment :
fs::internal::SplitAbstractPath(fs::internal::GetAbstractPathParent(path).first)) {
ARROW_ASSIGN_OR_RAISE(auto maybe_key, ParseKey(segment, hive_options_));
if (auto key = maybe_key) {
keys.push_back(std::move(*key));
Expand All @@ -731,7 +734,7 @@ Result<std::vector<KeyValuePartitioning::Key>> HivePartitioning::ParseKeys(
return keys;
}

Result<Partitioning::PartitionPathFormat> HivePartitioning::FormatValues(
Result<PartitionPathFormat> HivePartitioning::FormatValues(
const ScalarVector& values) const {
std::vector<std::string> segments(static_cast<size_t>(schema_->num_fields()));

Expand All @@ -749,8 +752,7 @@ Result<Partitioning::PartitionPathFormat> HivePartitioning::FormatValues(
}
}

return Partitioning::PartitionPathFormat{
fs::internal::JoinAbstractPath(std::move(segments)), ""};
return PartitionPathFormat{fs::internal::JoinAbstractPath(std::move(segments)), ""};
}

class HivePartitioningFactory : public KeyValuePartitioningFactory {
Expand Down Expand Up @@ -805,9 +807,14 @@ std::shared_ptr<PartitioningFactory> HivePartitioning::MakeFactory(
return std::shared_ptr<PartitioningFactory>(new HivePartitioningFactory(options));
}

std::string StripPrefixAndFilename(const std::string& path, const std::string& prefix) {
std::string StripPrefix(const std::string& path, const std::string& prefix) {
auto maybe_base_less = fs::internal::RemoveAncestor(prefix, path);
auto base_less = maybe_base_less ? std::string(*maybe_base_less) : path;
return base_less;
}

std::string StripPrefixAndFilename(const std::string& path, const std::string& prefix) {
auto base_less = StripPrefix(path, prefix);
auto basename_filename = fs::internal::GetAbstractPathParent(base_less);
return basename_filename.first;
}
Expand Down Expand Up @@ -837,7 +844,6 @@ Result<std::shared_ptr<Schema>> PartitioningOrFactory::GetOrInferSchema(
if (auto part = partitioning()) {
return part->schema();
}

return factory()->Inspect(paths);
}

Expand Down
16 changes: 10 additions & 6 deletions cpp/src/arrow/dataset/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ namespace dataset {

constexpr char kFilenamePartitionSep = '_';

struct ARROW_DS_EXPORT PartitionPathFormat {
std::string directory, filename;
};

// ----------------------------------------------------------------------
// Partitioning

Expand Down Expand Up @@ -78,10 +82,6 @@ class ARROW_DS_EXPORT Partitioning {
/// \brief Parse a path into a partition expression
virtual Result<compute::Expression> Parse(const std::string& path) const = 0;

struct PartitionPathFormat {
std::string directory, prefix;
};

virtual Result<PartitionPathFormat> Format(const compute::Expression& expr) const = 0;

/// \brief A default Partitioning which always yields scalar(true)
Expand Down Expand Up @@ -358,9 +358,13 @@ class ARROW_DS_EXPORT FilenamePartitioning : public KeyValuePartitioning {
Result<PartitionPathFormat> FormatValues(const ScalarVector& values) const override;
};

/// \brief Remove a prefix and the filename of a path.
ARROW_DS_EXPORT std::string StripPrefix(const std::string& path,
const std::string& prefix);

/// \brief Extracts the directory and filename and removes the prefix of a path
///
/// e.g., `StripPrefixAndFilename("/data/year=2019/c.txt", "/data") -> "year=2019"`
/// e.g., `StripPrefixAndFilename("/data/year=2019/c.txt", "/data") ->
/// {"year=2019","c.txt"}`
ARROW_DS_EXPORT std::string StripPrefixAndFilename(const std::string& path,
const std::string& prefix);

Expand Down
Loading