Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions cpp/src/arrow/dataset/discovery.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,24 +236,44 @@ Result<std::vector<std::shared_ptr<Schema>>> FileSystemDatasetFactory::InspectSc
InspectOptions options) {
std::vector<std::shared_ptr<Schema>> schemas;

ARROW_ASSIGN_OR_RAISE(auto partition_schema,
options_.partitioning.GetOrInferSchema(
StripPrefixAndFilename(files_, options_.partition_base_dir)));

const bool has_fragments_limit = options.fragments >= 0;
int fragments = options.fragments;
for (const auto& info : files_) {
if (has_fragments_limit && fragments-- == 0) break;
auto result = format_->Inspect({info, fs_});

if (ARROW_PREDICT_FALSE(!result.ok())) {
return result.status().WithMessage(
"Error creating dataset. Could not read schema from '", info.path(),
"'. Is this a '", format_->type_name(), "' file?: ", result.status().message());
}

if (partition_schema->num_fields()) {
auto field_check =
result->get()->CanReferenceFieldsByNames(partition_schema->field_names());

// Partitioning field present in fragment. Doing deep check with data type
if (ARROW_PREDICT_FALSE(field_check.ok())) {
const FieldVector fields = partition_schema->fields();

for(auto &it:fields){
if (it && result->get()->GetFieldIndex(it->name())!=-1) {
if (it->type() != result->get()->GetFieldByName(it->name())->type()) {
return Status::Invalid(
"Error creating dataset. Partitioning field(s) present in fragment with different datatype.");
}
}
}
}
}

schemas.push_back(result.MoveValueUnsafe());
}

ARROW_ASSIGN_OR_RAISE(auto partition_schema,
options_.partitioning.GetOrInferSchema(
StripPrefixAndFilename(files_, options_.partition_base_dir)));
schemas.push_back(partition_schema);

return schemas;
}

Expand Down
44 changes: 34 additions & 10 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -170,23 +170,47 @@ Result<std::shared_ptr<Schema>> GetProjectedSchemaFromExpression(
// Scan options has a number of options that we can infer from the dataset
// schema if they are not specified.
Status NormalizeScanOptions(const std::shared_ptr<ScanOptions>& scan_options,
const std::shared_ptr<Schema>& dataset_schema) {
const std::shared_ptr<Dataset>& dataset) {
if (scan_options->dataset_schema == nullptr) {
scan_options->dataset_schema = dataset_schema;
scan_options->dataset_schema = dataset->schema();
}

if (!scan_options->filter.IsBound()) {
ARROW_ASSIGN_OR_RAISE(scan_options->filter,
scan_options->filter.Bind(*dataset_schema));
scan_options->filter.Bind(*dataset->schema()));
}

// std::shared_ptr<FileSystemDataset> fs_dataset = std::dynamic_pointer_cast<FileSystemDataset>(dataset);
// if (fs_dataset) {
// std::shared_ptr<Partitioning> partitioning = fs_dataset->partitioning();

// if (partitioning->schema()->num_fields()) {
// auto field_check =
// dataset->schema()->CanReferenceFieldsByNames(partitioning->schema()->field_names());

// // Partitioning field present in fragment. Doing deep check with data type
// if (ARROW_PREDICT_FALSE(field_check.ok())) {
// const FieldVector fields = partitioning->schema()->fields();

// for(auto &it:fields){
// if (it && dataset->schema()->GetFieldIndex(it->name())!=-1) {
// if (it->type() != dataset->schema()->GetFieldByName(it->name())->type()) {
// return Status::Invalid(
// "Error creating dataset. Partitioning field(s) present in fragment with different datatype.");
// }
// }
// }
// }
// }
// }

if (!scan_options->projected_schema) {
// If the user specifies a projection expression we can maybe infer from
// that expression
if (scan_options->projection.IsBound()) {
ARROW_ASSIGN_OR_RAISE(
auto project_schema,
GetProjectedSchemaFromExpression(scan_options->projection, dataset_schema));
GetProjectedSchemaFromExpression(scan_options->projection, dataset->schema()));
if (project_schema->num_fields() > 0) {
scan_options->projected_schema = std::move(project_schema);
}
Expand All @@ -205,7 +229,7 @@ Status NormalizeScanOptions(const std::shared_ptr<ScanOptions>& scan_options,
// process resultant dataset_schema after projection
ARROW_ASSIGN_OR_RAISE(
auto projected_schema,
GetProjectedSchemaFromExpression(scan_options->projection, dataset_schema));
GetProjectedSchemaFromExpression(scan_options->projection, dataset->schema()));

if (projected_schema->num_fields() > 0) {
// create the projected schema only if the provided expressions
Expand All @@ -220,7 +244,7 @@ Status NormalizeScanOptions(const std::shared_ptr<ScanOptions>& scan_options,
// if projected_fields are not found, we default to creating the projected_schema
// and projection from the dataset_schema.
ARROW_ASSIGN_OR_RAISE(auto projection_descr,
ProjectionDescr::Default(*dataset_schema));
ProjectionDescr::Default(*dataset->schema()));
scan_options->projected_schema = std::move(projection_descr.schema);
scan_options->projection = projection_descr.expression;
}
Expand All @@ -231,12 +255,12 @@ Status NormalizeScanOptions(const std::shared_ptr<ScanOptions>& scan_options,
ARROW_ASSIGN_OR_RAISE(
auto projection_descr,
ProjectionDescr::FromNames(scan_options->projected_schema->field_names(),
*dataset_schema));
*dataset->schema()));
scan_options->projection = projection_descr.expression;
}

if (!scan_options->projection.IsBound()) {
auto fields = dataset_schema->fields();
auto fields = dataset->schema()->fields();
for (const auto& aug_field : kAugmentedFields) {
fields.push_back(aug_field);
}
Expand Down Expand Up @@ -426,7 +450,7 @@ Result<EnumeratedRecordBatch> ToEnumeratedRecordBatch(

Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync(
Executor* cpu_executor, bool sequence_fragments, bool use_legacy_batching) {
RETURN_NOT_OK(NormalizeScanOptions(scan_options_, dataset_->schema()));
RETURN_NOT_OK(NormalizeScanOptions(scan_options_, dataset_));

auto exec_context =
std::make_shared<compute::ExecContext>(scan_options_->pool, cpu_executor);
Expand Down Expand Up @@ -992,7 +1016,7 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan* plan,
auto dataset = scan_node_options.dataset;
bool require_sequenced_output = scan_node_options.require_sequenced_output;

RETURN_NOT_OK(NormalizeScanOptions(scan_options, dataset->schema()));
RETURN_NOT_OK(NormalizeScanOptions(scan_options, dataset));

// using a generator for speculative forward compatibility with async fragment discovery
ARROW_ASSIGN_OR_RAISE(auto fragments_it, dataset->GetFragments(scan_options->filter));
Expand Down
8 changes: 8 additions & 0 deletions python/pyarrow/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ def test_partitioning():
names=["f1", "f2", "part"]
)
partitioning_schema = pa.schema([("part", pa.string())])
modified_partitioning_schema = pa.schema([("f1", pa.int64())])
for klass in [ds.DirectoryPartitioning, ds.HivePartitioning,
ds.FilenamePartitioning]:
with tempfile.TemporaryDirectory() as tempdir:
Expand All @@ -695,6 +696,13 @@ def test_partitioning():
load_back_table = load_back.to_table()
assert load_back_table.equals(table)

# test roundtrip with different partitioning field
with pytest.raises(pa.ArrowInvalid):
load_back = ds.dataset(tempdir, format='ipc',
partitioning=klass(
modified_partitioning_schema)
)


def test_expression_arithmetic_operators():
dataset = ds.dataset(pa.table({'a': [1, 2, 3], 'b': [2, 2, 2]}))
Expand Down