diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 393bf8a1623..79dd13ea49f 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -1209,28 +1209,31 @@ Status ConvertDictionaryToDense(const ::arrow::Array& array, MemoryPool* pool, return Status::OK(); } -template -class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter { +template +class TypedColumnWriterImpl : public ColumnWriterImpl, + public TypedColumnWriter { public: - using T = typename DType::c_type; + using T = typename ParquetType::c_type; TypedColumnWriterImpl(ColumnChunkMetaDataBuilder* metadata, std::unique_ptr pager, const bool use_dictionary, Encoding::type encoding, const WriterProperties* properties) : ColumnWriterImpl(metadata, std::move(pager), use_dictionary, encoding, properties) { - current_encoder_ = MakeEncoder(DType::type_num, encoding, use_dictionary, descr_, - properties->memory_pool()); + current_encoder_ = MakeEncoder(ParquetType::type_num, encoding, use_dictionary, + descr_, properties->memory_pool()); // We have to dynamic_cast as some compilers don't want to static_cast // through virtual inheritance. - current_value_encoder_ = dynamic_cast*>(current_encoder_.get()); + current_value_encoder_ = + dynamic_cast*>(current_encoder_.get()); // Will be null if not using dictionary, but that's ok - current_dict_encoder_ = dynamic_cast*>(current_encoder_.get()); + current_dict_encoder_ = + dynamic_cast*>(current_encoder_.get()); if (properties->statistics_enabled(descr_->path()) && (SortOrder::UNKNOWN != descr_->sort_order())) { - page_statistics_ = MakeStatistics(descr_, allocator_); - chunk_statistics_ = MakeStatistics(descr_, allocator_); + page_statistics_ = MakeStatistics(descr_, allocator_); + chunk_statistics_ = MakeStatistics(descr_, allocator_); } if (properties->size_statistics_level() == SizeStatisticsLevel::ColumnChunk || properties->size_statistics_level() == SizeStatisticsLevel::PageAndColumnChunk) { @@ -1364,6 +1367,35 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< int64_t num_levels, const ::arrow::Array& array, ArrowWriteContext* context, bool maybe_parent_nulls); + template + Status WriteArrowSerialize(const int16_t* def_levels, const int16_t* rep_levels, + int64_t num_levels, const ::arrow::Array& array, + ArrowWriteContext* ctx, bool maybe_parent_nulls); + + Status WriteArrowZeroCopy(const int16_t* def_levels, const int16_t* rep_levels, + int64_t num_levels, const ::arrow::Array& array, + ArrowWriteContext* ctx, bool maybe_parent_nulls) { + const auto& data = checked_cast(array); + const T* values = data.data()->GetValues(1); + bool no_nulls = + this->descr()->schema_node()->is_required() || (array.null_count() == 0); + + if (!maybe_parent_nulls && no_nulls) { + PARQUET_CATCH_NOT_OK(WriteBatch(num_levels, def_levels, rep_levels, values)); + } else { + PARQUET_CATCH_NOT_OK(WriteBatchSpaced(num_levels, def_levels, rep_levels, + data.null_bitmap_data(), data.offset(), + values)); + } + return Status::OK(); + } + + Status WriteArrowTimestamps(const int16_t* def_levels, const int16_t* rep_levels, + int64_t num_levels, const ::arrow::Array& values, + ArrowWriteContext* ctx, bool maybe_parent_nulls) { + return Status::NotImplemented("Timestamps writing is only implemented for Int64Type"); + } + void WriteDictionaryPage() override { DCHECK(current_dict_encoder_); std::shared_ptr buffer = AllocateBuffer( @@ -1449,15 +1481,15 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< } private: - using ValueEncoderType = typename EncodingTraits::Encoder; - using TypedStats = TypedStatistics; + using ValueEncoderType = typename EncodingTraits::Encoder; + using TypedStats = TypedStatistics; std::unique_ptr current_encoder_; // Downcasted observers of current_encoder_. // The downcast is performed once as opposed to at every use since // dynamic_cast is so expensive, and static_cast is not available due // to virtual inheritance. ValueEncoderType* current_value_encoder_; - DictEncoder* current_dict_encoder_; + DictEncoder* current_dict_encoder_; std::shared_ptr page_statistics_; std::shared_ptr chunk_statistics_; std::unique_ptr page_size_statistics_; @@ -1655,8 +1687,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< FlushBufferedDataPages(); fallback_ = true; // Only PLAIN encoding is supported for fallback in V1 - current_encoder_ = MakeEncoder(DType::type_num, Encoding::PLAIN, false, descr_, - properties_->memory_pool()); + current_encoder_ = MakeEncoder(ParquetType::type_num, Encoding::PLAIN, false, + descr_, properties_->memory_pool()); current_value_encoder_ = dynamic_cast(current_encoder_.get()); current_dict_encoder_ = nullptr; // not using dict encoding_ = Encoding::PLAIN; @@ -1720,8 +1752,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< } }; -template -Status TypedColumnWriterImpl::WriteArrowDictionary( +template +Status TypedColumnWriterImpl::WriteArrowDictionary( const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) { // If this is the first time writing a DictionaryArray, then there's @@ -1758,7 +1790,7 @@ Status TypedColumnWriterImpl::WriteArrowDictionary( return WriteDense(); } - auto dict_encoder = dynamic_cast*>(current_encoder_.get()); + auto dict_encoder = dynamic_cast*>(current_encoder_.get()); const auto& data = checked_cast(array); std::shared_ptr<::arrow::Array> dictionary = data.dictionary(); std::shared_ptr<::arrow::Array> indices = data.indices(); @@ -1870,13 +1902,13 @@ struct SerializeFunctor { } }; -template -Status WriteArrowSerialize(const ::arrow::Array& array, int64_t num_levels, - const int16_t* def_levels, const int16_t* rep_levels, - ArrowWriteContext* ctx, TypedColumnWriter* writer, - bool maybe_parent_nulls) { - using ParquetCType = typename ParquetType::c_type; +template +template +Status TypedColumnWriterImpl::WriteArrowSerialize( + const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, + const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) { using ArrayType = typename ::arrow::TypeTraits::ArrayType; + using ParquetCType = typename ParquetType::c_type; ParquetCType* buffer = nullptr; PARQUET_THROW_NOT_OK(ctx->GetScratchData(array.length(), &buffer)); @@ -1884,53 +1916,28 @@ Status WriteArrowSerialize(const ::arrow::Array& array, int64_t num_levels, SerializeFunctor functor; RETURN_NOT_OK(functor.Serialize(checked_cast(array), ctx, buffer)); bool no_nulls = - writer->descr()->schema_node()->is_required() || (array.null_count() == 0); + this->descr()->schema_node()->is_required() || (array.null_count() == 0); if (!maybe_parent_nulls && no_nulls) { - PARQUET_CATCH_NOT_OK(writer->WriteBatch(num_levels, def_levels, rep_levels, buffer)); + PARQUET_CATCH_NOT_OK(WriteBatch(num_levels, def_levels, rep_levels, buffer)); } else { - PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(num_levels, def_levels, rep_levels, - array.null_bitmap_data(), - array.offset(), buffer)); + PARQUET_CATCH_NOT_OK(WriteBatchSpaced(num_levels, def_levels, rep_levels, + array.null_bitmap_data(), array.offset(), + buffer)); } return Status::OK(); } -template -Status WriteArrowZeroCopy(const ::arrow::Array& array, int64_t num_levels, - const int16_t* def_levels, const int16_t* rep_levels, - ArrowWriteContext* ctx, TypedColumnWriter* writer, - bool maybe_parent_nulls) { - using T = typename ParquetType::c_type; - const auto& data = static_cast(array); - const T* values = nullptr; - // The values buffer may be null if the array is empty (ARROW-2744) - if (data.values() != nullptr) { - values = reinterpret_cast(data.values()->data()) + data.offset(); - } else { - DCHECK_EQ(data.length(), 0); +#define WRITE_SERIALIZE_CASE(ArrowEnum) \ + case ::arrow::Type::ArrowEnum: { \ + using ArrowType = typename ::arrow::TypeIdTraits<::arrow::Type::ArrowEnum>::Type; \ + return WriteArrowSerialize(def_levels, rep_levels, num_levels, array, \ + ctx, maybe_parent_nulls); \ } - bool no_nulls = - writer->descr()->schema_node()->is_required() || (array.null_count() == 0); - if (!maybe_parent_nulls && no_nulls) { - PARQUET_CATCH_NOT_OK(writer->WriteBatch(num_levels, def_levels, rep_levels, values)); - } else { - PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(num_levels, def_levels, rep_levels, - data.null_bitmap_data(), data.offset(), - values)); - } - return Status::OK(); -} - -#define WRITE_SERIALIZE_CASE(ArrowEnum, ArrowType, ParquetType) \ - case ::arrow::Type::ArrowEnum: \ - return WriteArrowSerialize( \ - array, num_levels, def_levels, rep_levels, ctx, this, maybe_parent_nulls); - -#define WRITE_ZERO_COPY_CASE(ArrowEnum, ArrowType, ParquetType) \ - case ::arrow::Type::ArrowEnum: \ - return WriteArrowZeroCopy(array, num_levels, def_levels, rep_levels, \ - ctx, this, maybe_parent_nulls); +#define WRITE_ZERO_COPY_CASE(ArrowEnum) \ + case ::arrow::Type::ArrowEnum: \ + return WriteArrowZeroCopy(def_levels, rep_levels, num_levels, array, ctx, \ + maybe_parent_nulls); #define ARROW_UNSUPPORTED() \ std::stringstream ss; \ @@ -1958,8 +1965,8 @@ Status TypedColumnWriterImpl::WriteArrowDense( if (array.type_id() != ::arrow::Type::BOOL) { ARROW_UNSUPPORTED(); } - return WriteArrowSerialize( - array, num_levels, def_levels, rep_levels, ctx, this, maybe_parent_nulls); + return WriteArrowSerialize<::arrow::BooleanType>(def_levels, rep_levels, num_levels, + array, ctx, maybe_parent_nulls); } // ---------------------------------------------------------------------- @@ -2041,17 +2048,17 @@ Status TypedColumnWriterImpl::WriteArrowDense( case ::arrow::Type::NA: { PARQUET_CATCH_NOT_OK(WriteBatch(num_levels, def_levels, rep_levels, nullptr)); } break; - WRITE_SERIALIZE_CASE(INT8, Int8Type, Int32Type) - WRITE_SERIALIZE_CASE(UINT8, UInt8Type, Int32Type) - WRITE_SERIALIZE_CASE(INT16, Int16Type, Int32Type) - WRITE_SERIALIZE_CASE(UINT16, UInt16Type, Int32Type) - WRITE_SERIALIZE_CASE(UINT32, UInt32Type, Int32Type) - WRITE_ZERO_COPY_CASE(INT32, Int32Type, Int32Type) - WRITE_ZERO_COPY_CASE(DATE32, Date32Type, Int32Type) - WRITE_SERIALIZE_CASE(DATE64, Date64Type, Int32Type) - WRITE_SERIALIZE_CASE(TIME32, Time32Type, Int32Type) - WRITE_SERIALIZE_CASE(DECIMAL128, Decimal128Type, Int32Type) - WRITE_SERIALIZE_CASE(DECIMAL256, Decimal256Type, Int32Type) + WRITE_SERIALIZE_CASE(INT8) + WRITE_SERIALIZE_CASE(UINT8) + WRITE_SERIALIZE_CASE(INT16) + WRITE_SERIALIZE_CASE(UINT16) + WRITE_SERIALIZE_CASE(UINT32) + WRITE_ZERO_COPY_CASE(INT32) + WRITE_ZERO_COPY_CASE(DATE32) + WRITE_SERIALIZE_CASE(DATE64) + WRITE_SERIALIZE_CASE(TIME32) + WRITE_SERIALIZE_CASE(DECIMAL128) + WRITE_SERIALIZE_CASE(DECIMAL256) default: ARROW_UNSUPPORTED() } @@ -2158,28 +2165,27 @@ struct SerializeFunctor { #undef COERCE_INVALID #undef COERCE_MULTIPLY -Status WriteTimestamps(const ::arrow::Array& values, int64_t num_levels, - const int16_t* def_levels, const int16_t* rep_levels, - ArrowWriteContext* ctx, TypedColumnWriter* writer, - bool maybe_parent_nulls) { +template <> +Status TypedColumnWriterImpl::WriteArrowTimestamps( + const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, + const ::arrow::Array& values, ArrowWriteContext* ctx, bool maybe_parent_nulls) { const auto& source_type = static_cast(*values.type()); auto WriteCoerce = [&](const ArrowWriterProperties* properties) { ArrowWriteContext temp_ctx = *ctx; temp_ctx.properties = properties; - return WriteArrowSerialize( - values, num_levels, def_levels, rep_levels, &temp_ctx, writer, - maybe_parent_nulls); + return WriteArrowSerialize<::arrow::TimestampType>( + def_levels, rep_levels, num_levels, values, &temp_ctx, maybe_parent_nulls); }; - const ParquetVersion::type version = writer->properties()->version(); + const ParquetVersion::type version = this->properties()->version(); if (ctx->properties->coerce_timestamps_enabled()) { // User explicitly requested coercion to specific unit if (source_type.unit() == ctx->properties->coerce_timestamps_unit()) { // No data conversion necessary - return WriteArrowZeroCopy(values, num_levels, def_levels, rep_levels, - ctx, writer, maybe_parent_nulls); + return WriteArrowZeroCopy(def_levels, rep_levels, num_levels, values, ctx, + maybe_parent_nulls); } else { return WriteCoerce(ctx->properties); } @@ -2204,8 +2210,8 @@ Status WriteTimestamps(const ::arrow::Array& values, int64_t num_levels, return WriteCoerce(properties.get()); } else { // No data conversion necessary - return WriteArrowZeroCopy(values, num_levels, def_levels, rep_levels, ctx, - writer, maybe_parent_nulls); + return WriteArrowZeroCopy(def_levels, rep_levels, num_levels, values, ctx, + maybe_parent_nulls); } } @@ -2215,15 +2221,15 @@ Status TypedColumnWriterImpl::WriteArrowDense( const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) { switch (array.type()->id()) { case ::arrow::Type::TIMESTAMP: - return WriteTimestamps(array, num_levels, def_levels, rep_levels, ctx, this, - maybe_parent_nulls); - WRITE_ZERO_COPY_CASE(INT64, Int64Type, Int64Type) - WRITE_SERIALIZE_CASE(UINT32, UInt32Type, Int64Type) - WRITE_SERIALIZE_CASE(UINT64, UInt64Type, Int64Type) - WRITE_ZERO_COPY_CASE(TIME64, Time64Type, Int64Type) - WRITE_ZERO_COPY_CASE(DURATION, DurationType, Int64Type) - WRITE_SERIALIZE_CASE(DECIMAL128, Decimal128Type, Int64Type) - WRITE_SERIALIZE_CASE(DECIMAL256, Decimal256Type, Int64Type) + return WriteArrowTimestamps(def_levels, rep_levels, num_levels, array, ctx, + maybe_parent_nulls); + WRITE_ZERO_COPY_CASE(INT64) + WRITE_SERIALIZE_CASE(UINT32) + WRITE_SERIALIZE_CASE(UINT64) + WRITE_ZERO_COPY_CASE(TIME64) + WRITE_ZERO_COPY_CASE(DURATION) + WRITE_SERIALIZE_CASE(DECIMAL128) + WRITE_SERIALIZE_CASE(DECIMAL256) default: ARROW_UNSUPPORTED(); } @@ -2236,8 +2242,8 @@ Status TypedColumnWriterImpl::WriteArrowDense( if (array.type_id() != ::arrow::Type::TIMESTAMP) { ARROW_UNSUPPORTED(); } - return WriteArrowSerialize( - array, num_levels, def_levels, rep_levels, ctx, this, maybe_parent_nulls); + return WriteArrowSerialize<::arrow::TimestampType>(def_levels, rep_levels, num_levels, + array, ctx, maybe_parent_nulls); } // ---------------------------------------------------------------------- @@ -2250,8 +2256,8 @@ Status TypedColumnWriterImpl::WriteArrowDense( if (array.type_id() != ::arrow::Type::FLOAT) { ARROW_UNSUPPORTED(); } - return WriteArrowZeroCopy(array, num_levels, def_levels, rep_levels, ctx, - this, maybe_parent_nulls); + return WriteArrowZeroCopy(def_levels, rep_levels, num_levels, array, ctx, + maybe_parent_nulls); } template <> @@ -2261,8 +2267,8 @@ Status TypedColumnWriterImpl::WriteArrowDense( if (array.type_id() != ::arrow::Type::DOUBLE) { ARROW_UNSUPPORTED(); } - return WriteArrowZeroCopy(array, num_levels, def_levels, rep_levels, ctx, - this, maybe_parent_nulls); + return WriteArrowZeroCopy(def_levels, rep_levels, num_levels, array, ctx, + maybe_parent_nulls); } // ---------------------------------------------------------------------- @@ -2442,10 +2448,10 @@ Status TypedColumnWriterImpl::WriteArrowDense( const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, const ::arrow::Array& array, ArrowWriteContext* ctx, bool maybe_parent_nulls) { switch (array.type()->id()) { - WRITE_SERIALIZE_CASE(FIXED_SIZE_BINARY, FixedSizeBinaryType, FLBAType) - WRITE_SERIALIZE_CASE(DECIMAL128, Decimal128Type, FLBAType) - WRITE_SERIALIZE_CASE(DECIMAL256, Decimal256Type, FLBAType) - WRITE_SERIALIZE_CASE(HALF_FLOAT, HalfFloatType, FLBAType) + WRITE_SERIALIZE_CASE(FIXED_SIZE_BINARY) + WRITE_SERIALIZE_CASE(DECIMAL128) + WRITE_SERIALIZE_CASE(DECIMAL256) + WRITE_SERIALIZE_CASE(HALF_FLOAT) default: break; }