diff --git a/js/DEVELOP.md b/js/DEVELOP.md index 06699830b93..51fe316c975 100644 --- a/js/DEVELOP.md +++ b/js/DEVELOP.md @@ -82,7 +82,7 @@ You can change the target you want to test by changing the imports in `perf/inde # Testing Bundling -The bunldes use `apache-arrow` so make sure to build it with `yarn build -t apache-arrow`. To bundle with a variety of bundlers, run `yarn test:bundle` or `yarn gulp bundle`. +The bundles use `apache-arrow` so make sure to build it with `yarn build -t apache-arrow`. To bundle with a variety of bundlers, run `yarn test:bundle` or `yarn gulp bundle`. Run `yarn gulp bundle:webpack:analyze` to open [Webpack Bundle Analyzer](https://github.com/webpack-contrib/webpack-bundle-analyzer). diff --git a/js/package.json b/js/package.json index 5b7d23ad0a3..8ab0d91803b 100644 --- a/js/package.json +++ b/js/package.json @@ -99,6 +99,7 @@ "jest": "27.5.1", "jest-silent-reporter": "0.5.0", "lerna": "4.0.0", + "lz4js": "0.2.0", "memfs": "3.4.1", "mkdirp": "1.0.4", "multistream": "4.1.0", diff --git a/js/src/Arrow.dom.ts b/js/src/Arrow.dom.ts index 2fdef60c1fb..4938daf1680 100644 --- a/js/src/Arrow.dom.ts +++ b/js/src/Arrow.dom.ts @@ -74,6 +74,7 @@ export { RecordBatch, util, Builder, makeBuilder, builderThroughIterable, builderThroughAsyncIterable, + compressionRegistry, CompressionType, } from './Arrow.js'; export { diff --git a/js/src/Arrow.ts b/js/src/Arrow.ts index dc44e10b920..f6002f5006e 100644 --- a/js/src/Arrow.ts +++ b/js/src/Arrow.ts @@ -16,6 +16,7 @@ // under the License. export { MessageHeader } from './fb/message-header.js'; +export { CompressionType } from './fb/compression-type.js'; export { Type, @@ -88,6 +89,7 @@ export type { ReadableSource, WritableSink } from './io/stream.js'; export { RecordBatchReader, RecordBatchFileReader, RecordBatchStreamReader, AsyncRecordBatchFileReader, AsyncRecordBatchStreamReader } from './ipc/reader.js'; export { RecordBatchWriter, RecordBatchFileWriter, RecordBatchStreamWriter, RecordBatchJSONWriter } from './ipc/writer.js'; export { tableToIPC, tableFromIPC } from './ipc/serialization.js'; +export { compressionRegistry } from './ipc/compression.js'; export { MessageReader, AsyncMessageReader, JSONMessageReader } from './ipc/message.js'; export { Message } from './ipc/metadata/message.js'; export { RecordBatch } from './recordbatch.js'; diff --git a/js/src/ipc/compression.ts b/js/src/ipc/compression.ts new file mode 100644 index 00000000000..0e9f3c447cb --- /dev/null +++ b/js/src/ipc/compression.ts @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import { CompressionType } from '../fb/compression-type.js'; + +export interface Codec { + encode?(data: Uint8Array): Uint8Array; + decode?(data: Uint8Array): Uint8Array; +} + +class _CompressionRegistry { + protected declare registry: { [key in CompressionType]?: Codec }; + + constructor() { + this.registry = {}; + } + + set(compression: CompressionType, codec: Codec) { + this.registry[compression] = codec; + } + + get(compression?: CompressionType): Codec | null { + if (compression !== undefined) { + return this.registry?.[compression] || null; + } + return null; + } +} + +export const compressionRegistry = new _CompressionRegistry(); diff --git a/js/src/ipc/metadata/json.ts b/js/src/ipc/metadata/json.ts index 0fc1ca08019..117dd07fd18 100644 --- a/js/src/ipc/metadata/json.ts +++ b/js/src/ipc/metadata/json.ts @@ -42,7 +42,8 @@ export function recordBatchFromJSON(b: any) { return new RecordBatch( b['count'], fieldNodesFromJSON(b['columns']), - buffersFromJSON(b['columns']) + buffersFromJSON(b['columns']), + null ); } diff --git a/js/src/ipc/metadata/message.ts b/js/src/ipc/metadata/message.ts index 53501d55177..4e95675b9f6 100644 --- a/js/src/ipc/metadata/message.ts +++ b/js/src/ipc/metadata/message.ts @@ -41,6 +41,7 @@ import { FixedSizeBinary as _FixedSizeBinary } from '../../fb/fixed-size-binary. import { FixedSizeList as _FixedSizeList } from '../../fb/fixed-size-list.js'; import { Map as _Map } from '../../fb/map.js'; import { Message as _Message } from '../../fb/message.js'; +import { CompressionType as _CompressionType } from '../../fb/compression-type.js'; import { Schema, Field } from '../../schema.js'; import { toUint8Array } from '../../util/buffer.js'; @@ -150,13 +151,16 @@ export class RecordBatch { protected _length: number; protected _nodes: FieldNode[]; protected _buffers: BufferRegion[]; + protected _compression: _CompressionType | null; public get nodes() { return this._nodes; } public get length() { return this._length; } public get buffers() { return this._buffers; } - constructor(length: Long | number, nodes: FieldNode[], buffers: BufferRegion[]) { + public get compression() { return this._compression; } + constructor(length: Long | number, nodes: FieldNode[], buffers: BufferRegion[], compression: _CompressionType | null) { this._nodes = nodes; this._buffers = buffers; this._length = typeof length === 'number' ? length : length.low; + this._compression = compression; } } @@ -297,10 +301,7 @@ function decodeSchema(_schema: _Schema, dictionaries: Map = ne /** @ignore */ function decodeRecordBatch(batch: _RecordBatch, version = MetadataVersion.V4) { - if (batch.compression() !== null) { - throw new Error('Record batch compression not implemented'); - } - return new RecordBatch(batch.length(), decodeFieldNodes(batch), decodeBuffers(batch, version)); + return new RecordBatch(batch.length(), decodeFieldNodes(batch), decodeBuffers(batch, version), batch.compression()?.codec() || null); } /** @ignore */ diff --git a/js/src/ipc/reader.ts b/js/src/ipc/reader.ts index 5b949322d92..e7e92dd2bbb 100644 --- a/js/src/ipc/reader.ts +++ b/js/src/ipc/reader.ts @@ -46,6 +46,7 @@ import { isFileHandle, isFetchResponse, isReadableDOMStream, isReadableNodeStream } from '../util/compat.js'; +import { compressionRegistry } from './compression.js'; /** @ignore */ export type FromArg0 = ArrowJSONLike; /** @ignore */ export type FromArg1 = PromiseLike; @@ -352,12 +353,24 @@ abstract class RecordBatchReaderImpl implements RecordB return this; } - protected _loadRecordBatch(header: metadata.RecordBatch, body: any) { + protected _loadRecordBatch(header: metadata.RecordBatch, body: Uint8Array) { + if (header.compression) { + const codec = compressionRegistry.get(header.compression); + if (codec?.decode) { + // TODO: does this need to be offset by 8 bytes? Since the uncompressed length is + // written in the first 8 bytes: + // https://github.com/apache/arrow/blob/1fc251f18d5b48f0c9fe8af8168237e7e6d05a45/format/Message.fbs#L59-L65 + body = codec.decode(body); + } else { + throw new Error('Record batch is compressed but codec not found'); + } + } + const children = this._loadVectors(header, body, this.schema.fields); const data = makeData({ type: new Struct(this.schema.fields), length: header.length, children }); return new RecordBatch(this.schema, data); } - protected _loadDictionaryBatch(header: metadata.DictionaryBatch, body: any) { + protected _loadDictionaryBatch(header: metadata.DictionaryBatch, body: Uint8Array) { const { id, isDelta } = header; const { dictionaries, schema } = this; const dictionary = dictionaries.get(id); @@ -370,7 +383,7 @@ abstract class RecordBatchReaderImpl implements RecordB } return dictionary.memoize(); } - protected _loadVectors(header: metadata.RecordBatch, body: any, types: (Field | DataType)[]) { + protected _loadVectors(header: metadata.RecordBatch, body: Uint8Array, types: (Field | DataType)[]) { return new VectorLoader(body, header.nodes, header.buffers, this.dictionaries).visitMany(types); } } diff --git a/js/src/ipc/writer.ts b/js/src/ipc/writer.ts index 5f5cc37562b..97f5cf91be9 100644 --- a/js/src/ipc/writer.ts +++ b/js/src/ipc/writer.ts @@ -248,7 +248,7 @@ export class RecordBatchWriter extends ReadableInterop< protected _writeRecordBatch(batch: RecordBatch) { const { byteLength, nodes, bufferRegions, buffers } = VectorAssembler.assemble(batch); - const recordBatch = new metadata.RecordBatch(batch.numRows, nodes, bufferRegions); + const recordBatch = new metadata.RecordBatch(batch.numRows, nodes, bufferRegions, null); const message = Message.from(recordBatch, byteLength); return this ._writeDictionaries(batch) @@ -259,7 +259,7 @@ export class RecordBatchWriter extends ReadableInterop< protected _writeDictionaryBatch(dictionary: Data, id: number, isDelta = false) { this._dictionaryDeltaOffsets.set(id, dictionary.length + (this._dictionaryDeltaOffsets.get(id) || 0)); const { byteLength, nodes, bufferRegions, buffers } = VectorAssembler.assemble(new Vector([dictionary])); - const recordBatch = new metadata.RecordBatch(dictionary.length, nodes, bufferRegions); + const recordBatch = new metadata.RecordBatch(dictionary.length, nodes, bufferRegions, null); const dictionaryBatch = new metadata.DictionaryBatch(recordBatch, id, isDelta); const message = Message.from(dictionaryBatch, byteLength); return this diff --git a/js/yarn.lock b/js/yarn.lock index 37e477adba5..27f0cf20400 100644 --- a/js/yarn.lock +++ b/js/yarn.lock @@ -6516,6 +6516,11 @@ lunr@^2.3.9: resolved "https://registry.yarnpkg.com/lunr/-/lunr-2.3.9.tgz#18b123142832337dd6e964df1a5a7707b25d35e1" integrity sha512-zTU3DaZaF3Rt9rhN3uBMGQD3dD2/vFQqnvZCDv4dl5iOzq2IZQqTxu90r4E5J+nP70J3ilqVCrbho2eWaeW8Ow== +lz4js@0.2.0: + version "0.2.0" + resolved "https://registry.yarnpkg.com/lz4js/-/lz4js-0.2.0.tgz#09f1a397cb2158f675146c3351dde85058cb322f" + integrity sha1-CfGjl8shWPZ1FGwzUd3oUFjLMi8= + make-dir@^2.1.0: version "2.1.0" resolved "https://registry.yarnpkg.com/make-dir/-/make-dir-2.1.0.tgz#5f0310e18b8be898cc07009295a30ae41e91e6f5"