Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 112 additions & 106 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1209,28 +1209,31 @@ Status ConvertDictionaryToDense(const ::arrow::Array& array, MemoryPool* pool,
return Status::OK();
}

template <typename DType>
class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<DType> {
template <typename ParquetType>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the implementation uses both parquet and arrow types, I thought it is better to be explicit. I can restore the previous name though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense and already used by some functions in this file.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the implementation uses both parquet and arrow types, I thought it is better to be explicit.

+1

class TypedColumnWriterImpl : public ColumnWriterImpl,
public TypedColumnWriter<ParquetType> {
public:
using T = typename DType::c_type;
using T = typename ParquetType::c_type;

TypedColumnWriterImpl(ColumnChunkMetaDataBuilder* metadata,
std::unique_ptr<PageWriter> pager, const bool use_dictionary,
Encoding::type encoding, const WriterProperties* properties)
: ColumnWriterImpl(metadata, std::move(pager), use_dictionary, encoding,
properties) {
current_encoder_ = MakeEncoder(DType::type_num, encoding, use_dictionary, descr_,
properties->memory_pool());
current_encoder_ = MakeEncoder(ParquetType::type_num, encoding, use_dictionary,
descr_, properties->memory_pool());
// We have to dynamic_cast as some compilers don't want to static_cast
// through virtual inheritance.
current_value_encoder_ = dynamic_cast<TypedEncoder<DType>*>(current_encoder_.get());
current_value_encoder_ =
dynamic_cast<TypedEncoder<ParquetType>*>(current_encoder_.get());
// Will be null if not using dictionary, but that's ok
current_dict_encoder_ = dynamic_cast<DictEncoder<DType>*>(current_encoder_.get());
current_dict_encoder_ =
dynamic_cast<DictEncoder<ParquetType>*>(current_encoder_.get());

if (properties->statistics_enabled(descr_->path()) &&
(SortOrder::UNKNOWN != descr_->sort_order())) {
page_statistics_ = MakeStatistics<DType>(descr_, allocator_);
chunk_statistics_ = MakeStatistics<DType>(descr_, allocator_);
page_statistics_ = MakeStatistics<ParquetType>(descr_, allocator_);
chunk_statistics_ = MakeStatistics<ParquetType>(descr_, allocator_);
}
if (properties->size_statistics_level() == SizeStatisticsLevel::ColumnChunk ||
properties->size_statistics_level() == SizeStatisticsLevel::PageAndColumnChunk) {
Expand Down Expand Up @@ -1364,6 +1367,35 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
int64_t num_levels, const ::arrow::Array& array,
ArrowWriteContext* context, bool maybe_parent_nulls);

template <typename ArrowType>
Status WriteArrowSerialize(const int16_t* def_levels, const int16_t* rep_levels,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consistently using the same argument order as WriteArrow.

int64_t num_levels, const ::arrow::Array& array,
ArrowWriteContext* ctx, bool maybe_parent_nulls);

Status WriteArrowZeroCopy(const int16_t* def_levels, const int16_t* rep_levels,
int64_t num_levels, const ::arrow::Array& array,
ArrowWriteContext* ctx, bool maybe_parent_nulls) {
const auto& data = checked_cast<const ::arrow::PrimitiveArray&>(array);
const T* values = data.data()->GetValues<T>(1);
bool no_nulls =
this->descr()->schema_node()->is_required() || (array.null_count() == 0);

if (!maybe_parent_nulls && no_nulls) {
PARQUET_CATCH_NOT_OK(WriteBatch(num_levels, def_levels, rep_levels, values));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if array.null_count() != 0 here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the existing logic, I haven't changed it.
I there are actually null values in the array not just being nullable, then we need to pick the slower path.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if array.null_count() != 0 here?

It should not happen when the field is required but has null values: https://github.com/apache/arrow/blob/main/cpp/src/parquet/column_writer.cc#L1324

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, thanks. Ideally we would have a DCHECK but since this is just moving code around we'll live without it.

} else {
PARQUET_CATCH_NOT_OK(WriteBatchSpaced(num_levels, def_levels, rep_levels,
data.null_bitmap_data(), data.offset(),
values));
}
return Status::OK();
}

Status WriteArrowTimestamps(const int16_t* def_levels, const int16_t* rep_levels,
int64_t num_levels, const ::arrow::Array& values,
ArrowWriteContext* ctx, bool maybe_parent_nulls) {
return Status::NotImplemented("Timestamps writing is only implemented for Int64Type");
}

void WriteDictionaryPage() override {
DCHECK(current_dict_encoder_);
std::shared_ptr<ResizableBuffer> buffer = AllocateBuffer(
Expand Down Expand Up @@ -1449,15 +1481,15 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
}

private:
using ValueEncoderType = typename EncodingTraits<DType>::Encoder;
using TypedStats = TypedStatistics<DType>;
using ValueEncoderType = typename EncodingTraits<ParquetType>::Encoder;
using TypedStats = TypedStatistics<ParquetType>;
std::unique_ptr<Encoder> current_encoder_;
// Downcasted observers of current_encoder_.
// The downcast is performed once as opposed to at every use since
// dynamic_cast is so expensive, and static_cast is not available due
// to virtual inheritance.
ValueEncoderType* current_value_encoder_;
DictEncoder<DType>* current_dict_encoder_;
DictEncoder<ParquetType>* current_dict_encoder_;
std::shared_ptr<TypedStats> page_statistics_;
std::shared_ptr<TypedStats> chunk_statistics_;
std::unique_ptr<SizeStatistics> page_size_statistics_;
Expand Down Expand Up @@ -1655,8 +1687,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
FlushBufferedDataPages();
fallback_ = true;
// Only PLAIN encoding is supported for fallback in V1
current_encoder_ = MakeEncoder(DType::type_num, Encoding::PLAIN, false, descr_,
properties_->memory_pool());
current_encoder_ = MakeEncoder(ParquetType::type_num, Encoding::PLAIN, false,
descr_, properties_->memory_pool());
current_value_encoder_ = dynamic_cast<ValueEncoderType*>(current_encoder_.get());
current_dict_encoder_ = nullptr; // not using dict
encoding_ = Encoding::PLAIN;
Expand Down Expand Up @@ -1720,8 +1752,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
}
};

template <typename DType>
Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(
template <typename ParquetType>
Status TypedColumnWriterImpl<ParquetType>::WriteArrowDictionary(
const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels,
const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) {
// If this is the first time writing a DictionaryArray, then there's
Expand Down Expand Up @@ -1758,7 +1790,7 @@ Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(
return WriteDense();
}

auto dict_encoder = dynamic_cast<DictEncoder<DType>*>(current_encoder_.get());
auto dict_encoder = dynamic_cast<DictEncoder<ParquetType>*>(current_encoder_.get());
const auto& data = checked_cast<const ::arrow::DictionaryArray&>(array);
std::shared_ptr<::arrow::Array> dictionary = data.dictionary();
std::shared_ptr<::arrow::Array> indices = data.indices();
Expand Down Expand Up @@ -1870,67 +1902,42 @@ struct SerializeFunctor {
}
};

template <typename ParquetType, typename ArrowType>
Status WriteArrowSerialize(const ::arrow::Array& array, int64_t num_levels,
const int16_t* def_levels, const int16_t* rep_levels,
ArrowWriteContext* ctx, TypedColumnWriter<ParquetType>* writer,
bool maybe_parent_nulls) {
using ParquetCType = typename ParquetType::c_type;
template <typename ParquetType>
template <typename ArrowType>
Status TypedColumnWriterImpl<ParquetType>::WriteArrowSerialize(
const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels,
const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) {
using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
using ParquetCType = typename ParquetType::c_type;

ParquetCType* buffer = nullptr;
PARQUET_THROW_NOT_OK(ctx->GetScratchData<ParquetCType>(array.length(), &buffer));

SerializeFunctor<ParquetType, ArrowType> functor;
RETURN_NOT_OK(functor.Serialize(checked_cast<const ArrayType&>(array), ctx, buffer));
bool no_nulls =
writer->descr()->schema_node()->is_required() || (array.null_count() == 0);
this->descr()->schema_node()->is_required() || (array.null_count() == 0);
if (!maybe_parent_nulls && no_nulls) {
PARQUET_CATCH_NOT_OK(writer->WriteBatch(num_levels, def_levels, rep_levels, buffer));
PARQUET_CATCH_NOT_OK(WriteBatch(num_levels, def_levels, rep_levels, buffer));
} else {
PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(num_levels, def_levels, rep_levels,
array.null_bitmap_data(),
array.offset(), buffer));
PARQUET_CATCH_NOT_OK(WriteBatchSpaced(num_levels, def_levels, rep_levels,
array.null_bitmap_data(), array.offset(),
buffer));
}
return Status::OK();
}

template <typename ParquetType>
Status WriteArrowZeroCopy(const ::arrow::Array& array, int64_t num_levels,
const int16_t* def_levels, const int16_t* rep_levels,
ArrowWriteContext* ctx, TypedColumnWriter<ParquetType>* writer,
bool maybe_parent_nulls) {
using T = typename ParquetType::c_type;
const auto& data = static_cast<const ::arrow::PrimitiveArray&>(array);
const T* values = nullptr;
// The values buffer may be null if the array is empty (ARROW-2744)
if (data.values() != nullptr) {
values = reinterpret_cast<const T*>(data.values()->data()) + data.offset();
} else {
DCHECK_EQ(data.length(), 0);
#define WRITE_SERIALIZE_CASE(ArrowEnum) \
case ::arrow::Type::ArrowEnum: { \
using ArrowType = typename ::arrow::TypeIdTraits<::arrow::Type::ArrowEnum>::Type; \
return WriteArrowSerialize<ArrowType>(def_levels, rep_levels, num_levels, array, \
ctx, maybe_parent_nulls); \
}
bool no_nulls =
writer->descr()->schema_node()->is_required() || (array.null_count() == 0);

if (!maybe_parent_nulls && no_nulls) {
PARQUET_CATCH_NOT_OK(writer->WriteBatch(num_levels, def_levels, rep_levels, values));
} else {
PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(num_levels, def_levels, rep_levels,
data.null_bitmap_data(), data.offset(),
values));
}
return Status::OK();
}

#define WRITE_SERIALIZE_CASE(ArrowEnum, ArrowType, ParquetType) \
case ::arrow::Type::ArrowEnum: \
return WriteArrowSerialize<ParquetType, ::arrow::ArrowType>( \
array, num_levels, def_levels, rep_levels, ctx, this, maybe_parent_nulls);

#define WRITE_ZERO_COPY_CASE(ArrowEnum, ArrowType, ParquetType) \
case ::arrow::Type::ArrowEnum: \
return WriteArrowZeroCopy<ParquetType>(array, num_levels, def_levels, rep_levels, \
ctx, this, maybe_parent_nulls);
#define WRITE_ZERO_COPY_CASE(ArrowEnum) \
case ::arrow::Type::ArrowEnum: \
return WriteArrowZeroCopy(def_levels, rep_levels, num_levels, array, ctx, \
maybe_parent_nulls);

#define ARROW_UNSUPPORTED() \
std::stringstream ss; \
Expand Down Expand Up @@ -1958,8 +1965,8 @@ Status TypedColumnWriterImpl<BooleanType>::WriteArrowDense(
if (array.type_id() != ::arrow::Type::BOOL) {
ARROW_UNSUPPORTED();
}
return WriteArrowSerialize<BooleanType, ::arrow::BooleanType>(
array, num_levels, def_levels, rep_levels, ctx, this, maybe_parent_nulls);
return WriteArrowSerialize<::arrow::BooleanType>(def_levels, rep_levels, num_levels,
array, ctx, maybe_parent_nulls);
}

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -2041,17 +2048,17 @@ Status TypedColumnWriterImpl<Int32Type>::WriteArrowDense(
case ::arrow::Type::NA: {
PARQUET_CATCH_NOT_OK(WriteBatch(num_levels, def_levels, rep_levels, nullptr));
} break;
WRITE_SERIALIZE_CASE(INT8, Int8Type, Int32Type)
WRITE_SERIALIZE_CASE(UINT8, UInt8Type, Int32Type)
WRITE_SERIALIZE_CASE(INT16, Int16Type, Int32Type)
WRITE_SERIALIZE_CASE(UINT16, UInt16Type, Int32Type)
WRITE_SERIALIZE_CASE(UINT32, UInt32Type, Int32Type)
WRITE_ZERO_COPY_CASE(INT32, Int32Type, Int32Type)
WRITE_ZERO_COPY_CASE(DATE32, Date32Type, Int32Type)
WRITE_SERIALIZE_CASE(DATE64, Date64Type, Int32Type)
WRITE_SERIALIZE_CASE(TIME32, Time32Type, Int32Type)
WRITE_SERIALIZE_CASE(DECIMAL128, Decimal128Type, Int32Type)
WRITE_SERIALIZE_CASE(DECIMAL256, Decimal256Type, Int32Type)
WRITE_SERIALIZE_CASE(INT8)
WRITE_SERIALIZE_CASE(UINT8)
WRITE_SERIALIZE_CASE(INT16)
WRITE_SERIALIZE_CASE(UINT16)
WRITE_SERIALIZE_CASE(UINT32)
WRITE_ZERO_COPY_CASE(INT32)
WRITE_ZERO_COPY_CASE(DATE32)
WRITE_SERIALIZE_CASE(DATE64)
WRITE_SERIALIZE_CASE(TIME32)
WRITE_SERIALIZE_CASE(DECIMAL128)
WRITE_SERIALIZE_CASE(DECIMAL256)
default:
ARROW_UNSUPPORTED()
}
Expand Down Expand Up @@ -2158,28 +2165,27 @@ struct SerializeFunctor<Int64Type, ::arrow::TimestampType> {
#undef COERCE_INVALID
#undef COERCE_MULTIPLY

Status WriteTimestamps(const ::arrow::Array& values, int64_t num_levels,
const int16_t* def_levels, const int16_t* rep_levels,
ArrowWriteContext* ctx, TypedColumnWriter<Int64Type>* writer,
bool maybe_parent_nulls) {
template <>
Status TypedColumnWriterImpl<Int64Type>::WriteArrowTimestamps(
const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels,
const ::arrow::Array& values, ArrowWriteContext* ctx, bool maybe_parent_nulls) {
const auto& source_type = static_cast<const ::arrow::TimestampType&>(*values.type());

auto WriteCoerce = [&](const ArrowWriterProperties* properties) {
ArrowWriteContext temp_ctx = *ctx;
temp_ctx.properties = properties;
return WriteArrowSerialize<Int64Type, ::arrow::TimestampType>(
values, num_levels, def_levels, rep_levels, &temp_ctx, writer,
maybe_parent_nulls);
return WriteArrowSerialize<::arrow::TimestampType>(
def_levels, rep_levels, num_levels, values, &temp_ctx, maybe_parent_nulls);
};

const ParquetVersion::type version = writer->properties()->version();
const ParquetVersion::type version = this->properties()->version();

if (ctx->properties->coerce_timestamps_enabled()) {
// User explicitly requested coercion to specific unit
if (source_type.unit() == ctx->properties->coerce_timestamps_unit()) {
// No data conversion necessary
return WriteArrowZeroCopy<Int64Type>(values, num_levels, def_levels, rep_levels,
ctx, writer, maybe_parent_nulls);
return WriteArrowZeroCopy(def_levels, rep_levels, num_levels, values, ctx,
maybe_parent_nulls);
} else {
return WriteCoerce(ctx->properties);
}
Expand All @@ -2204,8 +2210,8 @@ Status WriteTimestamps(const ::arrow::Array& values, int64_t num_levels,
return WriteCoerce(properties.get());
} else {
// No data conversion necessary
return WriteArrowZeroCopy<Int64Type>(values, num_levels, def_levels, rep_levels, ctx,
writer, maybe_parent_nulls);
return WriteArrowZeroCopy(def_levels, rep_levels, num_levels, values, ctx,
maybe_parent_nulls);
}
}

Expand All @@ -2215,15 +2221,15 @@ Status TypedColumnWriterImpl<Int64Type>::WriteArrowDense(
const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) {
switch (array.type()->id()) {
case ::arrow::Type::TIMESTAMP:
return WriteTimestamps(array, num_levels, def_levels, rep_levels, ctx, this,
maybe_parent_nulls);
WRITE_ZERO_COPY_CASE(INT64, Int64Type, Int64Type)
WRITE_SERIALIZE_CASE(UINT32, UInt32Type, Int64Type)
WRITE_SERIALIZE_CASE(UINT64, UInt64Type, Int64Type)
WRITE_ZERO_COPY_CASE(TIME64, Time64Type, Int64Type)
WRITE_ZERO_COPY_CASE(DURATION, DurationType, Int64Type)
WRITE_SERIALIZE_CASE(DECIMAL128, Decimal128Type, Int64Type)
WRITE_SERIALIZE_CASE(DECIMAL256, Decimal256Type, Int64Type)
return WriteArrowTimestamps(def_levels, rep_levels, num_levels, array, ctx,
maybe_parent_nulls);
WRITE_ZERO_COPY_CASE(INT64)
WRITE_SERIALIZE_CASE(UINT32)
WRITE_SERIALIZE_CASE(UINT64)
WRITE_ZERO_COPY_CASE(TIME64)
WRITE_ZERO_COPY_CASE(DURATION)
WRITE_SERIALIZE_CASE(DECIMAL128)
WRITE_SERIALIZE_CASE(DECIMAL256)
default:
ARROW_UNSUPPORTED();
}
Expand All @@ -2236,8 +2242,8 @@ Status TypedColumnWriterImpl<Int96Type>::WriteArrowDense(
if (array.type_id() != ::arrow::Type::TIMESTAMP) {
ARROW_UNSUPPORTED();
}
return WriteArrowSerialize<Int96Type, ::arrow::TimestampType>(
array, num_levels, def_levels, rep_levels, ctx, this, maybe_parent_nulls);
return WriteArrowSerialize<::arrow::TimestampType>(def_levels, rep_levels, num_levels,
array, ctx, maybe_parent_nulls);
}

// ----------------------------------------------------------------------
Expand All @@ -2250,8 +2256,8 @@ Status TypedColumnWriterImpl<FloatType>::WriteArrowDense(
if (array.type_id() != ::arrow::Type::FLOAT) {
ARROW_UNSUPPORTED();
}
return WriteArrowZeroCopy<FloatType>(array, num_levels, def_levels, rep_levels, ctx,
this, maybe_parent_nulls);
return WriteArrowZeroCopy(def_levels, rep_levels, num_levels, array, ctx,
maybe_parent_nulls);
}

template <>
Expand All @@ -2261,8 +2267,8 @@ Status TypedColumnWriterImpl<DoubleType>::WriteArrowDense(
if (array.type_id() != ::arrow::Type::DOUBLE) {
ARROW_UNSUPPORTED();
}
return WriteArrowZeroCopy<DoubleType>(array, num_levels, def_levels, rep_levels, ctx,
this, maybe_parent_nulls);
return WriteArrowZeroCopy(def_levels, rep_levels, num_levels, array, ctx,
maybe_parent_nulls);
}

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -2442,10 +2448,10 @@ Status TypedColumnWriterImpl<FLBAType>::WriteArrowDense(
const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels,
const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) {
switch (array.type()->id()) {
WRITE_SERIALIZE_CASE(FIXED_SIZE_BINARY, FixedSizeBinaryType, FLBAType)
WRITE_SERIALIZE_CASE(DECIMAL128, Decimal128Type, FLBAType)
WRITE_SERIALIZE_CASE(DECIMAL256, Decimal256Type, FLBAType)
WRITE_SERIALIZE_CASE(HALF_FLOAT, HalfFloatType, FLBAType)
WRITE_SERIALIZE_CASE(FIXED_SIZE_BINARY)
WRITE_SERIALIZE_CASE(DECIMAL128)
WRITE_SERIALIZE_CASE(DECIMAL256)
WRITE_SERIALIZE_CASE(HALF_FLOAT)
default:
break;
}
Expand Down
Loading