diff --git a/js/src/ipc/message.ts b/js/src/ipc/message.ts index c8d3b766620..47136b7a6c0 100644 --- a/js/src/ipc/message.ts +++ b/js/src/ipc/message.ts @@ -40,6 +40,11 @@ export class MessageReader implements IterableIterator { public next(): IteratorResult { let r; if ((r = this.readMetadataLength()).done) { return ITERATOR_DONE; } + // ARROW-6313: If the first 4 bytes are continuation indicator (-1), read + // the next 4 for the 32-bit metadata length. Otherwise, assume this is a + // pre-v0.15 message, where the first 4 bytes are the metadata length. + if ((r.value === -1) && + (r = this.readMetadataLength()).done) { return ITERATOR_DONE; } if ((r = this.readMetadata(r.value)).done) { return ITERATOR_DONE; } return ( r) as IteratorResult; } @@ -76,8 +81,8 @@ export class MessageReader implements IterableIterator { protected readMetadataLength(): IteratorResult { const buf = this.source.read(PADDING); const bb = buf && new ByteBuffer(buf); - const len = +(bb && bb.readInt32(0))!; - return { done: len <= 0, value: len }; + const len = bb && bb.readInt32(0) || 0; + return { done: len === 0, value: len }; } protected readMetadata(metadataLength: number): IteratorResult { const buf = this.source.read(metadataLength); @@ -104,6 +109,11 @@ export class AsyncMessageReader implements AsyncIterableIterator { public async next(): Promise> { let r; if ((r = await this.readMetadataLength()).done) { return ITERATOR_DONE; } + // ARROW-6313: If the first 4 bytes are continuation indicator (-1), read + // the next 4 for the 32-bit metadata length. Otherwise, assume this is a + // pre-v0.15 message, where the first 4 bytes are the metadata length. + if ((r.value === -1) && + (r = await this.readMetadataLength()).done) { return ITERATOR_DONE; } if ((r = await this.readMetadata(r.value)).done) { return ITERATOR_DONE; } return ( r) as IteratorResult; } @@ -140,8 +150,8 @@ export class AsyncMessageReader implements AsyncIterableIterator { protected async readMetadataLength(): Promise> { const buf = await this.source.read(PADDING); const bb = buf && new ByteBuffer(buf); - const len = +(bb && bb.readInt32(0))!; - return { done: len <= 0, value: len }; + const len = bb && bb.readInt32(0) || 0; + return { done: len === 0, value: len }; } protected async readMetadata(metadataLength: number): Promise> { const buf = await this.source.read(metadataLength); diff --git a/js/src/ipc/writer.ts b/js/src/ipc/writer.ts index 2e60c5c9388..4441b174da8 100644 --- a/js/src/ipc/writer.ts +++ b/js/src/ipc/writer.ts @@ -32,7 +32,21 @@ import { JSONVectorAssembler } from '../visitor/jsonvectorassembler'; import { ArrayBufferViewInput, toUint8Array } from '../util/buffer'; import { RecordBatch, _InternalEmptyPlaceholderRecordBatch } from '../recordbatch'; import { Writable, ReadableInterop, ReadableDOMStreamOptions } from '../io/interfaces'; -import { isPromise, isAsyncIterable, isWritableDOMStream, isWritableNodeStream, isIterable } from '../util/compat'; +import { isPromise, isAsyncIterable, isWritableDOMStream, isWritableNodeStream, isIterable, isObject } from '../util/compat'; + +export interface RecordBatchStreamWriterOptions { + /** + * + */ + autoDestroy?: boolean; + /** + * A flag indicating whether the RecordBatchWriter should construct pre-0.15.0 + * encapsulated IPC Messages, which reserves 4 bytes for the Message metadata + * length instead of 8. + * @see https://issues.apache.org/jira/browse/ARROW-6313 + */ + writeLegacyIpcFormat?: boolean; +} export class RecordBatchWriter extends ReadableInterop implements Writable> { @@ -51,14 +65,17 @@ export class RecordBatchWriter exte throw new Error(`"throughDOM" not available in this environment`); } - constructor(options?: { autoDestroy: boolean }) { + constructor(options?: RecordBatchStreamWriterOptions) { super(); - this._autoDestroy = options && (typeof options.autoDestroy === 'boolean') ? options.autoDestroy : true; + isObject(options) || (options = { autoDestroy: true, writeLegacyIpcFormat: false }); + this._autoDestroy = (typeof options.autoDestroy === 'boolean') ? options.autoDestroy : true; + this._writeLegacyIpcFormat = (typeof options.writeLegacyIpcFormat === 'boolean') ? options.writeLegacyIpcFormat : false; } protected _position = 0; protected _started = false; protected _autoDestroy: boolean; + protected _writeLegacyIpcFormat: boolean; // @ts-ignore protected _sink = new AsyncByteQueue(); protected _schema: Schema | null = null; @@ -178,8 +195,9 @@ export class RecordBatchWriter exte const a = alignment - 1; const buffer = Message.encode(message); const flatbufferSize = buffer.byteLength; - const alignedSize = (flatbufferSize + 4 + a) & ~a; - const nPaddingBytes = alignedSize - flatbufferSize - 4; + const prefixSize = !this._writeLegacyIpcFormat ? 8 : 4; + const alignedSize = (flatbufferSize + prefixSize + a) & ~a; + const nPaddingBytes = alignedSize - flatbufferSize - prefixSize; if (message.headerType === MessageHeader.RecordBatch) { this._recordBatchBlocks.push(new FileBlock(alignedSize, message.bodyLength, this._position)); @@ -187,8 +205,12 @@ export class RecordBatchWriter exte this._dictionaryBlocks.push(new FileBlock(alignedSize, message.bodyLength, this._position)); } + // If not in legacy pre-0.15.0 mode, write the stream continuation indicator + if (!this._writeLegacyIpcFormat) { + this._write(Int32Array.of(-1)); + } // Write the flatbuffer size prefix including padding - this._write(Int32Array.of(alignedSize - 4)); + this._write(Int32Array.of(alignedSize - prefixSize)); // Write the flatbuffer if (flatbufferSize > 0) { this._write(buffer); } // Write any padding @@ -212,7 +234,10 @@ export class RecordBatchWriter exte // @ts-ignore protected _writeFooter(schema: Schema) { - return this._writePadding(4); // eos bytes + // eos bytes + return this._writeLegacyIpcFormat + ? this._write(Int32Array.of(0)) + : this._write(Int32Array.of(-1, 0)); } protected _writeMagic() { @@ -275,12 +300,12 @@ export class RecordBatchWriter exte /** @ignore */ export class RecordBatchStreamWriter extends RecordBatchWriter { - public static writeAll(input: Table | Iterable>, options?: { autoDestroy: true }): RecordBatchStreamWriter; - public static writeAll(input: AsyncIterable>, options?: { autoDestroy: true }): Promise>; - public static writeAll(input: PromiseLike>>, options?: { autoDestroy: true }): Promise>; - public static writeAll(input: PromiseLike | Iterable>>, options?: { autoDestroy: true }): Promise>; + public static writeAll(input: Table | Iterable>, options?: RecordBatchStreamWriterOptions): RecordBatchStreamWriter; + public static writeAll(input: AsyncIterable>, options?: RecordBatchStreamWriterOptions): Promise>; + public static writeAll(input: PromiseLike>>, options?: RecordBatchStreamWriterOptions): Promise>; + public static writeAll(input: PromiseLike | Iterable>>, options?: RecordBatchStreamWriterOptions): Promise>; /** @nocollapse */ - public static writeAll(input: any, options?: { autoDestroy: true }) { + public static writeAll(input: any, options?: RecordBatchStreamWriterOptions) { const writer = new RecordBatchStreamWriter(options); if (isPromise(input)) { return input.then((x) => writer.writeAll(x)); @@ -323,8 +348,8 @@ export class RecordBatchFileWriter schema, MetadataVersion.V4, this._recordBatchBlocks, this._dictionaryBlocks )); - return this - ._writePadding(4) // EOS bytes for sequential readers + return super + ._writeFooter(schema) // EOS bytes for sequential readers ._write(buffer) // Write the flatbuffer ._write(Int32Array.of(buffer.byteLength)) // then the footer size suffix ._writeMagic(); // then the magic suffix @@ -355,6 +380,8 @@ export class RecordBatchJSONWriter } protected _writeMessage() { return this; } + // @ts-ignore + protected _writeFooter(schema: Schema) { return this; } protected _writeSchema(schema: Schema) { return this._write(`{\n "schema": ${ JSON.stringify({ fields: schema.fields.map(fieldToJSON) }, null, 2) diff --git a/js/test/unit/ipc/writer/stream-writer-tests.ts b/js/test/unit/ipc/writer/stream-writer-tests.ts index ec6f581548d..48cc36526da 100644 --- a/js/test/unit/ipc/writer/stream-writer-tests.ts +++ b/js/test/unit/ipc/writer/stream-writer-tests.ts @@ -22,6 +22,7 @@ import { import * as generate from '../../../generate-test-data'; import { validateRecordBatchIterator } from '../validate'; +import { RecordBatchStreamWriterOptions } from '../../../../src/ipc/writer'; import { DictionaryVector, Dictionary, Uint32, Int32 } from '../../../Arrow'; import { Table, Schema, Chunked, Builder, RecordBatch, RecordBatchReader, RecordBatchStreamWriter } from '../../../Arrow'; @@ -31,15 +32,21 @@ describe('RecordBatchStreamWriter', () => { const type = generate.sparseUnion(0, 0).vector.type; const schema = Schema.new({ 'dictSparseUnion': type }); const table = generate.table([10, 20, 30], schema).table; - testStreamWriter(table, `[${table.schema.fields.join(', ')}]`); + const testName = `[${table.schema.fields.join(', ')}]`; + testStreamWriter(table, testName, { writeLegacyIpcFormat: true }); + testStreamWriter(table, testName, { writeLegacyIpcFormat: false }); })(); for (const table of generateRandomTables([10, 20, 30])) { - testStreamWriter(table, `[${table.schema.fields.join(', ')}]`); + const testName = `[${table.schema.fields.join(', ')}]`; + testStreamWriter(table, testName, { writeLegacyIpcFormat: true }); + testStreamWriter(table, testName, { writeLegacyIpcFormat: false }); } for (const table of generateDictionaryTables([10, 20, 30])) { - testStreamWriter(table, `${table.schema.fields[0]}`); + const testName = `${table.schema.fields[0]}`; + testStreamWriter(table, testName, { writeLegacyIpcFormat: true }); + testStreamWriter(table, testName, { writeLegacyIpcFormat: false }); } it(`should write multiple tables to the same output stream`, async () => { @@ -98,14 +105,14 @@ describe('RecordBatchStreamWriter', () => { }); }); -function testStreamWriter(table: Table, name: string) { +function testStreamWriter(table: Table, name: string, options: RecordBatchStreamWriterOptions) { describe(`should write the Arrow IPC stream format (${name})`, () => { - test(`Table`, validateTable.bind(0, table)); + test(`Table`, validateTable.bind(0, table, options)); }); } -async function validateTable(source: Table) { - const writer = RecordBatchStreamWriter.writeAll(source); +async function validateTable(source: Table, options: RecordBatchStreamWriterOptions) { + const writer = RecordBatchStreamWriter.writeAll(source, options); const result = await Table.from(writer.toUint8Array()); validateRecordBatchIterator(3, source.chunks); expect(result).toEqualTable(source);