diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc index bb39d564616..ee4ddcb8e89 100644 --- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc @@ -44,6 +44,7 @@ #include "parquet/arrow/schema.h" #include "parquet/arrow/test-util.h" #include "parquet/arrow/writer.h" +#include "parquet/column_writer.h" #include "parquet/file_writer.h" #include "parquet/test-util.h" @@ -459,7 +460,7 @@ static std::shared_ptr MakeSimpleSchema(const DataType& type, case ::arrow::Type::DECIMAL: { const auto& decimal_type = static_cast(values_type); - byte_width = DecimalSize(decimal_type.precision()); + byte_width = internal::DecimalSize(decimal_type.precision()); } break; default: break; @@ -470,7 +471,7 @@ static std::shared_ptr MakeSimpleSchema(const DataType& type, break; case ::arrow::Type::DECIMAL: { const auto& decimal_type = static_cast(type); - byte_width = DecimalSize(decimal_type.precision()); + byte_width = internal::DecimalSize(decimal_type.precision()); } break; default: break; diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 931fd19a32c..3d1ad76c8f0 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -19,7 +19,6 @@ #include #include -#include #include #include #include @@ -33,13 +32,11 @@ #include "arrow/util/thread-pool.h" #include "parquet/arrow/reader_internal.h" -#include "parquet/arrow/schema.h" #include "parquet/column_reader.h" #include "parquet/exception.h" #include "parquet/file_reader.h" #include "parquet/metadata.h" #include "parquet/properties.h" -#include "parquet/schema-internal.h" #include "parquet/schema.h" using arrow::Array; @@ -76,8 +73,6 @@ using parquet::internal::RecordReader; namespace parquet { namespace arrow { -class ColumnChunkReaderImpl; - class ColumnReaderImpl : public ColumnReader { public: enum ReaderType { PRIMITIVE, LIST, STRUCT }; @@ -91,11 +86,6 @@ class ColumnReaderImpl : public ColumnReader { virtual ReaderType type() const = 0; }; -ArrowReaderProperties default_arrow_reader_properties() { - static ArrowReaderProperties default_reader_props; - return default_reader_props; -} - // ---------------------------------------------------------------------- // FileReaderImpl forward declaration diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h index d0ce68a57e4..1492e2eaa9f 100644 --- a/cpp/src/parquet/arrow/reader.h +++ b/cpp/src/parquet/arrow/reader.h @@ -20,9 +20,9 @@ #include #include -#include #include +#include "parquet/file_reader.h" #include "parquet/platform.h" #include "parquet/properties.h" @@ -39,8 +39,6 @@ class Table; namespace parquet { class FileMetaData; -class ParquetFileReader; -class ReaderProperties; class SchemaDescriptor; namespace arrow { @@ -49,52 +47,6 @@ class ColumnChunkReader; class ColumnReader; class RowGroupReader; -static constexpr bool DEFAULT_USE_THREADS = false; - -// Default number of rows to read when using ::arrow::RecordBatchReader -static constexpr int64_t DEFAULT_BATCH_SIZE = 64 * 1024; - -/// EXPERIMENTAL: Properties for configuring FileReader behavior. -class PARQUET_EXPORT ArrowReaderProperties { - public: - explicit ArrowReaderProperties(bool use_threads = DEFAULT_USE_THREADS) - : use_threads_(use_threads), - read_dict_indices_(), - batch_size_(DEFAULT_BATCH_SIZE) {} - - void set_use_threads(bool use_threads) { use_threads_ = use_threads; } - - bool use_threads() const { return use_threads_; } - - void set_read_dictionary(int column_index, bool read_dict) { - if (read_dict) { - read_dict_indices_.insert(column_index); - } else { - read_dict_indices_.erase(column_index); - } - } - bool read_dictionary(int column_index) const { - if (read_dict_indices_.find(column_index) != read_dict_indices_.end()) { - return true; - } else { - return false; - } - } - - void set_batch_size(int64_t batch_size) { batch_size_ = batch_size; } - - int64_t batch_size() const { return batch_size_; } - - private: - bool use_threads_; - std::unordered_set read_dict_indices_; - int64_t batch_size_; -}; - -/// EXPERIMENTAL: Constructs the default ArrowReaderProperties -PARQUET_EXPORT -ArrowReaderProperties default_arrow_reader_properties(); - // Arrow read adapter class for deserializing Parquet files as Arrow row // batches. // diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc index f41e8753c28..bfc40940e3b 100644 --- a/cpp/src/parquet/arrow/reader_internal.cc +++ b/cpp/src/parquet/arrow/reader_internal.cc @@ -24,7 +24,6 @@ #include #include #include -#include #include #include @@ -32,8 +31,10 @@ #include "arrow/array.h" #include "arrow/builder.h" #include "arrow/compute/kernel.h" +#include "arrow/status.h" #include "arrow/table.h" #include "arrow/type.h" +#include "arrow/type_traits.h" #include "arrow/util/checked_cast.h" #include "arrow/util/int-util.h" #include "arrow/util/logging.h" @@ -42,6 +43,7 @@ #include "parquet/arrow/reader.h" #include "parquet/column_reader.h" #include "parquet/platform.h" +#include "parquet/properties.h" #include "parquet/schema.h" #include "parquet/types.h" diff --git a/cpp/src/parquet/arrow/reader_internal.h b/cpp/src/parquet/arrow/reader_internal.h index cbf44ecb5f6..4568e421474 100644 --- a/cpp/src/parquet/arrow/reader_internal.h +++ b/cpp/src/parquet/arrow/reader_internal.h @@ -17,17 +17,19 @@ #pragma once +#include #include +#include #include #include #include +#include #include -#include "arrow/status.h" - #include "parquet/column_reader.h" #include "parquet/file_reader.h" #include "parquet/metadata.h" +#include "parquet/platform.h" #include "parquet/schema.h" namespace arrow { @@ -36,8 +38,6 @@ class Array; class ChunkedArray; class DataType; class Field; -class MemoryPool; -class Schema; } // namespace arrow @@ -45,17 +45,10 @@ using arrow::Status; namespace parquet { -class ColumnDescriptor; - -namespace internal { - -class RecordReader; - -} // namespace internal +class ArrowReaderProperties; namespace arrow { -class ArrowReaderProperties; class ColumnReaderImpl; // ---------------------------------------------------------------------- diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index 82c85663980..49d0dd98808 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -18,18 +18,13 @@ #include "parquet/arrow/schema.h" #include -#include -#include #include #include "arrow/type.h" #include "arrow/util/checked_cast.h" -#include "arrow/util/logging.h" -#include "parquet/arrow/writer.h" #include "parquet/exception.h" #include "parquet/properties.h" -#include "parquet/schema-internal.h" #include "parquet/types.h" using arrow::Field; @@ -262,7 +257,7 @@ Status FieldToNode(const std::shared_ptr& field, static_cast(*field->type()); precision = decimal_type.precision(); scale = decimal_type.scale(); - length = DecimalSize(precision); + length = internal::DecimalSize(precision); PARQUET_CATCH_NOT_OK(logical_type = LogicalType::Decimal(precision, scale)); } break; case ArrowTypeId::DATE32: @@ -351,77 +346,5 @@ Status ToParquetSchema(const ::arrow::Schema* arrow_schema, out); } -/// \brief Compute the number of bytes required to represent a decimal of a -/// given precision. Taken from the Apache Impala codebase. The comments next -/// to the return values are the maximum value that can be represented in 2's -/// complement with the returned number of bytes. -int32_t DecimalSize(int32_t precision) { - DCHECK_GE(precision, 1) << "decimal precision must be greater than or equal to 1, got " - << precision; - DCHECK_LE(precision, 38) << "decimal precision must be less than or equal to 38, got " - << precision; - - switch (precision) { - case 1: - case 2: - return 1; // 127 - case 3: - case 4: - return 2; // 32,767 - case 5: - case 6: - return 3; // 8,388,607 - case 7: - case 8: - case 9: - return 4; // 2,147,483,427 - case 10: - case 11: - return 5; // 549,755,813,887 - case 12: - case 13: - case 14: - return 6; // 140,737,488,355,327 - case 15: - case 16: - return 7; // 36,028,797,018,963,967 - case 17: - case 18: - return 8; // 9,223,372,036,854,775,807 - case 19: - case 20: - case 21: - return 9; // 2,361,183,241,434,822,606,847 - case 22: - case 23: - return 10; // 604,462,909,807,314,587,353,087 - case 24: - case 25: - case 26: - return 11; // 154,742,504,910,672,534,362,390,527 - case 27: - case 28: - return 12; // 39,614,081,257,132,168,796,771,975,167 - case 29: - case 30: - case 31: - return 13; // 10,141,204,801,825,835,211,973,625,643,007 - case 32: - case 33: - return 14; // 2,596,148,429,267,413,814,265,248,164,610,047 - case 34: - case 35: - return 15; // 664,613,997,892,457,936,451,903,530,140,172,287 - case 36: - case 37: - case 38: - return 16; // 170,141,183,460,469,231,731,687,303,715,884,105,727 - default: - break; - } - DCHECK(false); - return -1; -} - } // namespace arrow } // namespace parquet diff --git a/cpp/src/parquet/arrow/schema.h b/cpp/src/parquet/arrow/schema.h index b3cc66bfe6c..5ec9b0c0b63 100644 --- a/cpp/src/parquet/arrow/schema.h +++ b/cpp/src/parquet/arrow/schema.h @@ -18,11 +18,8 @@ #ifndef PARQUET_ARROW_SCHEMA_H #define PARQUET_ARROW_SCHEMA_H -#include #include -#include -#include "parquet/metadata.h" #include "parquet/platform.h" #include "parquet/schema.h" @@ -35,12 +32,11 @@ class Schema; namespace parquet { +class ArrowWriterProperties; class WriterProperties; namespace arrow { -class ArrowWriterProperties; - PARQUET_EXPORT ::arrow::Status FieldToNode(const std::shared_ptr<::arrow::Field>& field, const WriterProperties& properties, @@ -58,11 +54,7 @@ ::arrow::Status ToParquetSchema(const ::arrow::Schema* arrow_schema, const WriterProperties& properties, std::shared_ptr* out); -PARQUET_EXPORT -int32_t DecimalSize(int32_t precision); - } // namespace arrow - } // namespace parquet #endif // PARQUET_ARROW_SCHEMA_H diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index ee3b8807037..fb437f14320 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -18,7 +18,6 @@ #include "parquet/arrow/writer.h" #include -#include #include #include #include @@ -28,12 +27,9 @@ #include "arrow/buffer-builder.h" #include "arrow/compute/api.h" #include "arrow/table.h" -#include "arrow/util/checked_cast.h" +#include "arrow/type.h" #include "arrow/visitor_inline.h" -#include "arrow/util/logging.h" - -#include "parquet/arrow/reader.h" #include "parquet/arrow/reader_internal.h" #include "parquet/arrow/schema.h" #include "parquet/column_writer.h" @@ -70,12 +66,6 @@ using parquet::schema::GroupNode; namespace parquet { namespace arrow { -std::shared_ptr default_arrow_writer_properties() { - static std::shared_ptr default_writer_properties = - ArrowWriterProperties::Builder().build(); - return default_writer_properties; -} - namespace { class LevelBuilder { @@ -293,31 +283,6 @@ Status LevelBuilder::VisitInline(const Array& array) { return VisitArrayInline(array, this); } -struct ColumnWriterContext { - ColumnWriterContext(MemoryPool* memory_pool, ArrowWriterProperties* properties) - : memory_pool(memory_pool), properties(properties) { - this->data_buffer = AllocateBuffer(memory_pool); - this->def_levels_buffer = AllocateBuffer(memory_pool); - } - - template - Status GetScratchData(const int64_t num_values, T** out) { - RETURN_NOT_OK(this->data_buffer->Resize(num_values * sizeof(T), false)); - *out = reinterpret_cast(this->data_buffer->mutable_data()); - return Status::OK(); - } - - MemoryPool* memory_pool; - ArrowWriterProperties* properties; - - // Buffer used for storing the data of an array converted to the physical type - // as expected by parquet-cpp. - std::shared_ptr data_buffer; - - // We use the shared ownership of this buffer - std::shared_ptr def_levels_buffer; -}; - Status GetLeafType(const ::arrow::DataType& type, ::arrow::Type::type* leaf_type) { if (type.id() == ::arrow::Type::LIST || type.id() == ::arrow::Type::STRUCT) { if (type.num_children() != 1) { @@ -332,7 +297,7 @@ Status GetLeafType(const ::arrow::DataType& type, ::arrow::Type::type* leaf_type class ArrowColumnWriter { public: - ArrowColumnWriter(ColumnWriterContext* ctx, ColumnWriter* column_writer, + ArrowColumnWriter(ArrowWriteContext* ctx, ColumnWriter* column_writer, const SchemaField* schema_field, const SchemaManifest* schema_manifest) : ctx_(ctx), @@ -340,7 +305,35 @@ class ArrowColumnWriter { schema_field_(schema_field), schema_manifest_(schema_manifest) {} - Status Write(const Array& data); + Status Write(const Array& data) { + if (data.length() == 0) { + // Write nothing when length is 0 + return Status::OK(); + } + + ::arrow::Type::type values_type; + RETURN_NOT_OK(GetLeafType(*data.type(), &values_type)); + + std::shared_ptr _values_array; + int64_t values_offset = 0; + int64_t num_levels = 0; + int64_t num_values = 0; + LevelBuilder level_builder(ctx_->memory_pool, schema_field_, schema_manifest_); + std::shared_ptr def_levels_buffer, rep_levels_buffer; + RETURN_NOT_OK(level_builder.GenerateLevels( + data, &values_offset, &num_values, &num_levels, ctx_->def_levels_buffer, + &def_levels_buffer, &rep_levels_buffer, &_values_array)); + const int16_t* def_levels = nullptr; + if (def_levels_buffer) { + def_levels = reinterpret_cast(def_levels_buffer->data()); + } + const int16_t* rep_levels = nullptr; + if (rep_levels_buffer) { + rep_levels = reinterpret_cast(rep_levels_buffer->data()); + } + std::shared_ptr values_array = _values_array->Slice(values_offset, num_values); + return writer_->WriteArrow(def_levels, rep_levels, num_levels, *values_array, ctx_); + } Status Write(const ChunkedArray& data, int64_t offset, const int64_t size) { if (data.length() == 0) { @@ -394,646 +387,24 @@ class ArrowColumnWriter { } private: - template - Status TypedWriteBatch(const Array& data, int64_t num_levels, const int16_t* def_levels, - const int16_t* rep_levels); - - Status WriteTimestamps(const Array& data, int64_t num_levels, const int16_t* def_levels, - const int16_t* rep_levels); - - Status WriteTimestampsCoerce(const Array& data, int64_t num_levels, - const int16_t* def_levels, const int16_t* rep_levels, - const ArrowWriterProperties& properties); - - template - Status WriteNonNullableBatch(const ArrowType& type, int64_t num_values, - int64_t num_levels, const int16_t* def_levels, - const int16_t* rep_levels, - const typename ArrowType::c_type* values); - - template - Status WriteNullableBatch(const ArrowType& type, int64_t num_values, int64_t num_levels, - const int16_t* def_levels, const int16_t* rep_levels, - const uint8_t* valid_bits, int64_t valid_bits_offset, - const typename ArrowType::c_type* values); - - template - Status WriteBatch(int64_t num_levels, const int16_t* def_levels, - const int16_t* rep_levels, - const typename ParquetType::c_type* values) { - auto typed_writer = - ::arrow::internal::checked_cast*>(writer_); - // WriteBatch was called with type mismatching the writer_'s type. This - // could be a schema conversion problem. - DCHECK(typed_writer); - PARQUET_CATCH_NOT_OK( - typed_writer->WriteBatch(num_levels, def_levels, rep_levels, values)); - return Status::OK(); - } - - template - Status WriteBatchSpaced(int64_t num_levels, const int16_t* def_levels, - const int16_t* rep_levels, const uint8_t* valid_bits, - int64_t valid_bits_offset, - const typename ParquetType::c_type* values) { - auto typed_writer = - ::arrow::internal::checked_cast*>(writer_); - // WriteBatchSpaced was called with type mismatching the writer_'s type. This - // could be a schema conversion problem. - DCHECK(typed_writer); - PARQUET_CATCH_NOT_OK(typed_writer->WriteBatchSpaced( - num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, values)); - return Status::OK(); - } - - ColumnWriterContext* ctx_; + ArrowWriteContext* ctx_; ColumnWriter* writer_; const SchemaField* schema_field_; const SchemaManifest* schema_manifest_; }; -template -Status ArrowColumnWriter::TypedWriteBatch(const Array& array, int64_t num_levels, - const int16_t* def_levels, - const int16_t* rep_levels) { - using ArrowCType = typename ArrowType::c_type; - - const auto& data = static_cast(array); - const ArrowCType* values = nullptr; - // The values buffer may be null if the array is empty (ARROW-2744) - if (data.values() != nullptr) { - values = reinterpret_cast(data.values()->data()) + data.offset(); - } else { - DCHECK_EQ(data.length(), 0); - } - - if (writer_->descr()->schema_node()->is_required() || (data.null_count() == 0)) { - // no nulls, just dump the data - RETURN_NOT_OK((WriteNonNullableBatch( - static_cast(*array.type()), array.length(), num_levels, - def_levels, rep_levels, values))); - } else { - const uint8_t* valid_bits = data.null_bitmap_data(); - RETURN_NOT_OK((WriteNullableBatch( - static_cast(*array.type()), data.length(), num_levels, - def_levels, rep_levels, valid_bits, data.offset(), values))); - } - return Status::OK(); -} - -template -Status ArrowColumnWriter::WriteNonNullableBatch( - const ArrowType& type, int64_t num_values, int64_t num_levels, - const int16_t* def_levels, const int16_t* rep_levels, - const typename ArrowType::c_type* values) { - using ParquetCType = typename ParquetType::c_type; - ParquetCType* buffer; - RETURN_NOT_OK(ctx_->GetScratchData(num_values, &buffer)); - - std::copy(values, values + num_values, buffer); - - return WriteBatch(num_levels, def_levels, rep_levels, buffer); -} - -template <> -Status ArrowColumnWriter::WriteNonNullableBatch( - const ::arrow::Date64Type& type, int64_t num_values, int64_t num_levels, - const int16_t* def_levels, const int16_t* rep_levels, const int64_t* values) { - int32_t* buffer; - RETURN_NOT_OK(ctx_->GetScratchData(num_levels, &buffer)); - - for (int i = 0; i < num_values; i++) { - buffer[i] = static_cast(values[i] / 86400000); - } - - return WriteBatch(num_levels, def_levels, rep_levels, buffer); -} - -template <> -Status ArrowColumnWriter::WriteNonNullableBatch( - const ::arrow::Time32Type& type, int64_t num_values, int64_t num_levels, - const int16_t* def_levels, const int16_t* rep_levels, const int32_t* values) { - int32_t* buffer; - RETURN_NOT_OK(ctx_->GetScratchData(num_levels, &buffer)); - if (type.unit() == TimeUnit::SECOND) { - for (int i = 0; i < num_values; i++) { - buffer[i] = values[i] * 1000; - } - } else { - std::copy(values, values + num_values, buffer); - } - return WriteBatch(num_levels, def_levels, rep_levels, buffer); -} - -#define NONNULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType) \ - template <> \ - Status ArrowColumnWriter::WriteNonNullableBatch( \ - const ArrowType& type, int64_t num_values, int64_t num_levels, \ - const int16_t* def_levels, const int16_t* rep_levels, const CType* buffer) { \ - return WriteBatch(num_levels, def_levels, rep_levels, buffer); \ - } - -NONNULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t) -NONNULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Date32Type, int32_t) -NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Int64Type, int64_t) -NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Time64Type, int64_t) -NONNULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float) -NONNULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double) - -template -Status ArrowColumnWriter::WriteNullableBatch( - const ArrowType& type, int64_t num_values, int64_t num_levels, - const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits, - int64_t valid_bits_offset, const typename ArrowType::c_type* values) { - using ParquetCType = typename ParquetType::c_type; - - ParquetCType* buffer; - RETURN_NOT_OK(ctx_->GetScratchData(num_values, &buffer)); - for (int i = 0; i < num_values; i++) { - buffer[i] = static_cast(values[i]); - } - - return WriteBatchSpaced(num_levels, def_levels, rep_levels, valid_bits, - valid_bits_offset, buffer); -} - -template <> -Status ArrowColumnWriter::WriteNullableBatch( - const ::arrow::Date64Type& type, int64_t num_values, int64_t num_levels, - const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits, - int64_t valid_bits_offset, const int64_t* values) { - int32_t* buffer; - RETURN_NOT_OK(ctx_->GetScratchData(num_values, &buffer)); - - for (int i = 0; i < num_values; i++) { - // Convert from milliseconds into days since the epoch - buffer[i] = static_cast(values[i] / 86400000); - } - - return WriteBatchSpaced(num_levels, def_levels, rep_levels, valid_bits, - valid_bits_offset, buffer); -} - -template <> -Status ArrowColumnWriter::WriteNullableBatch( - const ::arrow::Time32Type& type, int64_t num_values, int64_t num_levels, - const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits, - int64_t valid_bits_offset, const int32_t* values) { - int32_t* buffer; - RETURN_NOT_OK(ctx_->GetScratchData(num_values, &buffer)); - - if (type.unit() == TimeUnit::SECOND) { - for (int i = 0; i < num_values; i++) { - buffer[i] = values[i] * 1000; - } - } else { - for (int i = 0; i < num_values; i++) { - buffer[i] = values[i]; - } - } - return WriteBatchSpaced(num_levels, def_levels, rep_levels, valid_bits, - valid_bits_offset, buffer); -} - -#define NULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType) \ - template <> \ - Status ArrowColumnWriter::WriteNullableBatch( \ - const ArrowType& type, int64_t num_values, int64_t num_levels, \ - const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits, \ - int64_t valid_bits_offset, const CType* values) { \ - return WriteBatchSpaced(num_levels, def_levels, rep_levels, valid_bits, \ - valid_bits_offset, values); \ - } - -NULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t) -NULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Date32Type, int32_t) -NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Int64Type, int64_t) -NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Time64Type, int64_t) -NULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float) -NULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double) -NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t) -NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t) - -#define CONV_CASE_LOOP(ConversionFunction) \ - for (int64_t i = 0; i < num_values; i++) \ - ConversionFunction(arrow_values[i], &output[i]); - -static void ConvertArrowTimestampToParquetInt96(const int64_t* arrow_values, - int64_t num_values, - ::arrow::TimeUnit ::type unit_type, - Int96* output) { - switch (unit_type) { - case TimeUnit::NANO: - CONV_CASE_LOOP(internal::NanosecondsToImpalaTimestamp); - break; - case TimeUnit::MICRO: - CONV_CASE_LOOP(internal::MicrosecondsToImpalaTimestamp); - break; - case TimeUnit::MILLI: - CONV_CASE_LOOP(internal::MillisecondsToImpalaTimestamp); - break; - case TimeUnit::SECOND: - CONV_CASE_LOOP(internal::SecondsToImpalaTimestamp); - break; - } -} - -#undef CONV_CASE_LOOP - -template <> -Status ArrowColumnWriter::WriteNullableBatch( - const ::arrow::TimestampType& type, int64_t num_values, int64_t num_levels, - const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits, - int64_t valid_bits_offset, const int64_t* values) { - Int96* buffer = nullptr; - RETURN_NOT_OK(ctx_->GetScratchData(num_values, &buffer)); - - ConvertArrowTimestampToParquetInt96(values, num_values, type.unit(), buffer); - - return WriteBatchSpaced(num_levels, def_levels, rep_levels, valid_bits, - valid_bits_offset, buffer); -} - -template <> -Status ArrowColumnWriter::WriteNonNullableBatch( - const ::arrow::TimestampType& type, int64_t num_values, int64_t num_levels, - const int16_t* def_levels, const int16_t* rep_levels, const int64_t* values) { - Int96* buffer = nullptr; - RETURN_NOT_OK(ctx_->GetScratchData(num_values, &buffer)); - - ConvertArrowTimestampToParquetInt96(values, num_values, type.unit(), buffer); - - return WriteBatch(num_levels, def_levels, rep_levels, buffer); -} - -Status ArrowColumnWriter::WriteTimestamps(const Array& values, int64_t num_levels, - const int16_t* def_levels, - const int16_t* rep_levels) { - const auto& source_type = static_cast(*values.type()); - - if (ctx_->properties->support_deprecated_int96_timestamps()) { - // User explicitly requested Int96 timestamps - return TypedWriteBatch(values, num_levels, - def_levels, rep_levels); - } else if (ctx_->properties->coerce_timestamps_enabled()) { - // User explicitly requested coercion to specific unit - if (source_type.unit() == ctx_->properties->coerce_timestamps_unit()) { - // No data conversion necessary - return TypedWriteBatch(values, num_levels, - def_levels, rep_levels); - } else { - return WriteTimestampsCoerce(values, num_levels, def_levels, rep_levels, - *(ctx_->properties)); - } - } else if (writer_->properties()->version() == ParquetVersion::PARQUET_1_0 && - source_type.unit() == TimeUnit::NANO) { - // Absent superseding user instructions, when writing Parquet version 1.0 files, - // timestamps in nanoseconds are coerced to microseconds - std::shared_ptr properties = - (ArrowWriterProperties::Builder()) - .coerce_timestamps(TimeUnit::MICRO) - ->disallow_truncated_timestamps() - ->build(); - return WriteTimestampsCoerce(values, num_levels, def_levels, rep_levels, *properties); - } else if (source_type.unit() == TimeUnit::SECOND) { - // Absent superseding user instructions, timestamps in seconds are coerced to - // milliseconds - std::shared_ptr properties = - (ArrowWriterProperties::Builder()).coerce_timestamps(TimeUnit::MILLI)->build(); - return WriteTimestampsCoerce(values, num_levels, def_levels, rep_levels, *properties); - } else { - // No data conversion necessary - return TypedWriteBatch(values, num_levels, - def_levels, rep_levels); - } -} - -#define COERCE_DIVIDE -1 -#define COERCE_INVALID 0 -#define COERCE_MULTIPLY +1 - -static std::pair kTimestampCoercionFactors[4][4] = { - // from seconds ... - {{COERCE_INVALID, 0}, // ... to seconds - {COERCE_MULTIPLY, 1000}, // ... to millis - {COERCE_MULTIPLY, 1000000}, // ... to micros - {COERCE_MULTIPLY, INT64_C(1000000000)}}, // ... to nanos - // from millis ... - {{COERCE_INVALID, 0}, - {COERCE_MULTIPLY, 1}, - {COERCE_MULTIPLY, 1000}, - {COERCE_MULTIPLY, 1000000}}, - // from micros ... - {{COERCE_INVALID, 0}, - {COERCE_DIVIDE, 1000}, - {COERCE_MULTIPLY, 1}, - {COERCE_MULTIPLY, 1000}}, - // from nanos ... - {{COERCE_INVALID, 0}, - {COERCE_DIVIDE, 1000000}, - {COERCE_DIVIDE, 1000}, - {COERCE_MULTIPLY, 1}}}; - -Status ArrowColumnWriter::WriteTimestampsCoerce(const Array& array, int64_t num_levels, - const int16_t* def_levels, - const int16_t* rep_levels, - const ArrowWriterProperties& properties) { - int64_t* buffer; - RETURN_NOT_OK(ctx_->GetScratchData(num_levels, &buffer)); - - const auto& data = static_cast(array); - auto values = data.raw_values(); - - const auto& source_type = static_cast(*array.type()); - auto source_unit = source_type.unit(); - - TimeUnit::type target_unit = properties.coerce_timestamps_unit(); - auto target_type = ::arrow::timestamp(target_unit); - bool truncation_allowed = properties.truncated_timestamps_allowed(); - - auto DivideBy = [&](const int64_t factor) { - for (int64_t i = 0; i < array.length(); i++) { - if (!truncation_allowed && !data.IsNull(i) && (values[i] % factor != 0)) { - return Status::Invalid("Casting from ", source_type.ToString(), " to ", - target_type->ToString(), " would lose data: ", values[i]); - } - buffer[i] = values[i] / factor; - } - return Status::OK(); - }; - - auto MultiplyBy = [&](const int64_t factor) { - for (int64_t i = 0; i < array.length(); i++) { - buffer[i] = values[i] * factor; - } - return Status::OK(); - }; - - const auto& coercion = kTimestampCoercionFactors[static_cast(source_unit)] - [static_cast(target_unit)]; - // .first -> coercion operation; .second -> scale factor - DCHECK_NE(coercion.first, COERCE_INVALID); - RETURN_NOT_OK(coercion.first == COERCE_DIVIDE ? DivideBy(coercion.second) - : MultiplyBy(coercion.second)); - - if (writer_->descr()->schema_node()->is_required() || (data.null_count() == 0)) { - // no nulls, just dump the data - RETURN_NOT_OK((WriteNonNullableBatch( - static_cast(*target_type), array.length(), - num_levels, def_levels, rep_levels, buffer))); - } else { - const uint8_t* valid_bits = data.null_bitmap_data(); - RETURN_NOT_OK((WriteNullableBatch( - static_cast(*target_type), array.length(), - num_levels, def_levels, rep_levels, valid_bits, data.offset(), buffer))); - } - - return Status::OK(); -} - -#undef COERCE_DIVIDE -#undef COERCE_INVALID -#undef COERCE_MULTIPLY - -// This specialization seems quite similar but it significantly differs in two points: -// * offset is added at the most latest time to the pointer as we have sub-byte access -// * Arrow data is stored bitwise thus we cannot use std::copy to transform from -// ArrowType::c_type to ParquetType::c_type - -template <> -Status ArrowColumnWriter::TypedWriteBatch( - const Array& array, int64_t num_levels, const int16_t* def_levels, - const int16_t* rep_levels) { - bool* buffer = nullptr; - RETURN_NOT_OK(ctx_->GetScratchData(array.length(), &buffer)); - - const auto& data = static_cast(array); - const uint8_t* values = nullptr; - // The values buffer may be null if the array is empty (ARROW-2744) - if (data.values() != nullptr) { - values = reinterpret_cast(data.values()->data()); - } else { - DCHECK_EQ(data.length(), 0); - } - - int buffer_idx = 0; - int64_t offset = array.offset(); - for (int i = 0; i < data.length(); i++) { - if (!data.IsNull(i)) { - buffer[buffer_idx++] = BitUtil::GetBit(values, offset + i); - } - } - - return WriteBatch(num_levels, def_levels, rep_levels, buffer); -} - -template <> -Status ArrowColumnWriter::TypedWriteBatch( - const Array& array, int64_t num_levels, const int16_t* def_levels, - const int16_t* rep_levels) { - return WriteBatch(num_levels, def_levels, rep_levels, nullptr); -} - -template <> -Status ArrowColumnWriter::TypedWriteBatch( - const Array& array, int64_t num_levels, const int16_t* def_levels, - const int16_t* rep_levels) { - ByteArray* buffer = nullptr; - RETURN_NOT_OK(ctx_->GetScratchData(num_levels, &buffer)); - - const auto& data = static_cast(array); - - // In the case of an array consisting of only empty strings or all null, - // data.data() points already to a nullptr, thus data.data()->data() will - // segfault. - const uint8_t* values = nullptr; - if (data.value_data()) { - values = reinterpret_cast(data.value_data()->data()); - DCHECK(values != nullptr); - } - - // Slice offset is accounted for in raw_value_offsets - const int32_t* value_offset = data.raw_value_offsets(); - - if (writer_->descr()->schema_node()->is_required() || (data.null_count() == 0)) { - // no nulls, just dump the data - for (int64_t i = 0; i < data.length(); i++) { - buffer[i] = - ByteArray(value_offset[i + 1] - value_offset[i], values + value_offset[i]); - } - } else { - int buffer_idx = 0; - for (int64_t i = 0; i < data.length(); i++) { - if (!data.IsNull(i)) { - buffer[buffer_idx++] = - ByteArray(value_offset[i + 1] - value_offset[i], values + value_offset[i]); - } - } - } - - return WriteBatch(num_levels, def_levels, rep_levels, buffer); -} - -template <> -Status ArrowColumnWriter::TypedWriteBatch( - const Array& array, int64_t num_levels, const int16_t* def_levels, - const int16_t* rep_levels) { - const auto& data = static_cast(array); - const int64_t length = data.length(); - - FLBA* buffer; - RETURN_NOT_OK(ctx_->GetScratchData(num_levels, &buffer)); - - if (writer_->descr()->schema_node()->is_required() || data.null_count() == 0) { - // no nulls, just dump the data - // todo(advancedxy): use a writeBatch to avoid this step - for (int64_t i = 0; i < length; i++) { - buffer[i] = FixedLenByteArray(data.GetValue(i)); - } - } else { - int buffer_idx = 0; - for (int64_t i = 0; i < length; i++) { - if (!data.IsNull(i)) { - buffer[buffer_idx++] = FixedLenByteArray(data.GetValue(i)); - } - } - } - - return WriteBatch(num_levels, def_levels, rep_levels, buffer); -} - -template <> -Status ArrowColumnWriter::TypedWriteBatch( - const Array& array, int64_t num_levels, const int16_t* def_levels, - const int16_t* rep_levels) { - const auto& data = static_cast(array); - const int64_t length = data.length(); - - FLBA* buffer; - RETURN_NOT_OK(ctx_->GetScratchData(num_levels, &buffer)); - - const auto& decimal_type = static_cast(*data.type()); - const int32_t offset = - decimal_type.byte_width() - DecimalSize(decimal_type.precision()); - - const bool does_not_have_nulls = - writer_->descr()->schema_node()->is_required() || data.null_count() == 0; - - const auto valid_value_count = static_cast(length - data.null_count()) * 2; - std::vector big_endian_values(valid_value_count); - - // TODO(phillipc): Look into whether our compilers will perform loop unswitching so we - // don't have to keep writing two loops to handle the case where we know there are no - // nulls - if (does_not_have_nulls) { - // no nulls, just dump the data - // todo(advancedxy): use a writeBatch to avoid this step - for (int64_t i = 0, j = 0; i < length; ++i, j += 2) { - auto unsigned_64_bit = reinterpret_cast(data.GetValue(i)); - big_endian_values[j] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]); - big_endian_values[j + 1] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]); - buffer[i] = FixedLenByteArray( - reinterpret_cast(&big_endian_values[j]) + offset); - } - } else { - for (int64_t i = 0, buffer_idx = 0, j = 0; i < length; ++i) { - if (!data.IsNull(i)) { - auto unsigned_64_bit = reinterpret_cast(data.GetValue(i)); - big_endian_values[j] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]); - big_endian_values[j + 1] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]); - buffer[buffer_idx++] = FixedLenByteArray( - reinterpret_cast(&big_endian_values[j]) + offset); - j += 2; - } - } - } - - return WriteBatch(num_levels, def_levels, rep_levels, buffer); -} - -Status ArrowColumnWriter::Write(const Array& data) { - if (data.length() == 0) { - // Write nothing when length is 0 - return Status::OK(); - } - - ::arrow::Type::type values_type; - RETURN_NOT_OK(GetLeafType(*data.type(), &values_type)); - - std::shared_ptr _values_array; - int64_t values_offset = 0; - int64_t num_levels = 0; - int64_t num_values = 0; - LevelBuilder level_builder(ctx_->memory_pool, schema_field_, schema_manifest_); - std::shared_ptr def_levels_buffer, rep_levels_buffer; - RETURN_NOT_OK(level_builder.GenerateLevels( - data, &values_offset, &num_values, &num_levels, ctx_->def_levels_buffer, - &def_levels_buffer, &rep_levels_buffer, &_values_array)); - const int16_t* def_levels = nullptr; - if (def_levels_buffer) { - def_levels = reinterpret_cast(def_levels_buffer->data()); - } - const int16_t* rep_levels = nullptr; - if (rep_levels_buffer) { - rep_levels = reinterpret_cast(rep_levels_buffer->data()); - } - std::shared_ptr values_array = _values_array->Slice(values_offset, num_values); - -#define WRITE_BATCH_CASE(ArrowEnum, ArrowType, ParquetType) \ - case ::arrow::Type::ArrowEnum: \ - return TypedWriteBatch(*values_array, num_levels, \ - def_levels, rep_levels); - - switch (values_type) { - case ::arrow::Type::UINT32: { - if (writer_->properties()->version() == ParquetVersion::PARQUET_1_0) { - // Parquet 1.0 reader cannot read the UINT_32 logical type. Thus we need - // to use the larger Int64Type to store them lossless. - return TypedWriteBatch(*values_array, num_levels, - def_levels, rep_levels); - } else { - return TypedWriteBatch(*values_array, num_levels, - def_levels, rep_levels); - } - } - WRITE_BATCH_CASE(NA, NullType, Int32Type) - case ::arrow::Type::TIMESTAMP: - return WriteTimestamps(*values_array, num_levels, def_levels, rep_levels); - WRITE_BATCH_CASE(BOOL, BooleanType, BooleanType) - WRITE_BATCH_CASE(INT8, Int8Type, Int32Type) - WRITE_BATCH_CASE(UINT8, UInt8Type, Int32Type) - WRITE_BATCH_CASE(INT16, Int16Type, Int32Type) - WRITE_BATCH_CASE(UINT16, UInt16Type, Int32Type) - WRITE_BATCH_CASE(INT32, Int32Type, Int32Type) - WRITE_BATCH_CASE(INT64, Int64Type, Int64Type) - WRITE_BATCH_CASE(UINT64, UInt64Type, Int64Type) - WRITE_BATCH_CASE(FLOAT, FloatType, FloatType) - WRITE_BATCH_CASE(DOUBLE, DoubleType, DoubleType) - WRITE_BATCH_CASE(BINARY, BinaryType, ByteArrayType) - WRITE_BATCH_CASE(STRING, BinaryType, ByteArrayType) - WRITE_BATCH_CASE(FIXED_SIZE_BINARY, FixedSizeBinaryType, FLBAType) - WRITE_BATCH_CASE(DECIMAL, Decimal128Type, FLBAType) - WRITE_BATCH_CASE(DATE32, Date32Type, Int32Type) - WRITE_BATCH_CASE(DATE64, Date64Type, Int32Type) - WRITE_BATCH_CASE(TIME32, Time32Type, Int32Type) - WRITE_BATCH_CASE(TIME64, Time64Type, Int64Type) - default: - break; - } - return Status::NotImplemented("Data type not supported as list value: ", - values_array->type()->ToString()); -} - } // namespace // ---------------------------------------------------------------------- // FileWriter implementation -class FileWriter::Impl { +class FileWriterImpl : public FileWriter { public: - Impl(MemoryPool* pool, std::unique_ptr writer, - const std::shared_ptr& arrow_properties) - : writer_(std::move(writer)), + FileWriterImpl(const std::shared_ptr<::arrow::Schema>& schema, MemoryPool* pool, + std::unique_ptr writer, + const std::shared_ptr& arrow_properties) + : schema_(schema), + writer_(std::move(writer)), row_group_writer_(nullptr), column_write_context_(pool, arrow_properties.get()), arrow_properties_(arrow_properties), @@ -1044,7 +415,7 @@ class FileWriter::Impl { &schema_manifest_); } - Status NewRowGroup(int64_t chunk_size) { + Status NewRowGroup(int64_t chunk_size) override { if (row_group_writer_ != nullptr) { PARQUET_CATCH_NOT_OK(row_group_writer_->Close()); } @@ -1052,7 +423,7 @@ class FileWriter::Impl { return Status::OK(); } - Status Close() { + Status Close() override { if (!closed_) { // Make idempotent closed_ = true; @@ -1064,7 +435,7 @@ class FileWriter::Impl { return Status::OK(); } - Status WriteColumnChunk(const Array& data) { + Status WriteColumnChunk(const Array& data) override { // A bit awkward here since cannot instantiate ChunkedArray from const Array& ::arrow::ArrayVector chunks = {::arrow::MakeArray(data.data())}; auto chunked_array = std::make_shared<::arrow::ChunkedArray>(chunks); @@ -1072,7 +443,7 @@ class FileWriter::Impl { } Status WriteColumnChunk(const std::shared_ptr& data, int64_t offset, - const int64_t size) { + int64_t size) override { // DictionaryArrays are not yet handled with a fast path. To still support // writing them as a workaround, we convert them back to their non-dictionary // representation. @@ -1107,51 +478,70 @@ class FileWriter::Impl { return arrow_writer.Close(); } - const WriterProperties& properties() const { return *writer_->properties(); } + Status WriteColumnChunk(const std::shared_ptr<::arrow::ChunkedArray>& data) override { + return WriteColumnChunk(data, 0, data->length()); + } + + Status WriteTable(const Table& table, int64_t chunk_size) override { + RETURN_NOT_OK(table.Validate()); + + if (chunk_size <= 0 && table.num_rows() > 0) { + return Status::Invalid("chunk size per row_group must be greater than 0"); + } else if (!table.schema()->Equals(*schema_, false)) { + return Status::Invalid("table schema does not match this writer's. table:'", + table.schema()->ToString(), "' this:'", schema_->ToString(), + "'"); + } else if (chunk_size > this->properties().max_row_group_length()) { + chunk_size = this->properties().max_row_group_length(); + } + + auto WriteRowGroup = [&](int64_t offset, int64_t size) { + RETURN_NOT_OK(NewRowGroup(size)); + for (int i = 0; i < table.num_columns(); i++) { + RETURN_NOT_OK(WriteColumnChunk(table.column(i), offset, size)); + } + return Status::OK(); + }; + + if (table.num_rows() == 0) { + // Append a row group with 0 rows + RETURN_NOT_OK_ELSE(WriteRowGroup(0, 0), PARQUET_IGNORE_NOT_OK(Close())); + return Status::OK(); + } + + for (int chunk = 0; chunk * chunk_size < table.num_rows(); chunk++) { + int64_t offset = chunk * chunk_size; + RETURN_NOT_OK_ELSE( + WriteRowGroup(offset, std::min(chunk_size, table.num_rows() - offset)), + PARQUET_IGNORE_NOT_OK(Close())); + } + return Status::OK(); + } - ::arrow::MemoryPool* memory_pool() const { return column_write_context_.memory_pool; } + const WriterProperties& properties() const { return *writer_->properties(); } - virtual ~Impl() {} + ::arrow::MemoryPool* memory_pool() const override { + return column_write_context_.memory_pool; + } - const std::shared_ptr metadata() const { return writer_->metadata(); } + const std::shared_ptr metadata() const override { + return writer_->metadata(); + } private: friend class FileWriter; + std::shared_ptr<::arrow::Schema> schema_; + SchemaManifest schema_manifest_; std::unique_ptr writer_; RowGroupWriter* row_group_writer_; - ColumnWriterContext column_write_context_; + ArrowWriteContext column_write_context_; std::shared_ptr arrow_properties_; bool closed_; }; -Status FileWriter::NewRowGroup(int64_t chunk_size) { - return impl_->NewRowGroup(chunk_size); -} - -Status FileWriter::WriteColumnChunk(const ::arrow::Array& data) { - return impl_->WriteColumnChunk(data); -} - -Status FileWriter::WriteColumnChunk(const std::shared_ptr<::arrow::ChunkedArray>& data, - const int64_t offset, const int64_t size) { - return impl_->WriteColumnChunk(data, offset, size); -} - -Status FileWriter::WriteColumnChunk(const std::shared_ptr<::arrow::ChunkedArray>& data) { - return WriteColumnChunk(data, 0, data->length()); -} - -Status FileWriter::Close() { return impl_->Close(); } - -MemoryPool* FileWriter::memory_pool() const { return impl_->memory_pool(); } - -const std::shared_ptr FileWriter::metadata() const { - return impl_->metadata(); -} - FileWriter::~FileWriter() {} Status FileWriter::Make(::arrow::MemoryPool* pool, @@ -1159,16 +549,13 @@ Status FileWriter::Make(::arrow::MemoryPool* pool, const std::shared_ptr<::arrow::Schema>& schema, const std::shared_ptr& arrow_properties, std::unique_ptr* out) { - out->reset(new FileWriter(pool, std::move(writer), schema, arrow_properties)); - return (*out)->impl_->Init(); + std::unique_ptr impl( + new FileWriterImpl(schema, pool, std::move(writer), arrow_properties)); + RETURN_NOT_OK(impl->Init()); + *out = std::move(impl); + return Status::OK(); } -FileWriter::FileWriter(MemoryPool* pool, std::unique_ptr writer, - const std::shared_ptr<::arrow::Schema>& schema, - const std::shared_ptr& arrow_properties) - : impl_(new FileWriter::Impl(pool, std::move(writer), arrow_properties)), - schema_(schema) {} - Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool, const std::shared_ptr<::arrow::io::OutputStream>& sink, const std::shared_ptr& properties, @@ -1206,42 +593,6 @@ Status WriteMetaDataFile(const FileMetaData& file_metadata, return Status::OK(); } -Status FileWriter::WriteTable(const Table& table, int64_t chunk_size) { - RETURN_NOT_OK(table.Validate()); - - if (chunk_size <= 0 && table.num_rows() > 0) { - return Status::Invalid("chunk size per row_group must be greater than 0"); - } else if (!table.schema()->Equals(*schema_, false)) { - return Status::Invalid("table schema does not match this writer's. table:'", - table.schema()->ToString(), "' this:'", schema_->ToString(), - "'"); - } else if (chunk_size > impl_->properties().max_row_group_length()) { - chunk_size = impl_->properties().max_row_group_length(); - } - - auto WriteRowGroup = [&](int64_t offset, int64_t size) { - RETURN_NOT_OK(NewRowGroup(size)); - for (int i = 0; i < table.num_columns(); i++) { - RETURN_NOT_OK(WriteColumnChunk(table.column(i), offset, size)); - } - return Status::OK(); - }; - - if (table.num_rows() == 0) { - // Append a row group with 0 rows - RETURN_NOT_OK_ELSE(WriteRowGroup(0, 0), PARQUET_IGNORE_NOT_OK(Close())); - return Status::OK(); - } - - for (int chunk = 0; chunk * chunk_size < table.num_rows(); chunk++) { - int64_t offset = chunk * chunk_size; - RETURN_NOT_OK_ELSE( - WriteRowGroup(offset, std::min(chunk_size, table.num_rows() - offset)), - PARQUET_IGNORE_NOT_OK(Close())); - } - return Status::OK(); -} - Status WriteTable(const ::arrow::Table& table, ::arrow::MemoryPool* pool, const std::shared_ptr<::arrow::io::OutputStream>& sink, int64_t chunk_size, const std::shared_ptr& properties, diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h index 8f7d4990e18..b6c0c586c8e 100644 --- a/cpp/src/parquet/arrow/writer.h +++ b/cpp/src/parquet/arrow/writer.h @@ -19,19 +19,16 @@ #define PARQUET_ARROW_WRITER_H #include -#include #include #include "parquet/platform.h" #include "parquet/properties.h" -#include "parquet/types.h" - -#include "arrow/type.h" namespace arrow { class Array; class ChunkedArray; +class Schema; class Table; } // namespace arrow @@ -43,84 +40,6 @@ class ParquetFileWriter; namespace arrow { -class PARQUET_EXPORT ArrowWriterProperties { - public: - class Builder { - public: - Builder() - : write_timestamps_as_int96_(false), - coerce_timestamps_enabled_(false), - coerce_timestamps_unit_(::arrow::TimeUnit::SECOND), - truncated_timestamps_allowed_(false) {} - virtual ~Builder() {} - - Builder* disable_deprecated_int96_timestamps() { - write_timestamps_as_int96_ = false; - return this; - } - - Builder* enable_deprecated_int96_timestamps() { - write_timestamps_as_int96_ = true; - return this; - } - - Builder* coerce_timestamps(::arrow::TimeUnit::type unit) { - coerce_timestamps_enabled_ = true; - coerce_timestamps_unit_ = unit; - return this; - } - - Builder* allow_truncated_timestamps() { - truncated_timestamps_allowed_ = true; - return this; - } - - Builder* disallow_truncated_timestamps() { - truncated_timestamps_allowed_ = false; - return this; - } - - std::shared_ptr build() { - return std::shared_ptr(new ArrowWriterProperties( - write_timestamps_as_int96_, coerce_timestamps_enabled_, coerce_timestamps_unit_, - truncated_timestamps_allowed_)); - } - - private: - bool write_timestamps_as_int96_; - - bool coerce_timestamps_enabled_; - ::arrow::TimeUnit::type coerce_timestamps_unit_; - bool truncated_timestamps_allowed_; - }; - - bool support_deprecated_int96_timestamps() const { return write_timestamps_as_int96_; } - - bool coerce_timestamps_enabled() const { return coerce_timestamps_enabled_; } - ::arrow::TimeUnit::type coerce_timestamps_unit() const { - return coerce_timestamps_unit_; - } - - bool truncated_timestamps_allowed() const { return truncated_timestamps_allowed_; } - - private: - explicit ArrowWriterProperties(bool write_nanos_as_int96, - bool coerce_timestamps_enabled, - ::arrow::TimeUnit::type coerce_timestamps_unit, - bool truncated_timestamps_allowed) - : write_timestamps_as_int96_(write_nanos_as_int96), - coerce_timestamps_enabled_(coerce_timestamps_enabled), - coerce_timestamps_unit_(coerce_timestamps_unit), - truncated_timestamps_allowed_(truncated_timestamps_allowed) {} - - const bool write_timestamps_as_int96_; - const bool coerce_timestamps_enabled_; - const ::arrow::TimeUnit::type coerce_timestamps_unit_; - const bool truncated_timestamps_allowed_; -}; - -std::shared_ptr PARQUET_EXPORT default_arrow_writer_properties(); - /** * Iterative API: * Start a new RowGroup/Chunk with NewRowGroup @@ -129,49 +48,41 @@ std::shared_ptr PARQUET_EXPORT default_arrow_writer_prope class PARQUET_EXPORT FileWriter { public: static ::arrow::Status Make( - ::arrow::MemoryPool* pool, std::unique_ptr writer, + MemoryPool* pool, std::unique_ptr writer, const std::shared_ptr<::arrow::Schema>& schema, const std::shared_ptr& arrow_properties, std::unique_ptr* out); - static ::arrow::Status Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool, + static ::arrow::Status Open(const ::arrow::Schema& schema, MemoryPool* pool, const std::shared_ptr<::arrow::io::OutputStream>& sink, const std::shared_ptr& properties, std::unique_ptr* writer); static ::arrow::Status Open( - const ::arrow::Schema& schema, ::arrow::MemoryPool* pool, + const ::arrow::Schema& schema, MemoryPool* pool, const std::shared_ptr<::arrow::io::OutputStream>& sink, const std::shared_ptr& properties, const std::shared_ptr& arrow_properties, std::unique_ptr* writer); /// \brief Write a Table to Parquet. - ::arrow::Status WriteTable(const ::arrow::Table& table, int64_t chunk_size); + virtual ::arrow::Status WriteTable(const ::arrow::Table& table, int64_t chunk_size) = 0; - ::arrow::Status NewRowGroup(int64_t chunk_size); - ::arrow::Status WriteColumnChunk(const ::arrow::Array& data); + virtual ::arrow::Status NewRowGroup(int64_t chunk_size) = 0; + virtual ::arrow::Status WriteColumnChunk(const ::arrow::Array& data) = 0; /// \brief Write ColumnChunk in row group using slice of a ChunkedArray - ::arrow::Status WriteColumnChunk(const std::shared_ptr<::arrow::ChunkedArray>& data, - const int64_t offset, const int64_t size); - ::arrow::Status WriteColumnChunk(const std::shared_ptr<::arrow::ChunkedArray>& data); - ::arrow::Status Close(); + virtual ::arrow::Status WriteColumnChunk( + const std::shared_ptr<::arrow::ChunkedArray>& data, int64_t offset, + int64_t size) = 0; + virtual ::arrow::Status WriteColumnChunk( + const std::shared_ptr<::arrow::ChunkedArray>& data) = 0; + virtual ::arrow::Status Close() = 0; virtual ~FileWriter(); - ::arrow::MemoryPool* memory_pool() const; - - const std::shared_ptr metadata() const; - - private: - FileWriter(::arrow::MemoryPool* pool, std::unique_ptr writer, - const std::shared_ptr<::arrow::Schema>& schema, - const std::shared_ptr& arrow_properties); - - class PARQUET_NO_EXPORT Impl; - std::unique_ptr impl_; - std::shared_ptr<::arrow::Schema> schema_; + virtual MemoryPool* memory_pool() const = 0; + virtual const std::shared_ptr metadata() const = 0; }; /// \brief Write Parquet file metadata only to indicated Arrow OutputStream @@ -190,66 +101,13 @@ ::arrow::Status WriteMetaDataFile(const FileMetaData& file_metadata, * The table shall only consist of columns of primitive type or of primitive lists. */ ::arrow::Status PARQUET_EXPORT WriteTable( - const ::arrow::Table& table, ::arrow::MemoryPool* pool, + const ::arrow::Table& table, MemoryPool* pool, const std::shared_ptr<::arrow::io::OutputStream>& sink, int64_t chunk_size, const std::shared_ptr& properties = default_writer_properties(), const std::shared_ptr& arrow_properties = default_arrow_writer_properties()); -namespace internal { - -/** - * Timestamp conversion constants - */ -constexpr int64_t kJulianEpochOffsetDays = INT64_C(2440588); - -template -inline void ArrowTimestampToImpalaTimestamp(const int64_t time, Int96* impala_timestamp) { - int64_t julian_days = (time / UnitPerDay) + kJulianEpochOffsetDays; - (*impala_timestamp).value[2] = (uint32_t)julian_days; - - int64_t last_day_units = time % UnitPerDay; - auto last_day_nanos = last_day_units * NanosecondsPerUnit; - // impala_timestamp will be unaligned every other entry so do memcpy instead - // of assign and reinterpret cast to avoid undefined behavior. - std::memcpy(impala_timestamp, &last_day_nanos, sizeof(int64_t)); -} - -constexpr int64_t kSecondsInNanos = INT64_C(1000000000); - -inline void SecondsToImpalaTimestamp(const int64_t seconds, Int96* impala_timestamp) { - ArrowTimestampToImpalaTimestamp(seconds, - impala_timestamp); -} - -constexpr int64_t kMillisecondsInNanos = kSecondsInNanos / INT64_C(1000); - -inline void MillisecondsToImpalaTimestamp(const int64_t milliseconds, - Int96* impala_timestamp) { - ArrowTimestampToImpalaTimestamp( - milliseconds, impala_timestamp); -} - -constexpr int64_t kMicrosecondsInNanos = kMillisecondsInNanos / INT64_C(1000); - -inline void MicrosecondsToImpalaTimestamp(const int64_t microseconds, - Int96* impala_timestamp) { - ArrowTimestampToImpalaTimestamp( - microseconds, impala_timestamp); -} - -constexpr int64_t kNanosecondsInNanos = INT64_C(1); - -inline void NanosecondsToImpalaTimestamp(const int64_t nanoseconds, - Int96* impala_timestamp) { - ArrowTimestampToImpalaTimestamp( - nanoseconds, impala_timestamp); -} - -} // namespace internal - } // namespace arrow - } // namespace parquet #endif // PARQUET_ARROW_WRITER_H diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 86e21c2f3a3..fa16234e6ec 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -17,14 +17,17 @@ #include "parquet/column_writer.h" +#include #include #include #include #include #include +#include "arrow/array.h" #include "arrow/buffer-builder.h" -#include "arrow/memory_pool.h" +#include "arrow/type.h" +#include "arrow/type_traits.h" #include "arrow/util/bit-stream-utils.h" #include "arrow/util/checked_cast.h" #include "arrow/util/compression.h" @@ -43,6 +46,7 @@ namespace parquet { +using ::arrow::Status; using ::arrow::internal::checked_cast; using BitWriter = ::arrow::BitUtil::BitWriter; @@ -131,7 +135,7 @@ class SerializedPageWriter : public PageWriter { public: SerializedPageWriter(const std::shared_ptr& sink, Compression::type codec, ColumnChunkMetaDataBuilder* metadata, - ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) + MemoryPool* pool = ::arrow::default_memory_pool()) : sink_(sink), metadata_(metadata), pool_(pool), @@ -268,7 +272,7 @@ class SerializedPageWriter : public PageWriter { private: std::shared_ptr sink_; ColumnChunkMetaDataBuilder* metadata_; - ::arrow::MemoryPool* pool_; + MemoryPool* pool_; int64_t num_values_; int64_t dictionary_page_offset_; int64_t data_page_offset_; @@ -286,7 +290,7 @@ class BufferedPageWriter : public PageWriter { public: BufferedPageWriter(const std::shared_ptr& sink, Compression::type codec, ColumnChunkMetaDataBuilder* metadata, - ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) + MemoryPool* pool = ::arrow::default_memory_pool()) : final_sink_(sink), metadata_(metadata) { in_memory_sink_ = CreateOutputStream(pool); pager_ = std::unique_ptr( @@ -334,8 +338,7 @@ class BufferedPageWriter : public PageWriter { std::unique_ptr PageWriter::Open( const std::shared_ptr& sink, Compression::type codec, - ColumnChunkMetaDataBuilder* metadata, ::arrow::MemoryPool* pool, - bool buffered_row_group) { + ColumnChunkMetaDataBuilder* metadata, MemoryPool* pool, bool buffered_row_group) { if (buffered_row_group) { return std::unique_ptr( new BufferedPageWriter(sink, codec, metadata, pool)); @@ -411,7 +414,9 @@ class ColumnWriterImpl { void AddDataPage(); // Serializes Data Pages - void WriteDataPage(const CompressedDataPage& page); + void WriteDataPage(const CompressedDataPage& page) { + total_bytes_written_ += pager_->WriteDataPage(page); + } // Write multiple definition levels void WriteDefinitionLevels(int64_t num_levels, const int16_t* levels) { @@ -445,7 +450,7 @@ class ColumnWriterImpl { LevelEncoder level_encoder_; - ::arrow::MemoryPool* allocator_; + MemoryPool* allocator_; // The total number of values stored in the data page. This is the maximum of // the number of encoded definition levels or encoded values. For @@ -586,10 +591,6 @@ void ColumnWriterImpl::AddDataPage() { num_buffered_encoded_values_ = 0; } -void ColumnWriterImpl::WriteDataPage(const CompressedDataPage& page) { - total_bytes_written_ += pager_->WriteDataPage(page); -} - int64_t ColumnWriterImpl::Close() { if (!closed_) { closed_ = true; @@ -658,6 +659,10 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset, const T* values) override; + Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels, + int64_t num_levels, const ::arrow::Array& array, + ArrowWriteContext* context) override; + int64_t EstimatedBufferedValueBytes() const override { return current_encoder_->EstimatedDataEncodedSize(); } @@ -666,7 +671,20 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< std::shared_ptr GetValuesBuffer() override { return current_encoder_->FlushValues(); } - void WriteDictionaryPage() override; + + void WriteDictionaryPage() override { + // We have to dynamic cast here because of TypedEncoder as + // some compilers don't want to cast through virtual inheritance + auto dict_encoder = dynamic_cast*>(current_encoder_.get()); + DCHECK(dict_encoder); + std::shared_ptr buffer = + AllocateBuffer(properties_->memory_pool(), dict_encoder->dict_encoded_size()); + dict_encoder->WriteDict(buffer->mutable_data()); + + DictionaryPage page(buffer, dict_encoder->num_entries(), + properties_->dictionary_page_encoding()); + total_bytes_written_ += pager_->WriteDictionaryPage(page); + } // Checks if the Dictionary Page size limit is reached // If the limit is reached, the Dictionary and Data Pages are serialized @@ -685,7 +703,12 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< return result; } - void ResetPageStatistics() override; + void ResetPageStatistics() override { + if (chunk_statistics_ != nullptr) { + chunk_statistics_->Merge(*page_statistics_); + page_statistics_->Reset(); + } + } Type::type type() const override { return descr_->physical_type(); } @@ -700,6 +723,12 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< const WriterProperties* properties() override { return properties_; } private: + using ValueEncoderType = typename EncodingTraits::Encoder; + using TypedStats = TypedStatistics; + std::unique_ptr current_encoder_; + std::shared_ptr page_statistics_; + std::shared_ptr chunk_statistics_; + inline int64_t WriteMiniBatch(int64_t num_values, const int16_t* def_levels, const int16_t* rep_levels, const T* values); @@ -710,16 +739,16 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< int64_t* num_spaced_written); // Write values to a temporary buffer before they are encoded into pages - void WriteValues(int64_t num_values, const T* values); - void WriteValuesSpaced(int64_t num_values, const uint8_t* valid_bits, - int64_t valid_bits_offset, const T* values); - - using ValueEncoderType = typename EncodingTraits::Encoder; - std::unique_ptr current_encoder_; + void WriteValues(int64_t num_values, const T* values) { + dynamic_cast(current_encoder_.get()) + ->Put(values, static_cast(num_values)); + } - using TypedStats = TypedStatistics; - std::shared_ptr page_statistics_; - std::shared_ptr chunk_statistics_; + void WriteValuesSpaced(int64_t num_values, const uint8_t* valid_bits, + int64_t valid_bits_offset, const T* values) { + dynamic_cast(current_encoder_.get()) + ->PutSpaced(values, static_cast(num_values), valid_bits, valid_bits_offset); + } }; // Only one Dictionary Page is written. @@ -741,29 +770,6 @@ void TypedColumnWriterImpl::CheckDictionarySizeLimit() { } } -template -void TypedColumnWriterImpl::WriteDictionaryPage() { - // We have to dynamic cast here because TypedEncoder as some compilers - // don't want to cast through virtual inheritance - auto dict_encoder = dynamic_cast*>(current_encoder_.get()); - DCHECK(dict_encoder); - std::shared_ptr buffer = - AllocateBuffer(properties_->memory_pool(), dict_encoder->dict_encoded_size()); - dict_encoder->WriteDict(buffer->mutable_data()); - - DictionaryPage page(buffer, dict_encoder->num_entries(), - properties_->dictionary_page_encoding()); - total_bytes_written_ += pager_->WriteDictionaryPage(page); -} - -template -void TypedColumnWriterImpl::ResetPageStatistics() { - if (chunk_statistics_ != nullptr) { - chunk_statistics_->Merge(*page_statistics_); - page_statistics_->Reset(); - } -} - // ---------------------------------------------------------------------- // Instantiate templated classes @@ -952,19 +958,534 @@ void TypedColumnWriterImpl::WriteBatchSpaced( values + values_offset, &num_spaced_written); } -template -void TypedColumnWriterImpl::WriteValues(int64_t num_values, const T* values) { - dynamic_cast(current_encoder_.get()) - ->Put(values, static_cast(num_values)); +// ---------------------------------------------------------------------- +// Direct Arrow write path + +template +struct SerializeFunctor { + using ArrowCType = typename ArrowType::c_type; + using ArrayType = typename ::arrow::TypeTraits::ArrayType; + using ParquetCType = typename ParquetType::c_type; + Status Serialize(const ArrayType& array, ArrowWriteContext*, ParquetCType* out) { + const ArrowCType* input = array.raw_values(); + if (array.null_count() > 0) { + for (int i = 0; i < array.length(); i++) { + out[i] = static_cast(input[i]); + } + } else { + std::copy(input, input + array.length(), out); + } + return Status::OK(); + } +}; + +template +inline Status SerializeData(const ::arrow::Array& array, ArrowWriteContext* ctx, + typename ParquetType::c_type* out) { + using ArrayType = typename ::arrow::TypeTraits::ArrayType; + SerializeFunctor functor; + return functor.Serialize(checked_cast(array), ctx, out); } -template -void TypedColumnWriterImpl::WriteValuesSpaced(int64_t num_values, - const uint8_t* valid_bits, - int64_t valid_bits_offset, - const T* values) { - dynamic_cast(current_encoder_.get()) - ->PutSpaced(values, static_cast(num_values), valid_bits, valid_bits_offset); +template +Status WriteArrowSerialize(const ::arrow::Array& array, int64_t num_levels, + const int16_t* def_levels, const int16_t* rep_levels, + ArrowWriteContext* ctx, + TypedColumnWriter* writer) { + using ParquetCType = typename ParquetType::c_type; + + ParquetCType* buffer; + PARQUET_THROW_NOT_OK(ctx->GetScratchData(array.length(), &buffer)); + + bool no_nulls = + writer->descr()->schema_node()->is_required() || (array.null_count() == 0); + + Status s = SerializeData(array, ctx, buffer); + RETURN_NOT_OK(s); + if (no_nulls) { + PARQUET_CATCH_NOT_OK(writer->WriteBatch(num_levels, def_levels, rep_levels, buffer)); + } else { + PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(num_levels, def_levels, rep_levels, + array.null_bitmap_data(), + array.offset(), buffer)); + } + return Status::OK(); +} + +template +Status WriteArrowZeroCopy(const ::arrow::Array& array, int64_t num_levels, + const int16_t* def_levels, const int16_t* rep_levels, + ArrowWriteContext* ctx, + TypedColumnWriter* writer) { + using T = typename ParquetType::c_type; + const auto& data = static_cast(array); + const T* values = nullptr; + // The values buffer may be null if the array is empty (ARROW-2744) + if (data.values() != nullptr) { + values = reinterpret_cast(data.values()->data()) + data.offset(); + } else { + DCHECK_EQ(data.length(), 0); + } + if (writer->descr()->schema_node()->is_required() || (data.null_count() == 0)) { + PARQUET_CATCH_NOT_OK(writer->WriteBatch(num_levels, def_levels, rep_levels, values)); + } else { + PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(num_levels, def_levels, rep_levels, + data.null_bitmap_data(), data.offset(), + values)); + } + return Status::OK(); +} + +#define WRITE_SERIALIZE_CASE(ArrowEnum, ArrowType, ParquetType) \ + case ::arrow::Type::ArrowEnum: \ + return WriteArrowSerialize( \ + array, num_levels, def_levels, rep_levels, ctx, this); + +#define WRITE_ZERO_COPY_CASE(ArrowEnum, ArrowType, ParquetType) \ + case ::arrow::Type::ArrowEnum: \ + return WriteArrowZeroCopy(array, num_levels, def_levels, rep_levels, \ + ctx, this); + +#define ARROW_UNSUPPORTED() \ + std::stringstream ss; \ + ss << "Arrow type " << array.type()->ToString() \ + << " cannot be written to Parquet type " << descr_->ToString(); \ + return Status::Invalid(ss.str()); + +// ---------------------------------------------------------------------- +// Write Arrow to BooleanType + +template <> +Status TypedColumnWriterImpl::WriteArrow(const int16_t* def_levels, + const int16_t* rep_levels, + int64_t num_levels, + const ::arrow::Array& array, + ArrowWriteContext* ctx) { + if (array.type_id() != ::arrow::Type::BOOL) { + ARROW_UNSUPPORTED(); + } + bool* buffer = nullptr; + RETURN_NOT_OK(ctx->GetScratchData(array.length(), &buffer)); + + const auto& data = static_cast(array); + const uint8_t* values = nullptr; + // The values buffer may be null if the array is empty (ARROW-2744) + if (data.values() != nullptr) { + values = reinterpret_cast(data.values()->data()); + } else { + DCHECK_EQ(data.length(), 0); + } + + int buffer_idx = 0; + int64_t offset = array.offset(); + for (int i = 0; i < data.length(); i++) { + if (data.IsValid(i)) { + buffer[buffer_idx++] = BitUtil::GetBit(values, offset + i); + } + } + PARQUET_CATCH_NOT_OK(WriteBatch(num_levels, def_levels, rep_levels, buffer)); + return Status::OK(); +} + +// ---------------------------------------------------------------------- +// Write Arrow types to INT32 + +template <> +struct SerializeFunctor { + Status Serialize(const ::arrow::Date64Array& array, ArrowWriteContext*, int32_t* out) { + const int64_t* input = array.raw_values(); + for (int i = 0; i < array.length(); i++) { + *out++ = static_cast(*input++ / 86400000); + } + return Status::OK(); + } +}; + +template <> +struct SerializeFunctor { + Status Serialize(const ::arrow::Time32Array& array, ArrowWriteContext*, int32_t* out) { + const int32_t* input = array.raw_values(); + const auto& type = static_cast(*array.type()); + if (type.unit() == ::arrow::TimeUnit::SECOND) { + for (int i = 0; i < array.length(); i++) { + out[i] = input[i] * 1000; + } + } else { + std::copy(input, input + array.length(), out); + } + return Status::OK(); + } +}; + +template <> +Status TypedColumnWriterImpl::WriteArrow(const int16_t* def_levels, + const int16_t* rep_levels, + int64_t num_levels, + const ::arrow::Array& array, + ArrowWriteContext* ctx) { + switch (array.type()->id()) { + case ::arrow::Type::NA: { + PARQUET_CATCH_NOT_OK(WriteBatch(num_levels, def_levels, rep_levels, nullptr)); + } break; + WRITE_SERIALIZE_CASE(INT8, Int8Type, Int32Type) + WRITE_SERIALIZE_CASE(UINT8, UInt8Type, Int32Type) + WRITE_SERIALIZE_CASE(INT16, Int16Type, Int32Type) + WRITE_SERIALIZE_CASE(UINT16, UInt16Type, Int32Type) + WRITE_SERIALIZE_CASE(UINT32, UInt32Type, Int32Type) + WRITE_ZERO_COPY_CASE(INT32, Int32Type, Int32Type) + WRITE_ZERO_COPY_CASE(DATE32, Date32Type, Int32Type) + WRITE_SERIALIZE_CASE(DATE64, Date64Type, Int32Type) + WRITE_SERIALIZE_CASE(TIME32, Time32Type, Int32Type) + default: + ARROW_UNSUPPORTED() + } + return Status::OK(); +} + +// ---------------------------------------------------------------------- +// Write Arrow to Int64 and Int96 + +#define INT96_CONVERT_LOOP(ConversionFunction) \ + for (int64_t i = 0; i < array.length(); i++) ConversionFunction(input[i], &out[i]); + +template <> +struct SerializeFunctor { + Status Serialize(const ::arrow::TimestampArray& array, ArrowWriteContext*, Int96* out) { + const int64_t* input = array.raw_values(); + const auto& type = static_cast(*array.type()); + switch (type.unit()) { + case ::arrow::TimeUnit::NANO: + INT96_CONVERT_LOOP(internal::NanosecondsToImpalaTimestamp); + break; + case ::arrow::TimeUnit::MICRO: + INT96_CONVERT_LOOP(internal::MicrosecondsToImpalaTimestamp); + break; + case ::arrow::TimeUnit::MILLI: + INT96_CONVERT_LOOP(internal::MillisecondsToImpalaTimestamp); + break; + case ::arrow::TimeUnit::SECOND: + INT96_CONVERT_LOOP(internal::SecondsToImpalaTimestamp); + break; + } + return Status::OK(); + } +}; + +#define COERCE_DIVIDE -1 +#define COERCE_INVALID 0 +#define COERCE_MULTIPLY +1 + +static std::pair kTimestampCoercionFactors[4][4] = { + // from seconds ... + {{COERCE_INVALID, 0}, // ... to seconds + {COERCE_MULTIPLY, 1000}, // ... to millis + {COERCE_MULTIPLY, 1000000}, // ... to micros + {COERCE_MULTIPLY, INT64_C(1000000000)}}, // ... to nanos + // from millis ... + {{COERCE_INVALID, 0}, + {COERCE_MULTIPLY, 1}, + {COERCE_MULTIPLY, 1000}, + {COERCE_MULTIPLY, 1000000}}, + // from micros ... + {{COERCE_INVALID, 0}, + {COERCE_DIVIDE, 1000}, + {COERCE_MULTIPLY, 1}, + {COERCE_MULTIPLY, 1000}}, + // from nanos ... + {{COERCE_INVALID, 0}, + {COERCE_DIVIDE, 1000000}, + {COERCE_DIVIDE, 1000}, + {COERCE_MULTIPLY, 1}}}; + +template <> +struct SerializeFunctor { + Status Serialize(const ::arrow::TimestampArray& array, ArrowWriteContext* ctx, + int64_t* out) { + const auto& source_type = static_cast(*array.type()); + auto source_unit = source_type.unit(); + const int64_t* values = array.raw_values(); + + ::arrow::TimeUnit::type target_unit = ctx->properties->coerce_timestamps_unit(); + auto target_type = ::arrow::timestamp(target_unit); + bool truncation_allowed = ctx->properties->truncated_timestamps_allowed(); + + auto DivideBy = [&](const int64_t factor) { + for (int64_t i = 0; i < array.length(); i++) { + if (!truncation_allowed && array.IsValid(i) && (values[i] % factor != 0)) { + return Status::Invalid("Casting from ", source_type.ToString(), " to ", + target_type->ToString(), + " would lose data: ", values[i]); + } + out[i] = values[i] / factor; + } + return Status::OK(); + }; + + auto MultiplyBy = [&](const int64_t factor) { + for (int64_t i = 0; i < array.length(); i++) { + out[i] = values[i] * factor; + } + return Status::OK(); + }; + + const auto& coercion = kTimestampCoercionFactors[static_cast(source_unit)] + [static_cast(target_unit)]; + + // .first -> coercion operation; .second -> scale factor + DCHECK_NE(coercion.first, COERCE_INVALID); + return coercion.first == COERCE_DIVIDE ? DivideBy(coercion.second) + : MultiplyBy(coercion.second); + } +}; + +#undef COERCE_DIVIDE +#undef COERCE_INVALID +#undef COERCE_MULTIPLY + +Status WriteTimestamps(const ::arrow::Array& values, int64_t num_levels, + const int16_t* def_levels, const int16_t* rep_levels, + ArrowWriteContext* ctx, TypedColumnWriter* writer) { + const auto& source_type = static_cast(*values.type()); + + auto WriteCoerce = [&](const ArrowWriterProperties* properties) { + ArrowWriteContext temp_ctx = *ctx; + temp_ctx.properties = properties; + return WriteArrowSerialize( + values, num_levels, def_levels, rep_levels, &temp_ctx, writer); + }; + + if (ctx->properties->coerce_timestamps_enabled()) { + // User explicitly requested coercion to specific unit + if (source_type.unit() == ctx->properties->coerce_timestamps_unit()) { + // No data conversion necessary + return WriteArrowZeroCopy(values, num_levels, def_levels, rep_levels, + ctx, writer); + } else { + return WriteCoerce(ctx->properties); + } + } else if (writer->properties()->version() == ParquetVersion::PARQUET_1_0 && + source_type.unit() == ::arrow::TimeUnit::NANO) { + // Absent superseding user instructions, when writing Parquet version 1.0 files, + // timestamps in nanoseconds are coerced to microseconds + std::shared_ptr properties = + (ArrowWriterProperties::Builder()) + .coerce_timestamps(::arrow::TimeUnit::MICRO) + ->disallow_truncated_timestamps() + ->build(); + return WriteCoerce(properties.get()); + } else if (source_type.unit() == ::arrow::TimeUnit::SECOND) { + // Absent superseding user instructions, timestamps in seconds are coerced to + // milliseconds + std::shared_ptr properties = + (ArrowWriterProperties::Builder()) + .coerce_timestamps(::arrow::TimeUnit::MILLI) + ->build(); + return WriteCoerce(properties.get()); + } else { + // No data conversion necessary + return WriteArrowZeroCopy(values, num_levels, def_levels, rep_levels, ctx, + writer); + } +} + +template <> +Status TypedColumnWriterImpl::WriteArrow(const int16_t* def_levels, + const int16_t* rep_levels, + int64_t num_levels, + const ::arrow::Array& array, + ArrowWriteContext* ctx) { + switch (array.type()->id()) { + case ::arrow::Type::TIMESTAMP: + return WriteTimestamps(array, num_levels, def_levels, rep_levels, ctx, this); + WRITE_ZERO_COPY_CASE(INT64, Int64Type, Int64Type) + WRITE_SERIALIZE_CASE(UINT32, UInt32Type, Int64Type) + WRITE_SERIALIZE_CASE(UINT64, UInt64Type, Int64Type) + WRITE_ZERO_COPY_CASE(TIME64, Time64Type, Int64Type) + default: + ARROW_UNSUPPORTED(); + } +} + +template <> +Status TypedColumnWriterImpl::WriteArrow(const int16_t* def_levels, + const int16_t* rep_levels, + int64_t num_levels, + const ::arrow::Array& array, + ArrowWriteContext* ctx) { + if (array.type_id() != ::arrow::Type::TIMESTAMP) { + ARROW_UNSUPPORTED(); + } + return WriteArrowSerialize( + array, num_levels, def_levels, rep_levels, ctx, this); +} + +// ---------------------------------------------------------------------- +// Floating point types + +template <> +Status TypedColumnWriterImpl::WriteArrow(const int16_t* def_levels, + const int16_t* rep_levels, + int64_t num_levels, + const ::arrow::Array& array, + ArrowWriteContext* ctx) { + if (array.type_id() != ::arrow::Type::FLOAT) { + ARROW_UNSUPPORTED(); + } + return WriteArrowZeroCopy(array, num_levels, def_levels, rep_levels, ctx, + this); +} + +template <> +Status TypedColumnWriterImpl::WriteArrow(const int16_t* def_levels, + const int16_t* rep_levels, + int64_t num_levels, + const ::arrow::Array& array, + ArrowWriteContext* ctx) { + if (array.type_id() != ::arrow::Type::DOUBLE) { + ARROW_UNSUPPORTED(); + } + return WriteArrowZeroCopy(array, num_levels, def_levels, rep_levels, ctx, + this); +} + +// ---------------------------------------------------------------------- +// Write Arrow to BYTE_ARRAY + +template +struct SerializeFunctor> { + Status Serialize(const ::arrow::BinaryArray& array, ArrowWriteContext*, + ByteArray* out) { + // In the case of an array consisting of only empty strings or all null, + // array.data() points already to a nullptr, thus array.data()->data() will + // segfault. + const uint8_t* values = nullptr; + if (array.value_data()) { + values = reinterpret_cast(array.value_data()->data()); + DCHECK(values != nullptr); + } + + // Slice offset is accounted for in raw_value_offsets + const int32_t* value_offset = array.raw_value_offsets(); + if (array.null_count() == 0) { + // no nulls, just dump the data + for (int64_t i = 0; i < array.length(); i++) { + out[i] = + ByteArray(value_offset[i + 1] - value_offset[i], values + value_offset[i]); + } + } else { + for (int64_t i = 0; i < array.length(); i++) { + if (array.IsValid(i)) { + out[i] = + ByteArray(value_offset[i + 1] - value_offset[i], values + value_offset[i]); + } + } + } + return Status::OK(); + } +}; + +template <> +Status TypedColumnWriterImpl::WriteArrow(const int16_t* def_levels, + const int16_t* rep_levels, + int64_t num_levels, + const ::arrow::Array& array, + ArrowWriteContext* ctx) { + switch (array.type()->id()) { + WRITE_SERIALIZE_CASE(BINARY, BinaryType, ByteArrayType) + WRITE_SERIALIZE_CASE(STRING, BinaryType, ByteArrayType) + default: + ARROW_UNSUPPORTED(); + } +} + +// ---------------------------------------------------------------------- +// Write Arrow to FIXED_LEN_BYTE_ARRAY + +template +struct SerializeFunctor> { + Status Serialize(const ::arrow::FixedSizeBinaryArray& array, ArrowWriteContext*, + FLBA* out) { + if (array.null_count() == 0) { + // no nulls, just dump the data + // todo(advancedxy): use a writeBatch to avoid this step + for (int64_t i = 0; i < array.length(); i++) { + out[i] = FixedLenByteArray(array.GetValue(i)); + } + } else { + for (int64_t i = 0; i < array.length(); i++) { + if (array.IsValid(i)) { + out[i] = FixedLenByteArray(array.GetValue(i)); + } + } + } + return Status::OK(); + } +}; + +template <> +Status WriteArrowSerialize( + const ::arrow::Array& array, int64_t num_levels, const int16_t* def_levels, + const int16_t* rep_levels, ArrowWriteContext* ctx, + TypedColumnWriter* writer) { + const auto& data = static_cast(array); + const int64_t length = data.length(); + + FLBA* buffer; + RETURN_NOT_OK(ctx->GetScratchData(num_levels, &buffer)); + + const auto& decimal_type = static_cast(*data.type()); + const int32_t offset = + decimal_type.byte_width() - internal::DecimalSize(decimal_type.precision()); + + const bool does_not_have_nulls = + writer->descr()->schema_node()->is_required() || data.null_count() == 0; + + const auto valid_value_count = static_cast(length - data.null_count()) * 2; + std::vector big_endian_values(valid_value_count); + + // TODO(phillipc): Look into whether our compilers will perform loop unswitching so we + // don't have to keep writing two loops to handle the case where we know there are no + // nulls + if (does_not_have_nulls) { + // no nulls, just dump the data + // todo(advancedxy): use a writeBatch to avoid this step + for (int64_t i = 0, j = 0; i < length; ++i, j += 2) { + auto unsigned_64_bit = reinterpret_cast(data.GetValue(i)); + big_endian_values[j] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]); + big_endian_values[j + 1] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]); + buffer[i] = FixedLenByteArray( + reinterpret_cast(&big_endian_values[j]) + offset); + } + } else { + for (int64_t i = 0, buffer_idx = 0, j = 0; i < length; ++i) { + if (data.IsValid(i)) { + auto unsigned_64_bit = reinterpret_cast(data.GetValue(i)); + big_endian_values[j] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]); + big_endian_values[j + 1] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]); + buffer[buffer_idx++] = FixedLenByteArray( + reinterpret_cast(&big_endian_values[j]) + offset); + j += 2; + } + } + } + PARQUET_CATCH_NOT_OK(writer->WriteBatch(num_levels, def_levels, rep_levels, buffer)); + return Status::OK(); +} + +template <> +Status TypedColumnWriterImpl::WriteArrow(const int16_t* def_levels, + const int16_t* rep_levels, + int64_t num_levels, + const ::arrow::Array& array, + ArrowWriteContext* ctx) { + switch (array.type()->id()) { + WRITE_SERIALIZE_CASE(FIXED_SIZE_BINARY, FixedSizeBinaryType, FLBAType) + WRITE_SERIALIZE_CASE(DECIMAL, Decimal128Type, FLBAType) + default: + break; + } + return Status::OK(); } // ---------------------------------------------------------------------- diff --git a/cpp/src/parquet/column_writer.h b/cpp/src/parquet/column_writer.h index 0a67a86b63c..27ca400eb46 100644 --- a/cpp/src/parquet/column_writer.h +++ b/cpp/src/parquet/column_writer.h @@ -18,6 +18,7 @@ #pragma once #include +#include #include #include "parquet/exception.h" @@ -26,6 +27,8 @@ namespace arrow { +class Array; + namespace BitUtil { class BitWriter; } // namespace BitUtil @@ -38,6 +41,7 @@ class RleEncoder; namespace parquet { +struct ArrowWriteContext; class ColumnDescriptor; class CompressedDataPage; class DictionaryPage; @@ -130,6 +134,13 @@ class PARQUET_EXPORT ColumnWriter { /// \brief The file-level writer properties virtual const WriterProperties* properties() = 0; + + /// \brief Write Apache Arrow columnar data directly to ColumnWriter. Returns + /// error status if the array data type is not compatible with the concrete + /// writer type + virtual ::arrow::Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels, + int64_t num_levels, const ::arrow::Array& array, + ArrowWriteContext* ctx) = 0; }; // API to write values to a single column. This is the main client facing API. @@ -186,4 +197,55 @@ using DoubleWriter = TypedColumnWriter; using ByteArrayWriter = TypedColumnWriter; using FixedLenByteArrayWriter = TypedColumnWriter; +namespace internal { + +/** + * Timestamp conversion constants + */ +constexpr int64_t kJulianEpochOffsetDays = INT64_C(2440588); + +template +inline void ArrowTimestampToImpalaTimestamp(const int64_t time, Int96* impala_timestamp) { + int64_t julian_days = (time / UnitPerDay) + kJulianEpochOffsetDays; + (*impala_timestamp).value[2] = (uint32_t)julian_days; + + int64_t last_day_units = time % UnitPerDay; + auto last_day_nanos = last_day_units * NanosecondsPerUnit; + // impala_timestamp will be unaligned every other entry so do memcpy instead + // of assign and reinterpret cast to avoid undefined behavior. + std::memcpy(impala_timestamp, &last_day_nanos, sizeof(int64_t)); +} + +constexpr int64_t kSecondsInNanos = INT64_C(1000000000); + +inline void SecondsToImpalaTimestamp(const int64_t seconds, Int96* impala_timestamp) { + ArrowTimestampToImpalaTimestamp(seconds, + impala_timestamp); +} + +constexpr int64_t kMillisecondsInNanos = kSecondsInNanos / INT64_C(1000); + +inline void MillisecondsToImpalaTimestamp(const int64_t milliseconds, + Int96* impala_timestamp) { + ArrowTimestampToImpalaTimestamp( + milliseconds, impala_timestamp); +} + +constexpr int64_t kMicrosecondsInNanos = kMillisecondsInNanos / INT64_C(1000); + +inline void MicrosecondsToImpalaTimestamp(const int64_t microseconds, + Int96* impala_timestamp) { + ArrowTimestampToImpalaTimestamp( + microseconds, impala_timestamp); +} + +constexpr int64_t kNanosecondsInNanos = INT64_C(1); + +inline void NanosecondsToImpalaTimestamp(const int64_t nanoseconds, + Int96* impala_timestamp) { + ArrowTimestampToImpalaTimestamp( + nanoseconds, impala_timestamp); +} + +} // namespace internal } // namespace parquet diff --git a/cpp/src/parquet/properties.cc b/cpp/src/parquet/properties.cc index 4d3c5d677dc..e8ffad431e8 100644 --- a/cpp/src/parquet/properties.cc +++ b/cpp/src/parquet/properties.cc @@ -38,4 +38,15 @@ std::shared_ptr ReaderProperties::GetStream( } } +ArrowReaderProperties default_arrow_reader_properties() { + static ArrowReaderProperties default_reader_props; + return default_reader_props; +} + +std::shared_ptr default_arrow_writer_properties() { + static std::shared_ptr default_writer_properties = + ArrowWriterProperties::Builder().build(); + return default_writer_properties; +} + } // namespace parquet diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index d08d7b0c8fe..40a358c109d 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -21,6 +21,9 @@ #include #include #include +#include + +#include "arrow/type.h" #include "parquet/exception.h" #include "parquet/parquet_version.h" @@ -39,13 +42,13 @@ static bool DEFAULT_USE_BUFFERED_STREAM = false; class PARQUET_EXPORT ReaderProperties { public: - explicit ReaderProperties(::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) + explicit ReaderProperties(MemoryPool* pool = ::arrow::default_memory_pool()) : pool_(pool) { buffered_stream_enabled_ = DEFAULT_USE_BUFFERED_STREAM; buffer_size_ = DEFAULT_BUFFER_SIZE; } - ::arrow::MemoryPool* memory_pool() const { return pool_; } + MemoryPool* memory_pool() const { return pool_; } std::shared_ptr GetStream(std::shared_ptr source, int64_t start, int64_t num_bytes); @@ -61,7 +64,7 @@ class PARQUET_EXPORT ReaderProperties { int64_t buffer_size() const { return buffer_size_; } private: - ::arrow::MemoryPool* pool_; + MemoryPool* pool_; int64_t buffer_size_; bool buffered_stream_enabled_; }; @@ -142,7 +145,7 @@ class PARQUET_EXPORT WriterProperties { created_by_(DEFAULT_CREATED_BY) {} virtual ~Builder() {} - Builder* memory_pool(::arrow::MemoryPool* pool) { + Builder* memory_pool(MemoryPool* pool) { pool_ = pool; return this; } @@ -320,7 +323,7 @@ class PARQUET_EXPORT WriterProperties { } private: - ::arrow::MemoryPool* pool_; + MemoryPool* pool_; int64_t dictionary_pagesize_limit_; int64_t write_batch_size_; int64_t max_row_group_length_; @@ -336,7 +339,7 @@ class PARQUET_EXPORT WriterProperties { std::unordered_map statistics_enabled_; }; - inline ::arrow::MemoryPool* memory_pool() const { return pool_; } + inline MemoryPool* memory_pool() const { return pool_; } inline int64_t dictionary_pagesize_limit() const { return dictionary_pagesize_limit_; } @@ -395,10 +398,9 @@ class PARQUET_EXPORT WriterProperties { private: explicit WriterProperties( - ::arrow::MemoryPool* pool, int64_t dictionary_pagesize_limit, - int64_t write_batch_size, int64_t max_row_group_length, int64_t pagesize, - ParquetVersion::type version, const std::string& created_by, - const ColumnProperties& default_column_properties, + MemoryPool* pool, int64_t dictionary_pagesize_limit, int64_t write_batch_size, + int64_t max_row_group_length, int64_t pagesize, ParquetVersion::type version, + const std::string& created_by, const ColumnProperties& default_column_properties, const std::unordered_map& column_properties) : pool_(pool), dictionary_pagesize_limit_(dictionary_pagesize_limit), @@ -410,7 +412,7 @@ class PARQUET_EXPORT WriterProperties { default_column_properties_(default_column_properties), column_properties_(column_properties) {} - ::arrow::MemoryPool* pool_; + MemoryPool* pool_; int64_t dictionary_pagesize_limit_; int64_t write_batch_size_; int64_t max_row_group_length_; @@ -423,6 +425,161 @@ class PARQUET_EXPORT WriterProperties { std::shared_ptr PARQUET_EXPORT default_writer_properties(); +// ---------------------------------------------------------------------- +// Properties specific to Apache Arrow columnar read and write + +static constexpr bool kArrowDefaultUseThreads = false; + +// Default number of rows to read when using ::arrow::RecordBatchReader +static constexpr int64_t kArrowDefaultBatchSize = 64 * 1024; + +/// EXPERIMENTAL: Properties for configuring FileReader behavior. +class PARQUET_EXPORT ArrowReaderProperties { + public: + explicit ArrowReaderProperties(bool use_threads = kArrowDefaultUseThreads) + : use_threads_(use_threads), + read_dict_indices_(), + batch_size_(kArrowDefaultBatchSize) {} + + void set_use_threads(bool use_threads) { use_threads_ = use_threads; } + + bool use_threads() const { return use_threads_; } + + void set_read_dictionary(int column_index, bool read_dict) { + if (read_dict) { + read_dict_indices_.insert(column_index); + } else { + read_dict_indices_.erase(column_index); + } + } + bool read_dictionary(int column_index) const { + if (read_dict_indices_.find(column_index) != read_dict_indices_.end()) { + return true; + } else { + return false; + } + } + + void set_batch_size(int64_t batch_size) { batch_size_ = batch_size; } + + int64_t batch_size() const { return batch_size_; } + + private: + bool use_threads_; + std::unordered_set read_dict_indices_; + int64_t batch_size_; +}; + +/// EXPERIMENTAL: Constructs the default ArrowReaderProperties +PARQUET_EXPORT +ArrowReaderProperties default_arrow_reader_properties(); + +class PARQUET_EXPORT ArrowWriterProperties { + public: + class Builder { + public: + Builder() + : write_timestamps_as_int96_(false), + coerce_timestamps_enabled_(false), + coerce_timestamps_unit_(::arrow::TimeUnit::SECOND), + truncated_timestamps_allowed_(false) {} + virtual ~Builder() {} + + Builder* disable_deprecated_int96_timestamps() { + write_timestamps_as_int96_ = false; + return this; + } + + Builder* enable_deprecated_int96_timestamps() { + write_timestamps_as_int96_ = true; + return this; + } + + Builder* coerce_timestamps(::arrow::TimeUnit::type unit) { + coerce_timestamps_enabled_ = true; + coerce_timestamps_unit_ = unit; + return this; + } + + Builder* allow_truncated_timestamps() { + truncated_timestamps_allowed_ = true; + return this; + } + + Builder* disallow_truncated_timestamps() { + truncated_timestamps_allowed_ = false; + return this; + } + + std::shared_ptr build() { + return std::shared_ptr(new ArrowWriterProperties( + write_timestamps_as_int96_, coerce_timestamps_enabled_, coerce_timestamps_unit_, + truncated_timestamps_allowed_)); + } + + private: + bool write_timestamps_as_int96_; + + bool coerce_timestamps_enabled_; + ::arrow::TimeUnit::type coerce_timestamps_unit_; + bool truncated_timestamps_allowed_; + }; + + bool support_deprecated_int96_timestamps() const { return write_timestamps_as_int96_; } + + bool coerce_timestamps_enabled() const { return coerce_timestamps_enabled_; } + ::arrow::TimeUnit::type coerce_timestamps_unit() const { + return coerce_timestamps_unit_; + } + + bool truncated_timestamps_allowed() const { return truncated_timestamps_allowed_; } + + private: + explicit ArrowWriterProperties(bool write_nanos_as_int96, + bool coerce_timestamps_enabled, + ::arrow::TimeUnit::type coerce_timestamps_unit, + bool truncated_timestamps_allowed) + : write_timestamps_as_int96_(write_nanos_as_int96), + coerce_timestamps_enabled_(coerce_timestamps_enabled), + coerce_timestamps_unit_(coerce_timestamps_unit), + truncated_timestamps_allowed_(truncated_timestamps_allowed) {} + + const bool write_timestamps_as_int96_; + const bool coerce_timestamps_enabled_; + const ::arrow::TimeUnit::type coerce_timestamps_unit_; + const bool truncated_timestamps_allowed_; +}; + +/// \brief State object used for writing Arrow data directly to a Parquet +/// column chunk. API possibly not stable +struct ArrowWriteContext { + ArrowWriteContext(MemoryPool* memory_pool, ArrowWriterProperties* properties) + : memory_pool(memory_pool), + properties(properties), + data_buffer(AllocateBuffer(memory_pool)), + def_levels_buffer(AllocateBuffer(memory_pool)) {} + + template + ::arrow::Status GetScratchData(const int64_t num_values, T** out) { + ARROW_RETURN_NOT_OK(this->data_buffer->Resize(num_values * sizeof(T), false)); + *out = reinterpret_cast(this->data_buffer->mutable_data()); + return ::arrow::Status::OK(); + } + + MemoryPool* memory_pool; + const ArrowWriterProperties* properties; + + // Buffer used for storing the data of an array converted to the physical type + // as expected by parquet-cpp. + std::shared_ptr data_buffer; + + // We use the shared ownership of this buffer + std::shared_ptr def_levels_buffer; +}; + +PARQUET_EXPORT +std::shared_ptr default_arrow_writer_properties(); + } // namespace parquet #endif // PARQUET_COLUMN_PROPERTIES_H diff --git a/cpp/src/parquet/schema.cc b/cpp/src/parquet/schema.cc index e961127bc1c..1bada629556 100644 --- a/cpp/src/parquet/schema.cc +++ b/cpp/src/parquet/schema.cc @@ -25,7 +25,6 @@ #include #include "arrow/util/logging.h" -#include "arrow/util/macros.h" #include "parquet/exception.h" #include "parquet/schema-internal.h" diff --git a/cpp/src/parquet/types.cc b/cpp/src/parquet/types.cc index 822afcd872b..f7e0cf3e4c0 100644 --- a/cpp/src/parquet/types.cc +++ b/cpp/src/parquet/types.cc @@ -1608,4 +1608,80 @@ class LogicalType::Impl::Unknown final : public LogicalType::Impl::SimpleCompati GENERATE_MAKE(Unknown) +namespace internal { + +/// \brief Compute the number of bytes required to represent a decimal of a +/// given precision. Taken from the Apache Impala codebase. The comments next +/// to the return values are the maximum value that can be represented in 2's +/// complement with the returned number of bytes. +int32_t DecimalSize(int32_t precision) { + DCHECK_GE(precision, 1) << "decimal precision must be greater than or equal to 1, got " + << precision; + DCHECK_LE(precision, 38) << "decimal precision must be less than or equal to 38, got " + << precision; + + switch (precision) { + case 1: + case 2: + return 1; // 127 + case 3: + case 4: + return 2; // 32,767 + case 5: + case 6: + return 3; // 8,388,607 + case 7: + case 8: + case 9: + return 4; // 2,147,483,427 + case 10: + case 11: + return 5; // 549,755,813,887 + case 12: + case 13: + case 14: + return 6; // 140,737,488,355,327 + case 15: + case 16: + return 7; // 36,028,797,018,963,967 + case 17: + case 18: + return 8; // 9,223,372,036,854,775,807 + case 19: + case 20: + case 21: + return 9; // 2,361,183,241,434,822,606,847 + case 22: + case 23: + return 10; // 604,462,909,807,314,587,353,087 + case 24: + case 25: + case 26: + return 11; // 154,742,504,910,672,534,362,390,527 + case 27: + case 28: + return 12; // 39,614,081,257,132,168,796,771,975,167 + case 29: + case 30: + case 31: + return 13; // 10,141,204,801,825,835,211,973,625,643,007 + case 32: + case 33: + return 14; // 2,596,148,429,267,413,814,265,248,164,610,047 + case 34: + case 35: + return 15; // 664,613,997,892,457,936,451,903,530,140,172,287 + case 36: + case 37: + case 38: + return 16; // 170,141,183,460,469,231,731,687,303,715,884,105,727 + default: + break; + } + DCHECK(false); + return -1; +} + +} // namespace internal + } // namespace parquet diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h index b317784ba0e..bc456ea24a8 100644 --- a/cpp/src/parquet/types.h +++ b/cpp/src/parquet/types.h @@ -678,6 +678,12 @@ PARQUET_EXPORT SortOrder::type GetSortOrder(ConvertedType::type converted, PARQUET_EXPORT SortOrder::type GetSortOrder( const std::shared_ptr& logical_type, Type::type primitive); +namespace internal { + +PARQUET_EXPORT +int32_t DecimalSize(int32_t precision); + +} // namespace internal } // namespace parquet #endif // PARQUET_TYPES_H diff --git a/dev/lint/run_iwyu.sh b/dev/lint/run_iwyu.sh index edae841fa51..46517cfa91c 100755 --- a/dev/lint/run_iwyu.sh +++ b/dev/lint/run_iwyu.sh @@ -22,6 +22,7 @@ mkdir -p /build/lint pushd /build/lint cmake -GNinja \ + -DCMAKE_BUILD_TYPE=debug \ -DARROW_FLIGHT=ON \ -DARROW_GANDIVA=ON \ -DARROW_PARQUET=ON \ diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd index 32ee1ad1a0e..82ca9fbb33e 100644 --- a/python/pyarrow/_parquet.pxd +++ b/python/pyarrow/_parquet.pxd @@ -327,6 +327,15 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil: ReaderProperties default_reader_properties() + cdef cppclass ArrowReaderProperties: + ArrowReaderProperties() + void set_read_dictionary(int column_index, c_bool read_dict) + c_bool read_dictionary() + void set_batch_size() + int64_t batch_size() + + ArrowReaderProperties default_arrow_reader_properties() + cdef cppclass ParquetFileReader: shared_ptr[CFileMetaData] metadata() @@ -348,17 +357,19 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil: Builder* write_batch_size(int64_t batch_size) shared_ptr[WriterProperties] build() + cdef cppclass ArrowWriterProperties: + cppclass Builder: + Builder() + Builder* disable_deprecated_int96_timestamps() + Builder* enable_deprecated_int96_timestamps() + Builder* coerce_timestamps(TimeUnit unit) + Builder* allow_truncated_timestamps() + Builder* disallow_truncated_timestamps() + shared_ptr[ArrowWriterProperties] build() + c_bool support_deprecated_int96_timestamps() -cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil: - cdef cppclass ArrowReaderProperties: - ArrowReaderProperties() - void set_read_dictionary(int column_index, c_bool read_dict) - c_bool read_dictionary() - void set_batch_size() - int64_t batch_size() - - ArrowReaderProperties default_arrow_reader_properties() +cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil: cdef cppclass FileReader: FileReader(CMemoryPool* pool, unique_ptr[ParquetFileReader] reader) CStatus ReadColumn(int i, shared_ptr[CChunkedArray]* out) @@ -422,17 +433,6 @@ cdef extern from "parquet/arrow/writer.h" namespace "parquet::arrow" nogil: const shared_ptr[CFileMetaData] metadata() const - cdef cppclass ArrowWriterProperties: - cppclass Builder: - Builder() - Builder* disable_deprecated_int96_timestamps() - Builder* enable_deprecated_int96_timestamps() - Builder* coerce_timestamps(TimeUnit unit) - Builder* allow_truncated_timestamps() - Builder* disallow_truncated_timestamps() - shared_ptr[ArrowWriterProperties] build() - c_bool support_deprecated_int96_timestamps() - CStatus WriteMetaDataFile( const CFileMetaData& file_metadata, const OutputStream* sink) diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 8cfaa7714ea..5af51c59677 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -2754,7 +2754,7 @@ RcppExport SEXP _arrow_ipc___ReadMessage(SEXP stream_sexp){ // parquet.cpp #if defined(ARROW_R_WITH_ARROW) -std::shared_ptr parquet___arrow___ArrowReaderProperties__Make(bool use_threads); +std::shared_ptr parquet___arrow___ArrowReaderProperties__Make(bool use_threads); RcppExport SEXP _arrow_parquet___arrow___ArrowReaderProperties__Make(SEXP use_threads_sexp){ BEGIN_RCPP Rcpp::traits::input_parameter::type use_threads(use_threads_sexp); @@ -2769,10 +2769,10 @@ RcppExport SEXP _arrow_parquet___arrow___ArrowReaderProperties__Make(SEXP use_th // parquet.cpp #if defined(ARROW_R_WITH_ARROW) -void parquet___arrow___ArrowReaderProperties__set_use_threads(const std::shared_ptr& properties, bool use_threads); +void parquet___arrow___ArrowReaderProperties__set_use_threads(const std::shared_ptr& properties, bool use_threads); RcppExport SEXP _arrow_parquet___arrow___ArrowReaderProperties__set_use_threads(SEXP properties_sexp, SEXP use_threads_sexp){ BEGIN_RCPP - Rcpp::traits::input_parameter&>::type properties(properties_sexp); + Rcpp::traits::input_parameter&>::type properties(properties_sexp); Rcpp::traits::input_parameter::type use_threads(use_threads_sexp); parquet___arrow___ArrowReaderProperties__set_use_threads(properties, use_threads); return R_NilValue; @@ -2786,10 +2786,10 @@ RcppExport SEXP _arrow_parquet___arrow___ArrowReaderProperties__set_use_threads( // parquet.cpp #if defined(ARROW_R_WITH_ARROW) -bool parquet___arrow___ArrowReaderProperties__get_use_threads(const std::shared_ptr& properties, bool use_threads); +bool parquet___arrow___ArrowReaderProperties__get_use_threads(const std::shared_ptr& properties, bool use_threads); RcppExport SEXP _arrow_parquet___arrow___ArrowReaderProperties__get_use_threads(SEXP properties_sexp, SEXP use_threads_sexp){ BEGIN_RCPP - Rcpp::traits::input_parameter&>::type properties(properties_sexp); + Rcpp::traits::input_parameter&>::type properties(properties_sexp); Rcpp::traits::input_parameter::type use_threads(use_threads_sexp); return Rcpp::wrap(parquet___arrow___ArrowReaderProperties__get_use_threads(properties, use_threads)); END_RCPP @@ -2802,10 +2802,10 @@ RcppExport SEXP _arrow_parquet___arrow___ArrowReaderProperties__get_use_threads( // parquet.cpp #if defined(ARROW_R_WITH_ARROW) -bool parquet___arrow___ArrowReaderProperties__get_read_dictionary(const std::shared_ptr& properties, int column_index); +bool parquet___arrow___ArrowReaderProperties__get_read_dictionary(const std::shared_ptr& properties, int column_index); RcppExport SEXP _arrow_parquet___arrow___ArrowReaderProperties__get_read_dictionary(SEXP properties_sexp, SEXP column_index_sexp){ BEGIN_RCPP - Rcpp::traits::input_parameter&>::type properties(properties_sexp); + Rcpp::traits::input_parameter&>::type properties(properties_sexp); Rcpp::traits::input_parameter::type column_index(column_index_sexp); return Rcpp::wrap(parquet___arrow___ArrowReaderProperties__get_read_dictionary(properties, column_index)); END_RCPP @@ -2818,10 +2818,10 @@ RcppExport SEXP _arrow_parquet___arrow___ArrowReaderProperties__get_read_diction // parquet.cpp #if defined(ARROW_R_WITH_ARROW) -void parquet___arrow___ArrowReaderProperties__set_read_dictionary(const std::shared_ptr& properties, int column_index, bool read_dict); +void parquet___arrow___ArrowReaderProperties__set_read_dictionary(const std::shared_ptr& properties, int column_index, bool read_dict); RcppExport SEXP _arrow_parquet___arrow___ArrowReaderProperties__set_read_dictionary(SEXP properties_sexp, SEXP column_index_sexp, SEXP read_dict_sexp){ BEGIN_RCPP - Rcpp::traits::input_parameter&>::type properties(properties_sexp); + Rcpp::traits::input_parameter&>::type properties(properties_sexp); Rcpp::traits::input_parameter::type column_index(column_index_sexp); Rcpp::traits::input_parameter::type read_dict(read_dict_sexp); parquet___arrow___ArrowReaderProperties__set_read_dictionary(properties, column_index, read_dict); @@ -2836,11 +2836,11 @@ RcppExport SEXP _arrow_parquet___arrow___ArrowReaderProperties__set_read_diction // parquet.cpp #if defined(ARROW_R_WITH_ARROW) -std::unique_ptr parquet___arrow___FileReader__OpenFile(const std::shared_ptr& file, const std::shared_ptr& props); +std::unique_ptr parquet___arrow___FileReader__OpenFile(const std::shared_ptr& file, const std::shared_ptr& props); RcppExport SEXP _arrow_parquet___arrow___FileReader__OpenFile(SEXP file_sexp, SEXP props_sexp){ BEGIN_RCPP Rcpp::traits::input_parameter&>::type file(file_sexp); - Rcpp::traits::input_parameter&>::type props(props_sexp); + Rcpp::traits::input_parameter&>::type props(props_sexp); return Rcpp::wrap(parquet___arrow___FileReader__OpenFile(file, props)); END_RCPP } diff --git a/r/src/parquet.cpp b/r/src/parquet.cpp index 5124e9e32cc..692916bb963 100644 --- a/r/src/parquet.cpp +++ b/r/src/parquet.cpp @@ -24,35 +24,35 @@ #include // [[arrow::export]] -std::shared_ptr +std::shared_ptr parquet___arrow___ArrowReaderProperties__Make(bool use_threads) { - return std::make_shared(use_threads); + return std::make_shared(use_threads); } // [[arrow::export]] void parquet___arrow___ArrowReaderProperties__set_use_threads( - const std::shared_ptr& properties, + const std::shared_ptr& properties, bool use_threads) { properties->set_use_threads(use_threads); } // [[arrow::export]] bool parquet___arrow___ArrowReaderProperties__get_use_threads( - const std::shared_ptr& properties, + const std::shared_ptr& properties, bool use_threads) { return properties->use_threads(); } // [[arrow::export]] bool parquet___arrow___ArrowReaderProperties__get_read_dictionary( - const std::shared_ptr& properties, + const std::shared_ptr& properties, int column_index) { return properties->read_dictionary(column_index); } // [[arrow::export]] void parquet___arrow___ArrowReaderProperties__set_read_dictionary( - const std::shared_ptr& properties, + const std::shared_ptr& properties, int column_index, bool read_dict) { properties->set_read_dictionary(column_index, read_dict); } @@ -60,10 +60,11 @@ void parquet___arrow___ArrowReaderProperties__set_read_dictionary( // [[arrow::export]] std::unique_ptr parquet___arrow___FileReader__OpenFile( const std::shared_ptr& file, - const std::shared_ptr& props) { + const std::shared_ptr& props) { std::unique_ptr reader; - PARQUET_THROW_NOT_OK( - parquet::arrow::OpenFile(file, arrow::default_memory_pool(), *props, &reader)); + parquet::arrow::FileReaderBuilder builder; + PARQUET_THROW_NOT_OK(builder.Open(file)); + PARQUET_THROW_NOT_OK(builder.properties(*props)->Build(&reader)); return reader; }