Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/submodules/parquet-testing
3 changes: 2 additions & 1 deletion rust/arrow/src/ipc/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,11 +518,12 @@ pub(crate) fn get_fb_field_type<'a: 'b, 'b>(
)
}
List(ref list_type) => {
let field_name = fbb.create_string("list"); // field schema requires name to be not None
let inner_types = get_fb_field_type(list_type, fbb);
let child = ipc::Field::create(
fbb,
&ipc::FieldArgs {
name: None,
name: Some(field_name),
nullable: false,
type_type: inner_types.0,
type_: Some(inner_types.1),
Expand Down
116 changes: 115 additions & 1 deletion rust/arrow/src/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,25 @@ mod tests {
.len(4)
.add_buffer(Buffer::from([42, 28, 19, 31].to_byte_slice()))
.build();

// Construct a value array
let value_data = ArrayData::builder(DataType::Int32)
.len(9)
.add_buffer(Buffer::from(&[0, 1, 2, 3, 4, 5, 6, 7, 8].to_byte_slice()))
.build();

// Construct a buffer for value offsets, for the nested array:
// [[0, 1, 2], [3, 4, 5], [6, 7], [8]]
let value_offsets = Buffer::from(&[0, 3, 6, 8, 9].to_byte_slice());

// Construct a list array from the above two
let list_data_type = DataType::List(Box::new(DataType::Int32));
let list_data = ArrayData::builder(list_data_type.clone())
.len(4)
.add_buffer(value_offsets.clone())
.add_child_data(value_data.clone())
.build();

let struct_array = StructArray::from(vec![
(
Field::new("b", DataType::Boolean, false),
Expand All @@ -310,16 +329,111 @@ mod tests {
Field::new("c", DataType::Int32, false),
Arc::new(Int32Array::from(vec![42, 28, 19, 31])),
),
(
Field::new("d", DataType::List(Box::new(DataType::Int32)), false),
Arc::new(ListArray::from(list_data.clone())),
),
]);

let batch = RecordBatch::from(&struct_array);
assert_eq!(2, batch.num_columns());
assert_eq!(3, batch.num_columns());
assert_eq!(4, batch.num_rows());
assert_eq!(
struct_array.data_type(),
&DataType::Struct(batch.schema().fields().to_vec())
);
assert_eq!(batch.column(0).data(), boolean_data);
assert_eq!(batch.column(1).data(), int_data);
assert_eq!(batch.column(2).data(), list_data);
}

#[test]
fn create_record_batch_with_list_column() {
let schema = Schema::new(vec![Field::new(
"a",
DataType::List(Box::new(DataType::Int32)),
false,
)]);

// Construct a value array
let value_data = ArrayData::builder(DataType::Int32)
.len(8)
.add_buffer(Buffer::from(&[0, 1, 2, 3, 4, 5, 6, 7].to_byte_slice()))
.build();

// Construct a buffer for value offsets, for the nested array:
// [[0, 1, 2], [3, 4, 5], [6, 7]]
let value_offsets = Buffer::from(&[0, 3, 6, 8].to_byte_slice());

// Construct a list array from the above two
let list_data_type = DataType::List(Box::new(DataType::Int32));
let list_data = ArrayData::builder(list_data_type.clone())
.len(3)
.add_buffer(value_offsets.clone())
.add_child_data(value_data.clone())
.build();
let a = ListArray::from(list_data);

let record_batch =
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();

assert_eq!(3, record_batch.num_rows());
assert_eq!(1, record_batch.num_columns());
assert_eq!(
&DataType::List(Box::new(DataType::Int32)),
record_batch.schema().field(0).data_type()
);
assert_eq!(3, record_batch.column(0).data().len());
}

#[test]
fn create_record_batch_with_list_column_nulls() {
let schema = Schema::new(vec![Field::new(
"a",
DataType::List(Box::new(DataType::Int32)),
false,
)]);

let values_builder = PrimitiveBuilder::<Int32Type>::new(10);
let mut builder = ListBuilder::new(values_builder);

builder.values().append_null().unwrap();
builder.values().append_null().unwrap();
builder.append(true).unwrap();
builder.append(false).unwrap();
builder.append(true).unwrap();

// [[null, null], null, []]
let list_array = builder.finish();

let record_batch =
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_array)]).unwrap();

assert_eq!(3, record_batch.num_rows());
assert_eq!(1, record_batch.num_columns());
assert_eq!(
&DataType::List(Box::new(DataType::Int32)),
record_batch.schema().field(0).data_type()
);
assert_eq!(3, record_batch.column(0).data().len());

assert_eq!(false, record_batch.column(0).is_null(0));
assert_eq!(true, record_batch.column(0).is_null(1));
assert_eq!(false, record_batch.column(0).is_null(2));

let col_as_list_array = record_batch
.column(0)
.as_any()
.downcast_ref::<ListArray>()
.unwrap();

assert_eq!(2, col_as_list_array.value(0).len());
assert_eq!(0, col_as_list_array.value(2).len());

let sublist_0_val = col_as_list_array.value(0);
let sublist_0 = sublist_0_val.as_any().downcast_ref::<Int32Array>().unwrap();

assert_eq!(true, sublist_0.is_null(0));
assert_eq!(true, sublist_0.is_null(1));
}
}
17 changes: 17 additions & 0 deletions rust/arrow/src/util/pretty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,22 @@ macro_rules! make_string {
}};
}

macro_rules! make_string_from_list {
($column: ident, $row: ident) => {{
let list = $column
.as_any()
.downcast_ref::<array::ListArray>()
.ok_or(ArrowError::InvalidArgumentError(format!(
"Repl error: could not convert list column to list array."
)))?
.value($row);
let string_values = (0..list.len())
.map(|i| array_value_to_string(list.clone(), i))
.collect::<Result<Vec<String>>>()?;
Ok(format!("[{}]", string_values.join(", ")))
}};
}

/// Get the value at the given row in an array as a string
fn array_value_to_string(column: array::ArrayRef, row: usize) -> Result<String> {
match column.data_type() {
Expand Down Expand Up @@ -125,6 +141,7 @@ fn array_value_to_string(column: array::ArrayRef, row: usize) -> Result<String>
DataType::Time64(unit) if *unit == TimeUnit::Nanosecond => {
make_string!(array::Time64NanosecondArray, column, row)
}
DataType::List(_) => make_string_from_list!(column, row),
_ => Err(ArrowError::InvalidArgumentError(format!(
"Unsupported {:?} type for repl.",
column.data_type()
Expand Down
4 changes: 2 additions & 2 deletions rust/datafusion/src/execution/physical_plan/expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ use arrow::compute;
use arrow::compute::kernels::arithmetic::{add, divide, multiply, subtract};
use arrow::compute::kernels::boolean::{and, or};
use arrow::compute::kernels::cast::cast;
use arrow::compute::kernels::comparison::{eq, gt, gt_eq, lt, lt_eq, neq};
use arrow::compute::kernels::comparison::{
eq_utf8, gt_eq_utf8, gt_utf8, like_utf8, lt_eq_utf8, lt_utf8, neq_utf8, nlike_utf8,
eq, eq_utf8, gt, gt_eq, gt_eq_utf8, gt_utf8, like_utf8, lt, lt_eq, lt_eq_utf8,
lt_utf8, neq, neq_utf8, nlike_utf8,
};
use arrow::datatypes::{DataType, Schema, TimeUnit};
use arrow::record_batch::RecordBatch;
Expand Down
4 changes: 2 additions & 2 deletions rust/datafusion/src/logicalplan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1006,8 +1006,8 @@ mod tests {
.build()?;

let expected = "Projection: #state, #total_salary\
\n Aggregate: groupBy=[[#state]], aggr=[[SUM(#salary) AS total_salary]]\
\n TableScan: employee.csv projection=Some([3, 4])";
\n Aggregate: groupBy=[[#state]], aggr=[[SUM(#salary) AS total_salary]]\
\n TableScan: employee.csv projection=Some([3, 4])";

assert_eq!(expected, format!("{:?}", plan));

Expand Down
16 changes: 8 additions & 8 deletions rust/datafusion/src/optimizer/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ mod tests {
.build()?;

let expected = "Aggregate: groupBy=[[]], aggr=[[MAX(#0)]]\
\n TableScan: test projection=Some([1])";
\n TableScan: test projection=Some([1])";

assert_optimized_plan_eq(&plan, expected);

Expand All @@ -348,7 +348,7 @@ mod tests {
.build()?;

let expected = "Aggregate: groupBy=[[#1]], aggr=[[MAX(#0)]]\
\n TableScan: test projection=Some([1, 2])";
\n TableScan: test projection=Some([1, 2])";

assert_optimized_plan_eq(&plan, expected);

Expand All @@ -365,8 +365,8 @@ mod tests {
.build()?;

let expected = "Aggregate: groupBy=[[]], aggr=[[MAX(#0)]]\
\n Selection: #1\
\n TableScan: test projection=Some([1, 2])";
\n Selection: #1\
\n TableScan: test projection=Some([1, 2])";

assert_optimized_plan_eq(&plan, expected);

Expand All @@ -385,7 +385,7 @@ mod tests {
.build()?;

let expected = "Projection: CAST(#0 AS Float64)\
\n TableScan: test projection=Some([2])";
\n TableScan: test projection=Some([2])";

assert_optimized_plan_eq(&projection, expected);

Expand All @@ -405,7 +405,7 @@ mod tests {
assert_fields_eq(&plan, vec!["a", "b"]);

let expected = "Projection: #0, #1\
\n TableScan: test projection=Some([0, 1])";
\n TableScan: test projection=Some([0, 1])";

assert_optimized_plan_eq(&plan, expected);

Expand All @@ -426,8 +426,8 @@ mod tests {
assert_fields_eq(&plan, vec!["c", "a"]);

let expected = "Limit: UInt32(5)\
\n Projection: #1, #0\
\n TableScan: test projection=Some([0, 2])";
\n Projection: #1, #0\
\n TableScan: test projection=Some([0, 2])";

assert_optimized_plan_eq(&plan, expected);

Expand Down
2 changes: 1 addition & 1 deletion rust/datafusion/src/sql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ mod tests {
quick_test(
"SELECT * from person",
"Projection: #0, #1, #2, #3, #4, #5, #6\
\n TableScan: person projection=None",
\n TableScan: person projection=None",
);
}

Expand Down
97 changes: 96 additions & 1 deletion rust/datafusion/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
// specific language governing permissions and limitations
// under the License.

use std::convert::TryFrom;
use std::env;
use std::sync::Arc;

extern crate arrow;
extern crate datafusion;

use arrow::array::*;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::datatypes::{DataType, Field, Int64Type, Schema};
use arrow::record_batch::RecordBatch;

use datafusion::datasource::csv::CsvReadOptions;
Expand Down Expand Up @@ -119,6 +120,100 @@ fn parquet_single_nan_schema() {
}
}

#[test]
fn parquet_list_columns() {
let mut ctx = ExecutionContext::new();
let testdata = env::var("PARQUET_TEST_DATA").expect("PARQUET_TEST_DATA not defined");
ctx.register_parquet(
"list_columns",
&format!("{}/list_columns.parquet", testdata),
)
.unwrap();

let schema = Arc::new(Schema::new(vec![
Field::new(
"int64_list",
DataType::List(Box::new(DataType::Int64)),
true,
),
Field::new("utf8_list", DataType::List(Box::new(DataType::Utf8)), true),
]));

let sql = "SELECT int64_list, utf8_list FROM list_columns";
let plan = ctx.create_logical_plan(&sql).unwrap();
let plan = ctx.optimize(&plan).unwrap();
let plan = ctx.create_physical_plan(&plan, DEFAULT_BATCH_SIZE).unwrap();
let results = ctx.collect(plan.as_ref()).unwrap();

// int64_list utf8_list
// 0 [1, 2, 3] [abc, efg, hij]
// 1 [None, 1] None
// 2 [4] [efg, None, hij, xyz]

assert_eq!(1, results.len());
let batch = &results[0];
assert_eq!(3, batch.num_rows());
assert_eq!(2, batch.num_columns());
assert_eq!(&schema, batch.schema());

let int_list_array = batch
.column(0)
.as_any()
.downcast_ref::<ListArray>()
.unwrap();
let utf8_list_array = batch
.column(1)
.as_any()
.downcast_ref::<ListArray>()
.unwrap();

assert_eq!(
int_list_array
.value(0)
.as_any()
.downcast_ref::<PrimitiveArray<Int64Type>>()
.unwrap(),
&PrimitiveArray::<Int64Type>::from(vec![Some(1), Some(2), Some(3),])
);

assert_eq!(
utf8_list_array
.value(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap(),
&StringArray::try_from(vec![Some("abc"), Some("efg"), Some("hij"),]).unwrap()
);

assert_eq!(
int_list_array
.value(1)
.as_any()
.downcast_ref::<PrimitiveArray<Int64Type>>()
.unwrap(),
&PrimitiveArray::<Int64Type>::from(vec![None, Some(1),])
);

assert!(utf8_list_array.is_null(1));

assert_eq!(
int_list_array
.value(2)
.as_any()
.downcast_ref::<PrimitiveArray<Int64Type>>()
.unwrap(),
&PrimitiveArray::<Int64Type>::from(vec![Some(4),])
);

let result = utf8_list_array.value(2);
let result = result.as_any().downcast_ref::<StringArray>().unwrap();

assert_eq!(result.value(0), "efg");
assert!(result.is_null(1));
assert_eq!(result.value(2), "hij");
assert_eq!(result.value(3), "xyz");
}

#[test]
fn csv_count_star() -> Result<()> {
let mut ctx = ExecutionContext::new();
Expand Down
Loading