diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx index 0e6b81e9844..2fa5fb6b878 100644 --- a/python/pyarrow/io.pyx +++ b/python/pyarrow/io.pyx @@ -504,7 +504,7 @@ cdef class HdfsClient: out.mode = mode out.buffer_size = c_buffer_size - out.parent = self + out.parent = _HdfsFileNanny(self, out) out.is_open = True out.own_file = True @@ -516,48 +516,69 @@ cdef class HdfsClient: """ write_queue = Queue(50) - f = self.open(path, 'wb') + with self.open(path, 'wb') as f: + done = False + exc_info = None + def bg_write(): + try: + while not done or write_queue.qsize() > 0: + try: + buf = write_queue.get(timeout=0.01) + except QueueEmpty: + continue - done = False - exc_info = None - def bg_write(): - try: - while not done or write_queue.qsize() > 0: - try: - buf = write_queue.get(timeout=0.01) - except QueueEmpty: - continue + f.write(buf) - f.write(buf) + except Exception as e: + exc_info = sys.exc_info() - except Exception as e: - exc_info = sys.exc_info() - - writer_thread = threading.Thread(target=bg_write) - writer_thread.start() + writer_thread = threading.Thread(target=bg_write) + writer_thread.start() - try: - while True: - buf = stream.read(buffer_size) - if not buf: - break + try: + while True: + buf = stream.read(buffer_size) + if not buf: + break - write_queue.put_nowait(buf) - finally: - done = True + write_queue.put_nowait(buf) + finally: + done = True - writer_thread.join() - if exc_info is not None: - raise exc_info[0], exc_info[1], exc_info[2] + writer_thread.join() + if exc_info is not None: + raise exc_info[0], exc_info[1], exc_info[2] def download(self, path, stream, buffer_size=None): - f = self.open(path, 'rb', buffer_size=buffer_size) - f.download(stream) + with self.open(path, 'rb', buffer_size=buffer_size) as f: + f.download(stream) # ---------------------------------------------------------------------- # Specialization for HDFS +# ARROW-404: Helper class to ensure that files are closed before the +# client. During deallocation of the extension class, the attributes are +# decref'd which can cause the client to get closed first if the file has the +# last remaining reference +cdef class _HdfsFileNanny: + cdef: + object client + object file_handle_ref + + def __cinit__(self, client, file_handle): + import weakref + self.client = client + self.file_handle_ref = weakref.ref(file_handle) + + def __dealloc__(self): + fh = self.file_handle_ref() + if fh: + fh.close() + # avoid cyclic GC + self.file_handle_ref = None + self.client = None + cdef class HdfsFile(NativeFile): cdef readonly: @@ -565,6 +586,11 @@ cdef class HdfsFile(NativeFile): object mode object parent + cdef object __weakref__ + + def __dealloc__(self): + self.parent = None + def read(self, int nbytes): """ Read indicated number of bytes from the file, up to EOF diff --git a/python/pyarrow/tests/test_hdfs.py b/python/pyarrow/tests/test_hdfs.py index ed8d41994cd..c23543b7f0d 100644 --- a/python/pyarrow/tests/test_hdfs.py +++ b/python/pyarrow/tests/test_hdfs.py @@ -98,6 +98,29 @@ def test_hdfs_ls(hdfs): assert contents == [dir_path, f1_path] +def _make_test_file(hdfs, test_name, test_path, test_data): + base_path = pjoin(HDFS_TMP_PATH, test_name) + hdfs.mkdir(base_path) + + full_path = pjoin(base_path, test_path) + + f = hdfs.open(full_path, 'wb') + f.write(test_data) + + return full_path + + +@libhdfs +def test_hdfs_orphaned_file(): + hdfs = hdfs_test_client() + file_path = _make_test_file(hdfs, 'orphaned_file_test', 'fname', + 'foobarbaz') + + f = hdfs.open(file_path) + hdfs = None + f = None # noqa + + @libhdfs def test_hdfs_download_upload(hdfs): base_path = pjoin(HDFS_TMP_PATH, 'upload-test')