Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 32 additions & 22 deletions cpp/src/arrow/util/bit_stream_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class BitReader {
}

/// Gets the next value from the buffer. Returns true if 'v' could be read or false if
/// there are not enough bytes left. num_bits must be <= 32.
/// there are not enough bytes left.
template <typename T>
bool GetValue(int num_bits, T* v);

Expand All @@ -157,6 +157,10 @@ class BitReader {
template <typename T>
bool GetAligned(int num_bytes, T* v);

/// Advances the stream by a number of bits. Returns true if succeed or false if there
/// are not enough bits left.
bool Advance(int64_t num_bits);

/// Reads a vlq encoded int from the stream. The encoded int must start at
/// the beginning of a byte. Return false if there were not enough bytes in
/// the buffer.
Expand Down Expand Up @@ -255,6 +259,16 @@ inline bool BitWriter::PutAligned(T val, int num_bytes) {

namespace detail {

inline void ResetBufferedValues_(const uint8_t* buffer, int byte_offset,
int bytes_remaining, uint64_t* buffered_values) {
if (ARROW_PREDICT_TRUE(bytes_remaining >= 8)) {
memcpy(buffered_values, buffer + byte_offset, 8);
} else {
memcpy(buffered_values, buffer + byte_offset, bytes_remaining);
}
*buffered_values = arrow::BitUtil::FromLittleEndian(*buffered_values);
}

template <typename T>
inline void GetValue_(int num_bits, T* v, int max_bytes, const uint8_t* buffer,
int* bit_offset, int* byte_offset, uint64_t* buffered_values) {
Expand All @@ -272,13 +286,7 @@ inline void GetValue_(int num_bits, T* v, int max_bytes, const uint8_t* buffer,
*byte_offset += 8;
*bit_offset -= 64;

int bytes_remaining = max_bytes - *byte_offset;
if (ARROW_PREDICT_TRUE(bytes_remaining >= 8)) {
memcpy(buffered_values, buffer + *byte_offset, 8);
} else {
memcpy(buffered_values, buffer + *byte_offset, bytes_remaining);
}
*buffered_values = arrow::BitUtil::FromLittleEndian(*buffered_values);
ResetBufferedValues_(buffer, *byte_offset, max_bytes - *byte_offset, buffered_values);
#ifdef _MSC_VER
#pragma warning(push)
#pragma warning(disable : 4800 4805)
Expand Down Expand Up @@ -374,13 +382,8 @@ inline int BitReader::GetBatch(int num_bits, T* v, int batch_size) {
}
}

int bytes_remaining = max_bytes - byte_offset;
if (bytes_remaining >= 8) {
memcpy(&buffered_values, buffer + byte_offset, 8);
} else {
memcpy(&buffered_values, buffer + byte_offset, bytes_remaining);
}
buffered_values = arrow::BitUtil::FromLittleEndian(buffered_values);
detail::ResetBufferedValues_(buffer, byte_offset, max_bytes - byte_offset,
&buffered_values);

for (; i < batch_size; ++i) {
detail::GetValue_(num_bits, &v[i], max_bytes, buffer, &bit_offset, &byte_offset,
Expand Down Expand Up @@ -411,15 +414,22 @@ inline bool BitReader::GetAligned(int num_bytes, T* v) {
*v = arrow::BitUtil::FromLittleEndian(*v);
byte_offset_ += num_bytes;

// Reset buffered_values_
bit_offset_ = 0;
int bytes_remaining = max_bytes_ - byte_offset_;
if (ARROW_PREDICT_TRUE(bytes_remaining >= 8)) {
memcpy(&buffered_values_, buffer_ + byte_offset_, 8);
} else {
memcpy(&buffered_values_, buffer_ + byte_offset_, bytes_remaining);
detail::ResetBufferedValues_(buffer_, byte_offset_, max_bytes_ - byte_offset_,
&buffered_values_);
return true;
}

inline bool BitReader::Advance(int64_t num_bits) {
int64_t bits_required = bit_offset_ + num_bits;
int64_t bytes_required = BitUtil::BytesForBits(bits_required);
if (ARROW_PREDICT_FALSE(bytes_required > max_bytes_ - byte_offset_)) {
return false;
}
buffered_values_ = arrow::BitUtil::FromLittleEndian(buffered_values_);
byte_offset_ += static_cast<int>(bits_required >> 3);
bit_offset_ = static_cast<int>(bits_required & 7);
detail::ResetBufferedValues_(buffer_, byte_offset_, max_bytes_ - byte_offset_,
&buffered_values_);
return true;
}

Expand Down
117 changes: 100 additions & 17 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4158,36 +4158,119 @@ TEST(TestArrowWriteDictionaries, NestedSubfield) {
}

#ifdef ARROW_CSV
TEST(TestArrowReadDeltaEncoding, DeltaBinaryPacked) {
auto file = test::get_data_file("delta_binary_packed.parquet");
auto expect_file = test::get_data_file("delta_binary_packed_expect.csv");
auto pool = ::arrow::default_memory_pool();
std::unique_ptr<FileReader> parquet_reader;
std::shared_ptr<::arrow::Table> table;
ASSERT_OK(
FileReader::Make(pool, ParquetFileReader::OpenFile(file, false), &parquet_reader));
ASSERT_OK(parquet_reader->ReadTable(&table));

ASSERT_OK_AND_ASSIGN(auto input_file, ::arrow::io::ReadableFile::Open(expect_file));
class TestArrowReadDeltaEncoding : public ::testing::Test {
public:
void ReadTableFromParquetFile(const std::string& file_name,
std::shared_ptr<Table>* out) {
auto file = test::get_data_file(file_name);
auto pool = ::arrow::default_memory_pool();
std::unique_ptr<FileReader> parquet_reader;
ASSERT_OK(FileReader::Make(pool, ParquetFileReader::OpenFile(file, false),
&parquet_reader));
ASSERT_OK(parquet_reader->ReadTable(out));
ASSERT_OK((*out)->ValidateFull());
}

void ReadTableFromCSVFile(const std::string& file_name,
const ::arrow::csv::ConvertOptions& convert_options,
std::shared_ptr<Table>* out) {
auto file = test::get_data_file(file_name);
ASSERT_OK_AND_ASSIGN(auto input_file, ::arrow::io::ReadableFile::Open(file));
ASSERT_OK_AND_ASSIGN(auto csv_reader,
::arrow::csv::TableReader::Make(
::arrow::io::default_io_context(), input_file,
::arrow::csv::ReadOptions::Defaults(),
::arrow::csv::ParseOptions::Defaults(), convert_options));
ASSERT_OK_AND_ASSIGN(*out, csv_reader->Read());
}
};

TEST_F(TestArrowReadDeltaEncoding, DeltaBinaryPacked) {
std::shared_ptr<::arrow::Table> actual_table, expect_table;
ReadTableFromParquetFile("delta_binary_packed.parquet", &actual_table);

auto convert_options = ::arrow::csv::ConvertOptions::Defaults();
for (int i = 0; i <= 64; ++i) {
std::string column_name = "bitwidth" + std::to_string(i);
convert_options.column_types[column_name] = ::arrow::int64();
}
convert_options.column_types["int_value"] = ::arrow::int32();
ASSERT_OK_AND_ASSIGN(auto csv_reader,
::arrow::csv::TableReader::Make(
::arrow::io::default_io_context(), input_file,
::arrow::csv::ReadOptions::Defaults(),
::arrow::csv::ParseOptions::Defaults(), convert_options));
ASSERT_OK_AND_ASSIGN(auto expect_table, csv_reader->Read());
ReadTableFromCSVFile("delta_binary_packed_expect.csv", convert_options, &expect_table);

::arrow::AssertTablesEqual(*actual_table, *expect_table);
}

TEST_F(TestArrowReadDeltaEncoding, DeltaByteArray) {
std::shared_ptr<::arrow::Table> actual_table, expect_table;
ReadTableFromParquetFile("delta_byte_array.parquet", &actual_table);

auto convert_options = ::arrow::csv::ConvertOptions::Defaults();
std::vector<std::string> column_names = {
"c_customer_id", "c_salutation", "c_first_name",
"c_last_name", "c_preferred_cust_flag", "c_birth_country",
"c_login", "c_email_address", "c_last_review_date"};
for (auto name : column_names) {
convert_options.column_types[name] = ::arrow::utf8();
}
convert_options.strings_can_be_null = true;
ReadTableFromCSVFile("delta_byte_array_expect.csv", convert_options, &expect_table);

::arrow::AssertTablesEqual(*actual_table, *expect_table, false);
}

TEST_F(TestArrowReadDeltaEncoding, IncrementalDecodeDeltaByteArray) {
auto file = test::get_data_file("delta_byte_array.parquet");
auto pool = ::arrow::default_memory_pool();
const int64_t batch_size = 100;
ArrowReaderProperties properties = default_arrow_reader_properties();
properties.set_batch_size(batch_size);
std::unique_ptr<FileReader> parquet_reader;
std::shared_ptr<::arrow::RecordBatchReader> rb_reader;
ASSERT_OK(FileReader::Make(pool, ParquetFileReader::OpenFile(file, false), properties,
&parquet_reader));
ASSERT_OK(parquet_reader->GetRecordBatchReader(Iota(parquet_reader->num_row_groups()),
&rb_reader));

::arrow::AssertTablesEqual(*table, *expect_table);
auto convert_options = ::arrow::csv::ConvertOptions::Defaults();
std::vector<std::string> column_names = {
"c_customer_id", "c_salutation", "c_first_name",
"c_last_name", "c_preferred_cust_flag", "c_birth_country",
"c_login", "c_email_address", "c_last_review_date"};
for (auto name : column_names) {
convert_options.column_types[name] = ::arrow::utf8();
}
convert_options.strings_can_be_null = true;
std::shared_ptr<::arrow::Table> csv_table;
ReadTableFromCSVFile("delta_byte_array_expect.csv", convert_options, &csv_table);

::arrow::TableBatchReader csv_table_reader(*csv_table);
csv_table_reader.set_chunksize(batch_size);

std::shared_ptr<::arrow::RecordBatch> actual_batch, expected_batch;
for (int i = 0; i < csv_table->num_rows() / batch_size; ++i) {
ASSERT_OK(rb_reader->ReadNext(&actual_batch));
ASSERT_OK(actual_batch->ValidateFull());
ASSERT_OK(csv_table_reader.ReadNext(&expected_batch));
ASSERT_NO_FATAL_FAILURE(::arrow::AssertBatchesEqual(*expected_batch, *actual_batch));
}
ASSERT_OK(rb_reader->ReadNext(&actual_batch));
ASSERT_EQ(nullptr, actual_batch);
}

#else
TEST(TestArrowReadDeltaEncoding, DeltaBinaryPacked) {
GTEST_SKIP() << "Test needs CSV reader";
}

TEST(TestArrowReadDeltaEncoding, DeltaByteArray) {
GTEST_SKIP() << "Test needs CSV reader";
}

TEST(TestArrowReadDeltaEncoding, IncrementalDecodeDeltaByteArray) {
GTEST_SKIP() << "Test needs CSV reader";
}

#endif

struct NestedFilterTestCase {
Expand Down
7 changes: 6 additions & 1 deletion cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -780,8 +780,13 @@ class ColumnReaderImplBase {
decoders_[static_cast<int>(encoding)] = std::move(decoder);
break;
}
case Encoding::DELTA_BYTE_ARRAY: {
auto decoder = MakeTypedDecoder<DType>(Encoding::DELTA_BYTE_ARRAY, descr_);
current_decoder_ = decoder.get();
decoders_[static_cast<int>(encoding)] = std::move(decoder);
break;
}
case Encoding::DELTA_LENGTH_BYTE_ARRAY:
case Encoding::DELTA_BYTE_ARRAY:
ParquetException::NYI("Unsupported encoding");

default:
Expand Down
Loading