Skip to content
This repository was archived by the owner on May 10, 2024. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions cmake_modules/FindClangTools.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -27,34 +27,33 @@
# This module defines
# CLANG_TIDY_BIN, The path to the clang tidy binary
# CLANG_TIDY_FOUND, Whether clang tidy was found
# CLANG_FORMAT_BIN, The path to the clang format binary
# CLANG_FORMAT_BIN, The path to the clang format binary
# CLANG_TIDY_FOUND, Whether clang format was found

find_program(CLANG_TIDY_BIN
NAMES clang-tidy-3.8 clang-tidy-3.7 clang-tidy-3.6 clang-tidy
PATHS ${ClangTools_PATH} $ENV{CLANG_TOOLS_PATH} /usr/local/bin /usr/bin
find_program(CLANG_TIDY_BIN
NAMES clang-tidy-3.9 clang-tidy-3.8 clang-tidy-3.7 clang-tidy-3.6 clang-tidy
PATHS ${ClangTools_PATH} $ENV{CLANG_TOOLS_PATH} /usr/local/bin /usr/bin
NO_DEFAULT_PATH
)

if ( "${CLANG_TIDY_BIN}" STREQUAL "CLANG_TIDY_BIN-NOTFOUND" )
if ( "${CLANG_TIDY_BIN}" STREQUAL "CLANG_TIDY_BIN-NOTFOUND" )
set(CLANG_TIDY_FOUND 0)
message("clang-tidy not found")
else()
set(CLANG_TIDY_FOUND 1)
message("clang-tidy found at ${CLANG_TIDY_BIN}")
endif()

find_program(CLANG_FORMAT_BIN
NAMES clang-format-3.8 clang-format-3.7 clang-format-3.6 clang-format
PATHS ${ClangTools_PATH} $ENV{CLANG_TOOLS_PATH} /usr/local/bin /usr/bin
find_program(CLANG_FORMAT_BIN
NAMES clang-format-3.9 clang-format-3.8 clang-format-3.7 clang-format-3.6 clang-format
PATHS ${ClangTools_PATH} $ENV{CLANG_TOOLS_PATH} /usr/local/bin /usr/bin
NO_DEFAULT_PATH
)

if ( "${CLANG_FORMAT_BIN}" STREQUAL "CLANG_FORMAT_BIN-NOTFOUND" )
if ( "${CLANG_FORMAT_BIN}" STREQUAL "CLANG_FORMAT_BIN-NOTFOUND" )
set(CLANG_FORMAT_FOUND 0)
message("clang-format not found")
else()
set(CLANG_FORMAT_FOUND 1)
message("clang-format found at ${CLANG_FORMAT_BIN}")
endif()

146 changes: 92 additions & 54 deletions src/parquet/arrow/arrow-reader-writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,36 @@ using ParquetDataType = DataType<test_traits<T>::parquet_enum>;
template <typename T>
using ParquetWriter = TypedColumnWriter<ParquetDataType<T>>;

void WriteTableToBuffer(const std::shared_ptr<Table>& table, int num_threads,
int64_t row_group_size, std::shared_ptr<Buffer>* out) {
auto sink = std::make_shared<InMemoryOutputStream>();

ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), sink,
row_group_size, default_writer_properties()));
*out = sink->GetBuffer();
}

void DoSimpleRoundtrip(const std::shared_ptr<Table>& table, int num_threads,
int64_t row_group_size, const std::vector<int>& column_subset,
std::shared_ptr<Table>* out) {
std::shared_ptr<Buffer> buffer;
WriteTableToBuffer(table, num_threads, row_group_size, &buffer);

std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(
OpenFile(std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(),
::parquet::default_reader_properties(), nullptr, &reader));

reader->set_num_threads(num_threads);

if (column_subset.size() > 0) {
ASSERT_OK_NO_THROW(reader->ReadTable(column_subset, out));
} else {
// Read everything
ASSERT_OK_NO_THROW(reader->ReadTable(out));
}
}

template <typename TestType>
class TestParquetIO : public ::testing::Test {
public:
Expand Down Expand Up @@ -248,19 +278,6 @@ class TestParquetIO : public ::testing::Test {
ASSERT_NE(nullptr, out->get());
}

void ReadAndCheckSingleColumnTable(const std::shared_ptr<::arrow::Array>& values) {
std::shared_ptr<::arrow::Table> out;
std::unique_ptr<FileReader> reader;
ReaderFromSink(&reader);
ReadTableFromFile(std::move(reader), &out);
ASSERT_EQ(1, out->num_columns());
ASSERT_EQ(values->length(), out->num_rows());

std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
ASSERT_EQ(1, chunked_array->num_chunks());
ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
}

void PrepareListTable(int64_t size, bool nullable_lists, bool nullable_elements,
int64_t null_count, std::shared_ptr<Table>* out) {
std::shared_ptr<Array> values;
Expand Down Expand Up @@ -289,13 +306,23 @@ class TestParquetIO : public ::testing::Test {
*out = MakeSimpleTable(parent_lists, nullable_parent_lists);
}

void WriteReadAndCheckSingleColumnTable(const std::shared_ptr<Table>& table) {
std::shared_ptr<Array> values = table->column(0)->data()->chunk(0);
this->sink_ = std::make_shared<InMemoryOutputStream>();
ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_,
values->length(), default_writer_properties()));
void ReadAndCheckSingleColumnTable(const std::shared_ptr<::arrow::Array>& values) {
std::shared_ptr<::arrow::Table> out;
std::unique_ptr<FileReader> reader;
ReaderFromSink(&reader);
ReadTableFromFile(std::move(reader), &out);
ASSERT_EQ(1, out->num_columns());
ASSERT_EQ(values->length(), out->num_rows());

std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
ASSERT_EQ(1, chunked_array->num_chunks());
ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
}

this->ReadAndCheckSingleColumnTable(values);
void CheckRoundTrip(const std::shared_ptr<Table>& table) {
std::shared_ptr<Table> result;
DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result);
ASSERT_TRUE(table->Equals(*result));
}

template <typename ArrayType>
Expand Down Expand Up @@ -401,37 +428,37 @@ TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) {

ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
std::shared_ptr<Table> table = MakeSimpleTable(values, true);
this->WriteReadAndCheckSingleColumnTable(table);
this->CheckRoundTrip(table);
}

TYPED_TEST(TestParquetIO, SingleNullableListNullableColumnReadWrite) {
std::shared_ptr<Table> table;
this->PrepareListTable(SMALL_SIZE, true, true, 10, &table);
this->WriteReadAndCheckSingleColumnTable(table);
this->CheckRoundTrip(table);
}

TYPED_TEST(TestParquetIO, SingleRequiredListNullableColumnReadWrite) {
std::shared_ptr<Table> table;
this->PrepareListTable(SMALL_SIZE, false, true, 10, &table);
this->WriteReadAndCheckSingleColumnTable(table);
this->CheckRoundTrip(table);
}

TYPED_TEST(TestParquetIO, SingleNullableListRequiredColumnReadWrite) {
std::shared_ptr<Table> table;
this->PrepareListTable(SMALL_SIZE, true, false, 10, &table);
this->WriteReadAndCheckSingleColumnTable(table);
this->CheckRoundTrip(table);
}

TYPED_TEST(TestParquetIO, SingleRequiredListRequiredColumnReadWrite) {
std::shared_ptr<Table> table;
this->PrepareListTable(SMALL_SIZE, false, false, 0, &table);
this->WriteReadAndCheckSingleColumnTable(table);
this->CheckRoundTrip(table);
}

TYPED_TEST(TestParquetIO, SingleNullableListRequiredListRequiredColumnReadWrite) {
std::shared_ptr<Table> table;
this->PrepareListOfListTable(SMALL_SIZE, true, false, false, 0, &table);
this->WriteReadAndCheckSingleColumnTable(table);
this->CheckRoundTrip(table);
}

TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) {
Expand Down Expand Up @@ -756,18 +783,24 @@ TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead) {
this->CheckSingleColumnRequiredTableRead(4);
}

void MakeDoubleTable(int num_columns, int num_rows, std::shared_ptr<Table>* out) {
void MakeDoubleTable(
int num_columns, int num_rows, int nchunks, std::shared_ptr<Table>* out) {
std::shared_ptr<::arrow::Column> column;
std::vector<std::shared_ptr<::arrow::Column>> columns(num_columns);
std::vector<std::shared_ptr<::arrow::Field>> fields(num_columns);

std::shared_ptr<Array> values;
for (int i = 0; i < num_columns; ++i) {
std::vector<std::shared_ptr<Array>> arrays;
std::shared_ptr<Array> values;
ASSERT_OK(NullableArray<::arrow::DoubleType>(
num_rows, num_rows / 10, static_cast<uint32_t>(i), &values));
std::stringstream ss;
ss << "col" << i;
column = MakeColumn(ss.str(), values, true);

for (int j = 0; j < nchunks; ++j) {
arrays.push_back(values);
}
column = MakeColumn(ss.str(), arrays, true);

columns[i] = column;
fields[i] = column->field();
Expand All @@ -776,41 +809,46 @@ void MakeDoubleTable(int num_columns, int num_rows, std::shared_ptr<Table>* out)
*out = std::make_shared<Table>(schema, columns);
}

void DoTableRoundtrip(const std::shared_ptr<Table>& table, int num_threads,
const std::vector<int>& column_subset, std::shared_ptr<Table>* out) {
auto sink = std::make_shared<InMemoryOutputStream>();

ASSERT_OK_NO_THROW(WriteTable(
*table, ::arrow::default_memory_pool(), sink, (table->num_rows() + 1) / 2));
TEST(TestArrowReadWrite, MultithreadedRead) {
const int num_columns = 20;
const int num_rows = 1000;
const int num_threads = 4;

std::shared_ptr<Buffer> buffer = sink->GetBuffer();
std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(
OpenFile(std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(),
::parquet::default_reader_properties(), nullptr, &reader));
std::shared_ptr<Table> table;
MakeDoubleTable(num_columns, num_rows, 1, &table);

reader->set_num_threads(num_threads);
std::shared_ptr<Table> result;
DoSimpleRoundtrip(table, num_threads, table->num_rows(), {}, &result);

if (column_subset.size() > 0) {
ASSERT_OK_NO_THROW(reader->ReadTable(column_subset, out));
} else {
// Read everything
ASSERT_OK_NO_THROW(reader->ReadTable(out));
}
ASSERT_TRUE(table->Equals(*result));
}

TEST(TestArrowReadWrite, MultithreadedRead) {
TEST(TestArrowReadWrite, ReadSingleRowGroup) {
const int num_columns = 20;
const int num_rows = 1000;
const int num_threads = 4;

std::shared_ptr<Table> table;
MakeDoubleTable(num_columns, num_rows, &table);
MakeDoubleTable(num_columns, num_rows, 1, &table);

std::shared_ptr<Table> result;
DoTableRoundtrip(table, num_threads, {}, &result);
std::shared_ptr<Buffer> buffer;
WriteTableToBuffer(table, 1, num_rows / 2, &buffer);

ASSERT_TRUE(table->Equals(*result));
std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(
OpenFile(std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(),
::parquet::default_reader_properties(), nullptr, &reader));

ASSERT_EQ(2, reader->num_row_groups());

std::shared_ptr<Table> r1, r2;
// Read everything
ASSERT_OK_NO_THROW(reader->ReadRowGroup(0, &r1));
ASSERT_OK_NO_THROW(reader->ReadRowGroup(1, &r2));

std::shared_ptr<Table> concatenated;
ASSERT_OK(ConcatenateTables({r1, r2}, &concatenated));

ASSERT_TRUE(table->Equals(*concatenated));
}

TEST(TestArrowReadWrite, ReadColumnSubset) {
Expand All @@ -819,11 +857,11 @@ TEST(TestArrowReadWrite, ReadColumnSubset) {
const int num_threads = 4;

std::shared_ptr<Table> table;
MakeDoubleTable(num_columns, num_rows, &table);
MakeDoubleTable(num_columns, num_rows, 1, &table);

std::shared_ptr<Table> result;
std::vector<int> column_subset = {0, 4, 8, 10};
DoTableRoundtrip(table, num_threads, column_subset, &result);
DoSimpleRoundtrip(table, num_threads, table->num_rows(), column_subset, &result);

std::vector<std::shared_ptr<::arrow::Column>> ex_columns;
std::vector<std::shared_ptr<::arrow::Field>> ex_fields;
Expand Down
Loading