From 1539a2ce2d62189cfaeb2dc5c29b5815bc4b3753 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Wed, 7 Dec 2016 12:19:36 -0500 Subject: [PATCH 1/3] Ensure that HdfsClient does not get closed before an open file does when the last user-accessible client reference goes out of scope Change-Id: If97731c7cd663d89e418b4cdd713eb8f0add49e7 --- python/pyarrow/io.pyx | 81 +++++++++++++++++++------------ python/pyarrow/tests/test_hdfs.py | 23 +++++++++ 2 files changed, 74 insertions(+), 30 deletions(-) diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx index 0e6b81e9844..d0eca673bdc 100644 --- a/python/pyarrow/io.pyx +++ b/python/pyarrow/io.pyx @@ -504,7 +504,7 @@ cdef class HdfsClient: out.mode = mode out.buffer_size = c_buffer_size - out.parent = self + out.parent = _HdfsFileNanny(self, out) out.is_open = True out.own_file = True @@ -516,48 +516,66 @@ cdef class HdfsClient: """ write_queue = Queue(50) - f = self.open(path, 'wb') + with self.open(path, 'wb') as f: + done = False + exc_info = None + def bg_write(): + try: + while not done or write_queue.qsize() > 0: + try: + buf = write_queue.get(timeout=0.01) + except QueueEmpty: + continue - done = False - exc_info = None - def bg_write(): - try: - while not done or write_queue.qsize() > 0: - try: - buf = write_queue.get(timeout=0.01) - except QueueEmpty: - continue - - f.write(buf) + f.write(buf) - except Exception as e: - exc_info = sys.exc_info() + except Exception as e: + exc_info = sys.exc_info() - writer_thread = threading.Thread(target=bg_write) - writer_thread.start() + writer_thread = threading.Thread(target=bg_write) + writer_thread.start() - try: - while True: - buf = stream.read(buffer_size) - if not buf: - break + try: + while True: + buf = stream.read(buffer_size) + if not buf: + break - write_queue.put_nowait(buf) - finally: - done = True + write_queue.put_nowait(buf) + finally: + done = True - writer_thread.join() - if exc_info is not None: - raise exc_info[0], exc_info[1], exc_info[2] + writer_thread.join() + if exc_info is not None: + raise exc_info[0], exc_info[1], exc_info[2] def download(self, path, stream, buffer_size=None): - f = self.open(path, 'rb', buffer_size=buffer_size) - f.download(stream) + with self.open(path, 'rb', buffer_size=buffer_size) as f: + f.download(stream) # ---------------------------------------------------------------------- # Specialization for HDFS +# ARROW-404: Helper class to ensure that files are closed before the +# client. During deallocation of the extension class, the attributes are +# decref'd which can cause the client to get closed first if the file has the +# last remaining reference +cdef class _HdfsFileNanny: + cdef: + object client + object file_handle + + def __cinit__(self, client, file_handle): + self.client = client + self.file_handle = file_handle + + def __dealloc__(self): + self.file_handle.close() + # avoid cyclic GC + self.file_handle = None + self.client = None + cdef class HdfsFile(NativeFile): cdef readonly: @@ -565,6 +583,9 @@ cdef class HdfsFile(NativeFile): object mode object parent + def __dealloc__(self): + self.parent = None + def read(self, int nbytes): """ Read indicated number of bytes from the file, up to EOF diff --git a/python/pyarrow/tests/test_hdfs.py b/python/pyarrow/tests/test_hdfs.py index ed8d41994cd..c23543b7f0d 100644 --- a/python/pyarrow/tests/test_hdfs.py +++ b/python/pyarrow/tests/test_hdfs.py @@ -98,6 +98,29 @@ def test_hdfs_ls(hdfs): assert contents == [dir_path, f1_path] +def _make_test_file(hdfs, test_name, test_path, test_data): + base_path = pjoin(HDFS_TMP_PATH, test_name) + hdfs.mkdir(base_path) + + full_path = pjoin(base_path, test_path) + + f = hdfs.open(full_path, 'wb') + f.write(test_data) + + return full_path + + +@libhdfs +def test_hdfs_orphaned_file(): + hdfs = hdfs_test_client() + file_path = _make_test_file(hdfs, 'orphaned_file_test', 'fname', + 'foobarbaz') + + f = hdfs.open(file_path) + hdfs = None + f = None # noqa + + @libhdfs def test_hdfs_download_upload(hdfs): base_path = pjoin(HDFS_TMP_PATH, 'upload-test') From 274d0c5e8e87ebcb93d35a46760613b0250515af Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Wed, 7 Dec 2016 12:24:20 -0500 Subject: [PATCH 2/3] amend comment Change-Id: I4cf14f47203be94a8a95f5c7d9b780734a8cec67 --- python/pyarrow/io.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx index d0eca673bdc..020e6447656 100644 --- a/python/pyarrow/io.pyx +++ b/python/pyarrow/io.pyx @@ -572,7 +572,7 @@ cdef class _HdfsFileNanny: def __dealloc__(self): self.file_handle.close() - # avoid cyclic GC + # try to avoid cyclic GC self.file_handle = None self.client = None From 3a8e641eded630203ca84a0d41d525ab9fc996ff Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Thu, 8 Dec 2016 16:42:01 -0500 Subject: [PATCH 3/3] Use weakref in _HdfsFileNanny to avoid cyclic gc Change-Id: I9f2b03e3f9e8677c89f144ff890edc3900431a40 --- python/pyarrow/io.pyx | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx index 020e6447656..2fa5fb6b878 100644 --- a/python/pyarrow/io.pyx +++ b/python/pyarrow/io.pyx @@ -564,16 +564,19 @@ cdef class HdfsClient: cdef class _HdfsFileNanny: cdef: object client - object file_handle + object file_handle_ref def __cinit__(self, client, file_handle): + import weakref self.client = client - self.file_handle = file_handle + self.file_handle_ref = weakref.ref(file_handle) def __dealloc__(self): - self.file_handle.close() - # try to avoid cyclic GC - self.file_handle = None + fh = self.file_handle_ref() + if fh: + fh.close() + # avoid cyclic GC + self.file_handle_ref = None self.client = None @@ -583,6 +586,8 @@ cdef class HdfsFile(NativeFile): object mode object parent + cdef object __weakref__ + def __dealloc__(self): self.parent = None