Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ci/docker/conda-python-spark.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ RUN /arrow/ci/scripts/install_spark.sh ${spark} /spark
COPY ci/etc/integration_spark_ARROW-9438.patch /arrow/ci/etc/
RUN patch -d /spark -p1 -i /arrow/ci/etc/integration_spark_ARROW-9438.patch

# patch spark to handle struct timestamps with tzinfo
COPY ci/etc/integration_spark_ARROW-9223.patch /arrow/ci/etc/
RUN patch -d /spark -p1 -i /arrow/ci/etc/integration_spark_ARROW-9223.patch

# build cpp with tests
ENV CC=gcc \
CXX=g++ \
Expand Down
37 changes: 37 additions & 0 deletions ci/etc/integration_spark_ARROW-9223.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
diff --git a/python/pyspark/sql/pandas/serializers.py b/python/pyspark/sql/pandas/serializers.py
index 42562e1fb9..d00b67e99b 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -120,15 +120,30 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer):

def arrow_to_pandas(self, arrow_column):
from pyspark.sql.pandas.types import _check_series_localize_timestamps
- import pyarrow
+ import pyarrow as pa

# If the given column is a date type column, creates a series of datetime.date directly
# instead of creating datetime64[ns] as intermediate data to avoid overflow caused by
# datetime64[ns] type handling.
s = arrow_column.to_pandas(date_as_object=True)

- if pyarrow.types.is_timestamp(arrow_column.type):
+ if pa.types.is_timestamp(arrow_column.type):
return _check_series_localize_timestamps(s, self._timezone)
+ elif pa.types.is_struct(arrow_column.type):
+ if isinstance(arrow_column, pa.ChunkedArray):
+ arrow_column = pa.concat_arrays(arrow_column.iterchunks())
+ children = []
+ names = []
+ for f in arrow_column.type:
+ child = arrow_column.field(f.name)
+ if pa.types.is_timestamp(child.type):
+ child_series = child.to_pandas()
+ child_series = _check_series_localize_timestamps(child_series, self._timezone)
+ child = pa.array(child_series, type=pa.timestamp('us'))
+ children.append(child)
+ names.append(f.name)
+ arr = pa.StructArray.from_arrays(children, names)
+ return arr.to_pandas(date_as_object=True)
else:
return s

22 changes: 9 additions & 13 deletions cpp/src/arrow/python/inference.cc
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,7 @@ class TypeInferrer {
int_count_(0),
date_count_(0),
time_count_(0),
timestamp_second_count_(0),
timestamp_milli_count_(0),
timestamp_micro_count_(0),
timestamp_nano_count_(0),
duration_count_(0),
float_count_(0),
binary_count_(0),
Expand Down Expand Up @@ -332,6 +329,13 @@ class TypeInferrer {
++int_count_;
} else if (PyDateTime_Check(obj)) {
++timestamp_micro_count_;
OwnedRef tzinfo(PyObject_GetAttrString(obj, "tzinfo"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If null is returned, it means Python raised an error (for example the attribute doesn't exist, which is unlikely). You want either to return that error, or to ignore it (using PyErr_Clear).

if (tzinfo.obj() != nullptr && tzinfo.obj() != Py_None && timezone_.empty()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timezone_ should be first here.

// From public docs on array construction
// "Localized timestamps will currently be returned as UTC "
// representation). "
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean Arrow simply stores an erroneous value? We don't do timezone conversion in Arrow, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually not sure what it was intended to mean. This was my best guess. Not accounting for timezones info seems like a bug? Would you prefer I try to capture the first time zone encountered as a string? Or is the preference not to have the logic in this PR in arrow in the first place?

Copy link
Member

@pitrou pitrou Jul 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I reuse the nomenclature from the test, I get without this PR (it's 14h21 UTC currently):

>>> now_utc                                                                                                     
datetime.datetime(2020, 7, 20, 14, 21, 42, 96119, tzinfo=<UTC>)
>>> arr = pa.array([now_utc])                                                                                   
>>> arr                                                                                                         
<pyarrow.lib.TimestampArray object at 0x7f44b0bfcc90>
[
  2020-07-20 14:21:42.096119
]
>>> arr.type.tz                                                                                                 
>>> arr.to_pylist()                                                                                             
[datetime.datetime(2020, 7, 20, 14, 21, 42, 96119)]
>>> arr.to_pandas()                                                                                             
0   2020-07-20 14:21:42.096119
dtype: datetime64[ns]
>>> now_with_tz                                                                                                           
datetime.datetime(2020, 7, 20, 10, 21, 42, 96119, tzinfo=<DstTzInfo 'US/Eastern' EDT-1 day, 20:00:00 DST>)
>>> arr = pa.array([now_with_tz])                                                                                         
>>> arr.type.tz                                                                                                           
>>> arr.to_pylist()                                                                                                       
[datetime.datetime(2020, 7, 20, 10, 21, 42, 96119)]
>>> arr.to_pylist()[0].tzinfo                                                                                             
>>> arr.to_pandas()                                                                                                       
0   2020-07-20 10:21:42.096119
dtype: datetime64[ns]

So without the PR, there's the problem that timestamps lose the timezone information from Python. It seems to get worse with this PR because non-UTC timestamps get tagged as UTC without being corrected for the timezone's offset, which is misleading. At least originally you may be alerted by the absence of a timezone on the type (in Python terms, it's a "naive" timestamp).

Copy link
Contributor Author

@emkornfield emkornfield Jul 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to get worse with this PR because non-UTC timestamps get tagged as UTC without being corrected for the timezone's offset, which is misleading.

That is not the intent of the PR, right now everything gets corrected to UTC. As an example:
This correctly keeps the times logically the same. I can make the change to try to keep the original timezones in place and changes US/Eastern to the correct time in UTC>

>>> now_with_tz = datetime.datetime(2020, 7, 20, 10, 21, 42, 96119, tzinfo=pytz.timezone('US/Eastern'))
>>> arr = pa.array([now_with_tz]) 
>>> arr.type.tz  
'UTC'
>>> arr.to_pylist() 
[datetime.datetime(2020, 7, 20, 15, 17, 42, 96119, tzinfo=<UTC>)]
>>> arr.to_pylist()[0].tzinfo
<UTC>
>>> arr.to_pandas()
0   2020-07-20 15:17:42.096119+00:00
dtype: datetime64[ns, UTC]

timezone_ = "UTC";
}
*keep_going = make_unions_;
} else if (PyDelta_Check(obj)) {
++duration_count_;
Expand Down Expand Up @@ -458,14 +462,8 @@ class TypeInferrer {
*out = date32();
} else if (time_count_) {
*out = time64(TimeUnit::MICRO);
} else if (timestamp_nano_count_) {
*out = timestamp(TimeUnit::NANO);
} else if (timestamp_micro_count_) {
*out = timestamp(TimeUnit::MICRO);
} else if (timestamp_milli_count_) {
*out = timestamp(TimeUnit::MILLI);
} else if (timestamp_second_count_) {
*out = timestamp(TimeUnit::SECOND);
*out = timestamp(TimeUnit::MICRO, timezone_);
} else if (duration_count_) {
*out = duration(TimeUnit::MICRO);
} else if (bool_count_) {
Expand Down Expand Up @@ -597,10 +595,8 @@ class TypeInferrer {
int64_t int_count_;
int64_t date_count_;
int64_t time_count_;
int64_t timestamp_second_count_;
int64_t timestamp_milli_count_;
std::string timezone_;
int64_t timestamp_micro_count_;
int64_t timestamp_nano_count_;
int64_t duration_count_;
int64_t float_count_;
int64_t binary_count_;
Expand Down
21 changes: 18 additions & 3 deletions cpp/src/arrow/python/python_to_arrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,8 @@ template <typename Type, NullCoding null_coding>
class TemporalConverter : public TimeConverter<Type, null_coding> {
public:
using TimeConverter<Type, null_coding>::TimeConverter;
TemporalConverter<Type, null_coding>(TimeUnit::type unit, PyObject* utc)
: TimeConverter<Type, null_coding>(unit), utc_(utc) {}

Status AppendValue(PyObject* obj) override {
int64_t value;
Expand All @@ -569,11 +571,22 @@ class TemporalConverter : public TimeConverter<Type, null_coding> {
return this->typed_builder_->AppendNull();
}
} else {
// convert builtin python objects
ARROW_ASSIGN_OR_RAISE(value, ValueConverter<Type>::FromPython(obj, this->unit_));
PyObject* target = obj;
OwnedRef target_holder;
if (PyDateTime_Check(obj)) {
OwnedRef tzinfo(PyObject_GetAttrString(obj, "tzinfo"));
if (tzinfo.obj() != nullptr && tzinfo.obj() != Py_None) {
target_holder.reset(PyObject_CallMethod(obj, "astimezone", "O", utc_.obj()));
target = target_holder.obj();
}
}
ARROW_ASSIGN_OR_RAISE(value, ValueConverter<Type>::FromPython(target, this->unit_));
}
return this->typed_builder_->Append(value);
}

private:
OwnedRef utc_;
};

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -1169,9 +1182,11 @@ Status GetConverterFlat(const std::shared_ptr<DataType>& type, bool strict_conve
break;
}
case Type::TIMESTAMP: {
PyObject* utc;
RETURN_NOT_OK(internal::StringToTzinfo("UTC", &utc));
*out =
std::unique_ptr<SeqConverter>(new TemporalConverter<TimestampType, null_coding>(
checked_cast<const TimestampType&>(*type).unit()));
checked_cast<const TimestampType&>(*type).unit(), utc));
break;
}
case Type::DURATION: {
Expand Down
3 changes: 2 additions & 1 deletion python/pyarrow/scalar.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,8 @@ cdef class Date64Scalar(Scalar):

def as_py(self):
"""
Return this value as a Python datetime.datetime instance.
Return this value as a Pandas Timestamp instance (if available),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this needs to be reverted.

otherwise as a Python datetime.datetime instance.
"""
cdef CDate64Scalar* sp = <CDate64Scalar*> self.wrapped.get()

Expand Down
5 changes: 5 additions & 0 deletions python/pyarrow/tests/test_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import pickle5
except ImportError:
pickle5 = None
import pytz

import pyarrow as pa
import pyarrow.tests.strategies as past
Expand Down Expand Up @@ -300,6 +301,8 @@ def test_nulls(ty):
def test_array_from_scalar():
today = datetime.date.today()
now = datetime.datetime.now()
now_utc = now.replace(tzinfo=pytz.utc)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on my experimentations, you should write:

now_utc = datetime.datetime.now(tz=pytz.utc)

(simply calling .replace(tzinfo=pytz.utc) doesn't adjust the recorded time for the timezone change, so you get the local time tagged with a UTC timezone)

And, yes, this probably doesn't matter for the test's correctness :-)

now_with_tz = now_utc.astimezone(pytz.timezone('US/Eastern'))
oneday = datetime.timedelta(days=1)

cases = [
Expand All @@ -317,6 +320,7 @@ def test_array_from_scalar():
(pa.scalar(True), 11, pa.array([True] * 11)),
(today, 2, pa.array([today] * 2)),
(now, 10, pa.array([now] * 10)),
(now_with_tz, 10, pa.array([now_utc] * 10)),
(now.time(), 9, pa.array([now.time()] * 9)),
(oneday, 4, pa.array([oneday] * 4)),
(False, 9, pa.array([False] * 9)),
Expand All @@ -332,6 +336,7 @@ def test_array_from_scalar():
for value, size, expected in cases:
arr = pa.repeat(value, size)
assert len(arr) == size
assert arr.type.equals(expected.type)
assert arr.equals(expected)

if expected.type == pa.null():
Expand Down
15 changes: 15 additions & 0 deletions python/pyarrow/tests/test_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -3325,6 +3325,21 @@ def test_cast_timestamp_unit():
assert result.equals(expected)


def test_nested_with_timestamp_tz_round_trip():
ts = pd.Timestamp.now()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What timezone does this timestamp have? Is it a naive timestamp? Would be nice explaining it in comments.

ts_dt = ts.to_pydatetime()
arr = pa.array([ts_dt], type=pa.timestamp('us', tz='America/New_York'))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this test presume that ts itself was produced in "America/New York" timezone? It's not clear to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe so. Pretty sure my machine uses it. Local time, I'll double check by setting a different tz

struct = pa.StructArray.from_arrays([arr, arr], ['start', 'stop'])

result = struct.to_pandas()
# N.B. we test with Panaas because the Arrow types are not
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Pandas" :-)

# actually equal. All Timezone aware times are currently normalized
# to "UTC" as the timesetamp type.but since this conversion results
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"timestamp"

# in object dtypes, and tzinfo is preserrved the comparison should
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"preserved"

# result in equality.
pd.testing.assert_series_equal(pa.array(result).to_pandas(), result)


def test_nested_with_timestamp_tz():
# ARROW-7723
ts = pd.Timestamp.now()
Expand Down