Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rust/arrow-flight/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub fn flight_schema_from_arrow_schema(
options: &IpcWriteOptions,
) -> SchemaResult {
let data_gen = writer::IpcDataGenerator::default();
let schema_bytes = data_gen.schema_to_bytes(schema, &options);
let schema_bytes = data_gen.schema_to_bytes(schema, &options, &None);

SchemaResult {
schema: schema_bytes.ipc_message,
Expand All @@ -70,7 +70,7 @@ pub fn flight_data_from_arrow_schema(
options: &IpcWriteOptions,
) -> FlightData {
let data_gen = writer::IpcDataGenerator::default();
let schema = data_gen.schema_to_bytes(schema, &options);
let schema = data_gen.schema_to_bytes(schema, &options, &None);
FlightData {
flight_descriptor: None,
app_metadata: vec![],
Expand Down
6 changes: 6 additions & 0 deletions rust/arrow/src/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
//! * [`Field`](crate::datatypes::Field) to describe one field within a schema.
//! * [`DataType`](crate::datatypes::DataType) to describe the type of a field.

use std::collections::BTreeMap;
use std::collections::HashMap;
use std::default::Default;
use std::fmt;
Expand Down Expand Up @@ -183,6 +184,11 @@ pub enum IntervalUnit {
DayTime,
}

/// The `CustomMetaData` is an alias to `BTreeMap` which implements traits: Hash, PartialOrd, Ord.
/// It contains custom meta data (key-value pairs) defined by Arrow format for: field, file
/// footer, message and schema.
pub type CustomMetaData = BTreeMap<String, String>;

/// Contains the meta-data for a single relative type.
///
/// The `Schema` object is an ordered collection of `Field` objects.
Expand Down
6 changes: 3 additions & 3 deletions rust/arrow/src/ipc/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1068,9 +1068,9 @@ mod tests {
let batch = RecordBatch::try_new(Arc::new(schema.clone()), arrays).unwrap();
// create stream writer
let file = File::create("target/debug/testdata/float.stream").unwrap();
let mut stream_writer =
crate::ipc::writer::StreamWriter::try_new(file, &schema).unwrap();
stream_writer.write(&batch).unwrap();
let mut stream_writer = crate::ipc::writer::StreamWriter::new(file, &schema);
stream_writer.write_schema().unwrap();
stream_writer.write_record_batch(&batch).unwrap();
stream_writer.finish().unwrap();

// read stream back
Expand Down
202 changes: 155 additions & 47 deletions rust/arrow/src/ipc/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,20 +107,44 @@ impl IpcDataGenerator {
&self,
schema: &Schema,
write_options: &IpcWriteOptions,
custom_metadata: &Option<CustomMetaData>,
) -> EncodedData {
let mut fbb = FlatBufferBuilder::new();
let schema = {
let fb = ipc::convert::schema_to_fb_offset(&mut fbb, schema);
fb.as_union_value()
};

let mut message = ipc::MessageBuilder::new(&mut fbb);
message.add_version(write_options.metadata_version);
message.add_header_type(ipc::MessageHeader::Schema);
message.add_bodyLength(0);
message.add_header(schema);
// TODO: custom metadata
let data = message.finish();
// Optional custom metadata.
let mut fb_metadata = None;
if let Some(md) = custom_metadata {
if !md.is_empty() {
let mut kv_vec = vec![];
for (k, v) in md {
let kv_args = ipc::KeyValueArgs {
key: Some(fbb.create_string(k.as_str())),
value: Some(fbb.create_string(v.as_str())),
};
let kv_offset = ipc::KeyValue::create(&mut fbb, &kv_args);
kv_vec.push(kv_offset);
}

fb_metadata = Some(fbb.create_vector(&kv_vec));
}
}

let message_args = ipc::MessageArgs {
version: write_options.metadata_version,
header_type: ipc::MessageHeader::Schema,
header: Some(schema),
bodyLength: 0,
custom_metadata: fb_metadata,
};

// NOTE:
// As of crate `flatbuffers` 0.8.0, with `Message::new()`, almost no way to fix
// compilation error caused by "multiple mutable reference to fbb".
let data = ipc::Message::create(&mut fbb, &message_args);
fbb.finish(data, None);

let data = fbb.finished_data();
Expand Down Expand Up @@ -339,6 +363,8 @@ pub struct FileWriter<W: Write> {
write_options: IpcWriteOptions,
/// A reference to the schema, used in validating record batches
schema: Schema,
/// Optional custom metadata.
custom_metadata: Option<CustomMetaData>,
/// The number of bytes between each block of bytes, as an offset for random access
block_offsets: usize,
/// Dictionary blocks that will be written as part of the IPC footer
Expand All @@ -355,41 +381,83 @@ pub struct FileWriter<W: Write> {

impl<W: Write> FileWriter<W> {
/// Try create a new writer, with the schema written as part of the header
#[deprecated(
since = "3.0.0",
note = "This method is deprecated in favour of `let writer = FileWriter::new(writer, schema); writer.write_header_schema().unwrap();`"
)]
pub fn try_new(writer: W, schema: &Schema) -> Result<Self> {
let write_options = IpcWriteOptions::default();
Self::try_new_with_options(writer, schema, write_options)
let mut me = FileWriter::new(writer, schema);
me.write_header_schema()?;
Ok(me)
}

/// Try create a new writer with IpcWriteOptions
#[deprecated(
since = "3.0.0",
note = "This method is deprecated in favour of `let writer = FileWriter::new(writer, schema).with_write_options(opt); w.write_header_schema().unwrap();`"
)]
pub fn try_new_with_options(
writer: W,
schema: &Schema,
write_options: IpcWriteOptions,
) -> Result<Self> {
let data_gen = IpcDataGenerator::default();
let mut writer = BufWriter::new(writer);
// write magic to header
writer.write_all(&super::ARROW_MAGIC[..])?;
// create an 8-byte boundary after the header
writer.write_all(&[0, 0])?;
// write the schema, set the written bytes to the schema + header
let encoded_message = data_gen.schema_to_bytes(schema, &write_options);
let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;
Ok(Self {
writer,
write_options,
let mut me = FileWriter::new(writer, schema);
me.write_options = write_options;
me.write_header_schema()?;
Ok(me)
}

/// Creates the FileWriter.
pub fn new(writer: W, schema: &Schema) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The major downside with a new() that doesn't create the header, is that it creates a burden on end-users to remember to call the write_header_schema(). I would prefer to stick with try_new() and try_new_with_options()(happy to rename the latter), where we now amend the latter to also take other parameters.

I don't mind us using a builder pattern, but we shouldn't change the try_new in the process.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, so add metadata to IpcWriteOptions?

FileWriter {
writer: BufWriter::new(writer),
write_options: IpcWriteOptions::default(),
schema: schema.clone(),
block_offsets: meta + data + 8,
custom_metadata: None,
block_offsets: 0,
dictionary_blocks: vec![],
record_blocks: vec![],
finished: false,
dictionary_tracker: DictionaryTracker::new(true),
data_gen,
})
data_gen: IpcDataGenerator::default(),
}
}

/// Set optional write options and return `&Self`.
pub fn with_write_options(&mut self, write_options: IpcWriteOptions) -> &mut Self {
self.write_options = write_options;
self
}

/// Set optional custom metadata and return `&Self`.
pub fn with_custom_metadata(&mut self, metadata: CustomMetaData) -> &mut Self {
self.custom_metadata = Some(metadata);
self
}

/// Write header and schema to the file.
/// Must be called before `write_record_batch`.
pub fn write_header_schema(&mut self) -> Result<()> {
// write magic to header
self.writer.write_all(&super::ARROW_MAGIC[..])?;
// create an 8-byte boundary after the header
self.writer.write_all(&[0, 0])?;
// write the schema, set the written bytes to the schema + header
let encoded_message = self.data_gen.schema_to_bytes(
&self.schema,
&self.write_options,
&self.custom_metadata,
);
let (meta, data) =
write_message(&mut self.writer, encoded_message, &self.write_options)?;

self.block_offsets = meta + data + 8;

Ok(())
}

/// Write a record batch to the file
pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
pub fn write_record_batch(&mut self, batch: &RecordBatch) -> Result<()> {
if self.finished {
return Err(ArrowError::IoError(
"Cannot write record batch to file writer as it is closed".to_string(),
Expand Down Expand Up @@ -472,6 +540,8 @@ pub struct StreamWriter<W: Write> {
write_options: IpcWriteOptions,
/// A reference to the schema, used in validating record batches
schema: Schema,
/// Optional custom metadata.
custom_metadata: Option<CustomMetaData>,
/// Whether the writer footer has been written, and the writer is finished
finished: bool,
/// Keeps track of dictionaries that have been written
Expand All @@ -482,33 +552,68 @@ pub struct StreamWriter<W: Write> {

impl<W: Write> StreamWriter<W> {
/// Try create a new writer, with the schema written as part of the header
#[deprecated(
since = "3.0.0",
note = "This method is deprecated in favour of `let writer = StreamWriter::new(writer, schema); writer.write_schema().unwrap();`"
)]
pub fn try_new(writer: W, schema: &Schema) -> Result<Self> {
let write_options = IpcWriteOptions::default();
Self::try_new_with_options(writer, schema, write_options)
let mut me = StreamWriter::new(writer, schema);
me.write_schema()?;
Ok(me)
}

#[deprecated(
since = "3.0.0",
note = "This method is deprecated in favour of `let writer = StreamWriter::new(writer, schema).with_write_options(opt); w.write_schema().unwrap();`"
)]
pub fn try_new_with_options(
writer: W,
schema: &Schema,
write_options: IpcWriteOptions,
) -> Result<Self> {
let data_gen = IpcDataGenerator::default();
let mut writer = BufWriter::new(writer);
// write the schema, set the written bytes to the schema
let encoded_message = data_gen.schema_to_bytes(schema, &write_options);
write_message(&mut writer, encoded_message, &write_options)?;
Ok(Self {
writer,
write_options,
let mut me = StreamWriter::new(writer, schema);
me.write_options = write_options;
me.write_schema()?;
Ok(me)
}

pub fn new(writer: W, schema: &Schema) -> Self {
StreamWriter {
writer: BufWriter::new(writer),
write_options: IpcWriteOptions::default(),
schema: schema.clone(),
custom_metadata: None,
finished: false,
dictionary_tracker: DictionaryTracker::new(false),
data_gen,
})
data_gen: IpcDataGenerator::default(),
}
}

/// Set optional write options and return `&Self`.
pub fn with_write_options(&mut self, write_options: IpcWriteOptions) -> &Self {
self.write_options = write_options;
self
}

/// Set optional custom metadata and return `&Self`.
pub fn with_custom_metadata(&mut self, metadata: CustomMetaData) -> &Self {
self.custom_metadata = Some(metadata);
self
}

/// Write schema to the stream.
pub fn write_schema(&mut self) -> Result<()> {
let encoded_message = self.data_gen.schema_to_bytes(
&self.schema,
&self.write_options,
&self.custom_metadata,
);
write_message(&mut self.writer, encoded_message, &self.write_options)?;
Ok(())
}

/// Write a record batch to the stream
pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
pub fn write_record_batch(&mut self, batch: &RecordBatch) -> Result<()> {
if self.finished {
return Err(ArrowError::IoError(
"Cannot write record batch to stream writer as it is closed".to_string(),
Expand Down Expand Up @@ -766,9 +871,10 @@ mod tests {
.unwrap();
{
let file = File::create("target/debug/testdata/arrow.arrow_file").unwrap();
let mut writer = FileWriter::try_new(file, &schema).unwrap();
let mut writer = FileWriter::new(file, &schema);
writer.write_header_schema().unwrap();

writer.write(&batch).unwrap();
writer.write_record_batch(&batch).unwrap();
// this is inside a block to test the implicit finishing of the file on `Drop`
}

Expand Down Expand Up @@ -815,9 +921,9 @@ mod tests {
.unwrap();
{
let file = File::create("target/debug/testdata/nulls.arrow_file").unwrap();
let mut writer = FileWriter::try_new(file, &schema).unwrap();

writer.write(&batch).unwrap();
let mut writer = FileWriter::new(file, &schema);
writer.write_header_schema().unwrap();
writer.write_record_batch(&batch).unwrap();
// this is inside a block to test the implicit finishing of the file on `Drop`
}

Expand Down Expand Up @@ -867,9 +973,10 @@ mod tests {
let file =
File::create(format!("target/debug/testdata/{}.arrow_file", path))
.unwrap();
let mut writer = FileWriter::try_new(file, &reader.schema()).unwrap();
let mut writer = FileWriter::new(file, &reader.schema());
writer.write_header_schema().unwrap();
while let Some(Ok(batch)) = reader.next() {
writer.write(&batch).unwrap();
writer.write_record_batch(&batch).unwrap();
}
writer.finish().unwrap();
}
Expand Down Expand Up @@ -911,9 +1018,10 @@ mod tests {
{
let file = File::create(format!("target/debug/testdata/{}.stream", path))
.unwrap();
let mut writer = StreamWriter::try_new(file, &reader.schema()).unwrap();
let mut writer = StreamWriter::new(file, &reader.schema());
writer.write_schema().unwrap();
reader.for_each(|batch| {
writer.write(&batch.unwrap()).unwrap();
writer.write_record_batch(&batch.unwrap()).unwrap();
});
writer.finish().unwrap();
}
Expand Down
6 changes: 3 additions & 3 deletions rust/integration-testing/src/bin/arrow-file-to-stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ fn main() -> Result<()> {
let mut reader = FileReader::try_new(reader)?;
let schema = reader.schema();

let mut writer = StreamWriter::try_new(io::stdout(), &schema)?;

let mut writer = StreamWriter::new(io::stdout(), &schema);
writer.write_schema()?;
reader.try_for_each(|batch| {
let batch = batch?;
writer.write(&batch)
writer.write_record_batch(&batch)
})?;
writer.finish()?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,11 @@ fn json_to_arrow(json_name: &str, arrow_name: &str, verbose: bool) -> Result<()>
let json_file = read_json_file(json_name)?;

let arrow_file = File::create(arrow_name)?;
let mut writer = FileWriter::try_new(arrow_file, &json_file.schema)?;
let mut writer = FileWriter::new(arrow_file, &json_file.schema);
writer.write_header_schema()?;

for b in json_file.batches {
writer.write(&b)?;
writer.write_record_batch(&b)?;
}

Ok(())
Expand Down
5 changes: 3 additions & 2 deletions rust/integration-testing/src/bin/arrow-stream-to-file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ fn main() -> Result<()> {
let mut arrow_stream_reader = StreamReader::try_new(io::stdin())?;
let schema = arrow_stream_reader.schema();

let mut writer = FileWriter::try_new(io::stdout(), &schema)?;
let mut writer = FileWriter::new(io::stdout(), &schema);
writer.write_header_schema()?;

arrow_stream_reader.try_for_each(|batch| writer.write(&batch?))?;
arrow_stream_reader.try_for_each(|batch| writer.write_record_batch(&batch?))?;
writer.finish()?;

Ok(())
Expand Down
Loading