Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions js/src/Arrow.dom.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ export {
RecordBatch,
util,
Builder, makeBuilder, builderThroughIterable, builderThroughAsyncIterable,
compressionRegistry, CompressionType,
} from './Arrow.js';

export {
Expand Down
2 changes: 2 additions & 0 deletions js/src/Arrow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

export { MessageHeader } from './fb/message-header.js';
export { CompressionType } from './fb/compression-type.js';

export {
Type,
Expand Down Expand Up @@ -92,6 +93,7 @@ export type { ReadableSource, WritableSink } from './io/stream.js';
export { RecordBatchReader, RecordBatchFileReader, RecordBatchStreamReader, AsyncRecordBatchFileReader, AsyncRecordBatchStreamReader } from './ipc/reader.js';
export { RecordBatchWriter, RecordBatchFileWriter, RecordBatchStreamWriter, RecordBatchJSONWriter } from './ipc/writer.js';
export { tableToIPC, tableFromIPC } from './ipc/serialization.js';
export { compressionRegistry } from './ipc/compression.js';
export { MessageReader, AsyncMessageReader, JSONMessageReader } from './ipc/message.js';
export { Message } from './ipc/metadata/message.js';
export { RecordBatch } from './recordbatch.js';
Expand Down
2 changes: 1 addition & 1 deletion js/src/fb/body-compression.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ static getSizePrefixedRootAsBodyCompression(bb:flatbuffers.ByteBuffer, obj?:Body
* Compressor library.
* For LZ4_FRAME, each compressed buffer must consist of a single frame.
*/
codec():CompressionType {
codec(): CompressionType {
const offset = this.bb!.__offset(this.bb_pos, 4);
return offset ? this.bb!.readInt8(this.bb_pos + offset) : CompressionType.LZ4_FRAME;
}
Expand Down
48 changes: 48 additions & 0 deletions js/src/ipc/compression.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import { CompressionType } from '../fb/compression-type.js';

export const LENGTH_NO_COMPRESSED_DATA: number = -1;
export const LENGTH_OF_PREFIX_DATA: number = 8;

export interface Codec {
encode?(data: Uint8Array): Uint8Array;
decode?(data: Uint8Array): Uint8Array;
}

class _CompressionRegistry {
protected declare registry: { [key in CompressionType]?: Codec };

constructor() {
this.registry = {};
}

set(compression: CompressionType, codec: Codec) {
this.registry[compression] = codec;
}

get(compression?: CompressionType): Codec | null {
if (compression !== undefined) {
return this.registry?.[compression] || null;
}
return null;
}

}

export const compressionRegistry = new _CompressionRegistry();
3 changes: 2 additions & 1 deletion js/src/ipc/metadata/json.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ export function recordBatchFromJSON(b: any) {
return new RecordBatch(
b['count'],
fieldNodesFromJSON(b['columns']),
buffersFromJSON(b['columns'])
buffersFromJSON(b['columns']),
null
);
}

Expand Down
22 changes: 17 additions & 5 deletions js/src/ipc/metadata/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import { FixedSizeBinary as _FixedSizeBinary } from '../../fb/fixed-size-binary.
import { FixedSizeList as _FixedSizeList } from '../../fb/fixed-size-list.js';
import { Map as _Map } from '../../fb/map.js';
import { Message as _Message } from '../../fb/message.js';
import { CompressionType as _CompressionType } from '../../fb/compression-type.js';

import { Schema, Field } from '../../schema.js';
import { toUint8Array } from '../../util/buffer.js';
Expand Down Expand Up @@ -151,13 +152,21 @@ export class RecordBatch {
protected _length: number;
protected _nodes: FieldNode[];
protected _buffers: BufferRegion[];
protected _compression: _CompressionType | null;
public get nodes() { return this._nodes; }
public get length() { return this._length; }
public get buffers() { return this._buffers; }
constructor(length: bigint | number, nodes: FieldNode[], buffers: BufferRegion[]) {
public get compression() { return this._compression; }
constructor(
length: bigint | number,
nodes: FieldNode[],
buffers: BufferRegion[],
compression: _CompressionType | null
) {
this._nodes = nodes;
this._buffers = buffers;
this._length = bigIntToNumber(length);
this._compression = compression;
}
}

Expand Down Expand Up @@ -298,10 +307,13 @@ function decodeSchema(_schema: _Schema, dictionaries: Map<number, DataType> = ne

/** @ignore */
function decodeRecordBatch(batch: _RecordBatch, version = MetadataVersion.V5) {
if (batch.compression() !== null) {
throw new Error('Record batch compression not implemented');
}
return new RecordBatch(batch.length(), decodeFieldNodes(batch), decodeBuffers(batch, version));
const recordBatch = new RecordBatch(
batch.length(),
decodeFieldNodes(batch),
decodeBuffers(batch, version),
batch.compression()?.codec() ?? null
);
return recordBatch;
}

/** @ignore */
Expand Down
68 changes: 65 additions & 3 deletions js/src/ipc/reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,13 @@ import {
isFileHandle, isFetchResponse,
isReadableDOMStream, isReadableNodeStream
} from '../util/compat.js';
import { Codec, compressionRegistry, LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA } from './compression.js';

import type { DuplexOptions, Duplex } from 'node:stream';
import { bigIntToNumber } from './../util/bigint.js';
import * as flatbuffers from 'flatbuffers';

const DEFAULT_ALIGNMENT = 8;

/** @ignore */ export type FromArg0 = ArrowJSONLike;
/** @ignore */ export type FromArg1 = PromiseLike<ArrowJSONLike>;
Expand Down Expand Up @@ -354,12 +359,27 @@ abstract class RecordBatchReaderImpl<T extends TypeMap = any> implements RecordB
return this;
}

protected _loadRecordBatch(header: metadata.RecordBatch, body: any) {
protected _loadRecordBatch(header: metadata.RecordBatch, body: Uint8Array): RecordBatch<T> {
if (header.compression != null ) {
const codec = compressionRegistry.get(header.compression);
if (codec?.decode && typeof codec.decode === 'function') {
const {decommpressedBody, buffers} = this._decompressBuffers(header, body, codec);
body = decommpressedBody;
header = new metadata.RecordBatch(
header.length,
header.nodes,
buffers,
null
);
} else {
throw new Error('Record batch is compressed but codec not found');
}
}
const children = this._loadVectors(header, body, this.schema.fields);
const data = makeData({ type: new Struct(this.schema.fields), length: header.length, children });
return new RecordBatch(this.schema, data);
}
protected _loadDictionaryBatch(header: metadata.DictionaryBatch, body: any) {
protected _loadDictionaryBatch(header: metadata.DictionaryBatch, body: Uint8Array) {
const { id, isDelta } = header;
const { dictionaries, schema } = this;
const dictionary = dictionaries.get(id);
Expand All @@ -369,9 +389,51 @@ abstract class RecordBatchReaderImpl<T extends TypeMap = any> implements RecordB
new Vector(data)) :
new Vector(data)).memoize() as Vector;
}
protected _loadVectors(header: metadata.RecordBatch, body: any, types: (Field | DataType)[]) {
protected _loadVectors(header: metadata.RecordBatch, body: Uint8Array, types: (Field | DataType)[]) {
return new VectorLoader(body, header.nodes, header.buffers, this.dictionaries, this.schema.metadataVersion).visitMany(types);
}

private _decompressBuffers(header: metadata.RecordBatch, body: Uint8Array, codec: Codec): { decommpressedBody: Uint8Array; buffers: metadata.BufferRegion[] } {
const decompressedBuffers: Uint8Array[] = [];
const newBufferRegions: metadata.BufferRegion[] = [];

let currentOffset = 0;
for (const {offset, length } of header.buffers) {
if (length === 0) {
decompressedBuffers.push(new Uint8Array(0));
newBufferRegions.push(new metadata.BufferRegion(currentOffset, 0));
continue;
}
const byteBuf = new flatbuffers.ByteBuffer(body.subarray(offset, offset + length));
const uncompressedLenth = bigIntToNumber(byteBuf.readInt64(0));


const bytes = byteBuf.bytes().subarray(LENGTH_OF_PREFIX_DATA);

const decompressed = (uncompressedLenth === LENGTH_NO_COMPRESSED_DATA)
? bytes
: codec.decode!(bytes);

decompressedBuffers.push(decompressed);

const padding = (DEFAULT_ALIGNMENT - (currentOffset % DEFAULT_ALIGNMENT)) % DEFAULT_ALIGNMENT;
currentOffset += padding;
newBufferRegions.push(new metadata.BufferRegion(currentOffset, decompressed.length));
currentOffset += decompressed.length;
}

const totalSize = currentOffset;
const combined = new Uint8Array(totalSize);

for (const [i, decompressedBuffer] of decompressedBuffers.entries()) {
combined.set(decompressedBuffer, newBufferRegions[i].offset);
}

return {
decommpressedBody: combined,
buffers: newBufferRegions
};
}
}

/** @ignore */
Expand Down
4 changes: 2 additions & 2 deletions js/src/ipc/writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ export class RecordBatchWriter<T extends TypeMap = any> extends ReadableInterop<

protected _writeRecordBatch(batch: RecordBatch<T>) {
const { byteLength, nodes, bufferRegions, buffers } = VectorAssembler.assemble(batch);
const recordBatch = new metadata.RecordBatch(batch.numRows, nodes, bufferRegions);
const recordBatch = new metadata.RecordBatch(batch.numRows, nodes, bufferRegions, null);
const message = Message.from(recordBatch, byteLength);
return this
._writeDictionaries(batch)
Expand All @@ -262,7 +262,7 @@ export class RecordBatchWriter<T extends TypeMap = any> extends ReadableInterop<

protected _writeDictionaryBatch(dictionary: Data, id: number, isDelta = false) {
const { byteLength, nodes, bufferRegions, buffers } = VectorAssembler.assemble(new Vector([dictionary]));
const recordBatch = new metadata.RecordBatch(dictionary.length, nodes, bufferRegions);
const recordBatch = new metadata.RecordBatch(dictionary.length, nodes, bufferRegions, null);
const dictionaryBatch = new metadata.DictionaryBatch(recordBatch, id, isDelta);
const message = Message.from(dictionaryBatch, byteLength);
return this
Expand Down
Loading