Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion js/DEVELOP.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ You can change the target you want to test by changing the imports in `perf/inde

# Testing Bundling

The bunldes use `apache-arrow` so make sure to build it with `yarn build -t apache-arrow`. To bundle with a variety of bundlers, run `yarn test:bundle` or `yarn gulp bundle`.
The bundles use `apache-arrow` so make sure to build it with `yarn build -t apache-arrow`. To bundle with a variety of bundlers, run `yarn test:bundle` or `yarn gulp bundle`.

Run `yarn gulp bundle:webpack:analyze` to open [Webpack Bundle Analyzer](https://github.com/webpack-contrib/webpack-bundle-analyzer).

Expand Down
1 change: 1 addition & 0 deletions js/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
"jest": "27.5.1",
"jest-silent-reporter": "0.5.0",
"lerna": "4.0.0",
"lz4js": "0.2.0",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intended to only be registered in tests, and users need to declare the dependency and register in their own applications if they want?

We should include code snippets and instructions for how to do that for each codec needed to support the full Arrow spec. The error message could even link to the docs on exactly how to enable a given compression codec.

"memfs": "3.4.1",
"mkdirp": "1.0.4",
"multistream": "4.1.0",
Expand Down
1 change: 1 addition & 0 deletions js/src/Arrow.dom.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ export {
RecordBatch,
util,
Builder, makeBuilder, builderThroughIterable, builderThroughAsyncIterable,
compressionRegistry, CompressionType,
} from './Arrow.js';

export {
Expand Down
2 changes: 2 additions & 0 deletions js/src/Arrow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

export { MessageHeader } from './fb/message-header.js';
export { CompressionType } from './fb/compression-type.js';

export {
Type,
Expand Down Expand Up @@ -88,6 +89,7 @@ export type { ReadableSource, WritableSink } from './io/stream.js';
export { RecordBatchReader, RecordBatchFileReader, RecordBatchStreamReader, AsyncRecordBatchFileReader, AsyncRecordBatchStreamReader } from './ipc/reader.js';
export { RecordBatchWriter, RecordBatchFileWriter, RecordBatchStreamWriter, RecordBatchJSONWriter } from './ipc/writer.js';
export { tableToIPC, tableFromIPC } from './ipc/serialization.js';
export { compressionRegistry } from './ipc/compression.js';
export { MessageReader, AsyncMessageReader, JSONMessageReader } from './ipc/message.js';
export { Message } from './ipc/metadata/message.js';
export { RecordBatch } from './recordbatch.js';
Expand Down
44 changes: 44 additions & 0 deletions js/src/ipc/compression.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import { CompressionType } from '../fb/compression-type.js';

export interface Codec {
encode?(data: Uint8Array): Uint8Array;
decode?(data: Uint8Array): Uint8Array;
}

class _CompressionRegistry {
protected declare registry: { [key in CompressionType]?: Codec };

constructor() {
this.registry = {};
}

set(compression: CompressionType, codec: Codec) {
this.registry[compression] = codec;
}

get(compression?: CompressionType): Codec | null {
if (compression !== undefined) {
return this.registry?.[compression] || null;
}
return null;
}
}

export const compressionRegistry = new _CompressionRegistry();
3 changes: 2 additions & 1 deletion js/src/ipc/metadata/json.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ export function recordBatchFromJSON(b: any) {
return new RecordBatch(
b['count'],
fieldNodesFromJSON(b['columns']),
buffersFromJSON(b['columns'])
buffersFromJSON(b['columns']),
null
);
}

Expand Down
11 changes: 6 additions & 5 deletions js/src/ipc/metadata/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import { FixedSizeBinary as _FixedSizeBinary } from '../../fb/fixed-size-binary.
import { FixedSizeList as _FixedSizeList } from '../../fb/fixed-size-list.js';
import { Map as _Map } from '../../fb/map.js';
import { Message as _Message } from '../../fb/message.js';
import { CompressionType as _CompressionType } from '../../fb/compression-type.js';

import { Schema, Field } from '../../schema.js';
import { toUint8Array } from '../../util/buffer.js';
Expand Down Expand Up @@ -150,13 +151,16 @@ export class RecordBatch {
protected _length: number;
protected _nodes: FieldNode[];
protected _buffers: BufferRegion[];
protected _compression: _CompressionType | null;
public get nodes() { return this._nodes; }
public get length() { return this._length; }
public get buffers() { return this._buffers; }
constructor(length: Long | number, nodes: FieldNode[], buffers: BufferRegion[]) {
public get compression() { return this._compression; }
constructor(length: Long | number, nodes: FieldNode[], buffers: BufferRegion[], compression: _CompressionType | null) {
this._nodes = nodes;
this._buffers = buffers;
this._length = typeof length === 'number' ? length : length.low;
this._compression = compression;
}
}

Expand Down Expand Up @@ -297,10 +301,7 @@ function decodeSchema(_schema: _Schema, dictionaries: Map<number, DataType> = ne

/** @ignore */
function decodeRecordBatch(batch: _RecordBatch, version = MetadataVersion.V4) {
if (batch.compression() !== null) {
throw new Error('Record batch compression not implemented');
}
return new RecordBatch(batch.length(), decodeFieldNodes(batch), decodeBuffers(batch, version));
return new RecordBatch(batch.length(), decodeFieldNodes(batch), decodeBuffers(batch, version), batch.compression()?.codec() || null);
}

/** @ignore */
Expand Down
19 changes: 16 additions & 3 deletions js/src/ipc/reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import {
isFileHandle, isFetchResponse,
isReadableDOMStream, isReadableNodeStream
} from '../util/compat.js';
import { compressionRegistry } from './compression.js';

/** @ignore */ export type FromArg0 = ArrowJSONLike;
/** @ignore */ export type FromArg1 = PromiseLike<ArrowJSONLike>;
Expand Down Expand Up @@ -352,12 +353,24 @@ abstract class RecordBatchReaderImpl<T extends TypeMap = any> implements RecordB
return this;
}

protected _loadRecordBatch(header: metadata.RecordBatch, body: any) {
protected _loadRecordBatch(header: metadata.RecordBatch, body: Uint8Array) {
if (header.compression) {
const codec = compressionRegistry.get(header.compression);
if (codec?.decode) {
// TODO: does this need to be offset by 8 bytes? Since the uncompressed length is
// written in the first 8 bytes:
// https://github.com/apache/arrow/blob/1fc251f18d5b48f0c9fe8af8168237e7e6d05a45/format/Message.fbs#L59-L65

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confusing that here the uint8array is the "body" but it might correspond to the "buffer" in that spec. I'm not sure off hand which is actually here?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started digging into this although being unfamiliar to arrow and zstd both it has been a bit of a struggle.

The first 8 bytes are not part of the compressed section. You can see it corrected for in other places like here: https://github.com/apache/arrow/pull/9137/files#diff-38c3da9d6c7d521df83f59ee7d8cfced00951e8dcd38f39c7c36d26b6feee3d6R75

If you encode something with ZSTD you'll see it starts with [40, 181, 47, 253, 32] which comes after those 8 bytes. I'e just encoding:

const testVal = compress(Buffer.from(new Uint8Array([1, 0, 0, 0, 0, 0, 0, 0])));

Uint8Array(17) [
  40, 181, 47, 253, 32, 8, 65,
   0,  0,  1,   0,  0, 0,  0,
   0,  0,  0
]

This within the body here is:

const compressedMessage = new Uint8Array([
  8, 0, 0, 0,
  0, 0, 0, 0,
  40, 181, 47,
  253, 32, 8, 65,
  0, 0, 1, 0,
  0, 0, 0, 0,
  0, 0, 0, 0,
  0, 0, 0, 0,
  0
]);

Now there's another issue where which is there appears to be padding at the end and between the fields which I'm very uncertain on how to detect besides brute forcing in reverse between the LZ4 header and the end.

console.log(decompress(compressedMessage.slice(8, 17 + 8)));

Uint8Array([
  40, 181, 47, 253, 
  32, 8, 65, 0,
  0, 1, 0, 0,
  0, 0, 0, 0,
  0
]);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for doing some digging! This PR isn't currently at the top of my list of things to work on, but would love for this support to be added to arrow JS!

body = codec.decode(body);
} else {
throw new Error('Record batch is compressed but codec not found');

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this error include the codec ID?

}
}

const children = this._loadVectors(header, body, this.schema.fields);
const data = makeData({ type: new Struct(this.schema.fields), length: header.length, children });
return new RecordBatch(this.schema, data);
}
protected _loadDictionaryBatch(header: metadata.DictionaryBatch, body: any) {
protected _loadDictionaryBatch(header: metadata.DictionaryBatch, body: Uint8Array) {
const { id, isDelta } = header;
const { dictionaries, schema } = this;
const dictionary = dictionaries.get(id);
Expand All @@ -370,7 +383,7 @@ abstract class RecordBatchReaderImpl<T extends TypeMap = any> implements RecordB
}
return dictionary.memoize();
}
protected _loadVectors(header: metadata.RecordBatch, body: any, types: (Field | DataType)[]) {
protected _loadVectors(header: metadata.RecordBatch, body: Uint8Array, types: (Field | DataType)[]) {
return new VectorLoader(body, header.nodes, header.buffers, this.dictionaries).visitMany(types);
}
}
Expand Down
4 changes: 2 additions & 2 deletions js/src/ipc/writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ export class RecordBatchWriter<T extends TypeMap = any> extends ReadableInterop<

protected _writeRecordBatch(batch: RecordBatch<T>) {
const { byteLength, nodes, bufferRegions, buffers } = VectorAssembler.assemble(batch);
const recordBatch = new metadata.RecordBatch(batch.numRows, nodes, bufferRegions);
const recordBatch = new metadata.RecordBatch(batch.numRows, nodes, bufferRegions, null);
const message = Message.from(recordBatch, byteLength);
return this
._writeDictionaries(batch)
Expand All @@ -259,7 +259,7 @@ export class RecordBatchWriter<T extends TypeMap = any> extends ReadableInterop<
protected _writeDictionaryBatch(dictionary: Data, id: number, isDelta = false) {
this._dictionaryDeltaOffsets.set(id, dictionary.length + (this._dictionaryDeltaOffsets.get(id) || 0));
const { byteLength, nodes, bufferRegions, buffers } = VectorAssembler.assemble(new Vector([dictionary]));
const recordBatch = new metadata.RecordBatch(dictionary.length, nodes, bufferRegions);
const recordBatch = new metadata.RecordBatch(dictionary.length, nodes, bufferRegions, null);
const dictionaryBatch = new metadata.DictionaryBatch(recordBatch, id, isDelta);
const message = Message.from(dictionaryBatch, byteLength);
return this
Expand Down
5 changes: 5 additions & 0 deletions js/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6516,6 +6516,11 @@ lunr@^2.3.9:
resolved "https://registry.yarnpkg.com/lunr/-/lunr-2.3.9.tgz#18b123142832337dd6e964df1a5a7707b25d35e1"
integrity sha512-zTU3DaZaF3Rt9rhN3uBMGQD3dD2/vFQqnvZCDv4dl5iOzq2IZQqTxu90r4E5J+nP70J3ilqVCrbho2eWaeW8Ow==

lz4js@0.2.0:
version "0.2.0"
resolved "https://registry.yarnpkg.com/lz4js/-/lz4js-0.2.0.tgz#09f1a397cb2158f675146c3351dde85058cb322f"
integrity sha1-CfGjl8shWPZ1FGwzUd3oUFjLMi8=

make-dir@^2.1.0:
version "2.1.0"
resolved "https://registry.yarnpkg.com/make-dir/-/make-dir-2.1.0.tgz#5f0310e18b8be898cc07009295a30ae41e91e6f5"
Expand Down