Skip to content
This repository was archived by the owner on May 10, 2024. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -618,8 +618,6 @@ set(LIBPARQUET_SRCS

src/parquet/parquet_constants.cpp
src/parquet/parquet_types.cpp

src/parquet/util/cpu-info.cc
src/parquet/util/memory.cc
)

Expand Down
6 changes: 3 additions & 3 deletions benchmarks/decode_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ class DeltaBitPackEncoder {

uint8_t* Encode(int* encoded_len) {
uint8_t* result = new uint8_t[10 * 1024 * 1024];
int num_mini_blocks = parquet::BitUtil::Ceil(num_values() - 1, mini_block_size_);
int num_mini_blocks = arrow::BitUtil::Ceil(num_values() - 1, mini_block_size_);
uint8_t* mini_block_widths = NULL;

parquet::BitWriter writer(result, 10 * 1024 * 1024);
arrow::BitWriter writer(result, 10 * 1024 * 1024);

// Writer the size of each block. We only use 1 block currently.
writer.PutVlqInt(num_mini_blocks * mini_block_size_);
Expand Down Expand Up @@ -83,7 +83,7 @@ class DeltaBitPackEncoder {

// The bit width for this block is the number of bits needed to store
// (max_delta - min_delta).
int bit_width = parquet::BitUtil::NumRequiredBits(max_delta - min_delta);
int bit_width = arrow::BitUtil::NumRequiredBits(max_delta - min_delta);
mini_block_widths[i] = bit_width;

// Encode this mini blocking using min_delta and bit_width
Expand Down
53 changes: 24 additions & 29 deletions cmake_modules/ThirdpartyToolchain.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -109,41 +109,36 @@ set(LIBS ${LIBS} ${Boost_LIBRARIES})
# ----------------------------------------------------------------------
# ZLIB

set(ZLIB_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/zlib_ep/src/zlib_ep-install")
set(ZLIB_HOME "${ZLIB_PREFIX}")
set(ZLIB_INCLUDE_DIR "${ZLIB_PREFIX}/include")
if (MSVC)
if (${UPPERCASE_BUILD_TYPE} STREQUAL "DEBUG")
set(ZLIB_STATIC_LIB_NAME zlibstaticd.lib)
else()
set(ZLIB_STATIC_LIB_NAME zlibstatic.lib)
endif()
else()
set(ZLIB_STATIC_LIB_NAME libz.a)
endif()
set(ZLIB_STATIC_LIB "${ZLIB_PREFIX}/lib/${ZLIB_STATIC_LIB_NAME}")
set(ZLIB_CMAKE_ARGS -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
-DCMAKE_INSTALL_PREFIX=${ZLIB_PREFIX}
-DCMAKE_C_FLAGS=${EP_C_FLAGS}
-DBUILD_SHARED_LIBS=OFF)
ExternalProject_Add(zlib_ep
URL "http://zlib.net/fossils/zlib-1.2.8.tar.gz"
BUILD_BYPRODUCTS "${ZLIB_STATIC_LIB}"
${ZLIB_BUILD_BYPRODUCTS}
CMAKE_ARGS ${ZLIB_CMAKE_ARGS})

include_directories(SYSTEM ${ZLIB_INCLUDE_DIR})
add_library(zlibstatic STATIC IMPORTED)
set_target_properties(zlibstatic PROPERTIES IMPORTED_LOCATION ${ZLIB_STATIC_LIB})
add_dependencies(zlibstatic zlib_ep)

# ----------------------------------------------------------------------
# Thrift

# find thrift headers and libs
find_package(Thrift)

if (NOT THRIFT_FOUND)
set(ZLIB_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/zlib_ep/src/zlib_ep-install")
set(ZLIB_HOME "${ZLIB_PREFIX}")
set(ZLIB_INCLUDE_DIR "${ZLIB_PREFIX}/include")
if (MSVC)
if (${UPPERCASE_BUILD_TYPE} STREQUAL "DEBUG")
set(ZLIB_STATIC_LIB_NAME zlibstaticd.lib)
else()
set(ZLIB_STATIC_LIB_NAME zlibstatic.lib)
endif()
else()
set(ZLIB_STATIC_LIB_NAME libz.a)
endif()
set(ZLIB_STATIC_LIB "${ZLIB_PREFIX}/lib/${ZLIB_STATIC_LIB_NAME}")
set(ZLIB_CMAKE_ARGS -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
-DCMAKE_INSTALL_PREFIX=${ZLIB_PREFIX}
-DCMAKE_C_FLAGS=${EP_C_FLAGS}
-DBUILD_SHARED_LIBS=OFF)
ExternalProject_Add(zlib_ep
URL "http://zlib.net/fossils/zlib-1.2.8.tar.gz"
BUILD_BYPRODUCTS "${ZLIB_STATIC_LIB}"
${ZLIB_BUILD_BYPRODUCTS}
CMAKE_ARGS ${ZLIB_CMAKE_ARGS})

set(THRIFT_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/thrift_ep/src/thrift_ep-install")
set(THRIFT_HOME "${THRIFT_PREFIX}")
set(THRIFT_INCLUDE_DIR "${THRIFT_PREFIX}/include")
Expand Down Expand Up @@ -341,7 +336,7 @@ if (NOT ARROW_FOUND)
-DARROW_BUILD_TESTS=OFF)

if ("$ENV{PARQUET_ARROW_VERSION}" STREQUAL "")
set(ARROW_VERSION "98f7cac6e162d9775d615d07b9867c1ec0030f82")
set(ARROW_VERSION "a58893882ac8acd1ac4a5036685cbf09a9a09673")
else()
set(ARROW_VERSION "$ENV{PARQUET_ARROW_VERSION}")
endif()
Expand Down
6 changes: 3 additions & 3 deletions src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
#include <thread>
#include <vector>

#include "arrow/api.h"
#include "arrow/util/bit-util.h"

#include "parquet/arrow/schema.h"
#include "parquet/util/bit-util.h"
#include "parquet/util/schema-util.h"

#include "arrow/api.h"

using arrow::Array;
using arrow::BooleanArray;
using arrow::Column;
Expand Down
9 changes: 4 additions & 5 deletions src/parquet/arrow/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@
#include <string>
#include <vector>

#include "parquet/util/bit-util.h"
#include "parquet/util/logging.h"

#include "parquet/arrow/schema.h"

#include "arrow/api.h"
#include "arrow/util/bit-util.h"
#include "arrow/visitor_inline.h"

#include "parquet/arrow/schema.h"
#include "parquet/util/logging.h"

using arrow::Array;
using arrow::BinaryArray;
using arrow::FixedSizeBinaryArray;
Expand Down
7 changes: 4 additions & 3 deletions src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
#include <cstdint>
#include <memory>

#include "arrow/util/rle-encoding.h"

#include "parquet/column_page.h"
#include "parquet/encoding-internal.h"
#include "parquet/properties.h"
#include "parquet/util/rle-encoding.h"

using arrow::MemoryPool;

Expand All @@ -45,7 +46,7 @@ int LevelDecoder::SetData(Encoding::type encoding, int16_t max_level,
num_bytes = *reinterpret_cast<const int32_t*>(data);
const uint8_t* decoder_data = data + sizeof(int32_t);
if (!rle_decoder_) {
rle_decoder_.reset(new RleDecoder(decoder_data, num_bytes, bit_width_));
rle_decoder_.reset(new ::arrow::RleDecoder(decoder_data, num_bytes, bit_width_));
} else {
rle_decoder_->Reset(decoder_data, num_bytes, bit_width_);
}
Expand All @@ -55,7 +56,7 @@ int LevelDecoder::SetData(Encoding::type encoding, int16_t max_level,
num_bytes =
static_cast<int32_t>(BitUtil::Ceil(num_buffered_values * bit_width_, 8));
if (!bit_packed_decoder_) {
bit_packed_decoder_.reset(new BitReader(data, num_bytes));
bit_packed_decoder_.reset(new ::arrow::BitReader(data, num_bytes));
} else {
bit_packed_decoder_->Reset(data, num_bytes);
}
Expand Down
10 changes: 7 additions & 3 deletions src/parquet/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,15 @@
#include "parquet/util/memory.h"
#include "parquet/util/visibility.h"

namespace parquet {
namespace arrow {

class BitReader;
class RleDecoder;

} // namespace arrow

namespace parquet {

class PARQUET_EXPORT LevelDecoder {
public:
LevelDecoder();
Expand All @@ -58,8 +62,8 @@ class PARQUET_EXPORT LevelDecoder {
int bit_width_;
int num_values_remaining_;
Encoding::type encoding_;
std::unique_ptr<RleDecoder> rle_decoder_;
std::unique_ptr<BitReader> bit_packed_decoder_;
std::unique_ptr<::arrow::RleDecoder> rle_decoder_;
std::unique_ptr<::arrow::BitReader> bit_packed_decoder_;
};

class PARQUET_EXPORT ColumnReader {
Expand Down
7 changes: 6 additions & 1 deletion src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@

#include "parquet/column_writer.h"

#include "arrow/util/bit-util.h"
#include "arrow/util/rle-encoding.h"

#include "parquet/encoding-internal.h"
#include "parquet/properties.h"
#include "parquet/statistics.h"
#include "parquet/util/logging.h"
#include "parquet/util/memory.h"
#include "parquet/util/rle-encoding.h"

namespace parquet {

using BitWriter = ::arrow::BitWriter;
using RleEncoder = ::arrow::RleEncoder;

LevelEncoder::LevelEncoder() {}
LevelEncoder::~LevelEncoder() {}

Expand Down
10 changes: 7 additions & 3 deletions src/parquet/column_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,15 @@
#include "parquet/util/memory.h"
#include "parquet/util/visibility.h"

namespace parquet {
namespace arrow {

class BitWriter;
class RleEncoder;

} // namespace arrow

namespace parquet {

class PARQUET_EXPORT LevelEncoder {
public:
LevelEncoder();
Expand All @@ -61,8 +65,8 @@ class PARQUET_EXPORT LevelEncoder {
int bit_width_;
int rle_length_;
Encoding::type encoding_;
std::unique_ptr<RleEncoder> rle_encoder_;
std::unique_ptr<BitWriter> bit_packed_encoder_;
std::unique_ptr<::arrow::RleEncoder> rle_encoder_;
std::unique_ptr<::arrow::BitWriter> bit_packed_encoder_;
};

static constexpr int WRITE_BATCH_SIZE = 1000;
Expand Down
45 changes: 24 additions & 21 deletions src/parquet/encoding-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,23 @@
#include <memory>
#include <vector>

#include <arrow/util/bit-util.h>
#include "arrow/util/bit-stream-utils.h"
#include "arrow/util/bit-util.h"
#include "arrow/util/cpu-info.h"
#include "arrow/util/hash-util.h"
#include "arrow/util/rle-encoding.h"

#include "parquet/encoding.h"
#include "parquet/exception.h"
#include "parquet/schema.h"
#include "parquet/types.h"
#include "parquet/util/bit-stream-utils.inline.h"
#include "parquet/util/bit-util.h"
#include "parquet/util/cpu-info.h"
#include "parquet/util/hash-util.h"
#include "parquet/util/memory.h"
#include "parquet/util/rle-encoding.h"

namespace parquet {

namespace BitUtil = ::arrow::BitUtil;
using HashUtil = ::arrow::HashUtil;

class ColumnDescriptor;

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -136,7 +138,7 @@ class PlainDecoder<BooleanType> : public Decoder<BooleanType> {

virtual void SetData(int num_values, const uint8_t* data, int len) {
num_values_ = num_values;
bit_reader_ = BitReader(data, len);
bit_reader_ = ::arrow::BitReader(data, len);
}

// Two flavors of bool decoding
Expand All @@ -161,7 +163,7 @@ class PlainDecoder<BooleanType> : public Decoder<BooleanType> {
}

private:
BitReader bit_reader_;
::arrow::BitReader bit_reader_;
};

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -196,7 +198,7 @@ class PlainEncoder<BooleanType> : public Encoder<BooleanType> {
bits_available_(kInMemoryDefaultCapacity * 8),
bits_buffer_(AllocateBuffer(pool, kInMemoryDefaultCapacity)),
values_sink_(new InMemoryOutputStream(pool)) {
bit_writer_.reset(new BitWriter(
bit_writer_.reset(new ::arrow::BitWriter(
bits_buffer_->mutable_data(), static_cast<int>(bits_buffer_->size())));
}

Expand Down Expand Up @@ -260,7 +262,7 @@ class PlainEncoder<BooleanType> : public Encoder<BooleanType> {

protected:
int bits_available_;
std::unique_ptr<BitWriter> bit_writer_;
std::unique_ptr<::arrow::BitWriter> bit_writer_;
std::shared_ptr<PoolBuffer> bits_buffer_;
std::unique_ptr<InMemoryOutputStream> values_sink_;
};
Expand Down Expand Up @@ -325,21 +327,22 @@ class DictionaryDecoder : public Decoder<Type> {
uint8_t bit_width = *data;
++data;
--len;
idx_decoder_ = RleDecoder(data, len, bit_width);
idx_decoder_ = ::arrow::RleDecoder(data, len, bit_width);
}

int Decode(T* buffer, int max_values) override {
max_values = std::min(max_values, num_values_);
int decoded_values = idx_decoder_.GetBatchWithDict(dictionary_, buffer, max_values);
int decoded_values =
idx_decoder_.GetBatchWithDict(dictionary_.data(), buffer, max_values);
if (decoded_values != max_values) { ParquetException::EofException(); }
num_values_ -= max_values;
return max_values;
}

int DecodeSpaced(T* buffer, int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset) override {
int decoded_values = idx_decoder_.GetBatchWithDictSpaced(
dictionary_, buffer, num_values, null_count, valid_bits, valid_bits_offset);
int decoded_values = idx_decoder_.GetBatchWithDictSpaced(dictionary_.data(), buffer,
num_values, null_count, valid_bits, valid_bits_offset);
if (decoded_values != num_values) { ParquetException::EofException(); }
return decoded_values;
}
Expand All @@ -354,7 +357,7 @@ class DictionaryDecoder : public Decoder<Type> {
// pointers).
std::shared_ptr<PoolBuffer> byte_array_data_;

RleDecoder idx_decoder_;
::arrow::RleDecoder idx_decoder_;
};

template <typename Type>
Expand Down Expand Up @@ -446,7 +449,7 @@ class DictEncoder : public Encoder<DType> {
dict_encoded_size_(0),
type_length_(desc->type_length()) {
hash_slots_.Assign(hash_table_size_, HASH_SLOT_EMPTY);
if (!CpuInfo::initialized()) { CpuInfo::Init(); }
if (!::arrow::CpuInfo::initialized()) { ::arrow::CpuInfo::Init(); }
}

virtual ~DictEncoder() { DCHECK(buffered_indices_.empty()); }
Expand All @@ -464,9 +467,9 @@ class DictEncoder : public Encoder<DType> {
// reserve
// an extra "RleEncoder::MinBufferSize" bytes. These extra bytes won't be used
// but not reserving them would cause the encoder to fail.
return 1 + RleEncoder::MaxBufferSize(
return 1 + ::arrow::RleEncoder::MaxBufferSize(
bit_width(), static_cast<int>(buffered_indices_.size())) +
RleEncoder::MinBufferSize(bit_width());
::arrow::RleEncoder::MinBufferSize(bit_width());
}

/// The minimum bit width required to encode the currently buffered indices.
Expand Down Expand Up @@ -727,7 +730,7 @@ inline int DictEncoder<DType>::WriteIndices(uint8_t* buffer, int buffer_len) {
++buffer;
--buffer_len;

RleEncoder encoder(buffer, buffer_len, bit_width());
::arrow::RleEncoder encoder(buffer, buffer_len, bit_width());
for (int index : buffered_indices_) {
if (!encoder.Put(index)) return -1;
}
Expand Down Expand Up @@ -756,7 +759,7 @@ class DeltaBitPackDecoder : public Decoder<DType> {

virtual void SetData(int num_values, const uint8_t* data, int len) {
num_values_ = num_values;
decoder_ = BitReader(data, len);
decoder_ = ::arrow::BitReader(data, len);
values_current_block_ = 0;
values_current_mini_block_ = 0;
}
Expand Down Expand Up @@ -819,7 +822,7 @@ class DeltaBitPackDecoder : public Decoder<DType> {
return max_values;
}

BitReader decoder_;
::arrow::BitReader decoder_;
int32_t values_current_block_;
int32_t num_mini_blocks_;
uint64_t values_per_mini_block_;
Expand Down
Loading