diff --git a/ci/docker/conda-python-spark.dockerfile b/ci/docker/conda-python-spark.dockerfile index a20f1ff3521..4aed6db0fa0 100644 --- a/ci/docker/conda-python-spark.dockerfile +++ b/ci/docker/conda-python-spark.dockerfile @@ -40,6 +40,10 @@ RUN /arrow/ci/scripts/install_spark.sh ${spark} /spark COPY ci/etc/integration_spark_ARROW-9438.patch /arrow/ci/etc/ RUN patch -d /spark -p1 -i /arrow/ci/etc/integration_spark_ARROW-9438.patch +# patch spark to handle struct timestamps with tzinfo +COPY ci/etc/integration_spark_ARROW-9223.patch /arrow/ci/etc/ +RUN patch -d /spark -p1 -i /arrow/ci/etc/integration_spark_ARROW-9223.patch + # build cpp with tests ENV CC=gcc \ CXX=g++ \ diff --git a/ci/etc/integration_spark_ARROW-9223.patch b/ci/etc/integration_spark_ARROW-9223.patch new file mode 100644 index 00000000000..39bcc0bac3a --- /dev/null +++ b/ci/etc/integration_spark_ARROW-9223.patch @@ -0,0 +1,37 @@ +diff --git a/python/pyspark/sql/pandas/serializers.py b/python/pyspark/sql/pandas/serializers.py +index 42562e1fb9..d00b67e99b 100644 +--- a/python/pyspark/sql/pandas/serializers.py ++++ b/python/pyspark/sql/pandas/serializers.py +@@ -120,15 +120,30 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer): + + def arrow_to_pandas(self, arrow_column): + from pyspark.sql.pandas.types import _check_series_localize_timestamps +- import pyarrow ++ import pyarrow as pa + + # If the given column is a date type column, creates a series of datetime.date directly + # instead of creating datetime64[ns] as intermediate data to avoid overflow caused by + # datetime64[ns] type handling. + s = arrow_column.to_pandas(date_as_object=True) + +- if pyarrow.types.is_timestamp(arrow_column.type): ++ if pa.types.is_timestamp(arrow_column.type): + return _check_series_localize_timestamps(s, self._timezone) ++ elif pa.types.is_struct(arrow_column.type): ++ if isinstance(arrow_column, pa.ChunkedArray): ++ arrow_column = pa.concat_arrays(arrow_column.iterchunks()) ++ children = [] ++ names = [] ++ for f in arrow_column.type: ++ child = arrow_column.field(f.name) ++ if pa.types.is_timestamp(child.type): ++ child_series = child.to_pandas() ++ child_series = _check_series_localize_timestamps(child_series, self._timezone) ++ child = pa.array(child_series, type=pa.timestamp('us')) ++ children.append(child) ++ names.append(f.name) ++ arr = pa.StructArray.from_arrays(children, names) ++ return arr.to_pandas(date_as_object=True) + else: + return s + diff --git a/cpp/src/arrow/python/inference.cc b/cpp/src/arrow/python/inference.cc index c2fc06e554c..3df9a5dca56 100644 --- a/cpp/src/arrow/python/inference.cc +++ b/cpp/src/arrow/python/inference.cc @@ -295,10 +295,7 @@ class TypeInferrer { int_count_(0), date_count_(0), time_count_(0), - timestamp_second_count_(0), - timestamp_milli_count_(0), timestamp_micro_count_(0), - timestamp_nano_count_(0), duration_count_(0), float_count_(0), binary_count_(0), @@ -332,6 +329,13 @@ class TypeInferrer { ++int_count_; } else if (PyDateTime_Check(obj)) { ++timestamp_micro_count_; + OwnedRef tzinfo(PyObject_GetAttrString(obj, "tzinfo")); + if (tzinfo.obj() != nullptr && tzinfo.obj() != Py_None && timezone_.empty()) { + // From public docs on array construction + // "Localized timestamps will currently be returned as UTC " + // representation). " + timezone_ = "UTC"; + } *keep_going = make_unions_; } else if (PyDelta_Check(obj)) { ++duration_count_; @@ -458,14 +462,8 @@ class TypeInferrer { *out = date32(); } else if (time_count_) { *out = time64(TimeUnit::MICRO); - } else if (timestamp_nano_count_) { - *out = timestamp(TimeUnit::NANO); } else if (timestamp_micro_count_) { - *out = timestamp(TimeUnit::MICRO); - } else if (timestamp_milli_count_) { - *out = timestamp(TimeUnit::MILLI); - } else if (timestamp_second_count_) { - *out = timestamp(TimeUnit::SECOND); + *out = timestamp(TimeUnit::MICRO, timezone_); } else if (duration_count_) { *out = duration(TimeUnit::MICRO); } else if (bool_count_) { @@ -597,10 +595,8 @@ class TypeInferrer { int64_t int_count_; int64_t date_count_; int64_t time_count_; - int64_t timestamp_second_count_; - int64_t timestamp_milli_count_; + std::string timezone_; int64_t timestamp_micro_count_; - int64_t timestamp_nano_count_; int64_t duration_count_; int64_t float_count_; int64_t binary_count_; diff --git a/cpp/src/arrow/python/python_to_arrow.cc b/cpp/src/arrow/python/python_to_arrow.cc index 66a1e410265..d15cf2dfdcf 100644 --- a/cpp/src/arrow/python/python_to_arrow.cc +++ b/cpp/src/arrow/python/python_to_arrow.cc @@ -558,6 +558,8 @@ template class TemporalConverter : public TimeConverter { public: using TimeConverter::TimeConverter; + TemporalConverter(TimeUnit::type unit, PyObject* utc) + : TimeConverter(unit), utc_(utc) {} Status AppendValue(PyObject* obj) override { int64_t value; @@ -569,11 +571,22 @@ class TemporalConverter : public TimeConverter { return this->typed_builder_->AppendNull(); } } else { - // convert builtin python objects - ARROW_ASSIGN_OR_RAISE(value, ValueConverter::FromPython(obj, this->unit_)); + PyObject* target = obj; + OwnedRef target_holder; + if (PyDateTime_Check(obj)) { + OwnedRef tzinfo(PyObject_GetAttrString(obj, "tzinfo")); + if (tzinfo.obj() != nullptr && tzinfo.obj() != Py_None) { + target_holder.reset(PyObject_CallMethod(obj, "astimezone", "O", utc_.obj())); + target = target_holder.obj(); + } + } + ARROW_ASSIGN_OR_RAISE(value, ValueConverter::FromPython(target, this->unit_)); } return this->typed_builder_->Append(value); } + + private: + OwnedRef utc_; }; // ---------------------------------------------------------------------- @@ -1169,9 +1182,11 @@ Status GetConverterFlat(const std::shared_ptr& type, bool strict_conve break; } case Type::TIMESTAMP: { + PyObject* utc; + RETURN_NOT_OK(internal::StringToTzinfo("UTC", &utc)); *out = std::unique_ptr(new TemporalConverter( - checked_cast(*type).unit())); + checked_cast(*type).unit(), utc)); break; } case Type::DURATION: { diff --git a/python/pyarrow/scalar.pxi b/python/pyarrow/scalar.pxi index 248d92628b3..16302311cd0 100644 --- a/python/pyarrow/scalar.pxi +++ b/python/pyarrow/scalar.pxi @@ -331,7 +331,8 @@ cdef class Date64Scalar(Scalar): def as_py(self): """ - Return this value as a Python datetime.datetime instance. + Return this value as a Pandas Timestamp instance (if available), + otherwise as a Python datetime.datetime instance. """ cdef CDate64Scalar* sp = self.wrapped.get() diff --git a/python/pyarrow/tests/test_array.py b/python/pyarrow/tests/test_array.py index 77fc9b22635..f91d7da7a46 100644 --- a/python/pyarrow/tests/test_array.py +++ b/python/pyarrow/tests/test_array.py @@ -31,6 +31,7 @@ import pickle5 except ImportError: pickle5 = None +import pytz import pyarrow as pa import pyarrow.tests.strategies as past @@ -300,6 +301,8 @@ def test_nulls(ty): def test_array_from_scalar(): today = datetime.date.today() now = datetime.datetime.now() + now_utc = now.replace(tzinfo=pytz.utc) + now_with_tz = now_utc.astimezone(pytz.timezone('US/Eastern')) oneday = datetime.timedelta(days=1) cases = [ @@ -317,6 +320,7 @@ def test_array_from_scalar(): (pa.scalar(True), 11, pa.array([True] * 11)), (today, 2, pa.array([today] * 2)), (now, 10, pa.array([now] * 10)), + (now_with_tz, 10, pa.array([now_utc] * 10)), (now.time(), 9, pa.array([now.time()] * 9)), (oneday, 4, pa.array([oneday] * 4)), (False, 9, pa.array([False] * 9)), @@ -332,6 +336,7 @@ def test_array_from_scalar(): for value, size, expected in cases: arr = pa.repeat(value, size) assert len(arr) == size + assert arr.type.equals(expected.type) assert arr.equals(expected) if expected.type == pa.null(): diff --git a/python/pyarrow/tests/test_pandas.py b/python/pyarrow/tests/test_pandas.py index 979b2a6f72c..50fa539fd48 100644 --- a/python/pyarrow/tests/test_pandas.py +++ b/python/pyarrow/tests/test_pandas.py @@ -3325,6 +3325,21 @@ def test_cast_timestamp_unit(): assert result.equals(expected) +def test_nested_with_timestamp_tz_round_trip(): + ts = pd.Timestamp.now() + ts_dt = ts.to_pydatetime() + arr = pa.array([ts_dt], type=pa.timestamp('us', tz='America/New_York')) + struct = pa.StructArray.from_arrays([arr, arr], ['start', 'stop']) + + result = struct.to_pandas() + # N.B. we test with Panaas because the Arrow types are not + # actually equal. All Timezone aware times are currently normalized + # to "UTC" as the timesetamp type.but since this conversion results + # in object dtypes, and tzinfo is preserrved the comparison should + # result in equality. + pd.testing.assert_series_equal(pa.array(result).to_pandas(), result) + + def test_nested_with_timestamp_tz(): # ARROW-7723 ts = pd.Timestamp.now()