-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-8674: [JS] Implement IPC RecordBatch body buffer compression #13076
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
|
| // https://github.com/apache/arrow/blob/1fc251f18d5b48f0c9fe8af8168237e7e6d05a45/format/Message.fbs#L59-L65 | ||
| body = codec.decode(body); | ||
| } else { | ||
| throw new Error('Record batch is compressed but codec not found'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this error include the codec ID?
| if (codec?.decode) { | ||
| // TODO: does this need to be offset by 8 bytes? Since the uncompressed length is | ||
| // written in the first 8 bytes: | ||
| // https://github.com/apache/arrow/blob/1fc251f18d5b48f0c9fe8af8168237e7e6d05a45/format/Message.fbs#L59-L65 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Confusing that here the uint8array is the "body" but it might correspond to the "buffer" in that spec. I'm not sure off hand which is actually here?
| "jest": "27.5.1", | ||
| "jest-silent-reporter": "0.5.0", | ||
| "lerna": "4.0.0", | ||
| "lz4js": "0.2.0", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is intended to only be registered in tests, and users need to declare the dependency and register in their own applications if they want?
We should include code snippets and instructions for how to do that for each codec needed to support the full Arrow spec. The error message could even link to the docs on exactly how to enable a given compression codec.
| if (codec?.decode) { | ||
| // TODO: does this need to be offset by 8 bytes? Since the uncompressed length is | ||
| // written in the first 8 bytes: | ||
| // https://github.com/apache/arrow/blob/1fc251f18d5b48f0c9fe8af8168237e7e6d05a45/format/Message.fbs#L59-L65 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I started digging into this although being unfamiliar to arrow and zstd both it has been a bit of a struggle.
The first 8 bytes are not part of the compressed section. You can see it corrected for in other places like here: https://github.com/apache/arrow/pull/9137/files#diff-38c3da9d6c7d521df83f59ee7d8cfced00951e8dcd38f39c7c36d26b6feee3d6R75
If you encode something with ZSTD you'll see it starts with [40, 181, 47, 253, 32] which comes after those 8 bytes. I'e just encoding:
const testVal = compress(Buffer.from(new Uint8Array([1, 0, 0, 0, 0, 0, 0, 0])));
Uint8Array(17) [
40, 181, 47, 253, 32, 8, 65,
0, 0, 1, 0, 0, 0, 0,
0, 0, 0
]
This within the body here is:
const compressedMessage = new Uint8Array([
8, 0, 0, 0,
0, 0, 0, 0,
40, 181, 47,
253, 32, 8, 65,
0, 0, 1, 0,
0, 0, 0, 0,
0, 0, 0, 0,
0, 0, 0, 0,
0
]);
Now there's another issue where which is there appears to be padding at the end and between the fields which I'm very uncertain on how to detect besides brute forcing in reverse between the LZ4 header and the end.
console.log(decompress(compressedMessage.slice(8, 17 + 8)));
Uint8Array([
40, 181, 47, 253,
32, 8, 65, 0,
0, 1, 0, 0,
0, 0, 0, 0,
0
]);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for doing some digging! This PR isn't currently at the top of my list of things to work on, but would love for this support to be added to arrow JS!
|
Hey @kylebarron, super nice work on this PR so far! If you're busy with other things, can you briefly elaborate on what's left to do in your opinion such that we may consider moving it forward from our side. Thanks! |
No I have no imminent plans to work on this, but I'd love to see support for this added.
|
### Rationale for this change This change introduces support for reading compressed Arrow IPC streams in JavaScript. The primary motivation is the need to read Arrow IPC Stream in the browser when they are transmitted over the network in a compressed format to reduce network load. Several reasons support this enhancement: - Personal need in other project to read compressed arrow IPC stream. - Community demand, as seen in [Issue #109](#109). - A similar implementation was attempted in [PR apache/arrow#13076](apache/arrow#13076) but was never merged. I am very grateful to @kylebarron . - Other language implementations (e.g., C++, Python, Rust) already support IPC compression. ### What changes are included in this PR? - Support for decoding compressed RecordBatch buffers during reading. - Each buffer is decompressed individually, offsets are recalculated with 8-byte alignment, and a new metadata. RecordBatch is constructed before loading vectors. - Only decompression is implemented; compression (writing) is not supported yet. - Currently tested with the lz4 codec using the lz4js library. lz4-wasm was evaluated but rejected due to incompatibility with LZ4 Frame format. - The decompression logic is isolated to _loadRecordBatch() in the RecordBatchReaderImpl class. - A codec.decode function is retrieved from the compressionRegistry and applied per-buffer. So users can choose suitable lib. #### Additional notes: 1. Codec compatibility caveats Not all JavaScript LZ4 libraries are compatible with the Arrow IPC format. For example: - lz4js works correctly as it supports the LZ4 Frame Format. - lz4-wasm is not compatible, as it expects raw LZ4 blocks and fails to decompress LZ4 frame data. This can result in silent or cryptic errors. To improve developer experience, we could: - Wrap codec.decode calls in try/catch and surface a clearer error message if decompression fails. - Add an optional check in compressionRegistry.set() to validate that the codec supports LZ4 Frame Format. One way would be to compress dummy data and inspect the first 4 bytes for the expected LZ4 Frame magic header (0x04 0x22 0x4D 0x18). 2. Reconstruction of metadata.RecordBatch After decompressing the buffers, new BufferRegion entries are calculated to match the uncompressed data layout. A new metadata.RecordBatch is constructed with the updated buffer regions and passed into _loadVectors(). This introduces a mutation-like pattern that may break assumptions in the current design. However, it's necessary because: - _loadVectors() depends strictly on the offsets in header.buffers, which no longer match the decompressed buffer layout. - Without changing either _loadVectors() or metadata.RecordBatch, the current approach is the least intrusive. 3. Setting compression = null in new RecordBatch When reconstructing the metadata, the compression field is explicitly set to null, since the data is already decompressed in memory. This decision is somewhat debatable — feedback is welcome on whether it's better to retain the original compression metadata or to reflect the current state of the buffer (uncompressed). The current implementation assumes the latter. ### Are these changes tested? - The changes were tested in the own project using LZ4-compressed Arrow stream. - Test uncompressed, compressed and pseudo compressed(uncompressed data length = -1) data. - No unit tests are included in this PR yet. - The decompression was verified with real-world data and the lz4js codec (lz4-wasm is not compatible). - No issues were observed with alignment, vector loading, or decompression integrity. - Exception handling is not yet added around codec.decode. This may be useful for catching codec incompatibility and providing better user feedback. ### Are there any user-facing changes? Yes, Arrow JS users can now read compressed IPC stream, assuming they register an appropriate codec using compressionRegistry.set(). Example: ```ts import { Codec, compressionRegistry } from 'apache-arrow'; import * as lz4 from 'lz4js'; const lz4Codec: Codec = { encode(data: Uint8Array): Uint8Array { return lz4js.compress(data) }, decode(data: Uint8Array): Uint8Array { return lz4js.decompress(data) } }; compressionRegistry.set(CompressionType.LZ4_FRAME, lz4Codec); ``` This change does not affect writing or serialization. **This PR includes breaking changes to public APIs.** No. The change adds functionality but does not modify any existing API behavior. **This PR contains a "Critical Fix".** No. This is a new feature, not a critical fix. ### Checklist - [x] All tests pass (`yarn test`) - [x] Build completes (`yarn build`) - [x] I have added a new test for compressed batches Closes #109.
|
Closing in favor of apache/arrow-js#14 |
Haven't quite gotten this to work yet, attempting with a sample IPC file from PyArrow, but putting up for visibility.