-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-3200: [C++] Support dictionaries in Flight streams #4113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
aa70508 to
370c832
Compare
cpp/src/arrow/ipc/writer.cc
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may work but it's highly irregular. For clarity it might be worthwhile to make a deep copy of the schema:
| std::shared_ptr<Schema> schema_ptr(const_cast<Schema*>(&schema), deleter); | |
| auto schema_ptr = arrow::schema(schema.fields(), schema.metadata()); |
|
I will try to spend a good amount of time reviewing this before I leave town on Monday |
cpp/src/arrow/flight/client.cc
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can get the underlying gRPC error via ClientWriter.Finish: https://grpc.io/grpc/cpp/classgrpc_1_1_client_writer.html#ab6c58e110c289ac0f76ae1fdb8fe24a3
As documented, it will return right away if writing failed (as it would have here): https://grpc.io/grpc/cpp/classgrpc_1_1internal_1_1_client_streaming_interface.html#a47321b8c130947bcef5c793329a54619
ghost
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Flight changes look good to me (and it's much cleaner now!), but it seems to have broken compatibility with Java - looks like now schema messages can carry body buffers? (And Java would need to be updated to account for dictionary batch-type messages anyways). Is the plan to follow up with that change/should I or someone else follow up with that?
That sounds unexpected. What do the body buffers contain? |
|
It is a single empty buffer. It's the same problem as ARROW-4213 - the body tag gets written even though there is not a body. It looks like the IpcPayload's body length field is corrupted - when the IpcPayload is allocated in This diff fixes it: --- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -535,7 +535,7 @@ Status WriteIpcPayload(const IpcPayload& payload, io::OutputStream* dst,
Status GetSchemaPayloads(const Schema& schema, MemoryPool* pool, DictionaryMemo* out_memo,
std::vector<IpcPayload>* out_payloads) {
DictionaryMemo dictionary_memo;
- IpcPayload payload;
+ IpcPayload payload{};
out_payloads->clear();
payload.type = Message::SCHEMA; |
|
Thanks @lihalite. I'll try it out. |
|
I have a lull in the airport today so I'll spend a some time reviewing this since it's important |
370c832 to
f9f458d
Compare
|
Rebased and hopefully fixed the schema-message-with-body issue. |
wesm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code looks OK, and the refactoring to use common interfaces is good.
The meta-problem that isn't discussed anywhere in this PR is that the dictionaries are being serialized and written multiple times. When you have dictionaries, they are serialized and sent in the FlightGetInfo message
https://github.com/apache/arrow/blob/master/format/Flight.proto#L220
It is safe to assume that the Schema message is of a reasonable size (< 1 MB), but It isn't safe to assume the same of the entire schema including dictionaries.
I would suggest adding a benchmark with one or more large dictionaries (e.g. 32MB-128MB). The issue should immediately present itself.
I had commented in the JIRA "Some work is needed to handle schemas sent separately from their dictionaries" and there is also ARROW-3144 "Better solution for cases where dictionaries are unknown at schema reconstruction time, or for delta dictionaries".
cpp/src/arrow/flight/flight-test.cc
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could theoretically encode some standardized structure in the error messages so that the actual problem can be "unboxed"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right... Though we would first have to standardize errors accross all Arrow implementations ;-)
cpp/src/arrow/flight/server.cc
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does Finish() return it here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAICT, Finish() is only available on client-side streams :-(
cpp/src/arrow/ipc/test-common.cc
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably a lot of this file can be moved to arrow/testing/*, as there is some redundancy in the data generation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i created https://issues.apache.org/jira/browse/ARROW-5141 for it.
cpp/src/arrow/ipc/test-common.cc
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have random dictionary generation yet? Might be useful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't look like it. Also one should check if the IPC-testing dict payloads have some specifically useful characteristics.
cpp/src/arrow/ipc/writer.cc
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a bad idea. However in most cases this shouldn't allocate a lot of memory (unless there are a lot of sliced bitmaps that have to be "fixed"). Are we optimizing for reuse of padding buffers yet (Java does this) -- it is probably not an issue in C++ since we are pretty good about being 8-byte padded?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what you mean with padding buffers. There's a static constexpr uint8_t kPaddingBytes[kArrowAlignment] declaration in ipc/util.h (the constexpr looks a bit unexpected here btw).
cpp/src/arrow/ipc/writer.cc
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aside, we should decide on an evolution of DCHECK_OK:
- Always evaluates
EVAL_AND_DCHECK_OK - Some variant that only runs and checks the statement in debug mode -- here is a good example.
DEBUG_CHECK_OK(statement)or something
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the current situation is a bit unfortunate. I think most (all?) uses of DCHECK_OK should be replaced with ABORT_NOT_OK, which doesn't ignore the status return in release mode.
I see... This is because of |
Codecov Report
@@ Coverage Diff @@
## master #4113 +/- ##
==========================================
- Coverage 87.84% 87.77% -0.08%
==========================================
Files 739 741 +2
Lines 90980 91159 +179
Branches 1252 1252
==========================================
+ Hits 79921 80013 +92
- Misses 10938 11029 +91
+ Partials 121 117 -4
Continue to review full report at Codecov.
|
|
As another general comment about dictionaries: in many applications, dictionary encoding is a data compression technique, and two tables or two arrays having different dictionaries are considered semantically "the same". This is often the case with analytic databases -- we are already having the issue with reading Parquet files as dictionary-encoded (the dictionaries may differ between row groups). We will likely be forced by this practical consideration to evolve the C++ API around this. To make the problem more concrete for Flight -- if a dataset is distributed, the nodes may not "agree" about what the dictionary is. I do not think it is practical to go about some kind of "dictionary consensus" step. The only point at which a "consensus" dictionary is really required is when you do analytics. Then you have various cases:
Unfortunately, the case of "semantically the same data, but different dictionaries" is poorly modeled in the C++ library so we should think a bit about this. I plan to work on Parquet stuff a bunch when I'm back from vacation so I intend to address the dictionary-encoded / categorical issue as part of this |
|
BTW in the case of analytics, it is generally most practical to address dictionary / categorical normalization at the last possible moment. When the cardinality is not large, then this is also a lot cheaper. (Imagine a billion-row dataset split into 1000 dictionary-encoded pieces, each having 1 million elements and a different dictionary) |
|
@wesm @lihalite I'm gonna merge this soon if you don't object. |
|
I am good with merging, thanks for the heads up. The only thing I might want is to see if we can stop skipping the dictionary case in the integration tests. |
|
I'm not comfortable dealing with the integration tests now. Would you like to tackle that in a separate issue? |
|
Of course - filed ARROW-5143. |
No description provided.