Skip to content

Conversation

@save-buffer
Copy link
Contributor

This PR contains an implementation of a multithreaded TPC-H dbgen, as well as an implementation of Q1 as a google benchmark. The advantage of this dbgen approach is that it is a scan node: it generates data on the fly and streams it over. As a result, I was for instance able to run scale factor 1000 on Q1 on my desktop with only 32 GB of RAM.

I did verify results of Q1. They don't exactly match the reference results, but they are quite close and well within what I'd expect the variance to be between random number generators.

-------------------------------------------------------------
Benchmark                   Time             CPU   Iterations
-------------------------------------------------------------
BM_Tpch_Q1/SF:1     186609936 ns       268825 ns          100
BM_Tpch_Q1/SF:10   1858114140 ns       276741 ns           10
BM_Tpch_Q1/SF:100  18561088470 ns       273067 ns            1
BM_Tpch_Q1/SF:1000 186103719755 ns       289445 ns            1

@github-actions
Copy link

github-actions bot commented Mar 2, 2022

@github-actions
Copy link

github-actions bot commented Mar 2, 2022

⚠️ Ticket has not been started in JIRA, please click 'Start Progress'.

Copy link
Member

@jonkeane jonkeane left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fantastic! I'll pull this locally and try to wire it up in R tomorrow. One comment about licensing the code snippet.

Is it possible to add tests for these generators? Even if we don't want to necessarily have generated data that go with the tests, even just confirming that they ran + generated the right shape of data + other details would be fantastic.

`"' ___.,'` j,-'
`-.__.,--'
*/
// Source: https://stackoverflow.com/questions/1068849/how-do-i-determine-the-number-of-digits-of-an-integer-in-c
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should confirm that this is ok to copy/paste. https://www.apache.org/legal/resolved.html#stackoverflow says no, though I thought I remember mention of a date after which stack overflow content is ok.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's actually not really a copy - I have the DCHECK, made it work with 64-bit ints, return -1 in the unreachable scenario, and the variable and function names are different. I was mainly using it as a source for justifying why this is the fastest way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But yes, I'm not sure how different something like this has to be in order for it to count as "original" vs "copied and modified"...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nods modified is ok, and it might be changed enough, but I'll get a second opinion on it as well to ensure we're good here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modification doesn't really matter from a legal perspective. Derivative works are only allowed if you have rights to the source.

That being said, we are in a terrible gray area here that I fear will become a can of worms and a waste of everyone's time. Technically the only "proper" approach would be a clean room approach where @save-buffer describes what is needed to someone that has never seen the SO code and that person writes the code. However, legally documenting such a process is a headache.

In this case I think we are (at least ethically) in the clear. The answer author has this statement on their profile page:

All code I post on Stack Overflow is covered by the "Do whatever the heck you want with it" licence, the full text of which is:

Do whatever the heck you want with it.

There are a few edits by other authors but none of which touches the code included here. My opinion would be to proceed as is (but get rid of the ASCII art, I can't abide fun).

@save-buffer
Copy link
Contributor Author

Good point about tests - I will add some.

#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/type.h"
#include "arrow/util/pcg_random.h"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think these headers (or the ones in vendored) are actually pulled in by default, at least they weren't pulled in when I built using the instructions: https://arrow.apache.org/docs/r/articles/developers/setup.html#step-2---configure-the-libarrow-build

I coppied the vendored/pcg/*.hpp over manually — but I suspect we'll need to add those to cmake somewhere to do that as part of the build now that these are used in compute

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL about that file!

@jonkeane
Copy link
Member

jonkeane commented Mar 2, 2022

And one more thing I though as I was doing this: For the tables that have keys shared across them (e.g. lineitems and orders), is it possible generate those at the same time so that the keys are the same across both? I can figure out if generating them separately is an issue by comparing duckdb or running the queries through arrow bench.

@save-buffer
Copy link
Contributor Author

@jonkeane They get generated at the same time if you use the same TpchGen object.

@jonkeane
Copy link
Member

jonkeane commented Mar 2, 2022

@jonkeane They get generated at the same time if you use the same TpchGen object.

Cool, so then the next thing would be to figure out if there's a way to emit and write these record batch readers simultaneously. (and that might actually need to to be done all in C++, the R execution paradigm might make it hard to have one plan that is emitting two sets of record batches and writes them simultaneously)

@save-buffer
Copy link
Contributor Author

Yeah, one thing to think about is what the R code would look like. You'd need some way of "linking" the two tables in R right?

@jonkeane
Copy link
Member

jonkeane commented Mar 2, 2022

From the R side, yeah. The hard part is that we want to take each of these Record Batch Readers and write them to parquet files, but we don't have an object in the R bindings that is designed to hold multiple record batch readers and kick off the writing for both.

@jonkeane
Copy link
Member

jonkeane commented Mar 2, 2022

Though we should be able to construct a plan that includes write nodes for both of those recordbatchreaders in C++ and then let C++ handle the rest... (no need to try and shoe horn that multithreading into R which will be reluctant to make that easy!) I'm digging into those now to see if I can wire that up myself...

@westonpace
Copy link
Member

Yeah, I'm pretty sure a single plan could have one tpch gen node for each table and then each of those nodes would be attached to a dedicated write node. I have done very little testing with multiple sink nodes though so it would be an interesting experiment.

@jonkeane
Copy link
Member

jonkeane commented Mar 3, 2022

Yeah that was what I was thinking. I just did a bit of tinkering with one gen node and a write node, but don’t have it working quite yet. I’ll push what I have when I get home and maybe it’ll be obvious what I’m doing wrong.

Tangentially: ideally, I would like each table to go to a single parquet file (with multiple row groups, and written batch by batch). Do we have the node machinery for that already? Or just machinery for writing to datasets?

@westonpace
Copy link
Member

Tangentially: ideally, I would like each table to go to a single parquet file (with multiple row groups, and written batch by batch). Do we have the node machinery for that already? Or just machinery for writing to datasets?

I'm not sure we need a dedicated write node for single-file writes.

What your describing is the "default partitioning" but looking at the code I'm not sure if we default to that or if we default to a segmentation fault when no partitioning is specified. 😬 arrow::dataset::Partitioning::Default() could be used in the meantime.

@jonkeane
Copy link
Member

jonkeane commented Mar 3, 2022

Well that might explain the seg fault I got! Thanks for that I’ll try piping that in.

I have some thoughts about if that actually works, but I need to dig some more. My issues with trying to use it that way preciously might have been from the R bindings and not C++ so I might be able to work around them. I know the single file write is not standard, and unlikely to be the right thing for most people, but in this case it helps a lot with compatibility.

}

// [[arrow::export]]
void Tpch_Dbgen_Write(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the ultimate shape that this function will take — but was my first attempt to using the write node. It is currently segfaulting and I've commented some of the silly things I've done in-line.

This function is a bit separate from the PR so if getting this working will delay merging the larger PR, I'm happy to pull it into a separate one.

Comment on lines 377 to 396
auto base_path = base_dir + "/parquet_dataset";
filesystem->CreateDir(base_path);

auto format = std::make_shared<ds::ParquetFileFormat>();

ds::FileSystemDatasetWriteOptions write_options;
write_options.file_write_options = format->DefaultWriteOptions();
write_options.existing_data_behavior = ds::ExistingDataBehavior::kDeleteMatchingPartitions;
write_options.filesystem = filesystem;
write_options.base_dir = base_path;
write_options.partitioning = arrow::dataset::Partitioning::Default();
write_options.basename_template = "part{i}.parquet";
write_options.max_partitions = 1024;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of this is hard coded for now to get it working — one thing that surprise me a little bit is that we have slightly different write options for this than we do for datasets. I think I need to wire up FileSystemDatasetWriteOptions (or figure out if it gets translated in the right way to get to that point with our R wiring of FileWriteOptions

Comment on lines 391 to 406
// TODO: this had a checked_cast in front of it in the code I adapted it from
// but I ran into namespace issues when doing it so I took it out to see if it
// worked, but maybe that's what's causing the sefault?
const ds::WriteNodeOptions options =
ds::WriteNodeOptions{write_options, table->output_schema()};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be exactly what's causing the seg fault, the code I saw that did this had checked_cast in front of this

void AppendNumberPaddedToNineDigits(char *out, int64_t x)
{
// We do all of this to avoid calling snprintf, which does a lot of crazy
// locale stuff. On Windows and MacOS this can get suuuuper slow
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have formatting utilities in arrow/util/formatting.h, no need to reinvent them.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@save-buffer looks like this comment is still unaddressed. Also, if speed is a concern, I notice ~2x faster performance with arrow::internal::detail::FormatAllDigitsLeftPadded than with AppendNumberPaddedToNineDigits

@nealrichardson
Copy link
Member

Presumably this involves vendoring some official TPC code? If so, we should confirm its license and add a note to LICENSE.txt

@save-buffer
Copy link
Contributor Author

No, this is all original content. I followed the official spec, but no code was taken.

@westonpace westonpace self-requested a review March 10, 2022 22:54
Copy link
Member

@pitrou pitrou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. I pushed a couple minor changes.

@pitrou
Copy link
Member

pitrou commented Mar 29, 2022

Hmm, there's an ASAN issue that needs to be looked at before merging this. I can do that tomorrow if you want.

static Result<TpchGen> Make(ExecPlan* plan, double scale_factor = 1.0,
int64_t batch_size = 4096,
util::optional<int64_t> seed = util::nullopt);
static Result<std::unique_ptr<TpchGen>> Make(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not need to be a unique_ptr. The whole point of using shared_ptr for the order/lineitem and part/partsupp generators is so that this TpchGen object can safely die.
Also none of the methods in this class need to be virtual.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the commit title suggested, the point here is to hide implementation details from the .h, and minimize header inclusion.
It can be a unique_ptr, a shared_ptr or the pimpl pattern, but any of these patterns require some form of dynamic allocation.

@pitrou
Copy link
Member

pitrou commented Mar 29, 2022

@save-buffer I would have appreciated if you had asked for the changeset motivation instead of bluntly reverting it. The point here was to minimize the exposure of implementation details in a .h file, and also consequently minimize the set of transitive inclusions (as a common concern with C++ compilation times).

@save-buffer
Copy link
Contributor Author

Apologies, I'll not revert like that in the future. We can always revert the revert.

As for the problem of increasing transitive inclusions, I agree keeping compilation time down is a worthy goal. I guess this header file now will transitively include pcg_random.h. Maybe we can find some other way to avoid including pcg_random without having to make everything virtual.

I think either way we don't have to make the factory return a unique_ptr.

@pitrou
Copy link
Member

pitrou commented Mar 29, 2022

I think either way we don't have to make the factory return a unique_ptr.

Indeed, we don't. We can use the pimpl idiom instead (which might be slightly more verbose). The virtual methods seem harmless to me, given that the function call cost is not critical here, but either way is ok :-)

@save-buffer
Copy link
Contributor Author

I guess my biggest gripe is with the unnecessary heap allocation. But I guess it's not a big deal (until we begin measuring ExecPlans-per-second 😛)
I'll just revert the revert.

westonpace pushed a commit that referenced this pull request Mar 30, 2022
This PR modifies the `SubmitTask` and `Finish` methods of MapNode in `ExecPlan` to avoid scheduling extra thread tasks.

Performed the TPC-H Benchmark developed in PR #12537 with and without the changes.
```
TPC-H Benchmark (With Extra Thread Tasks)
-------------------------------------------------------------------
Benchmark                         Time             CPU   Iterations
-------------------------------------------------------------------
BM_Tpch_Q1/ScaleFactor:1   95035633 ns       178700 ns          100

TPC-H Benchmark (Without Extra Thread Tasks)
-------------------------------------------------------------------
Benchmark                         Time             CPU   Iterations
-------------------------------------------------------------------
BM_Tpch_Q1/ScaleFactor:1   91511754 ns       182060 ns          100

```

Also, tested with the Query Tester as proposed in PR #12586
```
With Thread Tasks (batch size = 4096)
./query_tester tpch-1
Average       Duration: 0.106694s (+/- 0s)
Average Output  Rows/S: 37.4902rps
Average Output Bytes/S: 4573.81bps

Without Thread Tasks (batch size = 4096)
./query_tester tpch-1
Average       Duration: 0.104658s (+/- 0s)
Average Output  Rows/S: 38.2198rps
Average Output Bytes/S: 4662.82bps
```

Closes #12720 from sanjibansg/thread_tasks

Authored-by: Sanjiban Sengupta <sanjiban.sg@gmail.com>
Signed-off-by: Weston Pace <weston.pace@gmail.com>
@pitrou
Copy link
Member

pitrou commented Mar 30, 2022

Great! The s3fs failures on macOS are unrelated, so I'm gonna merge now. Thank you @save-buffer .

@pitrou pitrou closed this in 50fab73 Mar 30, 2022
@ursabot
Copy link

ursabot commented Mar 30, 2022

Benchmark runs are scheduled for baseline = 0deefd1 and contender = 50fab73. 50fab73 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Finished ⬇️0.0% ⬆️0.0%] ec2-t3-xlarge-us-east-2
[Finished ⬇️0.38% ⬆️0.33%] test-mac-arm
[Finished ⬇️2.14% ⬆️0.0%] ursa-i9-9960x
[Finished ⬇️0.04% ⬆️0.26%] ursa-thinkcentre-m75q
Buildkite builds:
[Finished] <https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ec2-t3-xlarge-us-east-2/builds/415| 50fab73d ec2-t3-xlarge-us-east-2>
[Finished] <https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-test-mac-arm/builds/401| 50fab73d test-mac-arm>
[Finished] <https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-i9-9960x/builds/401| 50fab73d ursa-i9-9960x>
[Finished] <https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-thinkcentre-m75q/builds/411| 50fab73d ursa-thinkcentre-m75q>
[Finished] <https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ec2-t3-xlarge-us-east-2/builds/414| 0deefd18 ec2-t3-xlarge-us-east-2>
[Finished] <https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-test-mac-arm/builds/400| 0deefd18 test-mac-arm>
[Finished] <https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-i9-9960x/builds/400| 0deefd18 ursa-i9-9960x>
[Finished] <https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-thinkcentre-m75q/builds/410| 0deefd18 ursa-thinkcentre-m75q>
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

@save-buffer save-buffer deleted the sasha_tpch branch March 30, 2022 16:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants