diff --git a/ci/docker/conda-python-spark.dockerfile b/ci/docker/conda-python-spark.dockerfile index a20f1ff3521..4aed6db0fa0 100644 --- a/ci/docker/conda-python-spark.dockerfile +++ b/ci/docker/conda-python-spark.dockerfile @@ -40,6 +40,10 @@ RUN /arrow/ci/scripts/install_spark.sh ${spark} /spark COPY ci/etc/integration_spark_ARROW-9438.patch /arrow/ci/etc/ RUN patch -d /spark -p1 -i /arrow/ci/etc/integration_spark_ARROW-9438.patch +# patch spark to handle struct timestamps with tzinfo +COPY ci/etc/integration_spark_ARROW-9223.patch /arrow/ci/etc/ +RUN patch -d /spark -p1 -i /arrow/ci/etc/integration_spark_ARROW-9223.patch + # build cpp with tests ENV CC=gcc \ CXX=g++ \ diff --git a/ci/etc/integration_spark_ARROW-9223.patch b/ci/etc/integration_spark_ARROW-9223.patch new file mode 100644 index 00000000000..39bcc0bac3a --- /dev/null +++ b/ci/etc/integration_spark_ARROW-9223.patch @@ -0,0 +1,37 @@ +diff --git a/python/pyspark/sql/pandas/serializers.py b/python/pyspark/sql/pandas/serializers.py +index 42562e1fb9..d00b67e99b 100644 +--- a/python/pyspark/sql/pandas/serializers.py ++++ b/python/pyspark/sql/pandas/serializers.py +@@ -120,15 +120,30 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer): + + def arrow_to_pandas(self, arrow_column): + from pyspark.sql.pandas.types import _check_series_localize_timestamps +- import pyarrow ++ import pyarrow as pa + + # If the given column is a date type column, creates a series of datetime.date directly + # instead of creating datetime64[ns] as intermediate data to avoid overflow caused by + # datetime64[ns] type handling. + s = arrow_column.to_pandas(date_as_object=True) + +- if pyarrow.types.is_timestamp(arrow_column.type): ++ if pa.types.is_timestamp(arrow_column.type): + return _check_series_localize_timestamps(s, self._timezone) ++ elif pa.types.is_struct(arrow_column.type): ++ if isinstance(arrow_column, pa.ChunkedArray): ++ arrow_column = pa.concat_arrays(arrow_column.iterchunks()) ++ children = [] ++ names = [] ++ for f in arrow_column.type: ++ child = arrow_column.field(f.name) ++ if pa.types.is_timestamp(child.type): ++ child_series = child.to_pandas() ++ child_series = _check_series_localize_timestamps(child_series, self._timezone) ++ child = pa.array(child_series, type=pa.timestamp('us')) ++ children.append(child) ++ names.append(f.name) ++ arr = pa.StructArray.from_arrays(children, names) ++ return arr.to_pandas(date_as_object=True) + else: + return s +