Skip to content

Conversation

@lidavidm
Copy link
Member

@lidavidm lidavidm commented Mar 2, 2021

This provides an async Parquet reader where the unit of concurrency is a single row group.

@github-actions
Copy link

github-actions bot commented Mar 2, 2021

@lidavidm lidavidm force-pushed the parquet-reentrant branch 3 times, most recently from 05f8c14 to d4f60f4 Compare March 5, 2021 20:41
@lidavidm lidavidm marked this pull request as ready for review March 6, 2021 00:38
@lidavidm
Copy link
Member Author

CC @westonpace here as well. AsyncGenerator<vector<shared_ptr<RecordBatch>>> might be a little annoying to work with if the target type is AsyncGenerator<shared_ptr<RecordBatch>>, thoughts?

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be wrong but I don't think this will quite work as written. Let me know if I've misunderstood.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Answering your question I agree it would be better to have AsyncGenerator<RecordBatch> for consistency with the other readers. You can use MakeVectorGenerator to get AsyncGenerator<AsyncGenerator<RecordBatch>>. Then apply MakeConcatMapGenerator to the result to get to AsyncGenerator<RecordBatch>

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would the scan task be the thing providing the row_group_indices?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, currently scan tasks know which row group index they correspond to. As part of this we may want to make scan tasks less granular than a single row group as discussed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, historically that hasn't been the precedent for the generators. They keep an ownership stake in their resources. Is there some reason the generator can't have a shared pointer to the reader?

Consider the dataset scanning example. The scan tasks will be asked for a generator and the scanner will keep track of the generator but the scanner will have no idea what the reader is. Who is keeping track of the reader there? What if the scanner simply discarded the scan task after it got a generator from it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that it's true for the RecordBatchReader for Parquet as well; if you look at ParquetScanTask in dataset/file_parquet.cc, there's a similar note there. I think it's solely because we don't have enable_shared_from_this for the Parquet readers, I'm not sure if there's a reason why we omit that.

Copy link
Member

@westonpace westonpace Mar 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I might be breaking this. The current Scanner::ToTable keeps the scan task alive while it iterates the batches...

ARROW_ASSIGN_OR_RAISE(auto batch_it, scan_task->Execute());
ARROW_ASSIGN_OR_RAISE(auto local, batch_it.ToVector());
state->Emplace(std::move(local), id);

However, the async version does not...

ARROW_ASSIGN_OR_RAISE(auto batch_gen, scan_task->ExecuteAsync());
return CollectAsyncGenerator(std::move(batch_gen))
        .Then([state, id](const RecordBatchVector& rbs) -> util::optional<bool> {
          state->Emplace(rbs, id);
          return true;
        });

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can go ahead and copy the scan task into the Then callback to preserve its lifetime but can you add a comment to ScanTask explaining this requirement?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's supposed to be ok: iterating the scan task should implicitly keep the scan task alive, what I meant is that the scan task is explicitly keeping the Parquet reader alive.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be a problem. I think this task will block (I'm assuming ReadRowGroupsImpl is synchronous and blocking?) Blocking tasks should not be on the CPU pool. You could put it on the I/O pool but that isn't necessarily ideal either as it complicates sizing the I/O pool.

Ideally you want something like reader_->PreBuffer(row_groups, ...).Then(NonBlockingReadRowGroupsImpl).

I think you might kind of get away with it because the task they are waiting on is on the I/O thread pool (I assume the prebuffering tasks are on the I/O pool) so you won't have the nested deadlock problem.

However, it will not have ideal performance. If you are reading a bunch of files you will have some CPU threads tied up waiting that could be doing work.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, ok - I was envisioning that the caller would explicitly buffer beforehand (since we had been talking about splitting up ScanTask in that way) but we can have this manage buffering internally as well. (Either way, either the IPC reader or the Parquet reader will need some refactoring to meet Datasets' needs.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, if you plan to do that buffering elsewhere then this is fine. I guess I misunderstood then. So the point in breaking things into tasks here is to allow for parallelism?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is to allow for parallelism when scanning a Parquet file. Though as I think about it, maybe this isn't necessary? We can do the parallelism at the scan task level already.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can hold this until ARROW-7001 is through and then we can see what exactly we need to be reentrant to get the pipeline we want.

@lidavidm
Copy link
Member Author

lidavidm commented Apr 1, 2021

(FWIW, I'm currently reworking this so that it gives just AsyncGenerator<RecordBatch>, and to manage pre-buffering internally so that the user doesn't have to worry about it, similar to CSV and IPC. The granularity of pre-buffering will be per-row-group which is not ideal for small row groups, but that could be improved.)

@lidavidm lidavidm force-pushed the parquet-reentrant branch from d4f60f4 to 4ffac4e Compare April 1, 2021 21:40
@lidavidm
Copy link
Member Author

lidavidm commented Apr 1, 2021

What I pushed is still not quite what I want. Ideally, we'd be able to ask the read cache for a future that finishes when all I/O for the given row group has completed. That way, we can then kick off a decoding task. On master, currently, you just spawn a bunch of tasks that block and wait for I/O and then proceed (wasting threads), and in this PR, we have hijinks to manually pre-buffer each row group separately (wasting the effectiveness of pre-buffering).

That is, we should be able to say

reader->PreBuffer(row_groups, columns)
...
// I/O generator
return reader->WhenBuffered({current_row_group}, {columns});

// Decoding generator
return cpu_executor_->Transfer(io_generator()).Then([]() { return ReadRowGroup(current_row_group); });

and this will let us coalesce read ranges across row groups while only performing work on the CPU pool when it's truly ready.

Also, the range cache will have to be swappable for something that just does normal file I/O for the non-S3 case so that local file scans are still reasonable.

@lidavidm
Copy link
Member Author

lidavidm commented Apr 2, 2021

Ok, something wonky is going on here, but…

--------------------------------------------------------------------------------------------
Benchmark                                  Time             CPU   Iterations UserCounters...
--------------------------------------------------------------------------------------------
BM_ReadMultipleRowGroups            38393176 ns     38392591 ns           14 bytes_per_second=2.0349G/s items_per_second=2.18495G/s
BM_ReadMultipleRowGroupsGenerator   37526258 ns       123436 ns          100 bytes_per_second=632.918G/s items_per_second=679.59G/s

I confirmed that both benchmarks read the same number of rows/columns, but this still seems rather unbelievable/there must be some sort of measurement error.

@lidavidm
Copy link
Member Author

lidavidm commented Apr 2, 2021

The benchmark discrepancy is simply because the generator was putting each row group on its own thread. The difference goes away if we don't force transfer onto a background thread.

(snipped when I realized I had benchmarked the wrong code for this PR)

@lidavidm
Copy link
Member Author

lidavidm commented Apr 2, 2021

Okay, now that I've actually checked out the right branch…

So long as pre-buffering is enabled, this PR in conjunction with ARROW-7001 is either a big win (for S3) or no effect (locally). Hence I'd argue we should just always enable pre-buffer. (The reason is that without refactoring the Parquet reader heavily, without enabling pre-buffer, the generator is effectively synchronous. I could go through and do the refactor, but pre-buffering gives us an 'easy' way to convert the I/O to be async. If we want, we could change the read range cache to optionally be lazy, which would effectively be the same as refactoring the Parquet reader.)

Also, this changes the ParquetScanTask so that it manages intra-file concurrency internally. Hence, ParquetFileFragment only needs to generate one scan task now and doesn't have to do anything complicated around pre-buffering.

Local Median Scan Time (1)
S3 Median Scan Time (1)

@lidavidm
Copy link
Member Author

lidavidm commented Apr 6, 2021

Properly implementing OpenAsync mostly fixes the discrepancy between prebuffer/no prebuffer on local files, so it seems most of the bottleneck there was just synchronously opening files.

Local Median Scan Time (2)
S3 Median Scan Time (2)

@lidavidm
Copy link
Member Author

lidavidm commented Apr 6, 2021

Ah, this exposes a deadlock in Python; now that we're using ReadAsync, if the underlying file is backed by Python, we'll deadlock over the GIL. Probably PyReadableFile needs to override ReadAsync.

@lidavidm
Copy link
Member Author

Apart from the non-pre-buffered single-file case, this brings us back on par with or improves upon the threaded scanner.

Median Parquet Scan Time (seconds)

@lidavidm lidavidm force-pushed the parquet-reentrant branch from 51fbc61 to 6ebbc7c Compare April 23, 2021 17:50
@lidavidm
Copy link
Member Author

I'm also unable to replicate the failed Parquet encryption test on Windows. This PR already changes things to explicitly close the file handle, but I do feel like it's flakier than before.

@lidavidm lidavidm force-pushed the parquet-reentrant branch from 6ebbc7c to d7732a3 Compare April 26, 2021 18:40
Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checking my understanding: This approach does all of the decryption and decoding on the I/O pool? I think that's fine since prebuffering is already creating a sort of dedicated I/O thread so we don't have to worry about decryption/decoding leaving the next read idle.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little bit surprised this works at all honestly. Futures aren't really compatible with move-only types. I thought you would have gotten a compile error. This reminded me to create ARROW-12559. As long as this works I think you're ok. There is only one callback being added to reader_fut and you don't access the value that gets passed in as an arg here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case it works since Future internally heap-allocates its implementation, and yes, this is rather iffy, but works since this is the only callback. I'll add a TODO referencing ARROW-12259.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to look at enable_parallel_column_conversion anymore since we're async?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While benchmarking I noticed it got stuck due to this, so there's still nested parallelism coming in somewhere - I'll try to figure this out.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, actually, that makes sense. It would get stuck here: https://github.com/lidavidm/arrow/blob/parquet-reentrant/cpp/src/parquet/arrow/reader.cc#L959

ParallelFor does a blocking wait.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add ParallelForAsync which returns a Future but that can be done in a follow-up.

Copy link
Member

@westonpace westonpace Apr 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a nice win for async in the HDD/SSD space (still recommending a follow-up PR).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed this and added a TODO for ARROW-12597.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/*lazy=*/false? I'm not really sure what the rules are for those inline names.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note I split this out into ARROW-12522/#10145 so I'll fix things there and rebase here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

&*it -> it unless I'm missing something

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is turning the iterator type into an actual pointer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, ok, a new trick for me to learn then :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetFuture is not a very clear name. I'm not sure what a better one would be. Maybe GetOrIssueRead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know the rules around PARQUET_CATCH_EXCEPTIONS but will it be a problem that ReadOneRowGroup runs (presumably) outside of this guard?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just expands to a try { } catch. However I'll audit these again…I dislike working in this module because we're mixing exceptions/Status and the OpenAsync work has just exacerbated that problem a lot.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I haven't really had to think through those problems yet since exceptions are a bug everywhere else. It may be there are utilities we could add to the async code to help here. Feel free to create JIRAs and assign them to me if that is the case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming nit: Maybe DecodeRowGroup?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could be sold on using Item for brevity since it is constrained to this file but maybe Batches or Items instead of Item?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this function throw exceptions? It looks to me like that got changed when it changed to future.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to try and rework what's here as mixing exceptions/Status/Future is painful + confusing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this all to allow unique_ptr to work? If so, maybe add a todo comment reference ARROW-12559

Copy link
Member Author

@lidavidm lidavidm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. Yes, the assumption is that parsing/decrypting the footer is cheap enough that it can be done inline. However now that you point it out, I don't think this is always the case (notably, I think Parquet is superlinear in the number of columns - or at least the implementation is - as there have been bug reports filed in the past from people with 1000-10000 columns) so I'll have to test that as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is when the main thread holds the GIL and calls back into libarrow, and then libarrow tries to call back into Python. This is OK if it's all on the same thread but not if it's on different threads. Really, this is an implementation error (any such bindings in pyarrow should release the GIL) but I remember it bit me somewhere during this PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just expands to a try { } catch. However I'll audit these again…I dislike working in this module because we're mixing exceptions/Status and the OpenAsync work has just exacerbated that problem a lot.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to try and rework what's here as mixing exceptions/Status/Future is painful + confusing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note I split this out into ARROW-12522/#10145 so I'll fix things there and rebase here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case it works since Future internally heap-allocates its implementation, and yes, this is rather iffy, but works since this is the only callback. I'll add a TODO referencing ARROW-12259.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll change the API to have the user pass in the pointer. I'd rather have shared_from_this but I guess that won't work since it's typically used as a unique_ptr.

@lidavidm
Copy link
Member Author

By the way, if it's easier, I can split the generator/OpenAsync/ScanBatchesAsync parts of this PR from each other. I think the OpenAsync part deserves some extra scrutiny in any case.

@lidavidm lidavidm force-pushed the parquet-reentrant branch from d7732a3 to 7592eb7 Compare April 27, 2021 17:28
@lidavidm
Copy link
Member Author

This is now rebased on top of ARROW-12522 / #10145. I've reworked ParseMetadata so that the synchronous and asynchronous paths are truly synchronous/asynchronous, and to avoid mixing error handling methods (all parsing code uses exceptions, and the asynchronous path catches them and converts to Status).

@lidavidm lidavidm force-pushed the parquet-reentrant branch 4 times, most recently from 8ccda2b to b8b605d Compare May 18, 2021 20:21
@lidavidm
Copy link
Member Author

Sorry to bother you @emkornfield, but would you have time to review the changes here under src/parquet/reader.{cc,h}, since it refactors the footer parsing? Otherwise, I'll ask Antoine to take a look when he's back from leave. (I know there's a lot of other stuff in this PR, which Weston has looked through already.)

@lidavidm lidavidm force-pushed the parquet-reentrant branch 2 times, most recently from 1cd7e5b to 1c923ae Compare May 27, 2021 13:23
@lidavidm
Copy link
Member Author

@pitrou would you have time to review the Parquet reader parts of this PR? Particularly where the footer reading code has been refactored.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this comment. Where is the workaround?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The workaround is using AddCallback and marking a future complete manually instead of using Then.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not async, rename this function? For example ParseMetadataBuffer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

footer_buffer and footer_read_size don't seem used here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should explain whether the executor is meant for IO or CPU work.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use auto?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you open a JIRA to reuse @westonpace 's work to always transfer the future? (we don't want CPU-heavy Parquet decoding to happen on an IO thread)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed ARROW-12916.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you always have to pass columns explicitly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I figured this is a lower-level API. We could add overloads to ease this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem to actually check the contents read from file. Could you do that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've adjusted the test to check equality below.

@pitrou
Copy link
Member

pitrou commented Jun 1, 2021

Also, can you avoid using the misleading term "reentrant" here? I don't think there is any reentrant code path in this PR.

@lidavidm lidavidm changed the title ARROW-11843: [C++] Provide reentrant Parquet reader ARROW-11843: [C++] Provide async Parquet reader Jun 1, 2021
@lidavidm lidavidm force-pushed the parquet-reentrant branch 3 times, most recently from 7269cd7 to 6d1acfa Compare June 2, 2021 12:59
@lidavidm
Copy link
Member Author

lidavidm commented Jun 3, 2021

S3 Median Scan Time (s)(1)

I re-tested S3 and also compared against the threaded reader with pre-buffering. I also corrected a logic error in file_parquet.cc that was leading to us needlessly spawning a thread.

I also ran Conbench on the EC2 instance. It seems with that dataset, async doesn't make much of a difference. (Note that the bucket is in a different zone than my instance, though both are in the same region.)

Threaded/pre-buffer:

        "iterations": 10,
        "max": "10.805500",
        "mean": "10.117983",
        "median": "10.099560",
        "min": "9.663432",
        "q1": "9.919561",
        "q3": "10.197374",
        "stdev": "0.309781",

Async/pre-buffer:

        "iterations": 10,
        "max": "10.660384",
        "mean": "9.774189",
        "median": "9.626739",
        "min": "9.297837",
        "q1": "9.391290",
        "q3": "10.127940",
        "run_id": "c3a226feb28e4025bc70fd2e90e405d4",
        "stdev": "0.460085",

@pitrou pitrou force-pushed the parquet-reentrant branch from 0f974b2 to 95241bb Compare June 7, 2021 15:05
@pitrou
Copy link
Member

pitrou commented Jun 7, 2021

Rebased, will merge. Thank you!

@pitrou
Copy link
Member

pitrou commented Jun 7, 2021

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants