Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions python/pyarrow/error.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import os
import signal
import threading

from pyarrow.util import _break_traceback_cycle_from_frame


class ArrowException(Exception):
pass
Expand Down Expand Up @@ -171,6 +173,10 @@ def enable_signal_handlers(c_bool enable):

# For internal use

# Whether we need a workaround for https://bugs.python.org/issue42248
have_signal_refcycle = (sys.version_info < (3, 8, 10) or
(3, 9) <= sys.version_info < (3, 9, 5))

cdef class SignalStopHandler:
cdef:
StopToken _stop_token
Expand All @@ -180,20 +186,24 @@ cdef class SignalStopHandler:
def __cinit__(self):
self._enabled = False

tid = threading.current_thread().ident
if (signal_handlers_enabled and
threading.current_thread() is threading.main_thread()):
self._signals = [
sig for sig in (signal.SIGINT, signal.SIGTERM)
if signal.getsignal(sig) not in (signal.SIG_DFL,
signal.SIG_IGN, None)]
self._init_signals()
if have_signal_refcycle:
_break_traceback_cycle_from_frame(sys._getframe(0))

self._stop_token = StopToken()
if not self._signals.empty():
self._stop_token.init(GetResultValue(
SetSignalStopSource()).token())
self._enabled = True

def _init_signals(self):
if (signal_handlers_enabled and
threading.current_thread() is threading.main_thread()):
self._signals = [
sig for sig in (signal.SIGINT, signal.SIGTERM)
if signal.getsignal(sig) not in (signal.SIG_DFL,
signal.SIG_IGN, None)]

def __enter__(self):
if self._enabled:
check_status(RegisterCancellingSignalHandler(self._signals))
Expand Down
13 changes: 13 additions & 0 deletions python/pyarrow/tests/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import threading
import time
import unittest
import weakref

import pytest

Expand Down Expand Up @@ -1617,3 +1618,15 @@ def test_write_read_round_trip():

read_options = ReadOptions(column_names=t.column_names)
assert t == read_csv(buf, read_options=read_options)


def test_read_csv_reference_cycle():
# ARROW-13187
def inner():
buf = io.BytesIO(b"a,b,c\n1,2,3\n4,5,6")
table = read_csv(buf)
return weakref.ref(table)

with util.disabled_gc():
wr = inner()
assert wr() is None
52 changes: 52 additions & 0 deletions python/pyarrow/tests/test_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import gc
import signal
import sys
import weakref

import pytest

from pyarrow import util
from pyarrow.tests.util import disabled_gc


def exhibit_signal_refcycle():
# Put an object in the frame locals and return a weakref to it.
# If `signal.getsignal` has a bug where it creates a reference cycle
# keeping alive the current execution frames, `obj` will not be
# destroyed immediately when this function returns.
obj = set()
signal.getsignal(signal.SIGINT)
return weakref.ref(obj)


def test_signal_refcycle():
# Test possible workaround for https://bugs.python.org/issue42248
with disabled_gc():
wr = exhibit_signal_refcycle()
if wr() is None:
pytest.skip(
"Python version does not have the bug we're testing for")

gc.collect()
with disabled_gc():
wr = exhibit_signal_refcycle()
assert wr() is not None
util._break_traceback_cycle_from_frame(sys._getframe(0))
assert wr() is None
9 changes: 9 additions & 0 deletions python/pyarrow/tests/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,15 @@ def change_cwd(path):
os.chdir(curdir)


@contextlib.contextmanager
def disabled_gc():
gc.disable()
try:
yield
finally:
gc.enable()


def _filesystem_uri(path):
# URIs on Windows must follow 'file:///C:...' or 'file:/C:...' patterns.
if os.name == 'nt':
Expand Down
26 changes: 26 additions & 0 deletions python/pyarrow/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@

import contextlib
import functools
import gc
import pathlib
import socket
import sys
import types
import warnings


Expand Down Expand Up @@ -150,3 +153,26 @@ def find_free_port():
def guid():
from uuid import uuid4
return uuid4().hex


def _break_traceback_cycle_from_frame(frame):
# Clear local variables in all inner frames, so as to break the
# reference cycle.
this_frame = sys._getframe(0)
refs = gc.get_referrers(frame)
while refs:
for frame in refs:
if frame is not this_frame and isinstance(frame, types.FrameType):
break
else:
# No frame found in referrers (finished?)
break
refs = None
# Clear the frame locals, to try and break the cycle (it is
# somewhere along the chain of execution frames).
frame.clear()
# To visit the inner frame, we need to find it among the
# referers of this frame (while `frame.f_back` would let
# us visit the outer frame).
refs = gc.get_referrers(frame)
refs = frame = this_frame = None