Skip to content

Conversation

@westonpace
Copy link
Member

Created writer_post_finish (similar to writer_pre_finish) to visit dataset-created files after Finish. Added a similar file_visitor concept to pyarrow which maps to writer_post_finish. Connected the legacy metadata_collector to the file_visitor so that parquet datasets created with use_legacy_dataset=True can support metadata_collector.

@github-actions
Copy link

@westonpace westonpace changed the title ARROW-12364: [C++][Dataset][Python] Add a callback to visit file writers just before Finish() ARROW-12364: [Python] [Dataset] Add metadata_collector option to ds.write_dataset() Jun 30, 2021
@westonpace
Copy link
Member Author

westonpace commented Jun 30, 2021

This PR now includes #10619 . Some notes for review:

  • Ben and I took parallel approaches at this. Ben's approach was to mirror the C++ API and create a FileWriter to wrap CFileWriter. My approach was to create a WrittenFile class which is just the path & metadata (if present) and expose that as file_visitor. I'm happy to switch if we feel the other is better. My rationale was "FileWriter is an internal class, best to hide the concept and only expose what is needed."
  • ARROW-10440 added a visitor to be called right before finish was called on a file. For metadata_collector to work I needed to create a visitor that is called right after finish is called on the file so I added the second visitor as part of this PR.
  • The existing metadata_collector is a bit clunky when working with partitioned datasets. The _metadata file does not contain the partition columns. This does appear to be the intent (with common_metadata, if it exists, containing the full schema) but without a working spark/hadoop setup I can't be completely certain.
  • The existing tests for metadata_collector were calling write_dataset on the same directory multiple times and expecting multiple files to be created (since the legacy writer uses a guid for naming). This seems somewhat related to ARROW-12358. I just updated the test to call write_dataset once with a partitioned column.

@westonpace
Copy link
Member Author

@bkietz @jorisvandenbossche would appreciate any review if you have time.

Copy link
Member

@bkietz bkietz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, some minor comments

"FileWriter is an internal class, best to hide the concept and only expose what is needed."

That sounds good to me. I exposed FileWriter but this was speculative generality: I was envisioning appending things to an IPC file's custom metadata (like maybe a representation of accumulated statistics) but that's definitely not something we need to worry about for this PR.

For metadata_collector to work I needed to create a visitor that is called right after finish is called on the file so I added the second visitor as part of this PR.

Is it worth noting anywhere in this patch that this is because ParquetFileWriter doesn't finalize statistics until the writer is Close()d?

I just updated the test to call write_dataset once with a partitioned column.

Alternatively, you could tweak FileSystemDatasetWriteOptions::basename_template between writes

@westonpace westonpace force-pushed the feature/ARROW-12364--python-dataset-add-metadata_collector-option-t branch 2 times, most recently from e707e45 to 52347c9 Compare July 2, 2021 22:04
@westonpace
Copy link
Member Author

Thanks for the review @bkietz. I've addressed the changes you requested.

@westonpace westonpace requested a review from bkietz July 2, 2021 22:35
Copy link
Member

@jorisvandenbossche jorisvandenbossche left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for looking into this!

Mainly some questions about the testing (related to the schema, including the partition column or not)

Comment on lines 735 to 736
If set, this function will be called with a WrittenFile instance
for each file created during the call.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since "WrittenFile" is not a generally know pyarrow class, I would give a bit more details on this (eg the fact that you can get the path and (if parquet) metadata.

And maybe also give an example use case, something like "For example, this enables to collect the paths or metadata of all written files"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also mention that the Parquet metadata has been updated with the written file path?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a bit more details as suggested. I added the bit about the parquet metadata and the written file path in the WrittenFile.metadata docstring.

if metadata_collector is not None:
raise ValueError(msg.format("metadata_collector"))
def file_visitor(written_file):
if written_file.metadata:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this if check needed (in theory)? Since this will always be parquet files in this case

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose not, I have removed it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could instead be an assert


metadata_path = str(root_path / '_metadata')
# write _metadata file
pq.write_metadata(
table.schema, metadata_path,
partitionless_schema, metadata_path,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See below comment. Ideally no, but will be addressed in ARROW-13269

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: for consistency, shouldn't this be called "physical_schema"?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With my suggestion above, those changes can all be reverted, I think (it was changing the intent of the tests)

dataset = ds.parquet_dataset(metadata_path)
assert dataset.schema.equals(table.schema)
assert dataset.schema.equals(partitionless_schema)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem right to me? I think the dataset's schema should include the partition columns?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you that it isn't right (and now there is https://stackoverflow.com/questions/68277701/write-pandas-dataframe-parquet-metadata-with-partition-columns#comment120671321_68277701 ). However, that was the legacy behavior, and I'd rather not tackle it as part of this PR. I have opened up ARROW-13269 to address it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But so I don't fully understand what this PR changed that causes this?

@@ -2810,11 +2819,11 @@ def test_parquet_dataset_lazy_filtering(tempdir, open_logging_fs):

# filtering fragments should not open any file
with assert_opens([]):
list(dataset.get_fragments(ds.field("f1") > 15))
list(dataset.get_fragments(ds.field("f2") > 15))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "f2" column is random normal around 0, so this filter will never yield anything

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed f2 to be int64 values 0, 10, 20, 30. f1 is no longer usable because it got removed from the schema because it was a partition column.

Copy link
Member

@bkietz bkietz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@jorisvandenbossche jorisvandenbossche left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked a bit closer at the tests, and I think the reason the changes where needed (and my confusion around it) is that they were originally written to be for a non-partitioned dataset (and you changed the creation to a partitioned one). See my inline comments.

It might be useful to add an additional test to cover what you observed / reported in ARROW-13269 (but I still need to think that through)

Comment on lines 2680 to 2688
f1_vals = [item for chunk in range(4) for item in [chunk] * 10]
f2_vals = [item*10 for chunk in range(4) for item in [chunk] * 10]

table = pa.table({'f1': f1_vals, 'f2': f2_vals})
pq.write_to_dataset(
table, str(root_path), partition_cols=['f1'],
use_legacy_dataset=use_legacy_dataset,
metadata_collector=metadata_collector
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
f1_vals = [item for chunk in range(4) for item in [chunk] * 10]
f2_vals = [item*10 for chunk in range(4) for item in [chunk] * 10]
table = pa.table({'f1': f1_vals, 'f2': f2_vals})
pq.write_to_dataset(
table, str(root_path), partition_cols=['f1'],
use_legacy_dataset=use_legacy_dataset,
metadata_collector=metadata_collector
)
root_path.mkdir()
for i in range(4):
table = pa.table({'f1': [i] * 10, 'f2': np.random.randn(10)})
path = root_path / f"part-{i}.parquet"
pq.write_table(table, str(path), metadata_collector=metadata_collector)
# set the file path relative to the root of the partitioned dataset
metadata_collector[-1].set_file_path(f"part-{i}.parquet")

Looking a bit more in detail, the goal of this helper method is actually to create a non-partitioned dataset (so just a directory with flat files), since the partitioned case is tested a bit more below with the helper function _create_parquet_dataset_partitioned. So my code suggestion here adapts it to use plain write_table (to overcome the issue that write_to_dataset generates identical files (and thus overwrites)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this way it is of course not using the write_to_dataset non-legacy version, so if we are not doing that, we could also keep the original without passing the use_legacy_dataset keyword.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But so there is a _create_parquet_dataset_partitioned a little bit below, and I think it is that one that should be parametrized with use_legacy_dataset=True/False

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also ended up spinning myself around a few times here. What I wanted is a test to ensure that we can round trip table -> _metadata -> factory -> dataset -> table using the metadata_collector. However, changing use_legacy_dataset here fails (since the new dataset doesn't support append, a.k.a ARROW-12358). An, both use_legacy_dataset versions fail when there is a partition because of ARROW-13269.

So I created a new, simple, test which tests the round trip with a single file and no append and no partitioning. Tests for the other scenarios can be fleshed out when those issues are addressed. I then restored all the old tests as they were. Hopefully this works.

@@ -2672,47 +2672,57 @@ def test_feather_format(tempdir, dataset_reader):
dataset_reader.to_table(ds.dataset(basedir, format="feather"))


def _create_parquet_dataset_simple(root_path):
def _create_parquet_dataset_simple(root_path, use_legacy_dataset=True):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def _create_parquet_dataset_simple(root_path, use_legacy_dataset=True):
def _create_parquet_dataset_simple(root_path, use_legacy_dataset=True):
"""
Creates a simple (flat files, no nested partitioning) Parquet dataset
"""

(to clarify the intent)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.


metadata_path = str(root_path / '_metadata')
# write _metadata file
pq.write_metadata(
table.schema, metadata_path,
partitionless_schema, metadata_path,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With my suggestion above, those changes can all be reverted, I think (it was changing the intent of the tests)

file_visitor : Function
If set, this function will be called with a WrittenFile instance
for each file created during the call. This object will contain
the path and (if the dataset is a parquet dataset) the parquet
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The WrittenFile class is (currently) not exposed in the pyarrow.dataset namespace (and I think it is good to keep it that way, to not have users rely on the specific class), so I think we still need to be more explicit: eg "contain the path" -> "have a path attribute"

A small example might also help to illustrate, eg this one from the tests:

    visited_paths = []

    def file_visitor(written_file):
        visited_paths.append(written_file.path)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I thought WrittenFile was exposed. I've improved the docstring here. For my education, what is the concern with users relying on this class? It seems less brittle than users relying on a snippet of documentation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my education, what is the concern with users relying on this class? It seems less brittle than users relying on a snippet of documentation.

It just "locks us in" on using exactly this class (as users could start relying on the actual specific class (eg isinstance(obj, WrittenFile, although that shouldn't be useful in practice), instead of the interface of the class (the fact that it has a path and metadata attributes). Without publicly exposing the class, it gives us the freedom in the future to change this (eg expose the actual FileWriter) without having to worry about possibly breaking code, as long as it still has the path and metadata attributes.

I personally follow Ben's comment about this basically being a namedtuple, but since cython doesn't support namedtuples, a simple class seems a good alternative (a difference with eg ReadOptions, is that a user never creates a WrittenFile themselves).

Now, I don't care that much about it, and we could also simply expose it publicly :) (i.e. import it in the pyarrow.dataset namespace and add the class to the API reference docs)
But I think with your current updated docstring of write_dataset, this is clear enough.

…) to visit dataset-created files after Finish. Added a similar file_visitor concept to pyarrow which maps to writer_post_finish. Connected the legacy metadata_collector to the file_visitor so that parquet datasets created with use_legacy_dataset=True can support metadata_collector.
@westonpace westonpace force-pushed the feature/ARROW-12364--python-dataset-add-metadata_collector-option-t branch from 1081361 to 63ebb03 Compare July 9, 2021 00:48
Copy link
Member

@jorisvandenbossche jorisvandenbossche left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating the tests! Just two small comments on the docstring

westonpace and others added 2 commits July 9, 2021 09:47
Co-authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Co-authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
@westonpace
Copy link
Member Author

@jorisvandenbossche Thank you for all the review. I have incorporated your changes.

@bkietz bkietz closed this in 7b66f97 Jul 14, 2021
@westonpace westonpace deleted the feature/ARROW-12364--python-dataset-add-metadata_collector-option-t branch January 6, 2022 08:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants