From 865b27541f9793eeaf1afab79d36c3ddb101bfae Mon Sep 17 00:00:00 2001 From: CS <2498638+charliec443@users.noreply.github.com> Date: Sun, 5 Sep 2021 09:46:10 +1000 Subject: [PATCH 1/9] adding very hacky support for dates and timestamp formats --- python/src/types.rs | 87 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 85 insertions(+), 2 deletions(-) diff --git a/python/src/types.rs b/python/src/types.rs index bd6ef0d376e63..6943d803c861f 100644 --- a/python/src/types.rs +++ b/python/src/types.rs @@ -16,6 +16,7 @@ // under the License. use datafusion::arrow::datatypes::DataType; +use datafusion::arrow::datatypes::TimeUnit; use pyo3::{FromPyObject, PyAny, PyResult}; use crate::errors; @@ -28,13 +29,14 @@ pub struct PyDataType { impl<'source> FromPyObject<'source> for PyDataType { fn extract(ob: &'source PyAny) -> PyResult { + let str_ob = ob.to_string(); let id = ob.getattr("id")?.extract::()?; - let data_type = data_type_id(&id)?; + let data_type = data_type_id(&id, &str_ob)?; Ok(PyDataType { data_type }) } } -fn data_type_id(id: &i32) -> Result { +fn data_type_id(id: &i32, str_ob: &str) -> Result { // see https://github.com/apache/arrow/blob/3694794bdfd0677b95b8c95681e392512f1c9237/python/pyarrow/includes/libarrow.pxd // this is not ideal as it does not generalize for non-basic types // Find a way to get a unique name from the pyarrow.DataType @@ -53,6 +55,8 @@ fn data_type_id(id: &i32) -> Result { 12 => DataType::Float64, 13 => DataType::Utf8, 14 => DataType::Binary, + 16 => data_type_date(str_ob)?, + 18 => data_type_timestamp(str_ob)?, 34 => DataType::LargeUtf8, 35 => DataType::LargeBinary, other => { @@ -63,3 +67,82 @@ fn data_type_id(id: &i32) -> Result { } }) } + +fn data_type_timestamp(str_ob: &str) -> Result { + // maps to usage from apache/arrow/pyarrow/types.pxi + Ok(match str_ob.as_ref() { + "time32[s]" => DataType::Time32(TimeUnit::Second), + "time32[ms]" => DataType::Time32(TimeUnit::Millisecond), + "time64[us]" => DataType::Time64(TimeUnit::Microsecond), + "time64[ns]" => DataType::Time64(TimeUnit::Nanosecond), + "timestamp[s]" => DataType::Timestamp(TimeUnit::Second, None), + "timestamp[ms]" => DataType::Timestamp(TimeUnit::Millisecond, None), + "timestamp[us]" => DataType::Timestamp(TimeUnit::Microsecond, None), + "timestamp[ns]" => DataType::Timestamp(TimeUnit::Nanosecond, None), + _ => data_type_timestamp_infer(str_ob)?, + }) +} + +fn data_type_date(str_ob: &str) -> Result { + // maps to usage from apache/arrow/pyarrow/types.pxi + Ok(match str_ob.as_ref() { + "date32" => DataType::Date32, + "date64" => DataType::Date64, + "date32[day]" => DataType::Date32, + "date64[ms]" => DataType::Date64, + _ => { + return Err(errors::DataFusionError::Common(format!( + "invalid date {} provided", + str_ob + ))) + } + }) +} + +fn time_unit_str(unit: &str) -> Result { + Ok(match unit { + "s" => TimeUnit::Second, + "ms" => TimeUnit::Millisecond, + "us" => TimeUnit::Microsecond, + "ns" => TimeUnit::Nanosecond, + _ => { + return Err(errors::DataFusionError::Common(format!( + "invalid timestamp unit {} provided", + unit + ))) + } + }) +} + +fn data_type_timestamp_infer(str_ob: &str) -> Result { + // parse the timestamp string object - this approach is less than idea, as it requires maintaining + // this and more direct access methods are better + let chunks: Vec<_> = str_ob.split("[").collect(); + let timestamp_str: String = chunks[0].to_string(); + let mut unit_tz: String = chunks[1].to_string().replace(",", "").replace("]", ""); + + let mut tz: Option = None; + let mut unit: TimeUnit; + + if unit_tz.len() < 3 { + unit = time_unit_str(&unit_tz)?; + } else { + // manage timezones + let chunks: Vec<_> = unit_tz.split(" ").collect(); + let tz_part: Vec<_> = unit_tz.split("=").collect(); + unit = time_unit_str(&chunks[0])?; + tz = Some(tz_part[1].to_string()); + } + + Ok(match timestamp_str.as_ref() { + "time32" => DataType::Time32(unit), + "time64" => DataType::Time64(unit), + "timestamp" => DataType::Timestamp(unit, tz), + _ => { + return Err(errors::DataFusionError::Common(format!( + "invalid timestamp string {} provided", + str_ob + ))) + } + }) +} From 33211b72580babeab67361a65a9a35d02ec2f86b Mon Sep 17 00:00:00 2001 From: CS <2498638+charliec443@users.noreply.github.com> Date: Sun, 5 Sep 2021 18:34:43 +1000 Subject: [PATCH 2/9] bump --- python/src/types.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/src/types.rs b/python/src/types.rs index 6943d803c861f..24749cca3864a 100644 --- a/python/src/types.rs +++ b/python/src/types.rs @@ -119,10 +119,10 @@ fn data_type_timestamp_infer(str_ob: &str) -> Result = str_ob.split("[").collect(); let timestamp_str: String = chunks[0].to_string(); - let mut unit_tz: String = chunks[1].to_string().replace(",", "").replace("]", ""); + let unit_tz: String = chunks[1].to_string().replace(",", "").replace("]", ""); let mut tz: Option = None; - let mut unit: TimeUnit; + let unit: TimeUnit; if unit_tz.len() < 3 { unit = time_unit_str(&unit_tz)?; From 5151d6da06413e12d1a56e0480b08648ad7813b5 Mon Sep 17 00:00:00 2001 From: Charlie <2498638+charliec443@users.noreply.github.com> Date: Mon, 6 Sep 2021 05:20:33 +1000 Subject: [PATCH 3/9] Update types.rs --- python/src/types.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/src/types.rs b/python/src/types.rs index 24749cca3864a..393269f82de3b 100644 --- a/python/src/types.rs +++ b/python/src/types.rs @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -use datafusion::arrow::datatypes::DataType; -use datafusion::arrow::datatypes::TimeUnit; +use datafusion::arrow::datatypes::{DataType, TimeUnit}; use pyo3::{FromPyObject, PyAny, PyResult}; use crate::errors; From 14852b7a83f0b24da07cbe9eb6f66ce95edf0c6c Mon Sep 17 00:00:00 2001 From: CS <2498638+charliec443@users.noreply.github.com> Date: Mon, 6 Sep 2021 10:44:29 +1000 Subject: [PATCH 4/9] adding tests for dates using pyarrow only --- python/src/types.rs | 3 ++ python/tests/test_dates.py | 84 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+) create mode 100644 python/tests/test_dates.py diff --git a/python/src/types.rs b/python/src/types.rs index 393269f82de3b..a93e26fb6e40c 100644 --- a/python/src/types.rs +++ b/python/src/types.rs @@ -55,7 +55,10 @@ fn data_type_id(id: &i32, str_ob: &str) -> Result DataType::Utf8, 14 => DataType::Binary, 16 => data_type_date(str_ob)?, + 17 => data_type_date(str_ob)?, 18 => data_type_timestamp(str_ob)?, + 19 => data_type_timestamp(str_ob)?, + 20 => data_type_timestamp(str_ob)?, 34 => DataType::LargeUtf8, 35 => DataType::LargeBinary, other => { diff --git a/python/tests/test_dates.py b/python/tests/test_dates.py new file mode 100644 index 0000000000000..d99b3d5c84362 --- /dev/null +++ b/python/tests/test_dates.py @@ -0,0 +1,84 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from datetime import datetime + +import pyarrow as pa +import pytest +from datafusion import ExecutionContext + + +@pytest.fixture +def ctx(): + return ExecutionContext() + + +@pytest.mark.parametrize( + ("input_values", "input_type", "output_type"), + [ + ( + [datetime(1970, 1, 1), datetime(1970, 1, 2), datetime(1970, 1, 3)], + pa.date32(), + pa.date32(), + ), + ( + [datetime(1970, 1, 1), datetime(1970, 1, 2), datetime(1970, 1, 3)], + pa.date64(), + pa.date64(), + ), + ( + [datetime(1970, 1, 1), datetime(1970, 1, 2), datetime(1970, 1, 3)], + pa.timestamp("ms"), + pa.timestamp("ms"), + ), + ( + [datetime(1970, 1, 1), datetime(1970, 1, 2), datetime(1970, 1, 3)], + pa.timestamp("s"), + pa.timestamp("s"), + ), + ( + [datetime(1970, 1, 1), datetime(1970, 1, 2), datetime(1970, 1, 3)], + pa.timestamp("us"), + pa.timestamp("us"), + ), + ( + [datetime(1970, 1, 1), datetime(1970, 1, 2), datetime(1970, 1, 3)], + pa.timestamp("ns"), + pa.timestamp("ns"), + ), + pytest.param( + [datetime(1970, 1, 1), datetime(1970, 1, 2), datetime(1970, 1, 3)], + pa.time32("s"), + pa.time32("s"), + marks=pytest.mark.xfail, + ), + pytest.param( + [datetime(1970, 1, 1), datetime(1970, 1, 2), datetime(1970, 1, 3)], + pa.time64("us"), + pa.time64("us"), + marks=pytest.mark.xfail, + ), + ], +) +def test_datetypes(ctx, input_values, input_type, output_type): + batch = pa.RecordBatch.from_arrays( + [pa.array(input_values, type=input_type)], names=["a"] + ) + + df = ctx.create_dataframe([[batch]]) + result = df.collect()[0] + assert result.column(0).type == output_type From 72e6b3093ddc4ea57c6cf35809a9effb5b4ca0a8 Mon Sep 17 00:00:00 2001 From: CS <2498638+charliec443@users.noreply.github.com> Date: Mon, 6 Sep 2021 12:25:53 +1000 Subject: [PATCH 5/9] changing time32, time64 --- python/tests/test_dates.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/python/tests/test_dates.py b/python/tests/test_dates.py index d99b3d5c84362..a04a1146d0283 100644 --- a/python/tests/test_dates.py +++ b/python/tests/test_dates.py @@ -18,6 +18,8 @@ from datetime import datetime import pyarrow as pa +import numpy as np +import pandas as pd import pytest from datafusion import ExecutionContext @@ -60,17 +62,15 @@ def ctx(): pa.timestamp("ns"), pa.timestamp("ns"), ), - pytest.param( - [datetime(1970, 1, 1), datetime(1970, 1, 2), datetime(1970, 1, 3)], + ( + [0, 1, 2], pa.time32("s"), pa.time32("s"), - marks=pytest.mark.xfail, ), - pytest.param( - [datetime(1970, 1, 1), datetime(1970, 1, 2), datetime(1970, 1, 3)], + ( + [0, 1, 2], pa.time64("us"), pa.time64("us"), - marks=pytest.mark.xfail, ), ], ) From 3f3d6037a48287292f7da746d3d0ce637520897b Mon Sep 17 00:00:00 2001 From: CS <2498638+charliec443@users.noreply.github.com> Date: Mon, 6 Sep 2021 12:27:43 +1000 Subject: [PATCH 6/9] checking for equality, not only types --- python/tests/test_dates.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/tests/test_dates.py b/python/tests/test_dates.py index a04a1146d0283..bc4bca11d59d4 100644 --- a/python/tests/test_dates.py +++ b/python/tests/test_dates.py @@ -82,3 +82,4 @@ def test_datetypes(ctx, input_values, input_type, output_type): df = ctx.create_dataframe([[batch]]) result = df.collect()[0] assert result.column(0).type == output_type + assert result.column(0) == batch.column(0) From e6a8d65545d12945bf72e1b4e622d9440cf0655c Mon Sep 17 00:00:00 2001 From: CS <2498638+charliec443@users.noreply.github.com> Date: Mon, 6 Sep 2021 17:01:44 +1000 Subject: [PATCH 7/9] refactoring types --- python/src/types.rs | 64 +++++++++++++++++++++----------------- python/tests/generic.py | 11 +++++++ python/tests/test_dates.py | 30 +++++++++++++++++- 3 files changed, 75 insertions(+), 30 deletions(-) diff --git a/python/src/types.rs b/python/src/types.rs index a93e26fb6e40c..7dd74b6b98954 100644 --- a/python/src/types.rs +++ b/python/src/types.rs @@ -39,35 +39,41 @@ fn data_type_id(id: &i32, str_ob: &str) -> Result DataType::Boolean, - 2 => DataType::UInt8, - 3 => DataType::Int8, - 4 => DataType::UInt16, - 5 => DataType::Int16, - 6 => DataType::UInt32, - 7 => DataType::Int32, - 8 => DataType::UInt64, - 9 => DataType::Int64, - 10 => DataType::Float16, - 11 => DataType::Float32, - 12 => DataType::Float64, - 13 => DataType::Utf8, - 14 => DataType::Binary, - 16 => data_type_date(str_ob)?, - 17 => data_type_date(str_ob)?, - 18 => data_type_timestamp(str_ob)?, - 19 => data_type_timestamp(str_ob)?, - 20 => data_type_timestamp(str_ob)?, - 34 => DataType::LargeUtf8, - 35 => DataType::LargeBinary, - other => { - return Err(errors::DataFusionError::Common(format!( - "The type {} is not valid", - other - ))) - } - }) + if str_ob.contains("date") { + Ok(data_type_date(str_ob)?) + } else if str_ob.contains("time") { + Ok(data_type_timestamp(str_ob)?) + } else { + Ok(match id { + 1 => DataType::Boolean, + 2 => DataType::UInt8, + 3 => DataType::Int8, + 4 => DataType::UInt16, + 5 => DataType::Int16, + 6 => DataType::UInt32, + 7 => DataType::Int32, + 8 => DataType::UInt64, + 9 => DataType::Int64, + 10 => DataType::Float16, + 11 => DataType::Float32, + 12 => DataType::Float64, + 13 => DataType::Utf8, + 14 => DataType::Binary, + 16 => data_type_date(str_ob)?, + 17 => data_type_date(str_ob)?, + 18 => data_type_timestamp(str_ob)?, + 19 => data_type_timestamp(str_ob)?, + 20 => data_type_timestamp(str_ob)?, + 34 => DataType::LargeUtf8, + 35 => DataType::LargeBinary, + other => { + return Err(errors::DataFusionError::Common(format!( + "The type {} is not valid", + other + ))) + } + }) + } } fn data_type_timestamp(str_ob: &str) -> Result { diff --git a/python/tests/generic.py b/python/tests/generic.py index 8d5adaaaf9563..c9e96484febc5 100644 --- a/python/tests/generic.py +++ b/python/tests/generic.py @@ -66,6 +66,17 @@ def data_date32(): ) +def data_date64(): + data = [ + datetime.date(2000, 1, 1), + datetime.date(1980, 1, 1), + datetime.date(2030, 1, 1), + ] + return pa.array( + data, type=pa.date64(), mask=np.array([False, True, False]) + ) + + def data_timedelta(f): data = [ datetime.timedelta(days=100), diff --git a/python/tests/test_dates.py b/python/tests/test_dates.py index bc4bca11d59d4..17327e077f0a8 100644 --- a/python/tests/test_dates.py +++ b/python/tests/test_dates.py @@ -15,13 +15,17 @@ # specific language governing permissions and limitations # under the License. +import datetime from datetime import datetime -import pyarrow as pa import numpy as np import pandas as pd +import pyarrow as pa import pytest from datafusion import ExecutionContext +from datafusion import functions as f + +from . import generic as helpers @pytest.fixture @@ -29,6 +33,30 @@ def ctx(): return ExecutionContext() +@pytest.fixture +def df(): + ctx = ExecutionContext() + + # create a RecordBatch and a new DataFrame from it + batch = pa.RecordBatch.from_arrays( + [helpers.data_datetime("s"), helpers.data_date32(), helpers.data_date64()], + names=["ts", "dt1", "dt2"], + ) + + return ctx.create_dataframe([[batch]]) + + +def test_select_ts_date(df): + df = df.select(f.col("ts"), f.col("dt1"), f.col("dt2")) + + # execute and collect the first (and only) batch + result = df.collect()[0] + + assert result.column(0) == helpers.data_datetime("s") + assert result.column(1) == helpers.data_date32() + assert result.column(2) == helpers.data_date64() + + @pytest.mark.parametrize( ("input_values", "input_type", "output_type"), [ From 4a345b47397cbb8ec28f392fda3f822210b85f96 Mon Sep 17 00:00:00 2001 From: CS <2498638+charliec443@users.noreply.github.com> Date: Mon, 6 Sep 2021 17:03:38 +1000 Subject: [PATCH 8/9] removing unreachable code --- python/src/types.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/python/src/types.rs b/python/src/types.rs index 7dd74b6b98954..6201ce374c642 100644 --- a/python/src/types.rs +++ b/python/src/types.rs @@ -59,11 +59,6 @@ fn data_type_id(id: &i32, str_ob: &str) -> Result DataType::Float64, 13 => DataType::Utf8, 14 => DataType::Binary, - 16 => data_type_date(str_ob)?, - 17 => data_type_date(str_ob)?, - 18 => data_type_timestamp(str_ob)?, - 19 => data_type_timestamp(str_ob)?, - 20 => data_type_timestamp(str_ob)?, 34 => DataType::LargeUtf8, 35 => DataType::LargeBinary, other => { From 9523084d23c07340d025f8753f6b488ce71e83d5 Mon Sep 17 00:00:00 2001 From: CS <2498638+charliec443@users.noreply.github.com> Date: Tue, 14 Sep 2021 04:58:35 +1000 Subject: [PATCH 9/9] fix lint issues --- python/tests/test_dates.py | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/python/tests/test_dates.py b/python/tests/test_dates.py index 17327e077f0a8..0f7ce3baa9492 100644 --- a/python/tests/test_dates.py +++ b/python/tests/test_dates.py @@ -15,11 +15,8 @@ # specific language governing permissions and limitations # under the License. -import datetime from datetime import datetime -import numpy as np -import pandas as pd import pyarrow as pa import pytest from datafusion import ExecutionContext @@ -39,7 +36,11 @@ def df(): # create a RecordBatch and a new DataFrame from it batch = pa.RecordBatch.from_arrays( - [helpers.data_datetime("s"), helpers.data_date32(), helpers.data_date64()], + [ + helpers.data_datetime("s"), + helpers.data_date32(), + helpers.data_date64(), + ], names=["ts", "dt1", "dt2"], ) @@ -90,16 +91,8 @@ def test_select_ts_date(df): pa.timestamp("ns"), pa.timestamp("ns"), ), - ( - [0, 1, 2], - pa.time32("s"), - pa.time32("s"), - ), - ( - [0, 1, 2], - pa.time64("us"), - pa.time64("us"), - ), + ([0, 1, 2], pa.time32("s"), pa.time32("s"),), + ([0, 1, 2], pa.time64("us"), pa.time64("us"),), ], ) def test_datetypes(ctx, input_values, input_type, output_type):