Skip to content

[Python] [Dataset] Add metadata_collector option to ds.write_dataset() #18615

@asfimport

Description

@asfimport

The legacy pq.write_to_dataset() has an option to save metadata to a list when writing partitioned data.

    collector = []
    pq.write_to_dataset(
        table=table,
        root_path=output_path,
        use_legacy_dataset=True,
        metadata_collector=collector,
    )
    fragments = []
    for piece in collector:
        files.append(filesystem.sep.join([output_path, piece.row_group(0).column(0).file_path]))

This allows me to save a list of the specific parquet files which were created when writing the partitions to storage. I use this when scheduling tasks with Airflow.

Task A downloads data and partitions it -> Task B reads the file fragments which were just saved and transforms it -> Task C creates a list of dataset filters from the file fragments I transformed, reads each filter to into a table and then processes the data further (normally dropping duplicates or selecting a subset of the columns) and saves it for visualization

fragments = ['dev/date_id=20180111/transform-split-20210301013200-68.parquet', 'dev/date_id=20180114/transform-split-20210301013200-69.parquet', 'dev/date_id=20180128/transform-split-20210301013200-57.parquet', ]

I can use this list downstream to do two things:

  1. I can read the list of fragments directly as a new dataset and transform the data
ds.dataset(fragments)
  1. I can generate filters from the fragment paths which were saved using ds._get_partition_keys(). This allows me to query the dataset and retrieve all fragments within the partition. For example, if I partition by date and I process data every 30 minutes I might have 48 individual file fragments within a single partition. I need to know to query the entire partition instead of reading a single fragment.
def consolidate_filters(fragments):
    """Retrieves the partition_expressions from a list of dataset fragments to build a list of unique filters"""
    filters = []
    for frag in fragments:
        partitions = ds._get_partition_keys(frag.partition_expression)
        filter = [(k, "==", v) for k, v in partitions.items()]
        if filter not in filters:
            filters.append(filter)
    return filters

filter_expression = pq._filters_to_expression(
                filters=consolidate_filters(fragments=fragments)
            )

My current problem is that when I use ds.write_dataset(), I do not have a convenient method for generating a list of the file fragments I just saved. My only choice is to use basename_template and fs.glob() to find a list of the files based on the basename_template pattern. This is much slower and a waste of listing files on blob storage. Related stackoverflow question with the basis of the approach I am using now

Environment: Ubuntu 18.04
Reporter: Lance Dacey / @ldacey
Assignee: Weston Pace / @westonpace

Related issues:

PRs and other links:

Note: This issue was originally created as ARROW-12364. Please see the migration documentation for further details.

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions