-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-8311: [C++] Add push style stream format reader #6804
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thank you. I think that we should get the API as general as possible, so I would suggest the following: class ARROW_EXPORT Receiver {
public:
// Subclasses should override the methods they're interested in.
// Default implementations return NotImplemented.
virtual Status RecordBatchReceived(std::shared_ptr<RecordBatch>);
virtual Status TensorReceived(std::shared_ptr<Tensor>);
virtual Status SparseTensorReceived(std::shared_ptr<SparseTensor>);
}; |
|
(this will also be useful for Flight @lidavidm ) |
|
This will be very useful! Once this lands I'll see about wiring this up to the gRPC async APIs. |
|
@pitrou Thanks for the suggestion! It's a good idea. |
|
@lidavidm Thanks! I can help you when you work on it. The code will look like the followings: void on_read(const uint8_t* data, size_t data_size) {
std::shared_ptr<Buffer> chunk;
arrow::Buffer(data, data_size).Copy(0, data_size, &chunk);
emitter_.Consume(chunk);
while (!chunks_.empty()) {
if (chunks_[0].use_count() > 1) {
break;
}
chunks_.erase(chunks_.begin());
}
if (chunk.use_count() > 1) {
chunks_.push_back(std::move(chunk));
}
} |
d46b706 to
6784278
Compare
|
Sorry about not reviewing this yet, it's on my "short list". |
|
Thanks for the update @kou. I don't think it makes sense to have both Once we agree on the basic abstraction, I will make a more thorough review. |
|
I thought you suggested that we add a general receiver API like existing If we have a receiver API for We don't have data format that mixes RecordBatch, Tensor and SparseTensor for now. Users will want to implement one
This pull request implements the following push style readers:
For while (true) {
auto tensor = arrow::ipc::ReadTensor(input);
if (!tensor.status().ok()) {
break; // tensor.status() will be arrow::Status::Invalid
}
// process tensor
}Users can implement push style class TensorProcessor : public arrow::Receiver {
arrow::Status MessageReceive(arrow::unique_ptr<Message> message) override {
ARROW_ASSIGN_OR_RAISE(auto tensor, arrow::ipc::ReadTensor(*message));
// process tensor
}
};
TensorProcesor processor;
arrow::ipc::MessageEmitter emitter(&processor);
while (emitter.state() != arrow::ipc::MessageEmitter::State::EOS) {
emitter.Consume(data, data_size);
}Normally, users should not use Do you prefer the following API? // only for arrow::ipc::Message
class ARROW_EXPORT MessageReceiver {
virtual Status Receive(std::unique_ptr<Message> message) = 0;
};
// for others
class ARROW_EXPORT Receiver {
// Default implementations return NotImplemented.
virtual Status RecordBatchReceived(std::shared_ptr<RecordBatch> record_batch);
virtual Status TensorReceived(std::shared_ptr<Tensor> tensor);
virtual Status SparseTensorReceived(std::shared_ptr<SparseTensor> tensor);
}; |
6784278 to
b31977d
Compare
Yes. This is what I meant. Either you decode messages yourself and you implement |
|
OK. I've changed to use the API. |
|
I started reviewing, will try to finish soon |
wesm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall this looks good, thanks for working on this -- I think this will make it easier to implement delta dictionaries and dictionary replacements. Some minor stylistic comments
cpp/src/arrow/ipc/message.h
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The meaning of this parameter is not totally clear. Maybe "the number of bytes needed"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
I've changed to use "the number of bytes needed" for description.
Should we also improve parameter name (next_required_size)?
cpp/src/arrow/ipc/message.h
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this function need to retain ownership of the Buffer (versus const Buffer&)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
If the given buffer doesn't have enough data, emitter keeps the buffer in chunks_ instead of using it immediately. If we doesn't retain ownership of the given buffer, the buffer may be destructed when emitter uses it.
cpp/src/arrow/util/receiver.h
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style choice: We could use the same function name for all the receivers, like Receive, but with different input argument types. Not sure if all compilers would be happy about that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean the following API, right?
class ARROW_EXPORT Receiver {
virtual Status Received(std::shared_ptr<RecordBatch> record_batch);
virtual Status Received(std::shared_ptr<Tensor> tensor);
virtual Status Received(std::shared_ptr<SparseTensor> sparse_tensor);
};I don't have a preference for this.
@pitrou What do you think about this API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No preference, but Received alone sounds weird.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @bkietz
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't use this API if we use Receiver::EosReceived(). Because EosReceived() has no argument.
kou
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wesm Thanks for your review!
I've fixed most of problems.
What do you think about next_required_size name?
cpp/src/arrow/ipc/message.h
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
I've changed to use "the number of bytes needed" for description.
Should we also improve parameter name (next_required_size)?
cpp/src/arrow/ipc/message.h
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
If the given buffer doesn't have enough data, emitter keeps the buffer in chunks_ instead of using it immediately. If we doesn't retain ownership of the given buffer, the buffer may be destructed when emitter uses it.
cpp/src/arrow/util/receiver.h
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean the following API, right?
class ARROW_EXPORT Receiver {
virtual Status Received(std::shared_ptr<RecordBatch> record_batch);
virtual Status Received(std::shared_ptr<Tensor> tensor);
virtual Status Received(std::shared_ptr<SparseTensor> sparse_tensor);
};I don't have a preference for this.
@pitrou What do you think about this API?
pitrou
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still only looking at the API.
cpp/src/arrow/ipc/reader.h
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would call this StreamDecoder or something. At some point we'll add other methods to Receiver, so it won't emit just record batches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. I'll rename this to arrow::ipc::StreamDecoder.
You mean that we will extend https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format later to support more data type such as tensor. Right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, we would add TensorReceived (or OnTensor or whatever the chosing naming is :-)).
cpp/src/arrow/ipc/reader.h
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead this could be a EosReceived method on Receiver or something.
(note: the terminology I'm proposing is inspired by https://docs.python.org/3/library/asyncio-protocol.html#streaming-protocols , but YMMV)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add Receiver::EosReceived() and remove is_eos().
cpp/src/arrow/ipc/reader.h
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand what this means. What is the next action?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's "advancing the state of the emitter". So if you just read the metadata size prefix then this would return the size of the metadata, or the size of the body
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it mean you're expected to give exactly that number of bytes to Consume? Or does the emitter do its own buffering inside? The docs should probably make that clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you feed the emitter too much data, it will retain a slice of it internally, yes. This can be made more clear in the docs indeed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add more documentations. Could you confirm it?
cpp/src/arrow/ipc/message.h
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name of this function is okay with me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pitrou Thanks for your review!
I've applied your suggestion except StreamDecoder. I want to confirm what you meant.
cpp/src/arrow/ipc/reader.h
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. I'll rename this to arrow::ipc::StreamDecoder.
You mean that we will extend https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format later to support more data type such as tensor. Right?
cpp/src/arrow/ipc/reader.h
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add more documentations. Could you confirm it?
cpp/src/arrow/ipc/reader.h
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add Receiver::EosReceived() and remove is_eos().
cpp/src/arrow/util/receiver.h
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't use this API if we use Receiver::EosReceived(). Because EosReceived() has no argument.
|
Bunch of CI jobs failed with "no space left on device". Overall this patch looks good to me. I'll await @pitrou to make a final review / sign off per the comments above |
|
If we may have any not-received callback such as a callback that is called on error and a callback that is called on dictionary updated, |
pitrou
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll let @wesm comment on the naming.
High-level question: is it possible to reimplement RecordBatchStreamReader and MessageReader on top of this infrastructure? It's not terrific to duplicate the decoding logic in several places.
cpp/src/arrow/ipc/message.h
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of this, I think it would be better to have an EosReceived method on MessageReceiver.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added EOS callback but I want to keep this method.
Because this information can be used to optimize performance. I've added documentation for this.
|
I'm looking again. This needs to be rebased now after ARROW-7233, I'll see if I can perform the rebase |
9e280d0 to
247e118
Compare
Unless I'm missing something, that's exactly what this patch does. MessageReader just calls I'm looking at the other naming issues |
b23a193 to
f80f953
Compare
|
Assorted thoughts:
|
|
It seems |
|
I agree it would be good to improve that (persisting the emitter between calls to |
|
By the way, since this is a new API, perhaps it should incorporate per-message metadata as well? (the virtual Status RecordBatchReceived(
std::shared_ptr<RecordBatch> record_batch,
std::shared_ptr<KeyValueMetadata> custom_metadata);
virtual Status SchemaReceived(
std::shared_ptr<Schema> schema,
std::shared_ptr<KeyValueMetadata> custom_metadata); |
This change adds the following push style reader classes:
* ipc::MessageEmitter
* ipc::RecordBatchStreamEmitter
Push style readers don't read data from stream directly. They receive
already read data by users. This style is useful with event driven
style IO API. We can't read data from stream directly in event driven
style IO API. We just receive already read data from event driven style
IO API like:
void on_read(const uint8_t* data, size_t data_size) {
process_data(data, data_size);
}
register_read_event(on_read);
run_event_loop();
We can't use the current reader API with event driven style IO API but
we can use this push style reader with event driven style IO API.
The current Message reader is changed to use ipc::MessageEmitter
internally. So we don't have duplicated reader implementation. And no
performance regression with our benchmark.
Before:
Running release/arrow-ipc-read-write-benchmark
Run on (12 X 4600 MHz CPU s)
CPU Caches:
L1 Data 32K (x6)
L1 Instruction 32K (x6)
L2 Unified 256K (x6)
L3 Unified 12288K (x1)
Load Average: 0.85, 0.84, 0.65
-----------------------------------------------------------------------------------------
Benchmark Time CPU Iterations UserCounters...
-----------------------------------------------------------------------------------------
ReadRecordBatch/1/real_time 886 ns 886 ns 774286 bytes_per_second=1102.15G/s
ReadRecordBatch/4/real_time 1601 ns 1601 ns 436258 bytes_per_second=610.078G/s
ReadRecordBatch/16/real_time 4819 ns 4820 ns 143568 bytes_per_second=202.663G/s
ReadRecordBatch/64/real_time 18291 ns 18296 ns 38586 bytes_per_second=53.3893G/s
ReadRecordBatch/256/real_time 84852 ns 84872 ns 8317 bytes_per_second=11.5091G/s
ReadRecordBatch/1024/real_time 341091 ns 341168 ns 2049 bytes_per_second=2.86306G/s
ReadRecordBatch/4096/real_time 1368049 ns 1368361 ns 511 bytes_per_second=730.968M/s
ReadRecordBatch/8192/real_time 2676778 ns 2677341 ns 265 bytes_per_second=373.584M/s
After:
Running release/arrow-ipc-read-write-benchmark
Run on (12 X 4600 MHz CPU s)
CPU Caches:
L1 Data 32K (x6)
L1 Instruction 32K (x6)
L2 Unified 256K (x6)
L3 Unified 12288K (x1)
Load Average: 0.88, 0.85, 0.66
-----------------------------------------------------------------------------------------
Benchmark Time CPU Iterations UserCounters...
-----------------------------------------------------------------------------------------
ReadRecordBatch/1/real_time 891 ns 891 ns 769579 bytes_per_second=1095.57G/s
ReadRecordBatch/4/real_time 1599 ns 1599 ns 435756 bytes_per_second=610.746G/s
ReadRecordBatch/16/real_time 4834 ns 4835 ns 144374 bytes_per_second=202.027G/s
ReadRecordBatch/64/real_time 18204 ns 18206 ns 38190 bytes_per_second=53.6465G/s
ReadRecordBatch/256/real_time 84142 ns 84154 ns 8309 bytes_per_second=11.6061G/s
ReadRecordBatch/1024/real_time 343105 ns 343148 ns 2035 bytes_per_second=2.84625G/s
ReadRecordBatch/4096/real_time 1399287 ns 1399484 ns 511 bytes_per_second=714.65M/s
ReadRecordBatch/8192/real_time 2641529 ns 2641845 ns 263 bytes_per_second=378.569M/s
Fix format
Fix lint errors
Fix lint errors
Fix sanitizer errors
Use AllocateBuffer to create empty 64-bit aligned buffer
Introduce general Receiver API
Add missing include
Fix error type
Use new Receiver API
Fix format
Split MessageReceiver again
Remove duplicated comments
Fix style
Don't use deprecated API
Don't use deprecated API
Add missing slice for non CPU buffer
Fix next_required_size parameter description
Use ABORT_NOT_OK()
Remove needless forward declaration
Use different test suite name
Fix include location
Fix a bug that next_required_size() doesn't care buffered_size_
Use std::shared_ptr<Receiver>
Add SchemaReceived
Add more documentation for next_required_size()
Use EosReceived() instead of is_eos()
Rebase
Remove unused variable from test
f80f953 to
7fab0e3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've applied all suggestions except per-message metadata.
Summary:
- Renamed emitter to decoder
- Marked new APIs experimental
- Moved
Receiverfromarrow/utiltoarrow/ipc - Renamed
ReceivertoListener MessageReaderreuses decoder
For per-message metadata, I'm not sure which message's metadata is used when any dictionary batch message exists. Schema message's metadata? Should we merge all metadata in schema message and dictionary batch messages?
Can we do it as a follow-up task?
For RecordBatchStreamReader, we can't use StreamDecoder in RecordBatchStreamReader internally. Because we have RecordBatchStreamReader::Open(std::unique_ptr<MessageReader> message_reader, ...) API. If we use StreamDecoder, we don't use MessageReader.
(We can use StreamDecoder with this API by extracting InputStreamMessageReader::stream_ from MessageReader and creating StreamDecoder from the extracted stream. Should we do this?)
Most of core logics are shared with RecordBatchStreamReader and StreamDecoder in this pull request. Should we reimplement RecordBatchStreamReader by StreamDecoder in this pull request? Or can we do it as a follow-up task?
cpp/src/arrow/ipc/message.h
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added EOS callback but I want to keep this method.
Because this information can be used to optimize performance. I've added documentation for this.
|
Thank you @kou
My initial thought was that we should only propagate the metadata from the |
|
+1, merging. The CI failure https://github.com/apache/arrow/pull/6804/checks?check_run_id=575674263 does not appear to be related to me |
This change adds the following push style reader classes:
Push style readers don't read data from stream directly. They receive
already read data by users. This style is useful with event driven
style IO API. We can't read data from stream directly in event driven
style IO API. We just receive already read data from event driven style
IO API like:
We can't use the current reader API with event driven style IO API but
we can use this push style reader with event driven style IO API.
The current Message reader is changed to use ipc::MessageEmitter
internally. So we don't have duplicated reader implementation. And no
performance regression with our benchmark.
Before:
After: