Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust/arrow/src/ipc/gen/File.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl Block {
self.metaDataLength_.from_little_endian()
}
/// Length of the data (this is aligned so there can be a gap between this and
/// the metatdata).
/// the metadata).
pub fn bodyLength<'a>(&'a self) -> i64 {
self.bodyLength_.from_little_endian()
}
Expand Down
255 changes: 255 additions & 0 deletions rust/arrow/src/ipc/gen/Message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,123 @@ use flatbuffers::EndianScalar;
use std::{cmp::Ordering, mem};
// automatically generated by the FlatBuffers compiler, do not modify

#[allow(non_camel_case_types)]
#[repr(i8)]
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
pub enum CompressionType {
LZ4_FRAME = 0,
ZSTD = 1,
}

const ENUM_MIN_COMPRESSION_TYPE: i8 = 0;
const ENUM_MAX_COMPRESSION_TYPE: i8 = 1;

impl<'a> flatbuffers::Follow<'a> for CompressionType {
type Inner = Self;
#[inline]
fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
flatbuffers::read_scalar_at::<Self>(buf, loc)
}
}

impl flatbuffers::EndianScalar for CompressionType {
#[inline]
fn to_little_endian(self) -> Self {
let n = i8::to_le(self as i8);
let p = &n as *const i8 as *const CompressionType;
unsafe { *p }
}
#[inline]
fn from_little_endian(self) -> Self {
let n = i8::from_le(self as i8);
let p = &n as *const i8 as *const CompressionType;
unsafe { *p }
}
}

impl flatbuffers::Push for CompressionType {
type Output = CompressionType;
#[inline]
fn push(&self, dst: &mut [u8], _rest: &[u8]) {
flatbuffers::emplace_scalar::<CompressionType>(dst, *self);
}
}

#[allow(non_camel_case_types)]
const ENUM_VALUES_COMPRESSION_TYPE: [CompressionType; 2] =
[CompressionType::LZ4_FRAME, CompressionType::ZSTD];

#[allow(non_camel_case_types)]
const ENUM_NAMES_COMPRESSION_TYPE: [&'static str; 2] = ["LZ4_FRAME", "ZSTD"];

pub fn enum_name_compression_type(e: CompressionType) -> &'static str {
let index = e as i8;
ENUM_NAMES_COMPRESSION_TYPE[index as usize]
}

/// Provided for forward compatibility in case we need to support different
/// strategies for compressing the IPC message body (like whole-body
/// compression rather than buffer-level) in the future
#[allow(non_camel_case_types)]
#[repr(i8)]
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
pub enum BodyCompressionMethod {
/// Each constituent buffer is first compressed with the indicated
/// compressor, and then written with the uncompressed length in the first 8
/// bytes as a 64-bit little-endian signed integer followed by the compressed
/// buffer bytes (and then padding as required by the protocol). The
/// uncompressed length may be set to -1 to indicate that the data that
/// follows is not compressed, which can be useful for cases where
/// compression does not yield appreciable savings.
BUFFER = 0,
}

const ENUM_MIN_BODY_COMPRESSION_METHOD: i8 = 0;
const ENUM_MAX_BODY_COMPRESSION_METHOD: i8 = 0;

impl<'a> flatbuffers::Follow<'a> for BodyCompressionMethod {
type Inner = Self;
#[inline]
fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
flatbuffers::read_scalar_at::<Self>(buf, loc)
}
}

impl flatbuffers::EndianScalar for BodyCompressionMethod {
#[inline]
fn to_little_endian(self) -> Self {
let n = i8::to_le(self as i8);
let p = &n as *const i8 as *const BodyCompressionMethod;
unsafe { *p }
}
#[inline]
fn from_little_endian(self) -> Self {
let n = i8::from_le(self as i8);
let p = &n as *const i8 as *const BodyCompressionMethod;
unsafe { *p }
}
}

impl flatbuffers::Push for BodyCompressionMethod {
type Output = BodyCompressionMethod;
#[inline]
fn push(&self, dst: &mut [u8], _rest: &[u8]) {
flatbuffers::emplace_scalar::<BodyCompressionMethod>(dst, *self);
}
}

#[allow(non_camel_case_types)]
const ENUM_VALUES_BODY_COMPRESSION_METHOD: [BodyCompressionMethod; 1] =
[BodyCompressionMethod::BUFFER];

#[allow(non_camel_case_types)]
const ENUM_NAMES_BODY_COMPRESSION_METHOD: [&'static str; 1] = ["BUFFER"];

pub fn enum_name_body_compression_method(e: BodyCompressionMethod) -> &'static str {
let index = e as i8;
ENUM_NAMES_BODY_COMPRESSION_METHOD[index as usize]
}

/// ----------------------------------------------------------------------
/// The root Message type
/// This union enables us to easily send different message types without
Expand Down Expand Up @@ -184,6 +301,118 @@ impl FieldNode {
}
}

pub enum BodyCompressionOffset {}
#[derive(Copy, Clone, Debug, PartialEq)]

/// Optional compression for the memory buffers constituting IPC message
/// bodies. Intended for use with RecordBatch but could be used for other
/// message types
pub struct BodyCompression<'a> {
pub _tab: flatbuffers::Table<'a>,
}

impl<'a> flatbuffers::Follow<'a> for BodyCompression<'a> {
type Inner = BodyCompression<'a>;
#[inline]
fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
Self {
_tab: flatbuffers::Table { buf: buf, loc: loc },
}
}
}

impl<'a> BodyCompression<'a> {
#[inline]
pub fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
BodyCompression { _tab: table }
}
#[allow(unused_mut)]
pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr>(
_fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr>,
args: &'args BodyCompressionArgs,
) -> flatbuffers::WIPOffset<BodyCompression<'bldr>> {
let mut builder = BodyCompressionBuilder::new(_fbb);
builder.add_method(args.method);
builder.add_codec(args.codec);
builder.finish()
}

pub const VT_CODEC: flatbuffers::VOffsetT = 4;
pub const VT_METHOD: flatbuffers::VOffsetT = 6;

/// Compressor library
#[inline]
pub fn codec(&self) -> CompressionType {
self._tab
.get::<CompressionType>(
BodyCompression::VT_CODEC,
Some(CompressionType::LZ4_FRAME),
)
.unwrap()
}
/// Indicates the way the record batch body was compressed
#[inline]
pub fn method(&self) -> BodyCompressionMethod {
self._tab
.get::<BodyCompressionMethod>(
BodyCompression::VT_METHOD,
Some(BodyCompressionMethod::BUFFER),
)
.unwrap()
}
}

pub struct BodyCompressionArgs {
pub codec: CompressionType,
pub method: BodyCompressionMethod,
}
impl<'a> Default for BodyCompressionArgs {
#[inline]
fn default() -> Self {
BodyCompressionArgs {
codec: CompressionType::LZ4_FRAME,
method: BodyCompressionMethod::BUFFER,
}
}
}
pub struct BodyCompressionBuilder<'a: 'b, 'b> {
fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a>,
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
}
impl<'a: 'b, 'b> BodyCompressionBuilder<'a, 'b> {
#[inline]
pub fn add_codec(&mut self, codec: CompressionType) {
self.fbb_.push_slot::<CompressionType>(
BodyCompression::VT_CODEC,
codec,
CompressionType::LZ4_FRAME,
);
}
#[inline]
pub fn add_method(&mut self, method: BodyCompressionMethod) {
self.fbb_.push_slot::<BodyCompressionMethod>(
BodyCompression::VT_METHOD,
method,
BodyCompressionMethod::BUFFER,
);
}
#[inline]
pub fn new(
_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>,
) -> BodyCompressionBuilder<'a, 'b> {
let start = _fbb.start_table();
BodyCompressionBuilder {
fbb_: _fbb,
start_: start,
}
}
#[inline]
pub fn finish(self) -> flatbuffers::WIPOffset<BodyCompression<'a>> {
let o = self.fbb_.end_table(self.start_);
flatbuffers::WIPOffset::new(o.value())
}
}

pub enum RecordBatchOffset {}
#[derive(Copy, Clone, Debug, PartialEq)]

Expand Down Expand Up @@ -216,6 +445,9 @@ impl<'a> RecordBatch<'a> {
) -> flatbuffers::WIPOffset<RecordBatch<'bldr>> {
let mut builder = RecordBatchBuilder::new(_fbb);
builder.add_length(args.length);
if let Some(x) = args.compression {
builder.add_compression(x);
}
if let Some(x) = args.buffers {
builder.add_buffers(x);
}
Expand All @@ -228,6 +460,7 @@ impl<'a> RecordBatch<'a> {
pub const VT_LENGTH: flatbuffers::VOffsetT = 4;
pub const VT_NODES: flatbuffers::VOffsetT = 6;
pub const VT_BUFFERS: flatbuffers::VOffsetT = 8;
pub const VT_COMPRESSION: flatbuffers::VOffsetT = 10;

/// number of records / rows. The arrays in the batch should all have this
/// length
Expand Down Expand Up @@ -262,12 +495,22 @@ impl<'a> RecordBatch<'a> {
)
.map(|v| v.safe_slice())
}
/// Optional compression of the message body
#[inline]
pub fn compression(&self) -> Option<BodyCompression<'a>> {
self._tab
.get::<flatbuffers::ForwardsUOffset<BodyCompression<'a>>>(
RecordBatch::VT_COMPRESSION,
None,
)
}
}

pub struct RecordBatchArgs<'a> {
pub length: i64,
pub nodes: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, FieldNode>>>,
pub buffers: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, Buffer>>>,
pub compression: Option<flatbuffers::WIPOffset<BodyCompression<'a>>>,
}
impl<'a> Default for RecordBatchArgs<'a> {
#[inline]
Expand All @@ -276,6 +519,7 @@ impl<'a> Default for RecordBatchArgs<'a> {
length: 0,
nodes: None,
buffers: None,
compression: None,
}
}
}
Expand Down Expand Up @@ -308,6 +552,17 @@ impl<'a: 'b, 'b> RecordBatchBuilder<'a, 'b> {
);
}
#[inline]
pub fn add_compression(
&mut self,
compression: flatbuffers::WIPOffset<BodyCompression<'b>>,
) {
self.fbb_
.push_slot_always::<flatbuffers::WIPOffset<BodyCompression>>(
RecordBatch::VT_COMPRESSION,
compression,
);
}
#[inline]
pub fn new(
_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>,
) -> RecordBatchBuilder<'a, 'b> {
Expand Down
Loading