From b8f9ef4d06717edad2232189b467a93912646edc Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Thu, 27 Jul 2017 21:43:58 -0400 Subject: [PATCH] Delete any incomplete file when attempt to write single Parquet file fails Change-Id: I15acd3e5d0cb975a156881862addc987eec8b029 --- python/pyarrow/_parquet.pyx | 8 ++++---- python/pyarrow/parquet.py | 19 ++++++++++++++++--- python/pyarrow/tests/test_parquet.py | 25 +++++++++++++++++++++++++ 3 files changed, 45 insertions(+), 7 deletions(-) diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index bbe52033526..20f189a413e 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -542,6 +542,7 @@ cdef ParquetCompression compression_from_name(str name): cdef class ParquetWriter: cdef: unique_ptr[FileWriter] writer + shared_ptr[OutputStream] sink cdef readonly: object use_dictionary @@ -555,14 +556,13 @@ cdef class ParquetWriter: MemoryPool memory_pool=None, use_deprecated_int96_timestamps=False): cdef: shared_ptr[FileOutputStream] filestream - shared_ptr[OutputStream] sink shared_ptr[WriterProperties] properties if isinstance(where, six.string_types): check_status(FileOutputStream.Open(tobytes(where), &filestream)) - sink = filestream + self.sink = filestream else: - get_writer(where, &sink) + get_writer(where, &self.sink) self.use_dictionary = use_dictionary self.compression = compression @@ -582,7 +582,7 @@ cdef class ParquetWriter: check_status( FileWriter.Open(deref(schema.schema), maybe_unbox_memory_pool(memory_pool), - sink, properties, arrow_properties, + self.sink, properties, arrow_properties, &self.writer)) cdef void _set_int96_support(self, ArrowWriterProperties.Builder* props): diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index fea73978e3e..34c1d120ce0 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -769,9 +769,22 @@ def write_table(table, where, row_group_size=None, version='1.0', compression=compression, version=version, use_deprecated_int96_timestamps=use_deprecated_int96_timestamps) - writer = ParquetWriter(where, table.schema, **options) - writer.write_table(table, row_group_size=row_group_size) - writer.close() + + writer = None + try: + writer = ParquetWriter(where, table.schema, **options) + writer.write_table(table, row_group_size=row_group_size) + except: + if writer is not None: + writer.close() + if isinstance(where, six.string_types): + try: + os.remove(where) + except os.error: + pass + raise + else: + writer.close() def write_metadata(schema, where, version='1.0', diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index 40e44b352ac..6763fb3e04f 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -1006,3 +1006,28 @@ def test_multiindex_duplicate_values(tmpdir): result_df = result_table.to_pandas() tm.assert_frame_equal(result_df, df) + + +@parquet +def test_write_error_deletes_incomplete_file(tmpdir): + # ARROW-1285 + df = pd.DataFrame({'a': list('abc'), + 'b': list(range(1, 4)), + 'c': np.arange(3, 6).astype('u1'), + 'd': np.arange(4.0, 7.0, dtype='float64'), + 'e': [True, False, True], + 'f': pd.Categorical(list('abc')), + 'g': pd.date_range('20130101', periods=3), + 'h': pd.date_range('20130101', periods=3, + tz='US/Eastern'), + 'i': pd.date_range('20130101', periods=3, freq='ns')}) + + pdf = pa.Table.from_pandas(df) + + filename = tmpdir.join('tmp_file').strpath + try: + _write_table(pdf, filename) + except pa.ArrowException: + pass + + assert not os.path.exists(filename)