From af174d8c0703cea724352fd3e77b1b0655ae4267 Mon Sep 17 00:00:00 2001 From: zhaochun Date: Thu, 21 Mar 2019 18:32:38 +0800 Subject: [PATCH] Fix bug: stream load ignore last line with no-newline #783 --- be/src/exec/plain_text_line_reader.cpp | 5 +-- .../exec/plain_text_line_reader_gzip_test.cpp | 34 +++++++++++++++++ ...ain_text_line_reader_uncompressed_test.cpp | 35 ++++++++++++++++++ .../plain_text_line_reader/no_newline.csv | 2 + .../plain_text_line_reader/no_newline.csv.gz | Bin 0 -> 44 bytes 5 files changed, 73 insertions(+), 3 deletions(-) create mode 100644 be/test/exec/test_data/plain_text_line_reader/no_newline.csv create mode 100644 be/test/exec/test_data/plain_text_line_reader/no_newline.csv.gz diff --git a/be/src/exec/plain_text_line_reader.cpp b/be/src/exec/plain_text_line_reader.cpp index 7ad004706bdd2b..2eebf0f87d4b7f 100644 --- a/be/src/exec/plain_text_line_reader.cpp +++ b/be/src/exec/plain_text_line_reader.cpp @@ -247,9 +247,8 @@ Status PlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool* e } else { // last loop we meet stream end, // and now we finished reading file, so we are finished - *size = 0; - *eof = true; - return Status::OK; + // break this loop to see if there is data in buffer + break; } } diff --git a/be/test/exec/plain_text_line_reader_gzip_test.cpp b/be/test/exec/plain_text_line_reader_gzip_test.cpp index f914864211aa2a..27ca2cbcb3aa35 100644 --- a/be/test/exec/plain_text_line_reader_gzip_test.cpp +++ b/be/test/exec/plain_text_line_reader_gzip_test.cpp @@ -91,6 +91,40 @@ TEST_F(PlainTextLineReaderTest, gzip_normal_use) { ASSERT_TRUE(eof); } +TEST_F(PlainTextLineReaderTest, uncompressed_no_newline) { + LocalFileReader file_reader("./be/test/exec/test_data/plain_text_line_reader/no_newline.csv.gz", 0); + auto st = file_reader.open(); + ASSERT_TRUE(st.ok()); + + Decompressor* decompressor; + st = Decompressor::create_decompressor(CompressType::GZIP, &decompressor); + ASSERT_TRUE(st.ok()); + + PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, '\n'); + const uint8_t* ptr; + size_t size; + bool eof; + + // 1,2,3 + st = line_reader.read_line(&ptr, &size, &eof); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(5, size); + ASSERT_STREQ("1,2,3", std::string((char*)ptr, size).c_str()); + ASSERT_FALSE(eof); + + // 4,5 + st = line_reader.read_line(&ptr, &size, &eof); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(3, size); + ASSERT_STREQ("4,5", std::string((char*)ptr, size).c_str()); + ASSERT_FALSE(eof); + + // Empty + st = line_reader.read_line(&ptr, &size, &eof); + ASSERT_TRUE(st.ok()); + ASSERT_TRUE(eof); +} + TEST_F(PlainTextLineReaderTest, gzip_test_limit) { LocalFileReader file_reader("./be/test/exec/test_data/plain_text_line_reader/limit.csv.gz", 0); auto st = file_reader.open(); diff --git a/be/test/exec/plain_text_line_reader_uncompressed_test.cpp b/be/test/exec/plain_text_line_reader_uncompressed_test.cpp index 03bec722bd0663..eff25b110e0296 100644 --- a/be/test/exec/plain_text_line_reader_uncompressed_test.cpp +++ b/be/test/exec/plain_text_line_reader_uncompressed_test.cpp @@ -91,6 +91,41 @@ TEST_F(PlainTextLineReaderTest, uncompressed_normal_use) { ASSERT_TRUE(eof); } +TEST_F(PlainTextLineReaderTest, uncompressed_no_newline) { + LocalFileReader file_reader("./be/test/exec/test_data/plain_text_line_reader/no_newline.csv", 0); + auto st = file_reader.open(); + ASSERT_TRUE(st.ok()); + + Decompressor* decompressor; + st = Decompressor::create_decompressor(CompressType::UNCOMPRESSED, &decompressor); + ASSERT_TRUE(st.ok()); + ASSERT_TRUE(decompressor == nullptr); + + PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, '\n'); + const uint8_t* ptr; + size_t size; + bool eof; + + // 1,2,3 + st = line_reader.read_line(&ptr, &size, &eof); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(5, size); + ASSERT_STREQ("1,2,3", std::string((char*)ptr, size).c_str()); + ASSERT_FALSE(eof); + + // 4,5 + st = line_reader.read_line(&ptr, &size, &eof); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(3, size); + ASSERT_STREQ("4,5", std::string((char*)ptr, size).c_str()); + ASSERT_FALSE(eof); + + // Empty + st = line_reader.read_line(&ptr, &size, &eof); + ASSERT_TRUE(st.ok()); + ASSERT_TRUE(eof); +} + TEST_F(PlainTextLineReaderTest, uncompressed_test_limit) { LocalFileReader file_reader("./be/test/exec/test_data/plain_text_line_reader/limit.csv", 0); auto st = file_reader.open(); diff --git a/be/test/exec/test_data/plain_text_line_reader/no_newline.csv b/be/test/exec/test_data/plain_text_line_reader/no_newline.csv new file mode 100644 index 00000000000000..d3c27d6ab2f78a --- /dev/null +++ b/be/test/exec/test_data/plain_text_line_reader/no_newline.csv @@ -0,0 +1,2 @@ +1,2,3 +4,5 \ No newline at end of file diff --git a/be/test/exec/test_data/plain_text_line_reader/no_newline.csv.gz b/be/test/exec/test_data/plain_text_line_reader/no_newline.csv.gz new file mode 100644 index 0000000000000000000000000000000000000000..a877f5335913b7818a531aafbf4e54ce3f98cba3 GIT binary patch literal 44 zcmb2|=HS?rI5~!aIWIpxFSR@;GcQ#yxwwqM_=@2r!)Hbp4Otl;vt9Yk$-uw>09+~$ Ag8%>k literal 0 HcmV?d00001