From f50c489d02e5dadf01cd8819800ac31d0bd45b50 Mon Sep 17 00:00:00 2001 From: Dmitriy Ignatov Date: Sun, 18 May 2025 17:03:47 +0300 Subject: [PATCH 1/4] [JS] Define commpression registry --- js/src/ipc/compression.ts | 48 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 js/src/ipc/compression.ts diff --git a/js/src/ipc/compression.ts b/js/src/ipc/compression.ts new file mode 100644 index 00000000000..2c663809d07 --- /dev/null +++ b/js/src/ipc/compression.ts @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { CompressionType } from '../fb/compression-type.js'; + +export const LENGTH_NO_COMPRESSED_DATA: number = -1; +export const LENGTH_OF_PREFIX_DATA: number = 8; + +export interface Codec { + encode?(data: Uint8Array): Uint8Array; + decode?(data: Uint8Array): Uint8Array; +} + +class _CompressionRegistry { + protected declare registry: { [key in CompressionType]?: Codec }; + + constructor() { + this.registry = {}; + } + + set(compression: CompressionType, codec: Codec) { + this.registry[compression] = codec; + } + + get(compression?: CompressionType): Codec | null { + if (compression !== undefined) { + return this.registry?.[compression] || null; + } + return null; + } + +} + +export const compressionRegistry = new _CompressionRegistry(); From e7a71458cb6b9ee4845996ff3de22ddbd3d3b6c1 Mon Sep 17 00:00:00 2001 From: Dmitriy Ignatov Date: Sun, 18 May 2025 17:11:04 +0300 Subject: [PATCH 2/4] [JS] Implement RecordBatch body decompression --- js/src/fb/body-compression.ts | 2 +- js/src/ipc/metadata/json.ts | 3 +- js/src/ipc/metadata/message.ts | 22 ++++++++--- js/src/ipc/reader.ts | 68 ++++++++++++++++++++++++++++++++-- js/src/ipc/writer.ts | 4 +- 5 files changed, 87 insertions(+), 12 deletions(-) diff --git a/js/src/fb/body-compression.ts b/js/src/fb/body-compression.ts index ae3e5b4097e..8bb1324c983 100644 --- a/js/src/fb/body-compression.ts +++ b/js/src/fb/body-compression.ts @@ -33,7 +33,7 @@ static getSizePrefixedRootAsBodyCompression(bb:flatbuffers.ByteBuffer, obj?:Body * Compressor library. * For LZ4_FRAME, each compressed buffer must consist of a single frame. */ -codec():CompressionType { +codec(): CompressionType { const offset = this.bb!.__offset(this.bb_pos, 4); return offset ? this.bb!.readInt8(this.bb_pos + offset) : CompressionType.LZ4_FRAME; } diff --git a/js/src/ipc/metadata/json.ts b/js/src/ipc/metadata/json.ts index 8dc81ced3ff..9f5ac42adea 100644 --- a/js/src/ipc/metadata/json.ts +++ b/js/src/ipc/metadata/json.ts @@ -42,7 +42,8 @@ export function recordBatchFromJSON(b: any) { return new RecordBatch( b['count'], fieldNodesFromJSON(b['columns']), - buffersFromJSON(b['columns']) + buffersFromJSON(b['columns']), + null ); } diff --git a/js/src/ipc/metadata/message.ts b/js/src/ipc/metadata/message.ts index 552c4d846e8..21adaa74e71 100644 --- a/js/src/ipc/metadata/message.ts +++ b/js/src/ipc/metadata/message.ts @@ -42,6 +42,7 @@ import { FixedSizeBinary as _FixedSizeBinary } from '../../fb/fixed-size-binary. import { FixedSizeList as _FixedSizeList } from '../../fb/fixed-size-list.js'; import { Map as _Map } from '../../fb/map.js'; import { Message as _Message } from '../../fb/message.js'; +import { CompressionType as _CompressionType } from '../../fb/compression-type.js'; import { Schema, Field } from '../../schema.js'; import { toUint8Array } from '../../util/buffer.js'; @@ -151,13 +152,21 @@ export class RecordBatch { protected _length: number; protected _nodes: FieldNode[]; protected _buffers: BufferRegion[]; + protected _compression: _CompressionType | null; public get nodes() { return this._nodes; } public get length() { return this._length; } public get buffers() { return this._buffers; } - constructor(length: bigint | number, nodes: FieldNode[], buffers: BufferRegion[]) { + public get compression() { return this._compression; } + constructor( + length: bigint | number, + nodes: FieldNode[], + buffers: BufferRegion[], + compression: _CompressionType | null + ) { this._nodes = nodes; this._buffers = buffers; this._length = bigIntToNumber(length); + this._compression = compression; } } @@ -298,10 +307,13 @@ function decodeSchema(_schema: _Schema, dictionaries: Map = ne /** @ignore */ function decodeRecordBatch(batch: _RecordBatch, version = MetadataVersion.V5) { - if (batch.compression() !== null) { - throw new Error('Record batch compression not implemented'); - } - return new RecordBatch(batch.length(), decodeFieldNodes(batch), decodeBuffers(batch, version)); + const recordBatch = new RecordBatch( + batch.length(), + decodeFieldNodes(batch), + decodeBuffers(batch, version), + batch.compression()?.codec() ?? null + ); + return recordBatch; } /** @ignore */ diff --git a/js/src/ipc/reader.ts b/js/src/ipc/reader.ts index f84fe83db08..e9d544c02af 100644 --- a/js/src/ipc/reader.ts +++ b/js/src/ipc/reader.ts @@ -46,8 +46,13 @@ import { isFileHandle, isFetchResponse, isReadableDOMStream, isReadableNodeStream } from '../util/compat.js'; +import { Codec, compressionRegistry, LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA } from './compression.js'; import type { DuplexOptions, Duplex } from 'node:stream'; +import { bigIntToNumber } from './../util/bigint.js'; +import * as flatbuffers from 'flatbuffers'; + +const DEFAULT_ALIGNMENT = 8; /** @ignore */ export type FromArg0 = ArrowJSONLike; /** @ignore */ export type FromArg1 = PromiseLike; @@ -354,12 +359,27 @@ abstract class RecordBatchReaderImpl implements RecordB return this; } - protected _loadRecordBatch(header: metadata.RecordBatch, body: any) { + protected _loadRecordBatch(header: metadata.RecordBatch, body: Uint8Array): RecordBatch { + if (header.compression != null ) { + const codec = compressionRegistry.get(header.compression); + if (codec?.decode && typeof codec.decode === 'function') { + const {decommpressedBody, buffers} = this._decompressBuffers(header, body, codec); + body = decommpressedBody; + header = new metadata.RecordBatch( + header.length, + header.nodes, + buffers, + null + ); + } else { + throw new Error('Record batch is compressed but codec not found'); + } + } const children = this._loadVectors(header, body, this.schema.fields); const data = makeData({ type: new Struct(this.schema.fields), length: header.length, children }); return new RecordBatch(this.schema, data); } - protected _loadDictionaryBatch(header: metadata.DictionaryBatch, body: any) { + protected _loadDictionaryBatch(header: metadata.DictionaryBatch, body: Uint8Array) { const { id, isDelta } = header; const { dictionaries, schema } = this; const dictionary = dictionaries.get(id); @@ -369,9 +389,51 @@ abstract class RecordBatchReaderImpl implements RecordB new Vector(data)) : new Vector(data)).memoize() as Vector; } - protected _loadVectors(header: metadata.RecordBatch, body: any, types: (Field | DataType)[]) { + protected _loadVectors(header: metadata.RecordBatch, body: Uint8Array, types: (Field | DataType)[]) { return new VectorLoader(body, header.nodes, header.buffers, this.dictionaries, this.schema.metadataVersion).visitMany(types); } + + private _decompressBuffers(header: metadata.RecordBatch, body: Uint8Array, codec: Codec): { decommpressedBody: Uint8Array, buffers: metadata.BufferRegion[] } { + const decompressedBuffers: Uint8Array[] = []; + const newBufferRegions: metadata.BufferRegion[] = []; + + let currentOffset = 0; + for (const {offset, length } of header.buffers) { + if (length === 0) { + decompressedBuffers.push(new Uint8Array(0)); + newBufferRegions.push(new metadata.BufferRegion(currentOffset, 0)); + continue; + } + const byteBuf = new flatbuffers.ByteBuffer(body.subarray(offset, offset + length)) + let uncompressedLenth = bigIntToNumber(byteBuf.readInt64(0)); + + + const bytes = byteBuf.bytes().subarray(LENGTH_OF_PREFIX_DATA); + + let decompressed = (uncompressedLenth === LENGTH_NO_COMPRESSED_DATA) + ? bytes + : codec.decode!(bytes); + + decompressedBuffers.push(decompressed); + + const padding = (DEFAULT_ALIGNMENT - (currentOffset % DEFAULT_ALIGNMENT)) % DEFAULT_ALIGNMENT; + currentOffset += padding; + newBufferRegions.push(new metadata.BufferRegion(currentOffset, decompressed.length)); + currentOffset += decompressed.length; + } + + const totalSize = currentOffset; + const combined = new Uint8Array(totalSize); + + for (let i = 0; i < decompressedBuffers.length; i++) { + combined.set(decompressedBuffers[i], newBufferRegions[i].offset); + } + + return { + decommpressedBody: combined, + buffers: newBufferRegions + } + } } /** @ignore */ diff --git a/js/src/ipc/writer.ts b/js/src/ipc/writer.ts index cb74fe6fb67..cf5674cc417 100644 --- a/js/src/ipc/writer.ts +++ b/js/src/ipc/writer.ts @@ -252,7 +252,7 @@ export class RecordBatchWriter extends ReadableInterop< protected _writeRecordBatch(batch: RecordBatch) { const { byteLength, nodes, bufferRegions, buffers } = VectorAssembler.assemble(batch); - const recordBatch = new metadata.RecordBatch(batch.numRows, nodes, bufferRegions); + const recordBatch = new metadata.RecordBatch(batch.numRows, nodes, bufferRegions, null); const message = Message.from(recordBatch, byteLength); return this ._writeDictionaries(batch) @@ -262,7 +262,7 @@ export class RecordBatchWriter extends ReadableInterop< protected _writeDictionaryBatch(dictionary: Data, id: number, isDelta = false) { const { byteLength, nodes, bufferRegions, buffers } = VectorAssembler.assemble(new Vector([dictionary])); - const recordBatch = new metadata.RecordBatch(dictionary.length, nodes, bufferRegions); + const recordBatch = new metadata.RecordBatch(dictionary.length, nodes, bufferRegions, null); const dictionaryBatch = new metadata.DictionaryBatch(recordBatch, id, isDelta); const message = Message.from(dictionaryBatch, byteLength); return this From 6d3366653bfec7d53d163f2a11309b146371705b Mon Sep 17 00:00:00 2001 From: Dmitriy Ignatov Date: Sun, 18 May 2025 17:11:52 +0300 Subject: [PATCH 3/4] [JS] Export compressionType, Codec and compressionRegistry --- js/src/Arrow.dom.ts | 1 + js/src/Arrow.ts | 2 ++ 2 files changed, 3 insertions(+) diff --git a/js/src/Arrow.dom.ts b/js/src/Arrow.dom.ts index b6e3fbce6b5..eb03bf2bf99 100644 --- a/js/src/Arrow.dom.ts +++ b/js/src/Arrow.dom.ts @@ -76,6 +76,7 @@ export { RecordBatch, util, Builder, makeBuilder, builderThroughIterable, builderThroughAsyncIterable, + compressionRegistry, CompressionType, } from './Arrow.js'; export { diff --git a/js/src/Arrow.ts b/js/src/Arrow.ts index f31f91a74c3..23015888ba0 100644 --- a/js/src/Arrow.ts +++ b/js/src/Arrow.ts @@ -16,6 +16,7 @@ // under the License. export { MessageHeader } from './fb/message-header.js'; +export { CompressionType } from './fb/compression-type.js'; export { Type, @@ -92,6 +93,7 @@ export type { ReadableSource, WritableSink } from './io/stream.js'; export { RecordBatchReader, RecordBatchFileReader, RecordBatchStreamReader, AsyncRecordBatchFileReader, AsyncRecordBatchStreamReader } from './ipc/reader.js'; export { RecordBatchWriter, RecordBatchFileWriter, RecordBatchStreamWriter, RecordBatchJSONWriter } from './ipc/writer.js'; export { tableToIPC, tableFromIPC } from './ipc/serialization.js'; +export { compressionRegistry } from './ipc/compression.js'; export { MessageReader, AsyncMessageReader, JSONMessageReader } from './ipc/message.js'; export { Message } from './ipc/metadata/message.js'; export { RecordBatch } from './recordbatch.js'; From 1276a519df6d7baad21671bfc5c689d310491970 Mon Sep 17 00:00:00 2001 From: Dmitriy Ignatov Date: Sun, 18 May 2025 23:03:41 +0300 Subject: [PATCH 4/4] Lint --- js/src/ipc/metadata/message.ts | 8 ++++---- js/src/ipc/reader.ts | 22 +++++++++++----------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/js/src/ipc/metadata/message.ts b/js/src/ipc/metadata/message.ts index 21adaa74e71..fb1b9bfdb53 100644 --- a/js/src/ipc/metadata/message.ts +++ b/js/src/ipc/metadata/message.ts @@ -158,8 +158,8 @@ export class RecordBatch { public get buffers() { return this._buffers; } public get compression() { return this._compression; } constructor( - length: bigint | number, - nodes: FieldNode[], + length: bigint | number, + nodes: FieldNode[], buffers: BufferRegion[], compression: _CompressionType | null ) { @@ -308,8 +308,8 @@ function decodeSchema(_schema: _Schema, dictionaries: Map = ne /** @ignore */ function decodeRecordBatch(batch: _RecordBatch, version = MetadataVersion.V5) { const recordBatch = new RecordBatch( - batch.length(), - decodeFieldNodes(batch), + batch.length(), + decodeFieldNodes(batch), decodeBuffers(batch, version), batch.compression()?.codec() ?? null ); diff --git a/js/src/ipc/reader.ts b/js/src/ipc/reader.ts index e9d544c02af..a58b6622220 100644 --- a/js/src/ipc/reader.ts +++ b/js/src/ipc/reader.ts @@ -370,7 +370,7 @@ abstract class RecordBatchReaderImpl implements RecordB header.nodes, buffers, null - ); + ); } else { throw new Error('Record batch is compressed but codec not found'); } @@ -393,7 +393,7 @@ abstract class RecordBatchReaderImpl implements RecordB return new VectorLoader(body, header.nodes, header.buffers, this.dictionaries, this.schema.metadataVersion).visitMany(types); } - private _decompressBuffers(header: metadata.RecordBatch, body: Uint8Array, codec: Codec): { decommpressedBody: Uint8Array, buffers: metadata.BufferRegion[] } { + private _decompressBuffers(header: metadata.RecordBatch, body: Uint8Array, codec: Codec): { decommpressedBody: Uint8Array; buffers: metadata.BufferRegion[] } { const decompressedBuffers: Uint8Array[] = []; const newBufferRegions: metadata.BufferRegion[] = []; @@ -404,13 +404,13 @@ abstract class RecordBatchReaderImpl implements RecordB newBufferRegions.push(new metadata.BufferRegion(currentOffset, 0)); continue; } - const byteBuf = new flatbuffers.ByteBuffer(body.subarray(offset, offset + length)) - let uncompressedLenth = bigIntToNumber(byteBuf.readInt64(0)); - - + const byteBuf = new flatbuffers.ByteBuffer(body.subarray(offset, offset + length)); + const uncompressedLenth = bigIntToNumber(byteBuf.readInt64(0)); + + const bytes = byteBuf.bytes().subarray(LENGTH_OF_PREFIX_DATA); - - let decompressed = (uncompressedLenth === LENGTH_NO_COMPRESSED_DATA) + + const decompressed = (uncompressedLenth === LENGTH_NO_COMPRESSED_DATA) ? bytes : codec.decode!(bytes); @@ -425,14 +425,14 @@ abstract class RecordBatchReaderImpl implements RecordB const totalSize = currentOffset; const combined = new Uint8Array(totalSize); - for (let i = 0; i < decompressedBuffers.length; i++) { - combined.set(decompressedBuffers[i], newBufferRegions[i].offset); + for (const [i, decompressedBuffer] of decompressedBuffers.entries()) { + combined.set(decompressedBuffer, newBufferRegions[i].offset); } return { decommpressedBody: combined, buffers: newBufferRegions - } + }; } }