From 530a256880f451f5eed9b1f4bac047e940b098f3 Mon Sep 17 00:00:00 2001 From: Max Burke Date: Wed, 25 Mar 2020 19:45:18 -0700 Subject: [PATCH 1/5] [rust] Rust Arrow IPC reader must respect continuation markers. A continuation marker (value of 0xffffffff) in a message size block is used to align the next block to an 8-byte boundary. This value needs to be skipped over if encountered. --- rust/arrow/src/ipc/reader.rs | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/rust/arrow/src/ipc/reader.rs b/rust/arrow/src/ipc/reader.rs index 632c3540b76..803561160ee 100644 --- a/rust/arrow/src/ipc/reader.rs +++ b/rust/arrow/src/ipc/reader.rs @@ -33,6 +33,8 @@ use crate::ipc; use crate::record_batch::{RecordBatch, RecordBatchReader}; use DataType::*; +const CONTINUATION_MARKER: u32 = 0xffff_ffff; + /// Read a buffer based on offset and length fn read_buffer(buf: &ipc::Buffer, a_data: &Vec) -> Buffer { let start_offset = buf.offset() as usize; @@ -730,6 +732,12 @@ impl StreamReader { let mut meta_buffer = vec![0; meta_len as usize]; reader.read_exact(&mut meta_buffer)?; + // If a continuation marker is encountered, skip over it and read + // the size from the next four bytes. + if u32::from_le_bytes(meta_size) == CONTINUATION_MARKER { + reader.read_exact(&mut meta_size)?; + } + let vecs = &meta_buffer.to_vec(); let message = ipc::get_root_as_message(vecs); // message header is a Schema, so read it @@ -762,7 +770,18 @@ impl StreamReader { // determine metadata length let mut meta_size: [u8; 4] = [0; 4]; self.reader.read_exact(&mut meta_size)?; - let meta_len = u32::from_le_bytes(meta_size); + let meta_len = { + let meta_len = u32::from_le_bytes(meta_size); + + // If a continuation marker is encountered, skip over it and read + // the size from the next four bytes. + if meta_len == CONTINUATION_MARKER { + self.reader.read_exact(&mut meta_size)?; + u32::from_le_bytes(meta_size) + } else { + meta_len + } + }; if meta_len == 0 { // the stream has ended, mark the reader as finished From 7949e689250f4175291f47abcb1706bcf52d392e Mon Sep 17 00:00:00 2001 From: Max Burke Date: Thu, 26 Mar 2020 12:30:04 -0700 Subject: [PATCH 2/5] lint --- rust/datafusion/src/logicalplan.rs | 4 ++-- rust/datafusion/src/sql/planner.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/rust/datafusion/src/logicalplan.rs b/rust/datafusion/src/logicalplan.rs index 48fbae96ef7..2b690020608 100644 --- a/rust/datafusion/src/logicalplan.rs +++ b/rust/datafusion/src/logicalplan.rs @@ -820,8 +820,8 @@ mod tests { .build()?; let expected = "Projection: #0, #1 AS total_salary\ - \n Aggregate: groupBy=[[#0]], aggr=[[SUM(#1)]]\ - \n TableScan: employee.csv projection=Some([3, 4])"; + \n Aggregate: groupBy=[[#0]], aggr=[[SUM(#1)]]\ + \n TableScan: employee.csv projection=Some([3, 4])"; assert_eq!(expected, format!("{:?}", plan)); diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index 000dd49f9ab..61151463367 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -603,7 +603,7 @@ mod tests { fn select_aliased_scalar_func() { let sql = "SELECT sqrt(age) AS square_people FROM person"; let expected = "Projection: sqrt(CAST(#3 AS Float64)) AS square_people\ - \n TableScan: person projection=None"; + \n TableScan: person projection=None"; quick_test(sql, expected); } From 46fe554343c5d4c23342dd0cdda6e19046421b0d Mon Sep 17 00:00:00 2001 From: Max Burke Date: Thu, 26 Mar 2020 13:07:12 -0700 Subject: [PATCH 3/5] formatting --- rust/datafusion/src/sql/planner.rs | 7 ++++--- testing | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index 61151463367..9d16b481716 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -272,9 +272,10 @@ impl SqlToRel { Ok(Expr::Literal(ScalarValue::Utf8(s.clone()))) } - ASTNode::SQLAliasedExpr(ref expr, ref alias) => { - Ok(Alias(Arc::new(self.sql_to_rex(&expr, schema)?), alias.to_owned())) - } + ASTNode::SQLAliasedExpr(ref expr, ref alias) => Ok(Alias( + Arc::new(self.sql_to_rex(&expr, schema)?), + alias.to_owned(), + )), ASTNode::SQLIdentifier(ref id) => { match schema.fields().iter().position(|c| c.name().eq(id)) { diff --git a/testing b/testing index 58b29c418f8..5ebe03165fc 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit 58b29c418f86ffc1f34ea3f3b20532067795d483 +Subproject commit 5ebe03165fc41061e253479798816fe4da111cde From 708a51b278ec9ea343c7d28b529304fadc6b3d6e Mon Sep 17 00:00:00 2001 From: Max Burke Date: Thu, 26 Mar 2020 13:07:12 -0700 Subject: [PATCH 4/5] formatting --- rust/datafusion/src/sql/planner.rs | 7 ++++--- testing | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index 61151463367..9d16b481716 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -272,9 +272,10 @@ impl SqlToRel { Ok(Expr::Literal(ScalarValue::Utf8(s.clone()))) } - ASTNode::SQLAliasedExpr(ref expr, ref alias) => { - Ok(Alias(Arc::new(self.sql_to_rex(&expr, schema)?), alias.to_owned())) - } + ASTNode::SQLAliasedExpr(ref expr, ref alias) => Ok(Alias( + Arc::new(self.sql_to_rex(&expr, schema)?), + alias.to_owned(), + )), ASTNode::SQLIdentifier(ref id) => { match schema.fields().iter().position(|c| c.name().eq(id)) { diff --git a/testing b/testing index 58b29c418f8..b46e3a709c9 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit 58b29c418f86ffc1f34ea3f3b20532067795d483 +Subproject commit b46e3a709c948ef77584635098e5f921fe05700c From b935c3413e8baca732add7db2ed81dd680c856ce Mon Sep 17 00:00:00 2001 From: Max Burke Date: Thu, 26 Mar 2020 16:28:17 -0700 Subject: [PATCH 5/5] Update testing submodule --- testing | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testing b/testing index 5ebe03165fc..b46e3a709c9 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit 5ebe03165fc41061e253479798816fe4da111cde +Subproject commit b46e3a709c948ef77584635098e5f921fe05700c