Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion rust/arrow/src/ipc/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use crate::ipc;
use crate::record_batch::{RecordBatch, RecordBatchReader};
use DataType::*;

const CONTINUATION_MARKER: u32 = 0xffff_ffff;

/// Read a buffer based on offset and length
fn read_buffer(buf: &ipc::Buffer, a_data: &Vec<u8>) -> Buffer {
let start_offset = buf.offset() as usize;
Expand Down Expand Up @@ -730,6 +732,12 @@ impl<R: Read> StreamReader<R> {
let mut meta_buffer = vec![0; meta_len as usize];
reader.read_exact(&mut meta_buffer)?;

// If a continuation marker is encountered, skip over it and read
// the size from the next four bytes.
if u32::from_le_bytes(meta_size) == CONTINUATION_MARKER {
reader.read_exact(&mut meta_size)?;
}

let vecs = &meta_buffer.to_vec();
let message = ipc::get_root_as_message(vecs);
// message header is a Schema, so read it
Expand Down Expand Up @@ -762,7 +770,18 @@ impl<R: Read> StreamReader<R> {
// determine metadata length
let mut meta_size: [u8; 4] = [0; 4];
self.reader.read_exact(&mut meta_size)?;
let meta_len = u32::from_le_bytes(meta_size);
let meta_len = {
let meta_len = u32::from_le_bytes(meta_size);

// If a continuation marker is encountered, skip over it and read
// the size from the next four bytes.
if meta_len == CONTINUATION_MARKER {
self.reader.read_exact(&mut meta_size)?;
u32::from_le_bytes(meta_size)
} else {
meta_len
}
};

if meta_len == 0 {
// the stream has ended, mark the reader as finished
Expand Down
4 changes: 2 additions & 2 deletions rust/datafusion/src/logicalplan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -820,8 +820,8 @@ mod tests {
.build()?;

let expected = "Projection: #0, #1 AS total_salary\
\n Aggregate: groupBy=[[#0]], aggr=[[SUM(#1)]]\
\n TableScan: employee.csv projection=Some([3, 4])";
\n Aggregate: groupBy=[[#0]], aggr=[[SUM(#1)]]\
\n TableScan: employee.csv projection=Some([3, 4])";

assert_eq!(expected, format!("{:?}", plan));

Expand Down
9 changes: 5 additions & 4 deletions rust/datafusion/src/sql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,10 @@ impl<S: SchemaProvider> SqlToRel<S> {
Ok(Expr::Literal(ScalarValue::Utf8(s.clone())))
}

ASTNode::SQLAliasedExpr(ref expr, ref alias) => {
Ok(Alias(Arc::new(self.sql_to_rex(&expr, schema)?), alias.to_owned()))
}
ASTNode::SQLAliasedExpr(ref expr, ref alias) => Ok(Alias(
Arc::new(self.sql_to_rex(&expr, schema)?),
alias.to_owned(),
)),

ASTNode::SQLIdentifier(ref id) => {
match schema.fields().iter().position(|c| c.name().eq(id)) {
Expand Down Expand Up @@ -603,7 +604,7 @@ mod tests {
fn select_aliased_scalar_func() {
let sql = "SELECT sqrt(age) AS square_people FROM person";
let expected = "Projection: sqrt(CAST(#3 AS Float64)) AS square_people\
\n TableScan: person projection=None";
\n TableScan: person projection=None";
quick_test(sql, expected);
}

Expand Down