Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions js/src/ipc/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ export class MessageReader implements IterableIterator<Message> {
public next(): IteratorResult<Message> {
let r;
if ((r = this.readMetadataLength()).done) { return ITERATOR_DONE; }
// ARROW-6313: If the first 4 bytes are continuation indicator (-1), read
// the next 4 for the 32-bit metadata length. Otherwise, assume this is a
// pre-v0.15 message, where the first 4 bytes are the metadata length.
if ((r.value === -1) &&
(r = this.readMetadataLength()).done) { return ITERATOR_DONE; }
if ((r = this.readMetadata(r.value)).done) { return ITERATOR_DONE; }
return (<any> r) as IteratorResult<Message>;
}
Expand Down Expand Up @@ -76,8 +81,8 @@ export class MessageReader implements IterableIterator<Message> {
protected readMetadataLength(): IteratorResult<number> {
const buf = this.source.read(PADDING);
const bb = buf && new ByteBuffer(buf);
const len = +(bb && bb.readInt32(0))!;
return { done: len <= 0, value: len };
const len = bb && bb.readInt32(0) || 0;
return { done: len === 0, value: len };
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have some kind of assertion somewhere that verifies length > 0? What happens if a corrupt file has that value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That’s effectively handled in the subsequent call. We use the length read here as the number of bytes to consume from the stream, which should be the flatbuffer metadata.

If the length is incorrect, the flatbuffer metadata created from the buffer will be empty. If the length is too long, the stream will read as many bytes as it can, and close automatically. Both cases are interpreted by the stream reader as bad input, and ignored.

}
protected readMetadata(metadataLength: number): IteratorResult<Message> {
const buf = this.source.read(metadataLength);
Expand All @@ -104,6 +109,11 @@ export class AsyncMessageReader implements AsyncIterableIterator<Message> {
public async next(): Promise<IteratorResult<Message>> {
let r;
if ((r = await this.readMetadataLength()).done) { return ITERATOR_DONE; }
// ARROW-6313: If the first 4 bytes are continuation indicator (-1), read
// the next 4 for the 32-bit metadata length. Otherwise, assume this is a
// pre-v0.15 message, where the first 4 bytes are the metadata length.
if ((r.value === -1) &&
(r = await this.readMetadataLength()).done) { return ITERATOR_DONE; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's too bad we have to duplicate this code. Could the readers be combined somehow? Not in this PR, just curious

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately not. Async functions are implemented via promises, which don’t allow parameterizing the scheduler. Async functions force the caller to handle the returned promise, guaranteeing the function doesn’t finish its work synchronously, even if it could.

This is a key difference between JS promises and Python’s asyncio module, which does execute synchronously if no actual async work is performed.

if ((r = await this.readMetadata(r.value)).done) { return ITERATOR_DONE; }
return (<any> r) as IteratorResult<Message>;
}
Expand Down Expand Up @@ -140,8 +150,8 @@ export class AsyncMessageReader implements AsyncIterableIterator<Message> {
protected async readMetadataLength(): Promise<IteratorResult<number>> {
const buf = await this.source.read(PADDING);
const bb = buf && new ByteBuffer(buf);
const len = +(bb && bb.readInt32(0))!;
return { done: len <= 0, value: len };
const len = bb && bb.readInt32(0) || 0;
return { done: len === 0, value: len };
}
protected async readMetadata(metadataLength: number): Promise<IteratorResult<Message>> {
const buf = await this.source.read(metadataLength);
Expand Down
55 changes: 41 additions & 14 deletions js/src/ipc/writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,21 @@ import { JSONVectorAssembler } from '../visitor/jsonvectorassembler';
import { ArrayBufferViewInput, toUint8Array } from '../util/buffer';
import { RecordBatch, _InternalEmptyPlaceholderRecordBatch } from '../recordbatch';
import { Writable, ReadableInterop, ReadableDOMStreamOptions } from '../io/interfaces';
import { isPromise, isAsyncIterable, isWritableDOMStream, isWritableNodeStream, isIterable } from '../util/compat';
import { isPromise, isAsyncIterable, isWritableDOMStream, isWritableNodeStream, isIterable, isObject } from '../util/compat';

export interface RecordBatchStreamWriterOptions {
/**
*
*/
autoDestroy?: boolean;
/**
* A flag indicating whether the RecordBatchWriter should construct pre-0.15.0
* encapsulated IPC Messages, which reserves 4 bytes for the Message metadata
* length instead of 8.
* @see https://issues.apache.org/jira/browse/ARROW-6313
*/
writeLegacyIpcFormat?: boolean;
}

export class RecordBatchWriter<T extends { [key: string]: DataType } = any> extends ReadableInterop<Uint8Array> implements Writable<RecordBatch<T>> {

Expand All @@ -51,14 +65,17 @@ export class RecordBatchWriter<T extends { [key: string]: DataType } = any> exte
throw new Error(`"throughDOM" not available in this environment`);
}

constructor(options?: { autoDestroy: boolean }) {
constructor(options?: RecordBatchStreamWriterOptions) {
super();
this._autoDestroy = options && (typeof options.autoDestroy === 'boolean') ? options.autoDestroy : true;
isObject(options) || (options = { autoDestroy: true, writeLegacyIpcFormat: false });
this._autoDestroy = (typeof options.autoDestroy === 'boolean') ? options.autoDestroy : true;
this._writeLegacyIpcFormat = (typeof options.writeLegacyIpcFormat === 'boolean') ? options.writeLegacyIpcFormat : false;
}

protected _position = 0;
protected _started = false;
protected _autoDestroy: boolean;
protected _writeLegacyIpcFormat: boolean;
// @ts-ignore
protected _sink = new AsyncByteQueue();
protected _schema: Schema | null = null;
Expand Down Expand Up @@ -178,17 +195,22 @@ export class RecordBatchWriter<T extends { [key: string]: DataType } = any> exte
const a = alignment - 1;
const buffer = Message.encode(message);
const flatbufferSize = buffer.byteLength;
const alignedSize = (flatbufferSize + 4 + a) & ~a;
const nPaddingBytes = alignedSize - flatbufferSize - 4;
const prefixSize = !this._writeLegacyIpcFormat ? 8 : 4;
const alignedSize = (flatbufferSize + prefixSize + a) & ~a;
const nPaddingBytes = alignedSize - flatbufferSize - prefixSize;

if (message.headerType === MessageHeader.RecordBatch) {
this._recordBatchBlocks.push(new FileBlock(alignedSize, message.bodyLength, this._position));
} else if (message.headerType === MessageHeader.DictionaryBatch) {
this._dictionaryBlocks.push(new FileBlock(alignedSize, message.bodyLength, this._position));
}

// If not in legacy pre-0.15.0 mode, write the stream continuation indicator
if (!this._writeLegacyIpcFormat) {
this._write(Int32Array.of(-1));
}
// Write the flatbuffer size prefix including padding
this._write(Int32Array.of(alignedSize - 4));
this._write(Int32Array.of(alignedSize - prefixSize));
// Write the flatbuffer
if (flatbufferSize > 0) { this._write(buffer); }
// Write any padding
Expand All @@ -212,7 +234,10 @@ export class RecordBatchWriter<T extends { [key: string]: DataType } = any> exte

// @ts-ignore
protected _writeFooter(schema: Schema<T>) {
return this._writePadding(4); // eos bytes
// eos bytes
return this._writeLegacyIpcFormat
? this._write(Int32Array.of(0))
: this._write(Int32Array.of(-1, 0));
}

protected _writeMagic() {
Expand Down Expand Up @@ -275,12 +300,12 @@ export class RecordBatchWriter<T extends { [key: string]: DataType } = any> exte

/** @ignore */
export class RecordBatchStreamWriter<T extends { [key: string]: DataType } = any> extends RecordBatchWriter<T> {
public static writeAll<T extends { [key: string]: DataType } = any>(input: Table<T> | Iterable<RecordBatch<T>>, options?: { autoDestroy: true }): RecordBatchStreamWriter<T>;
public static writeAll<T extends { [key: string]: DataType } = any>(input: AsyncIterable<RecordBatch<T>>, options?: { autoDestroy: true }): Promise<RecordBatchStreamWriter<T>>;
public static writeAll<T extends { [key: string]: DataType } = any>(input: PromiseLike<AsyncIterable<RecordBatch<T>>>, options?: { autoDestroy: true }): Promise<RecordBatchStreamWriter<T>>;
public static writeAll<T extends { [key: string]: DataType } = any>(input: PromiseLike<Table<T> | Iterable<RecordBatch<T>>>, options?: { autoDestroy: true }): Promise<RecordBatchStreamWriter<T>>;
public static writeAll<T extends { [key: string]: DataType } = any>(input: Table<T> | Iterable<RecordBatch<T>>, options?: RecordBatchStreamWriterOptions): RecordBatchStreamWriter<T>;
public static writeAll<T extends { [key: string]: DataType } = any>(input: AsyncIterable<RecordBatch<T>>, options?: RecordBatchStreamWriterOptions): Promise<RecordBatchStreamWriter<T>>;
public static writeAll<T extends { [key: string]: DataType } = any>(input: PromiseLike<AsyncIterable<RecordBatch<T>>>, options?: RecordBatchStreamWriterOptions): Promise<RecordBatchStreamWriter<T>>;
public static writeAll<T extends { [key: string]: DataType } = any>(input: PromiseLike<Table<T> | Iterable<RecordBatch<T>>>, options?: RecordBatchStreamWriterOptions): Promise<RecordBatchStreamWriter<T>>;
/** @nocollapse */
public static writeAll<T extends { [key: string]: DataType } = any>(input: any, options?: { autoDestroy: true }) {
public static writeAll<T extends { [key: string]: DataType } = any>(input: any, options?: RecordBatchStreamWriterOptions) {
const writer = new RecordBatchStreamWriter<T>(options);
if (isPromise<any>(input)) {
return input.then((x) => writer.writeAll(x));
Expand Down Expand Up @@ -323,8 +348,8 @@ export class RecordBatchFileWriter<T extends { [key: string]: DataType } = any>
schema, MetadataVersion.V4,
this._recordBatchBlocks, this._dictionaryBlocks
));
return this
._writePadding(4) // EOS bytes for sequential readers
return super
._writeFooter(schema) // EOS bytes for sequential readers
._write(buffer) // Write the flatbuffer
._write(Int32Array.of(buffer.byteLength)) // then the footer size suffix
._writeMagic(); // then the magic suffix
Expand Down Expand Up @@ -355,6 +380,8 @@ export class RecordBatchJSONWriter<T extends { [key: string]: DataType } = any>
}

protected _writeMessage() { return this; }
// @ts-ignore
protected _writeFooter(schema: Schema<T>) { return this; }
protected _writeSchema(schema: Schema<T>) {
return this._write(`{\n "schema": ${
JSON.stringify({ fields: schema.fields.map(fieldToJSON) }, null, 2)
Expand Down
21 changes: 14 additions & 7 deletions js/test/unit/ipc/writer/stream-writer-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {

import * as generate from '../../../generate-test-data';
import { validateRecordBatchIterator } from '../validate';
import { RecordBatchStreamWriterOptions } from '../../../../src/ipc/writer';
import { DictionaryVector, Dictionary, Uint32, Int32 } from '../../../Arrow';
import { Table, Schema, Chunked, Builder, RecordBatch, RecordBatchReader, RecordBatchStreamWriter } from '../../../Arrow';

Expand All @@ -31,15 +32,21 @@ describe('RecordBatchStreamWriter', () => {
const type = generate.sparseUnion(0, 0).vector.type;
const schema = Schema.new({ 'dictSparseUnion': type });
const table = generate.table([10, 20, 30], schema).table;
testStreamWriter(table, `[${table.schema.fields.join(', ')}]`);
const testName = `[${table.schema.fields.join(', ')}]`;
testStreamWriter(table, testName, { writeLegacyIpcFormat: true });
testStreamWriter(table, testName, { writeLegacyIpcFormat: false });
})();

for (const table of generateRandomTables([10, 20, 30])) {
testStreamWriter(table, `[${table.schema.fields.join(', ')}]`);
const testName = `[${table.schema.fields.join(', ')}]`;
testStreamWriter(table, testName, { writeLegacyIpcFormat: true });
testStreamWriter(table, testName, { writeLegacyIpcFormat: false });
}

for (const table of generateDictionaryTables([10, 20, 30])) {
testStreamWriter(table, `${table.schema.fields[0]}`);
const testName = `${table.schema.fields[0]}`;
testStreamWriter(table, testName, { writeLegacyIpcFormat: true });
testStreamWriter(table, testName, { writeLegacyIpcFormat: false });
}

it(`should write multiple tables to the same output stream`, async () => {
Expand Down Expand Up @@ -98,14 +105,14 @@ describe('RecordBatchStreamWriter', () => {
});
});

function testStreamWriter(table: Table, name: string) {
function testStreamWriter(table: Table, name: string, options: RecordBatchStreamWriterOptions) {
describe(`should write the Arrow IPC stream format (${name})`, () => {
test(`Table`, validateTable.bind(0, table));
test(`Table`, validateTable.bind(0, table, options));
});
}

async function validateTable(source: Table) {
const writer = RecordBatchStreamWriter.writeAll(source);
async function validateTable(source: Table, options: RecordBatchStreamWriterOptions) {
const writer = RecordBatchStreamWriter.writeAll(source, options);
const result = await Table.from(writer.toUint8Array());
validateRecordBatchIterator(3, source.chunks);
expect(result).toEqualTable(source);
Expand Down