Skip to content
Open
3 changes: 3 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,9 @@ This product includes code from Apache Iceberg C++.
* .devcontainer/devcontainer.json.template
* CI utilities:
* .pre-commit-config.yaml
* Avro direct decoder/encoder:
* src/paimon/format/avro/avro_direct_decoder.cpp
* src/paimon/format/avro/avro_direct_decoder.h

Copyright: 2024-2025 The Apache Software Foundation.
Home page: https://iceberg.apache.org/
Expand Down
1 change: 1 addition & 0 deletions src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ if(PAIMON_BUILD_TESTS)
common/types/data_type_json_parser_test.cpp
common/types/row_kind_test.cpp
common/types/data_type_test.cpp
common/utils/arrow/arrow_utils_test.cpp
common/utils/arrow/mem_utils_test.cpp
common/utils/arrow/status_utils_test.cpp
common/utils/concurrent_hash_map_test.cpp
Expand Down
41 changes: 34 additions & 7 deletions src/paimon/common/utils/arrow/arrow_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,41 @@

#pragma once

#include "arrow/c/helpers.h"
#include <vector>

#include "arrow/api.h"
#include "fmt/format.h"
#include "paimon/result.h"

namespace paimon {
inline void ArrowArrayInit(struct ArrowArray* array) {
ArrowArrayMarkReleased(array);
}

inline void ArrowSchemaInit(struct ArrowSchema* schema) {
ArrowSchemaMarkReleased(schema);
}
class ArrowUtils {
public:
ArrowUtils() = delete;
~ArrowUtils() = delete;

static Result<std::shared_ptr<arrow::Schema>> DataTypeToSchema(
const std::shared_ptr<::arrow::DataType>& data_type) {
if (data_type->id() != arrow::Type::STRUCT) {
return Status::Invalid(fmt::format("Expected struct data type, actual data type: {}",
data_type->ToString()));
}
const auto& struct_type = std::static_pointer_cast<arrow::StructType>(data_type);
return std::make_shared<arrow::Schema>(struct_type->fields());
}

static std::vector<int32_t> CreateProjection(
const std::shared_ptr<::arrow::Schema>& file_schema,
const arrow::FieldVector& read_fields) {
std::vector<int32_t> target_to_src_mapping;
target_to_src_mapping.reserve(read_fields.size());
for (const auto& field : read_fields) {
auto src_field_idx = file_schema->GetFieldIndex(field->name());
assert(src_field_idx >= 0);
target_to_src_mapping.push_back(src_field_idx);
}
return target_to_src_mapping;
}
};

} // namespace paimon
47 changes: 47 additions & 0 deletions src/paimon/common/utils/arrow/arrow_utils_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2026-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "paimon/common/utils/arrow/arrow_utils.h"

#include "arrow/api.h"
#include "gtest/gtest.h"
#include "paimon/common/types/data_field.h"

namespace paimon::test {

TEST(ArrowUtilsTest, TestCreateProjection) {
std::vector<DataField> read_fields = {DataField(1, arrow::field("k1", arrow::int32())),
DataField(3, arrow::field("p1", arrow::int32())),
DataField(5, arrow::field("s1", arrow::utf8())),
DataField(6, arrow::field("v0", arrow::float64())),
DataField(7, arrow::field("v1", arrow::boolean()))};
auto read_schema = DataField::ConvertDataFieldsToArrowSchema(read_fields);

std::vector<DataField> file_fields = {DataField(0, arrow::field("k0", arrow::int32())),
DataField(1, arrow::field("k1", arrow::int32())),
DataField(3, arrow::field("p1", arrow::int32())),
DataField(5, arrow::field("s1", arrow::utf8())),
DataField(6, arrow::field("v0", arrow::float64())),
DataField(7, arrow::field("v1", arrow::boolean())),
DataField(4, arrow::field("s0", arrow::utf8()))};
auto file_schema = DataField::ConvertDataFieldsToArrowSchema(file_fields);

auto projection = ArrowUtils::CreateProjection(file_schema, read_schema->fields());
std::vector<int32_t> expected_projection = {1, 2, 3, 4, 5};
ASSERT_EQ(projection, expected_projection);
}

} // namespace paimon::test
2 changes: 1 addition & 1 deletion src/paimon/common/utils/status.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ void Status::AddContextLine(const char* filename, int line, const char* function
const char* expr) {
assert(!ok() && "Cannot add context line to ok status");
std::stringstream ss;
ss << "\nIn " << filename << ", line " << line << ", function: " << function_name
ss << "\nIn " << filename << ":" << line << ", function: " << function_name
<< ", code: " << expr;
state_->msg += ss.str();
}
Expand Down
2 changes: 1 addition & 1 deletion src/paimon/core/io/row_to_arrow_array_converter.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ Status RowToArrowArrayConverter<T, R>::Reserve(arrow::ArrayBuilder* array_builde
PAIMON_ASSIGN_OR_RAISE(auto* struct_builder,
CastToTypedBuilder<arrow::StructBuilder>(array_builder));
for (int32_t i = 0; i < struct_builder->num_fields(); i++) {
// reserve item builder in map
// reserve item builder in struct
PAIMON_RETURN_NOT_OK(Reserve(struct_builder->field_builder(i), idx));
}
break;
Expand Down
2 changes: 1 addition & 1 deletion src/paimon/core/io/single_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ Status SingleFileWriter<T, R>::Write(T record) {
}
} else {
ArrowArray array;
ArrowArrayInit(&array);
ArrowArrayMarkReleased(&array); // reset array
ScopeGuard inner_guard([&array]() { ArrowArrayRelease(&array); });
PAIMON_RETURN_NOT_OK(converter_(std::move(record), &array));
record_count = array.length;
Expand Down
4 changes: 3 additions & 1 deletion src/paimon/core/operation/abstract_split_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ Result<std::unique_ptr<FileBatchReader>> AbstractSplitRead::CreateFileBatchReade
// lance do not support stream build with input stream
return reader_builder->Build(data_file_path);
}
if (context_->EnablePrefetch() && file_format_identifier != "blob") {
// TODO(zhanyu.fyh): orc format support prefetch
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This TODO is already resolved.

if (context_->EnablePrefetch() && file_format_identifier != "blob" &&
file_format_identifier != "avro") {
PAIMON_ASSIGN_OR_RAISE(
std::unique_ptr<PrefetchFileBatchReaderImpl> prefetch_reader,
PrefetchFileBatchReaderImpl::Create(
Expand Down
17 changes: 3 additions & 14 deletions src/paimon/core/operation/merge_file_split_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "paimon/common/reader/concat_batch_reader.h"
#include "paimon/common/table/special_fields.h"
#include "paimon/common/types/data_field.h"
#include "paimon/common/utils/arrow/arrow_utils.h"
#include "paimon/common/utils/arrow/status_utils.h"
#include "paimon/core/core_options.h"
#include "paimon/core/deletionvectors/apply_deletion_vector_batch_reader.h"
Expand Down Expand Up @@ -105,7 +106,8 @@ Result<std::unique_ptr<MergeFileSplitRead>> MergeFileSplitRead::Create(
int32_t key_arity = trimmed_primary_key.size();

// projection is the mapping from value_schema in KeyValue object to raw_read_schema
std::vector<int32_t> projection = CreateProjection(context->GetReadSchema(), value_schema);
std::vector<int32_t> projection =
ArrowUtils::CreateProjection(value_schema, context->GetReadSchema()->fields());

return std::unique_ptr<MergeFileSplitRead>(new MergeFileSplitRead(
path_factory, context,
Expand Down Expand Up @@ -368,19 +370,6 @@ Result<std::shared_ptr<Predicate>> MergeFileSplitRead::GenerateKeyPredicates(
return PredicateUtils::ExcludePredicateWithFields(predicate, non_primary_keys);
}

std::vector<int32_t> MergeFileSplitRead::CreateProjection(
const std::shared_ptr<arrow::Schema>& raw_read_schema,
const std::shared_ptr<arrow::Schema>& value_schema) {
std::vector<int32_t> target_to_src_mapping;
target_to_src_mapping.reserve(raw_read_schema->num_fields());
for (const auto& field : raw_read_schema->fields()) {
auto src_field_idx = value_schema->GetFieldIndex(field->name());
assert(src_field_idx >= 0);
target_to_src_mapping.push_back(src_field_idx);
}
return target_to_src_mapping;
}

Result<std::unique_ptr<BatchReader>> MergeFileSplitRead::CreateReaderForSection(
const std::vector<SortedRun>& section, const std::string& bucket_path,
const BinaryRow& partition,
Expand Down
4 changes: 0 additions & 4 deletions src/paimon/core/operation/merge_file_split_read.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,6 @@ class MergeFileSplitRead : public AbstractSplitRead {
static Result<std::shared_ptr<Predicate>> GenerateKeyPredicates(
const std::shared_ptr<Predicate>& predicate, const TableSchema& table_schema);

static std::vector<int32_t> CreateProjection(
const std::shared_ptr<arrow::Schema>& raw_read_schema,
const std::shared_ptr<arrow::Schema>& value_schema);

private:
int32_t key_arity_;
// schema of value member in KeyValue object
Expand Down
22 changes: 0 additions & 22 deletions src/paimon/core/operation/merge_file_split_read_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -553,28 +553,6 @@ TEST_F(MergeFileSplitReadTest, TestGenerateKeyPredicates2) {
ASSERT_FALSE(result);
}

TEST_F(MergeFileSplitReadTest, TestCreateProjection) {
std::vector<DataField> raw_read_fields = {DataField(1, arrow::field("k1", arrow::int32())),
DataField(3, arrow::field("p1", arrow::int32())),
DataField(5, arrow::field("s1", arrow::utf8())),
DataField(6, arrow::field("v0", arrow::float64())),
DataField(7, arrow::field("v1", arrow::boolean()))};
auto raw_read_schema = DataField::ConvertDataFieldsToArrowSchema(raw_read_fields);

std::vector<DataField> value_fields = {DataField(0, arrow::field("k0", arrow::int32())),
DataField(1, arrow::field("k1", arrow::int32())),
DataField(3, arrow::field("p1", arrow::int32())),
DataField(5, arrow::field("s1", arrow::utf8())),
DataField(6, arrow::field("v0", arrow::float64())),
DataField(7, arrow::field("v1", arrow::boolean())),
DataField(4, arrow::field("s0", arrow::utf8()))};
auto value_schema = DataField::ConvertDataFieldsToArrowSchema(value_fields);

auto projection = MergeFileSplitRead::CreateProjection(raw_read_schema, value_schema);
std::vector<int32_t> expected_projection = {1, 2, 3, 4, 5};
ASSERT_EQ(projection, expected_projection);
}

TEST_P(MergeFileSplitReadTest, TestSimple) {
std::string path =
paimon::test::GetDataDir() + "/parquet/pk_table_with_mor.db/pk_table_with_mor";
Expand Down
6 changes: 1 addition & 5 deletions src/paimon/format/avro/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@ if(PAIMON_ENABLE_AVRO)

set(PAIMON_AVRO_FILE_FORMAT
avro_adaptor.cpp
avro_array_data_getter.cpp
avro_direct_decoder.cpp
avro_file_batch_reader.cpp
avro_file_format.cpp
avro_file_format_factory.cpp
avro_format_writer.cpp
avro_input_stream_impl.cpp
avro_output_stream_impl.cpp
avro_record_converter.cpp
avro_record_data_getter.cpp
avro_schema_converter.cpp)

add_paimon_lib(paimon_avro_file_format
Expand Down Expand Up @@ -55,10 +53,8 @@ if(PAIMON_ENABLE_AVRO)
avro_file_batch_reader_test.cpp
avro_file_format_test.cpp
avro_input_stream_impl_test.cpp
avro_record_converter_test.cpp
avro_schema_converter_test.cpp
avro_writer_builder_test.cpp
avro_array_data_getter_test.cpp
EXTRA_INCLUDES
${AVRO_INCLUDE_DIR}
STATIC_LINK_LIBS
Expand Down
12 changes: 0 additions & 12 deletions src/paimon/format/avro/avro_adaptor_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
#include "gtest/gtest.h"
#include "paimon/common/utils/arrow/mem_utils.h"
#include "paimon/core/utils/manifest_meta_reader.h"
#include "paimon/format/avro/avro_record_converter.h"
#include "paimon/format/avro/avro_schema_converter.h"
#include "paimon/memory/memory_pool.h"
#include "paimon/status.h"
Expand Down Expand Up @@ -62,17 +61,6 @@ TEST(AvroAdaptorTest, Simple) {
ASSERT_OK_AND_ASSIGN(std::vector<::avro::GenericDatum> datums,
adaptor.ConvertArrayToGenericDatums(array, avro_schema));
ASSERT_EQ(4, datums.size());
ASSERT_OK_AND_ASSIGN(auto record_converter,
AvroRecordConverter::Create(data_type, GetDefaultPool()));
auto read_batch_result = record_converter->NextBatch(datums);
ASSERT_OK(read_batch_result);
auto [c_array, c_schema] = std::move(read_batch_result).value();

auto arrow_array = arrow::ImportArray(c_array.get(), c_schema.get()).ValueOrDie();
auto arrow_pool = GetArrowPool(GetDefaultPool());
ASSERT_OK_AND_ASSIGN(arrow_array, ManifestMetaReader::AlignArrayWithSchema(
arrow_array, data_type, arrow_pool.get()));
ASSERT_TRUE(array->Equals(arrow_array));
}

} // namespace paimon::avro::test
91 changes: 0 additions & 91 deletions src/paimon/format/avro/avro_array_data_getter.cpp

This file was deleted.

Loading
Loading