Skip to content
This repository was archived by the owner on May 10, 2024. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/parquet/column/column-writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,

TYPED_TEST_CASE(TestPrimitiveWriter, TestTypes);

using TestNullValuesWriter = TestPrimitiveWriter<Int32Type>;

TYPED_TEST(TestPrimitiveWriter, RequiredPlain) {
this->TestRequiredWithEncoding(Encoding::PLAIN);
}
Expand Down Expand Up @@ -303,5 +305,26 @@ TYPED_TEST(TestPrimitiveWriter, RequiredVeryLargeChunk) {
}
}

// PARQUET-719
// Test case for NULL values
TEST_F(TestNullValuesWriter, OptionalNullValueChunk) {
this->SetUpSchemaOptional();

this->GenerateData(LARGE_SIZE);

std::vector<int16_t> definition_levels(LARGE_SIZE, 0);
std::vector<int16_t> repetition_levels(LARGE_SIZE, 0);

auto writer = this->BuildWriter(LARGE_SIZE);
// All values being written are NULL
writer->WriteBatch(
this->values_.size(), definition_levels.data(), repetition_levels.data(), NULL);
writer->Close();

// Just read the first SMALL_SIZE rows to ensure we could read it back in
this->ReadColumn();
ASSERT_EQ(0, this->values_read_);
}

} // namespace test
} // namespace parquet
12 changes: 5 additions & 7 deletions src/parquet/column/scanner-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,17 +141,15 @@ class TestFlatScanner : public ::testing::Test {
vector<uint8_t> data_buffer_; // For BA and FLBA
};

typedef TestFlatScanner<FLBAType> TestFlatFLBAScanner;

static int num_levels_per_page = 100;
static int num_pages = 20;
static int batch_size = 32;

typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
ByteArrayType> TestTypes;

typedef TestFlatScanner<BooleanType> TestBooleanFlatScanner;
typedef TestFlatScanner<FLBAType> TestFLBAFlatScanner;
using TestBooleanFlatScanner = TestFlatScanner<BooleanType>;
using TestFLBAFlatScanner = TestFlatScanner<FLBAType>;

TYPED_TEST_CASE(TestFlatScanner, TestTypes);

Expand Down Expand Up @@ -183,7 +181,7 @@ TEST_F(TestFLBAFlatScanner, TestPlainDictScanner) {
}

// PARQUET 502
TEST_F(TestFlatFLBAScanner, TestSmallBatch) {
TEST_F(TestFLBAFlatScanner, TestSmallBatch) {
NodePtr type = schema::PrimitiveNode::Make("c1", Repetition::REQUIRED,
Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, FLBA_LENGTH, 10, 2);
const ColumnDescriptor d(type, 0, 0);
Expand All @@ -194,7 +192,7 @@ TEST_F(TestFlatFLBAScanner, TestSmallBatch) {
CheckResults(1, &d);
}

TEST_F(TestFlatFLBAScanner, TestDescriptorAPI) {
TEST_F(TestFLBAFlatScanner, TestDescriptorAPI) {
NodePtr type = schema::PrimitiveNode::Make("c1", Repetition::OPTIONAL,
Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, FLBA_LENGTH, 10, 2);
const ColumnDescriptor d(type, 4, 0);
Expand All @@ -209,7 +207,7 @@ TEST_F(TestFlatFLBAScanner, TestDescriptorAPI) {
ASSERT_EQ(FLBA_LENGTH, scanner->descr()->type_length());
}

TEST_F(TestFlatFLBAScanner, TestFLBAPrinterNext) {
TEST_F(TestFLBAFlatScanner, TestFLBAPrinterNext) {
NodePtr type = schema::PrimitiveNode::Make("c1", Repetition::OPTIONAL,
Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, FLBA_LENGTH, 10, 2);
const ColumnDescriptor d(type, 4, 0);
Expand Down
14 changes: 9 additions & 5 deletions src/parquet/column/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class PARQUET_EXPORT TypedColumnWriter : public ColumnWriter {
void CheckDictionarySizeLimit() override;

private:
void WriteMiniBatch(int64_t num_values, const int16_t* def_levels,
int64_t WriteMiniBatch(int64_t num_values, const int16_t* def_levels,
const int16_t* rep_levels, const T* values);

typedef Encoder<DType> EncoderType;
Expand All @@ -167,7 +167,7 @@ class PARQUET_EXPORT TypedColumnWriter : public ColumnWriter {
};

template <typename DType>
inline void TypedColumnWriter<DType>::WriteMiniBatch(int64_t num_values,
inline int64_t TypedColumnWriter<DType>::WriteMiniBatch(int64_t num_values,
const int16_t* def_levels, const int16_t* rep_levels, const T* values) {
int64_t values_to_write = 0;
// If the field is required and non-repeated, there are no definition levels
Expand Down Expand Up @@ -209,6 +209,8 @@ inline void TypedColumnWriter<DType>::WriteMiniBatch(int64_t num_values,
AddDataPage();
}
if (has_dictionary_ && !fallback_) { CheckDictionarySizeLimit(); }

return values_to_write;
}

template <typename DType>
Expand All @@ -222,15 +224,17 @@ inline void TypedColumnWriter<DType>::WriteBatch(int64_t num_values,
int64_t write_batch_size = properties_->write_batch_size();
int num_batches = num_values / write_batch_size;
int64_t num_remaining = num_values % write_batch_size;
int64_t value_offset = 0;
for (int round = 0; round < num_batches; round++) {
int64_t offset = round * write_batch_size;
WriteMiniBatch(
write_batch_size, &def_levels[offset], &rep_levels[offset], &values[offset]);
int64_t num_values = WriteMiniBatch(write_batch_size, &def_levels[offset],
&rep_levels[offset], &values[value_offset]);
value_offset += num_values;
}
// Write the remaining values
int64_t offset = num_batches * write_batch_size;
WriteMiniBatch(
num_remaining, &def_levels[offset], &rep_levels[offset], &values[offset]);
num_remaining, &def_levels[offset], &rep_levels[offset], &values[value_offset]);
}

template <typename DType>
Expand Down