From 44cc43d4978deb49bbaafcefe14dbf5fe45b1e57 Mon Sep 17 00:00:00 2001 From: GongXun Date: Thu, 28 Aug 2025 14:38:02 +0800 Subject: [PATCH 1/2] feat: use ColumnEncoding_Kind_DIRECT_DELTA as default in offset stream Optimize performance of variable-length column offsets by switching from Zstd to delta encoding. This approach better compresses incremental integer sequences, cutting disk space by more than half while maintaining performance. The following is a comparison of file sizes for different encoding methods on TPC-DS 20G: Name PAX(ZSTD) AOCS_SIZE PAX(Delta) PAX SIZE / AOCS * 100% call_center 12 kB 231 kB 10185 bytes 4.31% catalog_page 499 kB 653 kB 393 kB 60.18% catalog_returns 240 MB 171 MB 178 MB 104.09% catalog_sales 3033 MB 1837 MB 1977 MB 107.63% customer 16 MB 12 MB 12 MB 100.00% customer_address 7008 kB 3161 kB 3115 kB 98.54% customer_demographics 28 MB 8164 kB 9292 kB 113.82% date_dim 3193 kB 1406 kB 1249 kB 88.85% household_demographics 42 kB 248 kB 28 kB 11.29% income_band 1239 bytes 225 kB 1239 bytes 0.54% inventory 36 MB 71 MB 36 MB 50.70% item 3084 kB 2479 kB 2227 kB 89.84% promotion 27 kB 239 kB 18 kB 7.53% reason 2730 bytes 226 kB 2280 bytes 0.99% ship_mode 3894 bytes 227 kB 3315 bytes 1.43% store 23 kB 239 kB 18 kB 7.53% store_returns 400 MB 265 MB 277 MB 104.53% store_sales 4173 MB 2384 MB 2554 MB 107.12% time_dim 1702 kB 819 kB 627 kB 76.56% warehouse 5394 bytes 227 kB 4698 bytes 2.02% web_page 21 kB 236 kB 14 kB 5.93% web_returns 116 MB 83 MB 85 MB 102.41% web_sales 1513 MB 908 MB 982 MB 108.15% --- contrib/pax_storage/.gitignore | 1 + contrib/pax_storage/src/cpp/cmake/pax.cmake | 1 + .../src/cpp/cmake/pax_format.cmake | 1 + contrib/pax_storage/src/cpp/pax_gbench.cc | 302 +++++++++- contrib/pax_storage/src/cpp/pax_gbench.h | 72 +++ .../cpp/storage/columns/pax_compress_bench.cc | 421 ++++++++++++++ .../src/cpp/storage/columns/pax_decoding.cc | 3 +- .../cpp/storage/columns/pax_delta_encoding.cc | 520 ++++++++++++++++++ .../cpp/storage/columns/pax_delta_encoding.h | 135 +++++ .../cpp/storage/columns/pax_dict_encoding.h | 15 +- .../src/cpp/storage/columns/pax_encoding.cc | 4 +- .../src/cpp/storage/columns/pax_encoding.h | 2 + .../columns/pax_encoding_non_fixed_column.cc | 88 ++- .../columns/pax_encoding_non_fixed_column.h | 3 + .../cpp/storage/columns/pax_encoding_test.cc | 92 ++++ .../cpp/storage/columns/pax_rlev2_encoding.h | 4 + .../columns/pax_vec_encoding_column.cc | 2 +- .../storage/columns/pax_vec_encoding_column.h | 3 + .../src/cpp/storage/micro_partition.h | 1 - .../src/cpp/storage/orc/orc_writer.cc | 18 +- contrib/pax_storage/src/cpp/storage/pax.cc | 22 +- .../pax_storage/src/cpp/storage/pax_defined.h | 2 +- .../src/test/regress/expected/gp_toolkit.out | 2 +- 23 files changed, 1653 insertions(+), 61 deletions(-) create mode 100644 contrib/pax_storage/src/cpp/pax_gbench.h create mode 100644 contrib/pax_storage/src/cpp/storage/columns/pax_compress_bench.cc create mode 100644 contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding.cc create mode 100644 contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding.h diff --git a/contrib/pax_storage/.gitignore b/contrib/pax_storage/.gitignore index 51a328f84e0..87aa2a4a742 100644 --- a/contrib/pax_storage/.gitignore +++ b/contrib/pax_storage/.gitignore @@ -12,6 +12,7 @@ Thumbs.db # Temp files dir +bench_data .tmp/** build*/** results/** diff --git a/contrib/pax_storage/src/cpp/cmake/pax.cmake b/contrib/pax_storage/src/cpp/cmake/pax.cmake index 71775bac2dd..099a66f30d8 100644 --- a/contrib/pax_storage/src/cpp/cmake/pax.cmake +++ b/contrib/pax_storage/src/cpp/cmake/pax.cmake @@ -51,6 +51,7 @@ set(pax_storage_src storage/columns/pax_dict_encoding.cc storage/columns/pax_decoding.cc storage/columns/pax_encoding.cc + storage/columns/pax_delta_encoding.cc storage/columns/pax_rlev2_decoding.cc storage/columns/pax_rlev2_encoding.cc storage/columns/pax_vec_bitpacked_column.cc diff --git a/contrib/pax_storage/src/cpp/cmake/pax_format.cmake b/contrib/pax_storage/src/cpp/cmake/pax_format.cmake index 4bdc25671f9..5a12185a0e6 100644 --- a/contrib/pax_storage/src/cpp/cmake/pax_format.cmake +++ b/contrib/pax_storage/src/cpp/cmake/pax_format.cmake @@ -41,6 +41,7 @@ set(pax_storage_src storage/columns/pax_dict_encoding.cc storage/columns/pax_decoding.cc storage/columns/pax_encoding.cc + storage/columns/pax_delta_encoding.cc storage/columns/pax_rlev2_decoding.cc storage/columns/pax_rlev2_encoding.cc storage/columns/pax_vec_column.cc diff --git a/contrib/pax_storage/src/cpp/pax_gbench.cc b/contrib/pax_storage/src/cpp/pax_gbench.cc index 82dbaaa7bb2..b6a0ecb0c76 100644 --- a/contrib/pax_storage/src/cpp/pax_gbench.cc +++ b/contrib/pax_storage/src/cpp/pax_gbench.cc @@ -25,12 +25,310 @@ *------------------------------------------------------------------------- */ +#include "pax_gbench.h" + +#include "comm/cbdb_api.h" + #include -static void example_benchmark(benchmark::State &state) { +#include +#include +#include + +#include "access/paxc_rel_options.h" +#include "comm/cbdb_wrappers.h" +#include "cpp-stub/src/stub.h" +#include "storage/micro_partition_iterator.h" +#include "storage/pax.h" +#include "storage/strategy.h" + +namespace pax::bench { + +// Create memory context for benchmark +void CreateMemoryContext() { + MemoryContext test_memory_context = AllocSetContextCreate( + (MemoryContext)NULL, "TestMemoryContext", 80 * 1024 * 1024, + 80 * 1024 * 1024, 80 * 1024 * 1024); + MemoryContextSwitchTo(test_memory_context); +} + +// Global registry +class BenchmarkRegistry { + private: + std::vector init_functions_; + std::vector cleanup_functions_; + bool initialized_ = false; + + public: + void RegisterInitFunction(InitFunction func) { + init_functions_.push_back(func); + } + + void RegisterCleanupFunction(CleanupFunction func) { + cleanup_functions_.push_back(func); + } + + void RunAllInitFunctions() { + if (initialized_) return; + + printf("Running PAX Benchmark Suite...\n"); + printf("Initializing all benchmark modules...\n\n"); + + for (const auto &func : init_functions_) { + func(); + } + initialized_ = true; + } + + void RunAllCleanupFunctions() { + if (!initialized_) return; + + printf("\nCleaning up all benchmark modules...\n"); + + // Cleanup functions executed in reverse order + for (auto it = cleanup_functions_.rbegin(); it != cleanup_functions_.rend(); + ++it) { + (*it)(); + } + initialized_ = false; + } +}; + +// Global registry access function +BenchmarkRegistry &GetBenchmarkRegistry() { + static BenchmarkRegistry instance; + return instance; +} + +// Registration functions +void RegisterBenchmarkInit(InitFunction func) { + GetBenchmarkRegistry().RegisterInitFunction(func); +} + +void RegisterBenchmarkCleanup(CleanupFunction func) { + GetBenchmarkRegistry().RegisterCleanupFunction(func); +} + +// Global Mock functions for benchmark framework +bool MockMinMaxGetStrategyProcinfo(Oid, Oid, Oid *, FmgrInfo *, + StrategyNumber) { + return false; +} + +int32 MockGetFastSequences(Oid) { + static int32 mock_id = 0; + return mock_id++; +} + +void MockInsertMicroPartitionPlaceHolder(Oid, int) {} +void MockDeleteMicroPartitionEntry(Oid, Snapshot, int) {} +void MockExecStoreVirtualTuple(TupleTableSlot *) {} + +std::string MockBuildPaxDirectoryPath(RelFileNode rnode, BackendId backend_id) { + // Create a simple file path for benchmarks + return std::string("./bench_data"); +} + +std::vector MockGetMinMaxColumnIndexes(Relation) { + return std::vector(); +} + +std::vector MockBloomFilterColumnIndexes(Relation) { + return std::vector(); +} + +std::vector> MockGetRelEncodingOptions( + Relation relation) { + std::vector> encoding_opts; + + // Get number of columns from relation + int num_columns = 10; // default for benchmark + if (relation && relation->rd_att) { + num_columns = relation->rd_att->natts; + } + + // Create encoding options for each column (NO_ENCODED, 0) + for (int i = 0; i < num_columns; i++) { + encoding_opts.emplace_back( + std::make_tuple(ColumnEncoding_Kind_NO_ENCODED, 0)); + } + + return encoding_opts; +} + +// Mock TupleDescInitEntry that doesn't rely on SYSCACHE +void MockTupleDescInitEntry(TupleDesc desc, AttrNumber attributeNumber, + const char *attributeName, Oid oidtypeid, + int32 typmod, int attdim) { + // Basic validation + if (attributeNumber < 1 || attributeNumber > desc->natts) { + return; + } + + Form_pg_attribute att = TupleDescAttr(desc, attributeNumber - 1); + + // Set basic attribute properties + namestrcpy(&(att->attname), attributeName); + att->atttypid = oidtypeid; + att->atttypmod = typmod; + att->attndims = attdim; + att->attnum = attributeNumber; + att->attnotnull = false; + att->atthasdef = false; + att->attidentity = '\0'; + att->attgenerated = '\0'; + att->attisdropped = false; + att->attislocal = true; + att->attinhcount = 0; + att->attcollation = InvalidOid; + + // Set type-specific properties based on OID (hardcoded for common types) + switch (oidtypeid) { + case INT2OID: // smallint + att->attlen = 2; + att->attalign = 's'; + att->attstorage = 'p'; + att->attbyval = true; + break; + case INT4OID: // integer + att->attlen = 4; + att->attalign = 'i'; + att->attstorage = TYPSTORAGE_PLAIN; + att->attbyval = true; + break; + case INT8OID: // bigint + att->attlen = 8; + att->attalign = 'd'; + att->attstorage = TYPSTORAGE_PLAIN; + att->attbyval = FLOAT8PASSBYVAL; + break; + case FLOAT8OID: // double precision + att->attlen = 8; + att->attalign = 'd'; + att->attstorage = 'p'; + att->attbyval = FLOAT8PASSBYVAL; + break; + case BOOLOID: // boolean + att->attlen = 1; + att->attalign = 'c'; + att->attstorage = 'p'; + att->attbyval = true; + break; + case TEXTOID: // text + att->attlen = -1; + att->attalign = 'i'; + att->attstorage = TYPSTORAGE_PLAIN; + att->attbyval = false; + att->attcollation = DEFAULT_COLLATION_OID; + break; + case NUMERICOID: // numeric + att->attlen = -1; + att->attalign = TYPALIGN_INT; + att->attstorage = TYPSTORAGE_PLAIN; + att->attbyval = false; + break; + case TIMESTAMPOID: // timestamp + att->attlen = 8; + att->attalign = 'd'; + att->attstorage = TYPSTORAGE_PLAIN; + att->attbyval = FLOAT8PASSBYVAL; + break; + default: + // Default values for unknown types + att->attlen = -1; + att->attalign = 'i'; + att->attstorage = 'p'; + att->attbyval = false; + break; + } +} + +// Global initialization function for general benchmark framework +void GlobalBenchmarkInit() { + static bool global_initialized = false; + if (global_initialized) return; + + printf("Initializing PAX benchmark framework...\n"); + + // Initialize memory context + MemoryContextInit(); + + // Setup global Mock functions + static std::unique_ptr stub_global = std::make_unique(); + + stub_global->set(MinMaxGetPgStrategyProcinfo, MockMinMaxGetStrategyProcinfo); + stub_global->set(CPaxGetFastSequences, MockGetFastSequences); + stub_global->set(cbdb::BuildPaxDirectoryPath, MockBuildPaxDirectoryPath); + stub_global->set(cbdb::InsertMicroPartitionPlaceHolder, + MockInsertMicroPartitionPlaceHolder); + stub_global->set(cbdb::DeleteMicroPartitionEntry, + MockDeleteMicroPartitionEntry); + stub_global->set(cbdb::GetMinMaxColumnIndexes, MockGetMinMaxColumnIndexes); + stub_global->set(cbdb::GetBloomFilterColumnIndexes, + MockBloomFilterColumnIndexes); + stub_global->set(cbdb::GetRelEncodingOptions, MockGetRelEncodingOptions); + stub_global->set(ExecStoreVirtualTuple, MockExecStoreVirtualTuple); + stub_global->set(TupleDescInitEntry, MockTupleDescInitEntry); + + // Create basic test directory + system("mkdir -p ./bench_data"); + + global_initialized = true; + printf("PAX benchmark framework initialized.\n"); +} + +// Global cleanup function for general benchmark framework +void GlobalBenchmarkCleanup() { + printf("Cleaning up PAX benchmark framework...\n"); + + // Clean up test directory + // system("rm -rf ./bench_data"); + + // Reset memory context + if (TopMemoryContext) { + MemoryContextReset(TopMemoryContext); + } + + printf("PAX benchmark framework cleaned up.\n"); +} + +// Example benchmark test +static void example_benchmark(::benchmark::State &state) { for (auto _ : state) { + // Empty example test } } BENCHMARK(example_benchmark); -BENCHMARK_MAIN(); \ No newline at end of file +} // namespace pax::benchmark + +// Global cleanup function (C-style for atexit) +static void cleanup_all() { + pax::bench::GetBenchmarkRegistry().RunAllCleanupFunctions(); + pax::bench::GlobalBenchmarkCleanup(); +} + +// Main entry function +int main(int argc, char **argv) { + // Register global cleanup function + std::atexit(cleanup_all); + + // Global initialization + pax::bench::GlobalBenchmarkInit(); + + // Run all registered initialization functions + pax::bench::GetBenchmarkRegistry().RunAllInitFunctions(); + + // Initialize benchmark framework + ::benchmark::Initialize(&argc, argv); + if (::benchmark::ReportUnrecognizedArguments(argc, argv)) return 1; + + printf("\n=== Starting PAX Benchmark Suite ===\n"); + printf("Use --benchmark_filter= to run specific tests\n"); + printf("Use --benchmark_list_tests to see all available tests\n\n"); + + // Run benchmark + ::benchmark::RunSpecifiedBenchmarks(); + + return 0; +} \ No newline at end of file diff --git a/contrib/pax_storage/src/cpp/pax_gbench.h b/contrib/pax_storage/src/cpp/pax_gbench.h new file mode 100644 index 00000000000..44376022693 --- /dev/null +++ b/contrib/pax_storage/src/cpp/pax_gbench.h @@ -0,0 +1,72 @@ +/*------------------------------------------------------------------------- + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * pax_gbench.h + * + * IDENTIFICATION + * contrib/pax_storage/src/cpp/pax_gbench.h + * + *------------------------------------------------------------------------- + */ + +#pragma once + +#include +#include + +namespace pax { + +namespace bench { + +// Generic initialization and cleanup function types +using InitFunction = std::function; +using CleanupFunction = std::function; + +// Create memory context for benchmark +extern void CreateMemoryContext(); + +// Forward declaration +class BenchmarkRegistry; + +// Global registry access function +BenchmarkRegistry &GetBenchmarkRegistry(); + +// Global initialization and cleanup functions +void GlobalBenchmarkInit(); +void GlobalBenchmarkCleanup(); + +// Registration functions (implemented in pax_gbench.cc) +void RegisterBenchmarkInit(InitFunction func); +void RegisterBenchmarkCleanup(CleanupFunction func); + +} // namespace benchmark +} // namespace pax + +// Convenient registration macros +#define REGISTER_BENCHMARK_INIT(func) \ + static bool BENCHMARK_INIT_##__COUNTER__ = []() { \ + pax::bench::RegisterBenchmarkInit(func); \ + return true; \ + }() + +#define REGISTER_BENCHMARK_CLEANUP(func) \ + static bool BENCHMARK_CLEANUP_##__COUNTER__ = []() { \ + pax::bench::RegisterBenchmarkCleanup(func); \ + return true; \ + }() diff --git a/contrib/pax_storage/src/cpp/storage/columns/pax_compress_bench.cc b/contrib/pax_storage/src/cpp/storage/columns/pax_compress_bench.cc new file mode 100644 index 00000000000..0a792601e99 --- /dev/null +++ b/contrib/pax_storage/src/cpp/storage/columns/pax_compress_bench.cc @@ -0,0 +1,421 @@ +/*------------------------------------------------------------------------- + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * pax_compress_bench.cc + * + * IDENTIFICATION + * contrib/pax_storage/src/cpp/storage/columns/pax_compress_bench.cc + * + *------------------------------------------------------------------------- + */ + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "comm/cbdb_wrappers.h" +#include "comm/pax_memory.h" +#include "pax_gbench.h" +#include "storage/columns/pax_compress.h" +#include "storage/columns/pax_decoding.h" +#include "storage/columns/pax_delta_encoding.h" +#include "storage/columns/pax_rlev2_encoding.h" +#include "storage/pax_buffer.h" + +namespace pax::bench { + +namespace { + +// Test data and prebuilt buffers for decode/decompress benchmarks +static const size_t kCount = 1024 * 1024; +static std::vector g_offsets; +static std::unique_ptr g_raw_bytes; +static size_t g_raw_len = 0; + +static std::vector g_rle_encoded; +static size_t g_rle_len = 0; + +static std::vector g_delta_encoded; +static size_t g_delta_len = 0; + +static std::unique_ptr g_zstd_compressed; +static size_t g_zstd_len = 0; + +static std::shared_ptr g_zstd; + +// Simple helpers for bench data persistence +static void EnsureDirExists(const char *dir_path) { + if (mkdir(dir_path, 0755) != 0) { + if (errno != EEXIST) { + std::cerr << "Failed to create directory: " << dir_path << std::endl; + std::abort(); + } + } +} + +static bool ReadWholeFile(const char *path, std::vector &out) { + std::ifstream in(path, std::ios::binary); + if (!in.is_open()) return false; + in.seekg(0, std::ios::end); + std::streampos size = in.tellg(); + if (size <= 0) return false; + out.resize(static_cast(size)); + in.seekg(0, std::ios::beg); + in.read(out.data(), size); + return static_cast(in); +} + +static bool ReadWholeFile(const char *path, std::unique_ptr &out, + size_t &out_len) { + std::ifstream in(path, std::ios::binary); + if (!in.is_open()) return false; + in.seekg(0, std::ios::end); + std::streampos size = in.tellg(); + if (size <= 0) return false; + out_len = static_cast(size); + out = std::make_unique(out_len); + in.seekg(0, std::ios::beg); + in.read(out.get(), size); + return static_cast(in); +} + +static void WriteWholeFile(const char *path, const char *data, size_t len) { + std::ofstream out(path, std::ios::binary | std::ios::trunc); + if (!out.is_open()) { + std::cerr << "Failed to open file for write: " << path << std::endl; + std::abort(); + } + out.write(data, static_cast(len)); + if (!out) { + std::cerr << "Failed to write file: " << path << std::endl; + std::abort(); + } +} + +static const char *kBenchDataDir = "bench_data"; +static const char *kRLEV2Path = "bench_data/rle_v2_u32.bin"; +static const char *kDeltaPath = "bench_data/delta_u32.bin"; +static const char *kZSTDPath = "bench_data/zstd_u32.bin"; +static const char *kRawPath = "bench_data/raw_u32.bin"; + +static std::vector GenerateMonotonicOffsets(size_t n, uint32_t seed) { + std::vector offsets; + offsets.resize(n); + offsets[0] = 0; + std::mt19937 rng(seed); + std::uniform_int_distribution step_dist(1, 256); + for (size_t i = 1; i < n; ++i) { + offsets[i] = offsets[i - 1] + static_cast(step_dist(rng)); + } + return offsets; +} + +// Lazily ensure raw bytes are available (prefer loading from disk) +static void EnsureRawData() { + if (g_raw_len != 0 && g_raw_bytes) return; + EnsureDirExists(kBenchDataDir); + std::vector raw_from_file; + if (ReadWholeFile(kRawPath, raw_from_file)) { + g_raw_len = raw_from_file.size(); + g_raw_bytes = std::make_unique(g_raw_len); + std::memcpy(g_raw_bytes.get(), raw_from_file.data(), g_raw_len); + return; + } + // Fallback: generate and persist + g_offsets = GenerateMonotonicOffsets(kCount, /*seed=*/12345); + g_raw_len = g_offsets.size() * sizeof(uint32_t); + g_raw_bytes = std::make_unique(g_raw_len); + std::memcpy(g_raw_bytes.get(), g_offsets.data(), g_raw_len); + WriteWholeFile(kRawPath, g_raw_bytes.get(), g_raw_len); +} + +// Lazily ensure RLEv2 encoded buffer exists (load or build from raw) +static void EnsureRleEncoded() { + if (g_rle_len != 0 && !g_rle_encoded.empty()) return; + EnsureDirExists(kBenchDataDir); + if (ReadWholeFile(kRLEV2Path, g_rle_encoded)) { + g_rle_len = g_rle_encoded.size(); + return; + } + EnsureRawData(); + PaxEncoder::EncodingOption enc_opt; + enc_opt.column_encode_type = ColumnEncoding_Kind_RLE_V2; + enc_opt.is_sign = false; + + PaxOrcEncoder rle_encoder(enc_opt); + auto rle_out = std::make_shared>(g_raw_len); + rle_encoder.SetDataBuffer(rle_out); + // encode directly from raw bytes to avoid depending on g_offsets + size_t count = g_raw_len / sizeof(uint32_t); + const uint32_t *vals = reinterpret_cast(g_raw_bytes.get()); + for (size_t i = 0; i < count; ++i) { + uint32_t v = vals[i]; + rle_encoder.Append(reinterpret_cast(&v), sizeof(uint32_t)); + } + rle_encoder.Flush(); + + g_rle_len = rle_encoder.GetBufferSize(); + g_rle_encoded.assign(rle_encoder.GetBuffer(), + rle_encoder.GetBuffer() + g_rle_len); + WriteWholeFile(kRLEV2Path, g_rle_encoded.data(), g_rle_len); +} + +// Lazily ensure Delta encoded buffer exists (load or build from raw) +static void EnsureDeltaEncoded() { + if (g_delta_len != 0 && !g_delta_encoded.empty()) return; + EnsureDirExists(kBenchDataDir); + if (ReadWholeFile(kDeltaPath, g_delta_encoded)) { + g_delta_len = g_delta_encoded.size(); + return; + } + EnsureRawData(); + PaxEncoder::EncodingOption enc_opt; + enc_opt.is_sign = false; + // type not used by PaxDeltaEncoder + PaxDeltaEncoder delta_encoder(enc_opt); + auto delta_out = std::make_shared>(g_raw_len); + delta_encoder.SetDataBuffer(delta_out); + // Encode whole array in one shot + delta_encoder.Append(g_raw_bytes.get(), g_raw_len); + delta_encoder.Flush(); + + g_delta_len = delta_encoder.GetBufferSize(); + g_delta_encoded.assign(delta_encoder.GetBuffer(), + delta_encoder.GetBuffer() + g_delta_len); + WriteWholeFile(kDeltaPath, g_delta_encoded.data(), g_delta_len); +} + +// Lazily ensure ZSTD compressed buffer exists (load or build from raw) +static void EnsureZstdCompressed() { + EnsureDirExists(kBenchDataDir); + if (!g_zstd) { + g_zstd = + PaxCompressor::CreateBlockCompressor(ColumnEncoding_Kind_COMPRESS_ZSTD); + if (!g_zstd) { + std::cerr << "Failed to create ZSTD compressor" << std::endl; + std::abort(); + } + } + if (g_zstd_len != 0 && g_zstd_compressed) return; + if (ReadWholeFile(kZSTDPath, g_zstd_compressed, g_zstd_len)) { + return; + } + EnsureRawData(); + size_t bound = g_zstd->GetCompressBound(g_raw_len); + g_zstd_compressed = std::make_unique(bound); + g_zstd_len = g_zstd->Compress(g_zstd_compressed.get(), bound, + g_raw_bytes.get(), g_raw_len, /*lvl=*/5); + if (g_zstd->IsError(g_zstd_len) || g_zstd_len == 0) { + std::cerr << "ZSTD one-time compress failed" << std::endl; + std::abort(); + } + WriteWholeFile(kZSTDPath, g_zstd_compressed.get(), g_zstd_len); +} + +static void PrepareOnce() { + pax::bench::CreateMemoryContext(); + EnsureDirExists(kBenchDataDir); +} + +static void CleanupBenchData() { + const char *files[] = {kRLEV2Path, kDeltaPath, kZSTDPath, kRawPath}; + for (const char *p : files) { + std::remove(p); + } + + rmdir(kBenchDataDir); +} + +} // namespace + +// Register module init with gbench framework +REGISTER_BENCHMARK_INIT(PrepareOnce); +REGISTER_BENCHMARK_CLEANUP(CleanupBenchData); + +// RLEv2 encode benchmark +static void BM_RLEV2_Encode(::benchmark::State &state) { + // Prepare raw data only; no encoded buffers are created here + EnsureRawData(); + for (auto _ : state) { + PaxEncoder::EncodingOption enc_opt; + enc_opt.column_encode_type = ColumnEncoding_Kind_RLE_V2; + enc_opt.is_sign = false; + + PaxOrcEncoder encoder(enc_opt); + auto out = std::make_shared>(g_raw_len); + encoder.SetDataBuffer(out); + + size_t count = g_raw_len / sizeof(uint32_t); + const uint32_t *vals = + reinterpret_cast(g_raw_bytes.get()); + for (size_t i = 0; i < count; ++i) { + uint32_t v = vals[i]; + encoder.Append(reinterpret_cast(&v), sizeof(uint32_t)); + } + encoder.Flush(); + g_rle_len = encoder.GetBufferSize(); + benchmark::DoNotOptimize(encoder.GetBuffer()); + benchmark::ClobberMemory(); + } + state.SetBytesProcessed(static_cast(state.iterations()) * + static_cast(g_raw_len)); + state.counters["raw_kb"] = + benchmark::Counter(static_cast(g_raw_len) / (1024.0)); + state.counters["rle_kb"] = + benchmark::Counter(static_cast(g_rle_len) / (1024.0)); +} +BENCHMARK(BM_RLEV2_Encode); + +// RLEv2 decode benchmark +static void BM_RLEV2_Decode(::benchmark::State &state) { + // Ensure we have raw size and encoded buffer ready (prefer from disk) + EnsureRawData(); + EnsureRleEncoded(); + for (auto _ : state) { + PaxDecoder::DecodingOption dec_opt; + dec_opt.column_encode_type = ColumnEncoding_Kind_RLE_V2; + dec_opt.is_sign = false; + + auto decoder = PaxDecoder::CreateDecoder(dec_opt); + auto out = std::make_shared>(g_raw_len); + decoder->SetSrcBuffer(g_rle_encoded.data(), g_rle_len); + decoder->SetDataBuffer(out); + size_t n = decoder->Decoding(); + benchmark::DoNotOptimize(n); + benchmark::ClobberMemory(); + } + state.SetBytesProcessed(static_cast(state.iterations()) * + static_cast(g_raw_len)); +} +BENCHMARK(BM_RLEV2_Decode); + +// Delta encode benchmark +static void BM_Delta_Encode(::benchmark::State &state) { + EnsureRawData(); + for (auto _ : state) { + PaxEncoder::EncodingOption enc_opt; + enc_opt.is_sign = false; + PaxDeltaEncoder encoder(enc_opt); + auto out = std::make_shared>(g_raw_len); + encoder.SetDataBuffer(out); + encoder.Append(g_raw_bytes.get(), g_raw_len); + encoder.Flush(); + g_delta_len = encoder.GetBufferSize(); + benchmark::DoNotOptimize(encoder.GetBuffer()); + benchmark::ClobberMemory(); + } + state.SetBytesProcessed(static_cast(state.iterations()) * + static_cast(g_raw_len)); + state.counters["delta_kb"] = + benchmark::Counter(static_cast(g_delta_len) / (1024.0)); +} +BENCHMARK(BM_Delta_Encode); + +// Delta decode benchmark +static void BM_Delta_Decode(::benchmark::State &state) { + EnsureRawData(); + EnsureDeltaEncoded(); + for (auto _ : state) { + PaxDecoder::DecodingOption dec_opt; + dec_opt.is_sign = false; + dec_opt.column_encode_type = ColumnEncoding_Kind_DIRECT_DELTA; + PaxDeltaDecoder decoder(dec_opt); + auto out = std::make_shared>(g_raw_len); + decoder.SetSrcBuffer(g_delta_encoded.data(), g_delta_len); + decoder.SetDataBuffer(out); + size_t n = decoder.Decoding(); + if (n != g_raw_len / sizeof(uint32_t) && out->Used() != g_raw_len) { + std::cerr << "Delta decode failed, n: " << n + << ", g_raw_len: " << g_raw_len + << ", g_delta_len: " << g_delta_len + << ", out: Used: " << out->Used() << std::endl; + std::abort(); + } + + if (memcmp(out->GetBuffer(), g_raw_bytes.get(), g_raw_len) != 0) { + std::cerr << "Delta decode failed, out: " << out->GetBuffer() + << ", g_raw_bytes: " << g_raw_bytes.get() << std::endl; + std::abort(); + } + + benchmark::DoNotOptimize(n); + benchmark::ClobberMemory(); + } + state.SetBytesProcessed(static_cast(state.iterations()) * + static_cast(g_raw_len)); +} +BENCHMARK(BM_Delta_Decode); + +// ZSTD compress benchmark +static void BM_ZSTD_Compress(::benchmark::State &state) { + EnsureRawData(); + if (!g_zstd) { + g_zstd = + PaxCompressor::CreateBlockCompressor(ColumnEncoding_Kind_COMPRESS_ZSTD); + if (!g_zstd) { + std::cerr << "Failed to create ZSTD compressor" << std::endl; + std::abort(); + } + } + size_t bound = g_zstd->GetCompressBound(g_raw_len); + std::unique_ptr dst(new char[bound]); + for (auto _ : state) { + size_t n = g_zstd->Compress(dst.get(), bound, g_raw_bytes.get(), g_raw_len, + /*lvl=*/5); + g_zstd_len = n; + benchmark::DoNotOptimize(n); + benchmark::ClobberMemory(); + } + state.SetBytesProcessed(static_cast(state.iterations()) * + static_cast(g_raw_len)); + state.counters["zstd_kb"] = + benchmark::Counter(static_cast(g_zstd_len) / (1024.0)); +} +BENCHMARK(BM_ZSTD_Compress); + +// ZSTD decompress benchmark +static void BM_ZSTD_Decompress(::benchmark::State &state) { + EnsureRawData(); + EnsureZstdCompressed(); + std::unique_ptr dst(new char[g_raw_len]); + for (auto _ : state) { + size_t n = g_zstd->Decompress(dst.get(), g_raw_len, g_zstd_compressed.get(), + g_zstd_len); + benchmark::DoNotOptimize(n); + benchmark::ClobberMemory(); + } + state.SetBytesProcessed(static_cast(state.iterations()) * + static_cast(g_raw_len)); +} +BENCHMARK(BM_ZSTD_Decompress); + +} // namespace pax::bench diff --git a/contrib/pax_storage/src/cpp/storage/columns/pax_decoding.cc b/contrib/pax_storage/src/cpp/storage/columns/pax_decoding.cc index 7ba0fcd6768..0e15ec52088 100644 --- a/contrib/pax_storage/src/cpp/storage/columns/pax_decoding.cc +++ b/contrib/pax_storage/src/cpp/storage/columns/pax_decoding.cc @@ -31,6 +31,7 @@ #include "comm/pax_memory.h" #include "storage/columns/pax_dict_encoding.h" #include "storage/columns/pax_rlev2_decoding.h" +#include "storage/columns/pax_delta_encoding.h" namespace pax { @@ -47,7 +48,7 @@ std::shared_ptr PaxDecoder::CreateDecoder(const DecodingOption &deco break; } case ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA: { - /// TODO(jiaqizho) support it + decoder = std::make_shared>(decoder_options); break; } case ColumnEncoding_Kind::ColumnEncoding_Kind_DICTIONARY: { diff --git a/contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding.cc b/contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding.cc new file mode 100644 index 00000000000..700c8d777c7 --- /dev/null +++ b/contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding.cc @@ -0,0 +1,520 @@ +/*------------------------------------------------------------------------- + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * pax_delta_encoding.cc + * + * IDENTIFICATION + * contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding.cc + * + *------------------------------------------------------------------------- + */ +#include "storage/columns/pax_delta_encoding.h" + +#include +#include +#include + +namespace pax { + +// delta bitpack encoder +template +PaxDeltaEncoder::PaxDeltaEncoder(const EncodingOption &encoder_options) + : PaxEncoder(encoder_options) {} + +template +void PaxDeltaEncoder::Append(char *data, size_t size) { + CBDB_CHECK(!has_append_, cbdb::CException::kExTypeAbort, + fmt("PaxDeltaEncoder::Append only support Append Once")); + has_append_ = true; + + auto T_data = reinterpret_cast(data); + auto T_data_len = size / sizeof(T); + Encode(T_data, T_data_len); +} + +inline uint8_t NumBitsAllowZero(uint32_t value) { + if (value == 0) return 0; + uint8_t bits = 0; + while (value) { + bits++; + value >>= 1; + } + return bits; +} + +// Fast bit width calculation (0 -> 0) +inline uint8_t FastNumBits(uint32_t v) { +#if defined(__GNUC__) || defined(__clang__) + return v == 0 ? 0 : static_cast(32 - __builtin_clz(v)); +#else + uint8_t bits = 0; + while (v) { + ++bits; + v >>= 1; + } + return bits; +#endif +} + +// 64-bit bit writer based on raw pointer (writes to reserved DataBuffer range) +struct BitWriter64Ptr { + uint8_t *out; + size_t index; + uint64_t bit_buffer; + uint32_t bit_count; + + BitWriter64Ptr(uint8_t *p) : out(p), index(0), bit_buffer(0), bit_count(0) {} + + inline void Append(uint32_t value, uint8_t width) { + if (width == 0) return; + bit_buffer |= (static_cast(value) << bit_count); + bit_count += width; + while (bit_count >= 8) { + out[index++] = static_cast(bit_buffer & 0xFF); + bit_buffer >>= 8; + bit_count -= 8; + } + } + + inline void FlushToByte() { + if (bit_count > 0) { + out[index++] = static_cast(bit_buffer & 0xFF); + bit_buffer = 0; + bit_count = 0; + } + } +}; + +// 64-bit bit reader based on raw pointer (limited to specified payload bytes) +struct BitReader64Ptr { + const uint8_t *in; + size_t size; + size_t index; + uint64_t bit_buffer; + uint32_t bit_count; + + BitReader64Ptr(const uint8_t *p, size_t len) + : in(p), size(len), index(0), bit_buffer(0), bit_count(0) {} + + inline void Ensure(uint32_t need_bits) { + while (bit_count < need_bits && index < size) { + bit_buffer |= (static_cast(in[index]) << bit_count); + ++index; + bit_count += 8; + } + } + + inline uint32_t Read(uint8_t width) { + if (width == 0) return 0; + Ensure(width); + uint32_t result; + if (width == 32) + result = static_cast(bit_buffer & 0xFFFFFFFFull); + else + result = static_cast(bit_buffer & ((1ull << width) - 1)); + bit_buffer >>= width; + bit_count -= width; + return result; + } + + inline void AlignToByte() { + uint32_t drop = bit_count % 8; + if (drop) { + bit_buffer >>= drop; + bit_count -= drop; + } + } +}; + +/* +Overall layout: + DeltaBlockHeader (struct, fixed-size) + - uint32 value_per_block + - uint32 values_per_mini_block + - uint32 total_count + T first_value + [Repeated Block until total_count is exhausted] + - uint32 min_delta + - uint8 bit_widths[ mini_blocks_per_block ] + - uint32 payload_size + - uint8 payload[payload_size] + // bit-packed adjusted deltas, mini-block by mini-block + // within a block: bits are written LSB-first, end aligned to byte +*/ + +template +size_t PaxDeltaEncoder::GetBoundSize(size_t src_len) const { + size_t value_count = src_len / sizeof(T); + size_t block_count = (value_count + value_per_block_ - 1) / value_per_block_; + /* header + first_value + block_count * (min_delta + bit_widths + payload_size + * + payload) */ + return sizeof(DeltaBlockHeader) + sizeof(T) + + block_count * (sizeof(uint32) + mini_blocks_per_block_ + + sizeof(uint32) + sizeof(uint32)); +} + +template +void PaxDeltaEncoder::Encode(T *data, size_t count) { + // Estimate allocation: by element byte count, sufficient to accommodate + // header and bit stream + if (result_buffer_->Capacity() < + count * sizeof(T) + sizeof(DeltaBlockHeader) + sizeof(T)) { + result_buffer_->ReSize(count * sizeof(T) + sizeof(DeltaBlockHeader) + + sizeof(T)); + } + + DeltaBlockHeader header; + header.value_per_block = value_per_block_; + header.values_per_mini_block = values_per_mini_block_; + header.total_count = count; + // add delta block header + result_buffer_->Write(reinterpret_cast(&header), sizeof(header)); + result_buffer_->Brush(sizeof(header)); + // add base value + result_buffer_->Write(reinterpret_cast(&data[0]), sizeof(data[0])); + result_buffer_->Brush(sizeof(data[0])); + + size_t values_emitted = 1; + T previous_value = data[0]; + + while (values_emitted < count) { + uint32_t values_in_block = std::min( + value_per_block_, static_cast(count - values_emitted)); + + if (deltas_scratch_.size() < values_in_block) { + deltas_scratch_.resize(values_in_block); + } + uint32_t *deltas = deltas_scratch_.data(); + uint32_t min_delta = UINT32_MAX; + uint32_t mini_max[mini_blocks_per_block_] = {0}; + + for (uint32_t i = 0; i < values_in_block; ++i) { + T current = data[values_emitted + i]; + uint32_t delta = static_cast(current - previous_value); + deltas[i] = delta; + previous_value = current; + if (delta < min_delta) min_delta = delta; + uint32_t mini_index = i / values_per_mini_block_; + if (delta > mini_max[mini_index]) mini_max[mini_index] = delta; + } + + // write block header: min_delta later + uint8_t bit_widths[mini_blocks_per_block_] = {0}; + uint64_t total_bits = 0; + for (uint32_t i = 0; i < mini_blocks_per_block_; ++i) { + uint32_t start = i * values_per_mini_block_; + if (start >= values_in_block) { + bit_widths[i] = 0; + continue; + } + uint32_t adjusted_max = mini_max[i] - min_delta; + uint8_t w = FastNumBits(adjusted_max); + bit_widths[i] = w; + uint32_t end = std::min(start + values_per_mini_block_, values_in_block); + total_bits += static_cast(w) * (end - start); + } + uint32_t payload_bytes = static_cast((total_bits + 7) / 8); + + size_t need_size = payload_bytes + mini_blocks_per_block_ + + sizeof(payload_bytes) + sizeof(min_delta); + + // Grows the buffer to be at least need_size bytes. To avoid frequent + // resizing, the new capacity is calculated as the maximum of (current + // capacity * 1.5) or (current capacity + need_size). + if (result_buffer_->Available() < need_size) { + size_t inc_size = need_size > (result_buffer_->Capacity() * 0.5) + ? need_size + : result_buffer_->Capacity() * 0.5; + result_buffer_->ReSize(result_buffer_->Capacity() + inc_size); + } + + // write block header: min_delta + result_buffer_->Write(reinterpret_cast(&min_delta), + sizeof(min_delta)); + result_buffer_->Brush(sizeof(min_delta)); + + // write bit_widths and payload_size + result_buffer_->Write(reinterpret_cast(bit_widths), + mini_blocks_per_block_); + result_buffer_->Brush(mini_blocks_per_block_); + + result_buffer_->Write(reinterpret_cast(&payload_bytes), + sizeof(payload_bytes)); + result_buffer_->Brush(sizeof(payload_bytes)); + + uint8_t *payload_ptr = + reinterpret_cast(result_buffer_->GetAvailableBuffer()); + BitWriter64Ptr bw(payload_ptr); + for (uint32_t i = 0; i < mini_blocks_per_block_; ++i) { + uint32_t start = i * values_per_mini_block_; + if (start >= values_in_block) break; + uint32_t end = std::min(start + values_per_mini_block_, values_in_block); + uint8_t w = bit_widths[i]; + if (w == 0) continue; + for (uint32_t j = start; j < end; ++j) { + uint32_t adjusted = deltas[j] - min_delta; + bw.Append(adjusted, w); + } + } + bw.FlushToByte(); + result_buffer_->Brush(payload_bytes); + + values_emitted += values_in_block; + } +} + +template +bool PaxDeltaEncoder::SupportAppendNull() const { + return false; +} + +template +void PaxDeltaEncoder::Flush() { + // do nothing +} + +// Specialized reading of one mini-block and batch writing results +// (BitReader64Ptr) +template +inline void ReadMiniBlockSpecializedPtr(BitReader64Ptr &br, T *out_values, + T ¤t_value, uint32_t count_in_mb, + uint32_t min_delta, uint8_t w) { + switch (w) { + case 0: { + for (uint32_t j = 0; j < count_in_mb; ++j) { + current_value = + static_cast(static_cast(current_value) + min_delta); + out_values[j] = current_value; + } + return; + } + case 8: { + for (uint32_t j = 0; j < count_in_mb; ++j) { + uint32_t adjusted = br.Read(8); + current_value = static_cast(static_cast(current_value) + + adjusted + min_delta); + out_values[j] = current_value; + } + return; + } + case 16: { + for (uint32_t j = 0; j < count_in_mb; ++j) { + uint32_t adjusted = br.Read(16); + current_value = static_cast(static_cast(current_value) + + adjusted + min_delta); + out_values[j] = current_value; + } + return; + } + case 32: { + for (uint32_t j = 0; j < count_in_mb; ++j) { + uint32_t adjusted = br.Read(32); + current_value = static_cast(static_cast(current_value) + + adjusted + min_delta); + out_values[j] = current_value; + } + return; + } + default: { + uint32_t j = 0; + const uint32_t n4 = count_in_mb & ~3u; + for (; j < n4; j += 4) { + uint32_t a0 = br.Read(w); + uint32_t a1 = br.Read(w); + uint32_t a2 = br.Read(w); + uint32_t a3 = br.Read(w); + current_value = static_cast(static_cast(current_value) + + a0 + min_delta); + out_values[j] = current_value; + current_value = static_cast(static_cast(current_value) + + a1 + min_delta); + out_values[j + 1] = current_value; + current_value = static_cast(static_cast(current_value) + + a2 + min_delta); + out_values[j + 2] = current_value; + current_value = static_cast(static_cast(current_value) + + a3 + min_delta); + out_values[j + 3] = current_value; + } + for (; j < count_in_mb; ++j) { + uint32_t a = br.Read(w); + current_value = static_cast(static_cast(current_value) + + a + min_delta); + out_values[j] = current_value; + } + return; + } + } +} + +// Specialized reading of one mini-block and batch writing results +template +PaxDeltaDecoder::PaxDeltaDecoder( + const PaxDecoder::DecodingOption &encoder_options) + : PaxDecoder(encoder_options), + data_buffer_(nullptr), + result_buffer_(nullptr) { + CBDB_CHECK(encoder_options.column_encode_type == + ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA, + cbdb::CException::kExTypeAbort, + fmt("PaxDeltaDecoder only support DIRECT_DELTA encoding")); + // TODO: if sign is true, should use zigzag encoding, now use delta encoding + // for offsets in non-fixed columns + CBDB_CHECK(encoder_options.is_sign == false, + cbdb::CException::kExTypeUnImplements, + fmt("PaxDeltaDecoder is not supported for signed data, " + "will support zigzag later")); +} + +template +PaxDecoder *PaxDeltaDecoder::SetSrcBuffer(char *data, size_t data_len) { + if (data) { + data_buffer_ = + std::make_shared>(data, data_len, false, false); + data_buffer_->Brush(data_len); + } + return this; +} + +template +PaxDecoder *PaxDeltaDecoder::SetDataBuffer( + std::shared_ptr> result_buffer) { + result_buffer_ = result_buffer; + return this; +} + +template +const char *PaxDeltaDecoder::GetBuffer() const { + return result_buffer_ ? result_buffer_->GetBuffer() : nullptr; +} + +template +size_t PaxDeltaDecoder::GetBufferSize() const { + return result_buffer_ ? result_buffer_->Used() : 0; +} + +template +size_t PaxDeltaDecoder::Next(const char * /*not_null*/) { + CBDB_RAISE(cbdb::CException::kExTypeUnImplements); +} + +template +size_t PaxDeltaDecoder::Decoding() { + if (!data_buffer_) return 0; + Assert(result_buffer_); + + const uint8_t *p = + reinterpret_cast(data_buffer_->GetBuffer()); + uint32_t remaining = static_cast(data_buffer_->Used()); + + // read header: values_per_block, values_per_mini_block_, total_count, + // first_value + DeltaBlockHeader header; + std::memcpy(&header, p, sizeof(header)); + p += sizeof(header); + remaining -= sizeof(header); + uint32_t values_per_block = header.value_per_block; + uint32_t values_per_mini_block_ = header.values_per_mini_block; + uint32_t total_count = header.total_count; + + T first_value; + std::memcpy(&first_value, p, sizeof(T)); + p += sizeof(T); + remaining -= sizeof(T); + + // reserve output buffer + if (result_buffer_->Capacity() < total_count * sizeof(T)) { + result_buffer_->ReSize(total_count * sizeof(T)); + } + + // write first value + T current_value = static_cast(first_value); + result_buffer_->Write(reinterpret_cast(¤t_value), sizeof(T)); + result_buffer_->Brush(sizeof(T)); + uint32_t decoded = 1; + + const uint32_t mini_blocks_per_block_ = + values_per_block / values_per_mini_block_; + + while (decoded < total_count && remaining > 0) { + uint32_t min_delta; + std::memcpy(&min_delta, p, sizeof(min_delta)); + p += sizeof(min_delta); + remaining -= sizeof(min_delta); + + if (remaining < mini_blocks_per_block_) break; + + uint8_t bit_widths[mini_blocks_per_block_] = {0}; + for (uint32_t i = 0; i < mini_blocks_per_block_; ++i) { + bit_widths[i] = *p++; + --remaining; + } + + uint32_t payload_size; + std::memcpy(&payload_size, p, sizeof(payload_size)); + p += sizeof(payload_size); + remaining -= sizeof(payload_size); + + uint32_t values_in_block = + std::min(values_per_block, total_count - decoded); + + // read payload within bounded size + BitReader64Ptr br(p, payload_size); + + for (uint32_t i = 0; i < mini_blocks_per_block_ && decoded < total_count; + ++i) { + uint32_t start = i * values_per_mini_block_; + if (start >= values_in_block) break; + uint32_t end = std::min(start + values_per_mini_block_, values_in_block); + uint32_t cnt = end - start; + uint8_t w = bit_widths[i]; + + T *out_base = reinterpret_cast(result_buffer_->GetAvailableBuffer()); + ReadMiniBlockSpecializedPtr(br, out_base, current_value, cnt, + min_delta, w); + result_buffer_->Brush(cnt * sizeof(T)); + decoded += cnt; + } + + br.AlignToByte(); + + p += payload_size; + remaining -= payload_size; + } + + Assert(result_buffer_->Used() == total_count * sizeof(T)); + + return result_buffer_->Used(); +} + +template +size_t PaxDeltaDecoder::Decoding(const char * /*not_null*/, + size_t /*not_null_len*/) { + CBDB_RAISE(cbdb::CException::kExTypeUnImplements); +} + +template class PaxDeltaEncoder; +template class PaxDeltaDecoder; +// Add explicit instantiations for signed integral types used by CreateDecoder +template class PaxDeltaDecoder; +template class PaxDeltaDecoder; +template class PaxDeltaDecoder; +template class PaxDeltaDecoder; + +} // namespace pax \ No newline at end of file diff --git a/contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding.h b/contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding.h new file mode 100644 index 00000000000..7f2251201bf --- /dev/null +++ b/contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding.h @@ -0,0 +1,135 @@ +/*------------------------------------------------------------------------- + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * pax_delta_encoding.h + * + * IDENTIFICATION + * contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding.h + * + *------------------------------------------------------------------------- + */ +#pragma once + +#include "storage/columns/pax_encoding.h" +#include "storage/columns/pax_decoding.h" +#include + +namespace pax { + +struct BitReader64 { + const uint8_t*& p; + uint32_t& remaining; + uint64_t bit_buffer = 0; + uint32_t bit_count = 0; + + BitReader64(const uint8_t*& ptr, uint32_t& size) : p(ptr), remaining(size) {} + + inline void Ensure(uint32_t need_bits) { + while (bit_count < need_bits && remaining > 0) { + bit_buffer |= (static_cast(*p) << bit_count); + ++p; + --remaining; + bit_count += 8; + } + } + + inline uint32_t Read(uint8_t width) { + if (width == 0) return 0; + Ensure(width); + uint32_t result; + if (width == 32) { + result = static_cast(bit_buffer & 0xFFFFFFFFull); + } else { + result = static_cast(bit_buffer & ((1ull << width) - 1)); + } + bit_buffer >>= width; + bit_count -= width; + return result; + } + + inline void AlignToByte() { + uint32_t drop = bit_count % 8; + if (drop) { + bit_buffer >>= drop; + bit_count -= drop; + } + } +}; + +struct DeltaBlockHeader { + uint32_t value_per_block; + uint32_t values_per_mini_block; + uint32_t total_count; +}; + +template +class PaxDeltaEncoder : public PaxEncoder { + public: + explicit PaxDeltaEncoder(const EncodingOption &encoder_options); + + virtual void Append(char *data, size_t size) override; + + virtual bool SupportAppendNull() const override; + + virtual void Flush() override; + + virtual size_t GetBoundSize(size_t src_len) const override; + + private: + + void Encode(T *data, size_t size); + + private: + static constexpr uint32_t value_per_block_ = 128; + static constexpr uint32_t mini_blocks_per_block_ = 4; + static constexpr uint32_t values_per_mini_block_ = + value_per_block_ / mini_blocks_per_block_; + + private: + bool has_append_ = false; + // Reusable working buffer to avoid per-block allocations during encoding + std::vector deltas_scratch_; +}; + +template +class PaxDeltaDecoder : public PaxDecoder { + public: + explicit PaxDeltaDecoder(const PaxDecoder::DecodingOption &encoder_options); + + virtual PaxDecoder *SetSrcBuffer(char *data, size_t data_len) override; + + virtual PaxDecoder *SetDataBuffer( + std::shared_ptr> result_buffer) override; + + virtual size_t Next(const char *not_null) override; + + virtual size_t Decoding() override; + + virtual size_t Decoding(const char *not_null, size_t not_null_len) override; + + virtual const char *GetBuffer() const override; + + virtual size_t GetBufferSize() const override; + + private: + std::shared_ptr> data_buffer_; + std::shared_ptr> result_buffer_; +}; + +} // namespace pax \ No newline at end of file diff --git a/contrib/pax_storage/src/cpp/storage/columns/pax_dict_encoding.h b/contrib/pax_storage/src/cpp/storage/columns/pax_dict_encoding.h index e552fa7a55a..38f3ba217db 100644 --- a/contrib/pax_storage/src/cpp/storage/columns/pax_dict_encoding.h +++ b/contrib/pax_storage/src/cpp/storage/columns/pax_dict_encoding.h @@ -53,6 +53,10 @@ class PaxDictEncoder final : public PaxEncoder { void Flush() override; + size_t GetBoundSize(size_t src_len) const override { + CBDB_RAISE(cbdb::CException::kExTypeUnImplements); + } + private: size_t AppendInternal(char *data, size_t len); @@ -89,7 +93,8 @@ class PaxDictDecoder final : public PaxDecoder { PaxDecoder *SetSrcBuffer(char *data, size_t data_len) override; - PaxDecoder *SetDataBuffer(std::shared_ptr> result_buffer) override; + PaxDecoder *SetDataBuffer( + std::shared_ptr> result_buffer) override; const char *GetBuffer() const override; @@ -121,8 +126,8 @@ class PaxDictDecoder final : public PaxDecoder { buffer = src_buff->GetBuffer(); - index_buffer = - std::make_shared>((int32 *)buffer, head.indexsz, false, false); + index_buffer = std::make_shared>( + (int32 *)buffer, head.indexsz, false, false); index_buffer->BrushAll(); desc_buffer = std::make_shared>( @@ -130,8 +135,8 @@ class PaxDictDecoder final : public PaxDecoder { false); desc_buffer->BrushAll(); - entry_buffer = std::make_shared>(buffer + head.indexsz, head.dictsz, - false, false); + entry_buffer = std::make_shared>( + buffer + head.indexsz, head.dictsz, false, false); entry_buffer->BrushAll(); return std::make_tuple(index_buffer, entry_buffer, desc_buffer); diff --git a/contrib/pax_storage/src/cpp/storage/columns/pax_encoding.cc b/contrib/pax_storage/src/cpp/storage/columns/pax_encoding.cc index 3a354ceec8d..b11b2b7b6bd 100644 --- a/contrib/pax_storage/src/cpp/storage/columns/pax_encoding.cc +++ b/contrib/pax_storage/src/cpp/storage/columns/pax_encoding.cc @@ -33,6 +33,7 @@ #include "comm/pax_memory.h" #include "storage/columns/pax_dict_encoding.h" #include "storage/columns/pax_rlev2_encoding.h" +#include "storage/columns/pax_delta_encoding.h" namespace pax { @@ -56,8 +57,7 @@ std::shared_ptr PaxEncoder::CreateStreamingEncoder( break; } case ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA: { - // TODO(jiaqizho): support direct delta encoding - // not support yet, then direct return a nullptr(means no encoding) + encoder = std::make_shared>(encoder_options); break; } case ColumnEncoding_Kind::ColumnEncoding_Kind_DEF_ENCODED: { diff --git a/contrib/pax_storage/src/cpp/storage/columns/pax_encoding.h b/contrib/pax_storage/src/cpp/storage/columns/pax_encoding.h index 362e68caa13..465c7bf0600 100644 --- a/contrib/pax_storage/src/cpp/storage/columns/pax_encoding.h +++ b/contrib/pax_storage/src/cpp/storage/columns/pax_encoding.h @@ -75,6 +75,8 @@ class PaxEncoder { virtual size_t GetBufferSize() const; + virtual size_t GetBoundSize(size_t src_len) const = 0; + /** * steaming encoder * diff --git a/contrib/pax_storage/src/cpp/storage/columns/pax_encoding_non_fixed_column.cc b/contrib/pax_storage/src/cpp/storage/columns/pax_encoding_non_fixed_column.cc index 25b6d2f1d6d..90060050236 100644 --- a/contrib/pax_storage/src/cpp/storage/columns/pax_encoding_non_fixed_column.cc +++ b/contrib/pax_storage/src/cpp/storage/columns/pax_encoding_non_fixed_column.cc @@ -59,21 +59,37 @@ void PaxNonFixedEncodingColumn::InitEncoder() { } void PaxNonFixedEncodingColumn::InitOffsetStreamCompressor() { - Assert(encoder_options_.offsets_encode_type != - ColumnEncoding_Kind::ColumnEncoding_Kind_DEF_ENCODED); - offsets_compressor_ = PaxCompressor::CreateBlockCompressor( - encoder_options_.offsets_encode_type); + Assert(encoder_options_.offsets_encode_type == + ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA); + SetOffsetsEncodeType(encoder_options_.offsets_encode_type); SetOffsetsCompressLevel(encoder_options_.offsets_compress_level); + + PaxEncoder::EncodingOption opt = encoder_options_; + opt.column_encode_type = + ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA; + opt.is_sign = false; + // offsets are fixed-width, do not enable non_fixed streaming restriction + offsets_encoder_ = PaxEncoder::CreateStreamingEncoder(opt, false); } void PaxNonFixedEncodingColumn::InitOffsetStreamDecompressor() { Assert(decoder_options_.offsets_encode_type != ColumnEncoding_Kind::ColumnEncoding_Kind_DEF_ENCODED); - offsets_compressor_ = PaxCompressor::CreateBlockCompressor( - decoder_options_.offsets_encode_type); SetOffsetsEncodeType(decoder_options_.offsets_encode_type); SetOffsetsCompressLevel(decoder_options_.offsets_compress_level); + + if (decoder_options_.offsets_encode_type == + ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA) { + PaxDecoder::DecodingOption temp_opt = decoder_options_; + temp_opt.column_encode_type = + ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA; + temp_opt.is_sign = false; + offsets_decoder_ = PaxDecoder::CreateDecoder(temp_opt); + } else { + offsets_compressor_ = PaxCompressor::CreateBlockCompressor( + decoder_options_.offsets_encode_type); + } } void PaxNonFixedEncodingColumn::InitDecoder() { @@ -169,9 +185,13 @@ void PaxNonFixedEncodingColumn::Set(std::shared_ptr> data, auto offsets_decompress = [&]() { Assert(!compress_route_); - Assert(offsets_compressor_); + Assert(offsets_compressor_ || offsets_decoder_); + + if (offsets->Used() == 0) { + return; + } - if (offsets->Used() != 0) { + if (offsets_compressor_) { auto d_size = offsets_compressor_->Decompress( PaxNonFixedColumn::offsets_->Start(), PaxNonFixedColumn::offsets_->Capacity(), offsets->Start(), @@ -182,22 +202,36 @@ void PaxNonFixedEncodingColumn::Set(std::shared_ptr> data, fmt("Decompress failed, %s", compressor_->ErrorName(d_size))); } PaxNonFixedColumn::offsets_->Brush(d_size); + return; + } + + if (offsets_decoder_) { + // Decode offsets using encoder for int32 stream + shared_offsets_data_ = std::make_shared>( + PaxNonFixedColumn::offsets_->Start(), + PaxNonFixedColumn::offsets_->Capacity(), false, false); + offsets_decoder_->SetDataBuffer(shared_offsets_data_); + offsets_decoder_->SetSrcBuffer(offsets->Start(), offsets->Used()); + offsets_decoder_->Decoding(); + PaxNonFixedColumn::offsets_->Brush(shared_offsets_data_->Used()); + return; } }; exist_decoder = compressor_ || decoder_; + bool has_offsets_processor = offsets_compressor_ || offsets_decoder_; - if (exist_decoder && offsets_compressor_) { + if (exist_decoder && has_offsets_processor) { data_decompress(); offsets_decompress(); PaxNonFixedColumn::estimated_size_ = total_size; PaxNonFixedColumn::next_offsets_ = -1; - } else if (exist_decoder && !offsets_compressor_) { + } else if (exist_decoder && !has_offsets_processor) { data_decompress(); PaxNonFixedColumn::offsets_ = offsets; PaxNonFixedColumn::estimated_size_ = total_size; PaxNonFixedColumn::next_offsets_ = -1; - } else if (!exist_decoder && offsets_compressor_) { + } else if (!exist_decoder && has_offsets_processor) { PaxNonFixedColumn::data_ = data; offsets_decompress(); PaxNonFixedColumn::estimated_size_ = total_size; @@ -278,17 +312,17 @@ std::pair PaxNonFixedEncodingColumn::GetOffsetBuffer( AppendLastOffset(); } - if (offsets_compressor_ && compress_route_) { - if (shared_offsets_data_) { - return std::make_pair(shared_offsets_data_->Start(), - shared_offsets_data_->Used()); - } + if (shared_offsets_data_) { + return std::make_pair(shared_offsets_data_->Start(), + shared_offsets_data_->Used()); + } - if (PaxNonFixedColumn::offsets_->Used() == 0) { - // should never append last offset again - return PaxNonFixedColumn::GetOffsetBuffer(false); - } + if (PaxNonFixedColumn::offsets_->Used() == 0) { + // should never append last offset again + return PaxNonFixedColumn::GetOffsetBuffer(false); + } + if (offsets_compressor_ && compress_route_) { size_t bound_size = offsets_compressor_->GetCompressBound( PaxNonFixedColumn::offsets_->Used()); shared_offsets_data_ = std::make_shared>(bound_size); @@ -308,6 +342,20 @@ std::pair PaxNonFixedEncodingColumn::GetOffsetBuffer( shared_offsets_data_->Used()); } + if (offsets_encoder_ && compress_route_) { + // For delta encoder, allocate a buffer sized by raw bytes for safety + size_t bound_size = offsets_encoder_->GetBoundSize(offsets_->Used()); + shared_offsets_data_ = std::make_shared>(bound_size); + offsets_encoder_->SetDataBuffer(shared_offsets_data_); + + // Encode entire offsets buffer as a single stream + offsets_encoder_->Append(offsets_->Start(), offsets_->Used()); + offsets_encoder_->Flush(); + + return std::make_pair(shared_offsets_data_->Start(), + shared_offsets_data_->Used()); + } + // no compress or uncompressed // should never append last offset again return PaxNonFixedColumn::GetOffsetBuffer(false); diff --git a/contrib/pax_storage/src/cpp/storage/columns/pax_encoding_non_fixed_column.h b/contrib/pax_storage/src/cpp/storage/columns/pax_encoding_non_fixed_column.h index b4e956cfe4a..06b60d02ac2 100644 --- a/contrib/pax_storage/src/cpp/storage/columns/pax_encoding_non_fixed_column.h +++ b/contrib/pax_storage/src/cpp/storage/columns/pax_encoding_non_fixed_column.h @@ -83,6 +83,9 @@ class PaxNonFixedEncodingColumn : public PaxNonFixedColumn { std::shared_ptr> shared_data_; std::shared_ptr offsets_compressor_; + // Optional encoder/decoder for offsets stream (alternative to compression) + std::shared_ptr offsets_encoder_; + std::shared_ptr offsets_decoder_; std::shared_ptr> shared_offsets_data_; }; diff --git a/contrib/pax_storage/src/cpp/storage/columns/pax_encoding_test.cc b/contrib/pax_storage/src/cpp/storage/columns/pax_encoding_test.cc index 5fa7fb7153c..b3a7ec59458 100644 --- a/contrib/pax_storage/src/cpp/storage/columns/pax_encoding_test.cc +++ b/contrib/pax_storage/src/cpp/storage/columns/pax_encoding_test.cc @@ -1361,4 +1361,96 @@ TEST_F(PaxEncodingTest, TestEncodingWithAllNULL) { ASSERT_EQ(n_read, shared_dst_data->Used()); } +TEST_F(PaxEncodingTest, TestPaxDeltaEncodingBasic) { + std::vector data_vec{100, 101, 102, 105, 106, 110, 120, 121}; + auto shared_data = std::make_shared>(1024); + auto shared_dst_data = std::make_shared>(1024); + + PaxEncoder::EncodingOption encoder_options; + encoder_options.column_encode_type = + ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA; + encoder_options.is_sign = false; + auto encoder = PaxEncoder::CreateStreamingEncoder(encoder_options); + + ASSERT_TRUE(encoder); + encoder->SetDataBuffer(shared_data); + encoder->Append(reinterpret_cast(data_vec.data()), data_vec.size() * sizeof(uint32_t)); + encoder->Flush(); + + ASSERT_NE(encoder->GetBuffer(), nullptr); + ASSERT_GT(encoder->GetBufferSize(), 0UL); + + PaxDecoder::DecodingOption decoder_options; + decoder_options.column_encode_type = + ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA; + decoder_options.is_sign = false; + + auto decoder = PaxDecoder::CreateDecoder(decoder_options); + ASSERT_TRUE(decoder); + decoder->SetSrcBuffer(shared_data->GetBuffer(), shared_data->Used()); + + decoder->SetDataBuffer(shared_dst_data); + decoder->Decoding(); + + ASSERT_EQ(shared_dst_data->Used(), data_vec.size() * sizeof(int32)); + + auto result_dst_data = std::make_shared>( + reinterpret_cast(shared_dst_data->Start()), + shared_dst_data->Used(), false, false); + + for (size_t i = 0; i < data_vec.size(); ++i) { + ASSERT_EQ((*result_dst_data)[i], static_cast(data_vec[i])); + } +} + +TEST_F(PaxEncodingTest, TestPaxDeltaEncodingRoundTripRandom) { + const size_t n = 1000; + std::vector data_vec(n); + std::mt19937 rng(12345); + std::uniform_int_distribution base_dist(0, 100); + std::uniform_int_distribution step_dist(0, 5); + + data_vec[0] = base_dist(rng); + for (size_t i = 1; i < n; ++i) { + data_vec[i] = data_vec[i - 1] + step_dist(rng); + } + + auto shared_data = std::make_shared>(n * sizeof(uint32_t)); + auto shared_dst_data = std::make_shared>(n * sizeof(uint32_t)); + + PaxEncoder::EncodingOption encoder_options; + encoder_options.column_encode_type = + ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA; + encoder_options.is_sign = false; + auto encoder = PaxEncoder::CreateStreamingEncoder(encoder_options); + + ASSERT_TRUE(encoder); + encoder->SetDataBuffer(shared_data); + + encoder->Append(reinterpret_cast(data_vec.data()), data_vec.size() * sizeof(uint32_t)); + encoder->Flush(); + + PaxDecoder::DecodingOption decoder_options; + decoder_options.column_encode_type = + ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA; + decoder_options.is_sign = false; + + auto decoder = PaxDecoder::CreateDecoder(decoder_options); + ASSERT_TRUE(decoder); + decoder->SetSrcBuffer(shared_data->GetBuffer(), shared_data->Used()); + + decoder->SetDataBuffer(shared_dst_data); + decoder->Decoding(); + + ASSERT_EQ(shared_dst_data->Used(), data_vec.size() * sizeof(int32)); + + auto result_dst_data = std::make_shared>( + reinterpret_cast(shared_dst_data->Start()), + shared_dst_data->Used(), false, false); + + for (size_t i = 0; i < data_vec.size(); ++i) { + ASSERT_EQ((*result_dst_data)[i], static_cast(data_vec[i])); + } +} + } // namespace pax::tests diff --git a/contrib/pax_storage/src/cpp/storage/columns/pax_rlev2_encoding.h b/contrib/pax_storage/src/cpp/storage/columns/pax_rlev2_encoding.h index 7d021a1f1cf..f2197258b69 100644 --- a/contrib/pax_storage/src/cpp/storage/columns/pax_rlev2_encoding.h +++ b/contrib/pax_storage/src/cpp/storage/columns/pax_rlev2_encoding.h @@ -49,6 +49,10 @@ class PaxOrcEncoder final : public PaxEncoder { void Flush() override; + size_t GetBoundSize(size_t src_len) const override { + CBDB_RAISE(cbdb::CException::kExTypeUnImplements); + } + private: struct EncoderContext { bool is_sign; diff --git a/contrib/pax_storage/src/cpp/storage/columns/pax_vec_encoding_column.cc b/contrib/pax_storage/src/cpp/storage/columns/pax_vec_encoding_column.cc index aaf514f5926..8f3aafae2c4 100644 --- a/contrib/pax_storage/src/cpp/storage/columns/pax_vec_encoding_column.cc +++ b/contrib/pax_storage/src/cpp/storage/columns/pax_vec_encoding_column.cc @@ -348,7 +348,7 @@ void PaxVecNonFixedEncodingColumn::Set( PaxVecNonFixedColumn::estimated_size_ = total_size; PaxVecNonFixedColumn::next_offsets_ = -1; } else { // (!compressor_ && !offsets_compressor_) - PaxVecNonFixedColumn::Set(data, offsets_, total_size, non_null_rows); + PaxVecNonFixedColumn::Set(data, offsets, total_size, non_null_rows); } } diff --git a/contrib/pax_storage/src/cpp/storage/columns/pax_vec_encoding_column.h b/contrib/pax_storage/src/cpp/storage/columns/pax_vec_encoding_column.h index 4362312a5a9..524ddca261a 100644 --- a/contrib/pax_storage/src/cpp/storage/columns/pax_vec_encoding_column.h +++ b/contrib/pax_storage/src/cpp/storage/columns/pax_vec_encoding_column.h @@ -112,6 +112,9 @@ class PaxVecNonFixedEncodingColumn : public PaxVecNonFixedColumn { std::shared_ptr> shared_data_; std::shared_ptr offsets_compressor_; + // Optional encoder/decoder for offsets stream (alternative to compression) + std::shared_ptr offsets_encoder_; + std::shared_ptr offsets_decoder_; std::shared_ptr> shared_offsets_data_; }; diff --git a/contrib/pax_storage/src/cpp/storage/micro_partition.h b/contrib/pax_storage/src/cpp/storage/micro_partition.h index 77d61462ad4..56d85b46a74 100644 --- a/contrib/pax_storage/src/cpp/storage/micro_partition.h +++ b/contrib/pax_storage/src/cpp/storage/micro_partition.h @@ -58,7 +58,6 @@ class MicroPartitionWriter { RelFileNode node; bool need_wal = false; std::vector> encoding_opts; - std::pair offsets_encoding_opts; std::vector enable_min_max_col_idxs; std::vector enable_bf_col_idxs; diff --git a/contrib/pax_storage/src/cpp/storage/orc/orc_writer.cc b/contrib/pax_storage/src/cpp/storage/orc/orc_writer.cc index 63413a2239d..6c8d49502e5 100644 --- a/contrib/pax_storage/src/cpp/storage/orc/orc_writer.cc +++ b/contrib/pax_storage/src/cpp/storage/orc/orc_writer.cc @@ -104,7 +104,6 @@ static std::unique_ptr BuildColumns( const std::vector &types, const TupleDesc desc, const std::vector> &column_encoding_types, - const std::pair &offsets_encoding_types, const PaxStorageFormat &storage_format) { std::unique_ptr columns; bool is_vec; @@ -125,14 +124,7 @@ static std::unique_ptr BuildColumns( encoding_option.is_sign = true; encoding_option.compress_level = std::get<1>(column_encoding_types[i]); - if (offsets_encoding_types.first == ColumnEncoding_Kind_DEF_ENCODED) { - // default value of offsets_stream is zstd - encoding_option.offsets_encode_type = ColumnEncoding_Kind_COMPRESS_ZSTD; - encoding_option.offsets_compress_level = 5; - } else { - encoding_option.offsets_encode_type = offsets_encoding_types.first; - encoding_option.offsets_compress_level = offsets_encoding_types.second; - } + encoding_option.offsets_encode_type = ColumnEncoding_Kind_DIRECT_DELTA; switch (type) { case (pax::porc::proto::Type_Kind::Type_Kind_STRING): { @@ -241,10 +233,9 @@ OrcWriter::OrcWriter( Assert(writer_options.rel_tuple_desc->natts == static_cast(column_types.size())); - pax_columns_ = BuildColumns(column_types_, writer_options.rel_tuple_desc, - writer_options.encoding_opts, - writer_options.offsets_encoding_opts, - writer_options.storage_format); + pax_columns_ = + BuildColumns(column_types_, writer_options.rel_tuple_desc, + writer_options.encoding_opts, writer_options.storage_format); summary_.rel_oid = writer_options.rel_oid; summary_.block_id = writer_options.block_id; @@ -300,7 +291,6 @@ void OrcWriter::Flush() { new_columns = BuildColumns(column_types_, writer_options_.rel_tuple_desc, writer_options_.encoding_opts, - writer_options_.offsets_encoding_opts, writer_options_.storage_format); for (size_t i = 0; i < column_types_.size(); ++i) { diff --git a/contrib/pax_storage/src/cpp/storage/pax.cc b/contrib/pax_storage/src/cpp/storage/pax.cc index ab10387c76c..c8d29bbb6ce 100644 --- a/contrib/pax_storage/src/cpp/storage/pax.cc +++ b/contrib/pax_storage/src/cpp/storage/pax.cc @@ -200,8 +200,6 @@ std::unique_ptr TableWriter::CreateMicroPartitionWriter( options.file_name = std::move(file_path); options.encoding_opts = GetRelEncodingOptions(); options.storage_format = GetStorageFormat(); - options.offsets_encoding_opts = std::make_pair( - PAX_OFFSETS_DEFAULT_COMPRESSTYPE, PAX_OFFSETS_DEFAULT_COMPRESSLEVEL); options.enable_min_max_col_idxs = GetMinMaxColumnIndexes(); options.enable_bf_col_idxs = GetBloomFilterColumnIndexes(); @@ -261,8 +259,8 @@ void TableWriter::InitOptionsCaches() { } void TableWriter::Open() { - rel_path_ = cbdb::BuildPaxDirectoryPath( - relation_->rd_node, relation_->rd_backend); + rel_path_ = + cbdb::BuildPaxDirectoryPath(relation_->rd_node, relation_->rd_backend); InitOptionsCaches(); @@ -509,8 +507,8 @@ void TableReader::OpenFile() { if (it.GetExistToast()) { // must exist the file in disk - toast_file = file_system_->Open(it.GetFileName() + TOAST_FILE_SUFFIX, - fs::kReadMode); + toast_file = + file_system_->Open(it.GetFileName() + TOAST_FILE_SUFFIX, fs::kReadMode); } reader_ = MicroPartitionFileFactory::CreateMicroPartitionReader( @@ -588,8 +586,7 @@ void TableDeleter::DeleteWithVisibilityMap( std::unique_ptr visi_bitmap; auto catalog_update = pax::PaxCatalogUpdater::Begin(rel_); - auto rel_path = cbdb::BuildPaxDirectoryPath( - rel_->rd_node, rel_->rd_backend); + auto rel_path = cbdb::BuildPaxDirectoryPath(rel_->rd_node, rel_->rd_backend); min_max_col_idxs = cbdb::GetMinMaxColumnIndexes(rel_); stats_updater_projection->SetColumnProjection(min_max_col_idxs, @@ -662,11 +659,10 @@ void TableDeleter::DeleteWithVisibilityMap( // TODO: update stats and visimap all in one catalog update // Update the stats in pax aux table // Notice that: PAX won't update the stats in group - UpdateStatsInAuxTable(catalog_update, micro_partition_metadata, - std::make_shared(visi_bitmap->Raw()), - min_max_col_idxs, - cbdb::GetBloomFilterColumnIndexes(rel_), - stats_updater_projection); + UpdateStatsInAuxTable( + catalog_update, micro_partition_metadata, + std::make_shared(visi_bitmap->Raw()), min_max_col_idxs, + cbdb::GetBloomFilterColumnIndexes(rel_), stats_updater_projection); // write pg_pax_blocks_oid catalog_update.UpdateVisimap(block_id, visimap_file_name); diff --git a/contrib/pax_storage/src/cpp/storage/pax_defined.h b/contrib/pax_storage/src/cpp/storage/pax_defined.h index b4ce1115af8..5315797ea3a 100644 --- a/contrib/pax_storage/src/cpp/storage/pax_defined.h +++ b/contrib/pax_storage/src/cpp/storage/pax_defined.h @@ -39,7 +39,7 @@ namespace pax { #define BITS_TO_BYTES(bits) (((bits) + 7) / 8) #define PAX_OFFSETS_DEFAULT_COMPRESSTYPE \ - ColumnEncoding_Kind::ColumnEncoding_Kind_COMPRESS_ZSTD + ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA #define PAX_OFFSETS_DEFAULT_COMPRESSLEVEL 5 #define COLUMN_STORAGE_FORMAT_IS_VEC(column) \ diff --git a/contrib/pax_storage/src/test/regress/expected/gp_toolkit.out b/contrib/pax_storage/src/test/regress/expected/gp_toolkit.out index 336354081af..745db42283a 100644 --- a/contrib/pax_storage/src/test/regress/expected/gp_toolkit.out +++ b/contrib/pax_storage/src/test/regress/expected/gp_toolkit.out @@ -304,7 +304,7 @@ update pg_statistic set stawidth=2034567890 where starelid = 'wide_width_test':: select btdrelpages, btdexppages from gp_toolkit.gp_bloat_expected_pages where btdrelid='wide_width_test'::regclass; btdrelpages | btdexppages -------------+------------- - 4 | 3104504228 + 1 | 3104504228 (1 row) select * from gp_toolkit.gp_bloat_diag WHERE bdinspname <> 'pg_catalog'; From 03747623699b48747d1dc048efcc172400a461c9 Mon Sep 17 00:00:00 2001 From: GongXun Date: Tue, 2 Sep 2025 01:22:08 +0800 Subject: [PATCH 2/2] performance: remove payload_size from delta encoding Stop recording payload_size to reduce offset disk usage by at least 5%. Add tests for delta encoding --- .../cpp/storage/columns/pax_column_test.cc | 15 +- .../cpp/storage/columns/pax_delta_encoding.cc | 39 +- .../columns/pax_delta_encoding_test.cc | 339 ++++++++++++++++++ 3 files changed, 360 insertions(+), 33 deletions(-) create mode 100644 contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding_test.cc diff --git a/contrib/pax_storage/src/cpp/storage/columns/pax_column_test.cc b/contrib/pax_storage/src/cpp/storage/columns/pax_column_test.cc index dfd346ef615..b26fdff65bf 100644 --- a/contrib/pax_storage/src/cpp/storage/columns/pax_column_test.cc +++ b/contrib/pax_storage/src/cpp/storage/columns/pax_column_test.cc @@ -697,7 +697,6 @@ TEST_P(PaxNonFixedColumnCompressTest, auto number = ::testing::get<0>(GetParam()); auto kind = ::testing::get<1>(GetParam()); auto verify_range = ::testing::get<2>(GetParam()); - auto enable_offsets_encoding = ::testing::get<2>(GetParam()); const size_t number_of_rows = 1024; PaxEncoder::EncodingOption encoding_option; @@ -705,10 +704,9 @@ TEST_P(PaxNonFixedColumnCompressTest, encoding_option.compress_level = 5; encoding_option.is_sign = true; - if (enable_offsets_encoding) { - encoding_option.offsets_encode_type = kind; - encoding_option.offsets_compress_level = 5; - } + encoding_option.offsets_encode_type = + ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA; + encoding_option.offsets_compress_level = 5; non_fixed_column = new PaxNonFixedEncodingColumn( number_of_rows, number_of_rows, std::move(encoding_option)); @@ -744,10 +742,9 @@ TEST_P(PaxNonFixedColumnCompressTest, decoding_option.is_sign = true; decoding_option.compress_level = 5; - if (enable_offsets_encoding) { - decoding_option.offsets_encode_type = kind; - decoding_option.offsets_compress_level = 5; - } + decoding_option.offsets_encode_type = + ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA; + decoding_option.offsets_compress_level = 5; auto non_fixed_column_for_read = new PaxNonFixedEncodingColumn( number_of_rows * number, sizeof(int32) * number_of_rows, diff --git a/contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding.cc b/contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding.cc index 700c8d777c7..3f4b5341c4a 100644 --- a/contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding.cc +++ b/contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding.cc @@ -59,7 +59,7 @@ inline uint8_t NumBitsAllowZero(uint32_t value) { } // Fast bit width calculation (0 -> 0) -inline uint8_t FastNumBits(uint32_t v) { +inline static uint8_t FastNumBits(uint32_t v) { #if defined(__GNUC__) || defined(__clang__) return v == 0 ? 0 : static_cast(32 - __builtin_clz(v)); #else @@ -152,21 +152,19 @@ Overall layout: [Repeated Block until total_count is exhausted] - uint32 min_delta - uint8 bit_widths[ mini_blocks_per_block ] - - uint32 payload_size - - uint8 payload[payload_size] + - uint8 payload[computed from bit_widths] // bit-packed adjusted deltas, mini-block by mini-block - // within a block: bits are written LSB-first, end aligned to byte + // within a block: bits are written MSB-first, end aligned to byte */ template size_t PaxDeltaEncoder::GetBoundSize(size_t src_len) const { size_t value_count = src_len / sizeof(T); size_t block_count = (value_count + value_per_block_ - 1) / value_per_block_; - /* header + first_value + block_count * (min_delta + bit_widths + payload_size - * + payload) */ + /* header + first_value + block_count * (min_delta + bit_widths ) + * + payload was eliminated to value_count*/ return sizeof(DeltaBlockHeader) + sizeof(T) + - block_count * (sizeof(uint32) + mini_blocks_per_block_ + - sizeof(uint32) + sizeof(uint32)); + block_count * (sizeof(uint32) + mini_blocks_per_block_) + value_count; } template @@ -231,8 +229,8 @@ void PaxDeltaEncoder::Encode(T *data, size_t count) { } uint32_t payload_bytes = static_cast((total_bits + 7) / 8); - size_t need_size = payload_bytes + mini_blocks_per_block_ + - sizeof(payload_bytes) + sizeof(min_delta); + size_t need_size = + payload_bytes + mini_blocks_per_block_ + sizeof(min_delta); // Grows the buffer to be at least need_size bytes. To avoid frequent // resizing, the new capacity is calculated as the maximum of (current @@ -249,15 +247,11 @@ void PaxDeltaEncoder::Encode(T *data, size_t count) { sizeof(min_delta)); result_buffer_->Brush(sizeof(min_delta)); - // write bit_widths and payload_size + // write bit_widths result_buffer_->Write(reinterpret_cast(bit_widths), mini_blocks_per_block_); result_buffer_->Brush(mini_blocks_per_block_); - result_buffer_->Write(reinterpret_cast(&payload_bytes), - sizeof(payload_bytes)); - result_buffer_->Brush(sizeof(payload_bytes)); - uint8_t *payload_ptr = reinterpret_cast(result_buffer_->GetAvailableBuffer()); BitWriter64Ptr bw(payload_ptr); @@ -466,16 +460,12 @@ size_t PaxDeltaDecoder::Decoding() { --remaining; } - uint32_t payload_size; - std::memcpy(&payload_size, p, sizeof(payload_size)); - p += sizeof(payload_size); - remaining -= sizeof(payload_size); - uint32_t values_in_block = std::min(values_per_block, total_count - decoded); - // read payload within bounded size - BitReader64Ptr br(p, payload_size); + // read payload: initialize reader with remaining bytes; we'll compute + // consumed + BitReader64Ptr br(p, remaining); for (uint32_t i = 0; i < mini_blocks_per_block_ && decoded < total_count; ++i) { @@ -494,8 +484,9 @@ size_t PaxDeltaDecoder::Decoding() { br.AlignToByte(); - p += payload_size; - remaining -= payload_size; + size_t consumed = br.index; + p += consumed; + remaining -= consumed; } Assert(result_buffer_->Used() == total_count * sizeof(T)); diff --git a/contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding_test.cc b/contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding_test.cc new file mode 100644 index 00000000000..031563381ee --- /dev/null +++ b/contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding_test.cc @@ -0,0 +1,339 @@ +/*------------------------------------------------------------------------- + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * pax_delta_encoding_test.cc + * + * IDENTIFICATION + * contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding_test.cc + * + *------------------------------------------------------------------------- + */ + +#include "storage/columns/pax_delta_encoding.h" + +#include +#include + +#include "comm/gtest_wrappers.h" +#include "pax_gtest_helper.h" + +namespace pax { + +class PaxDeltaEncodingTest : public ::testing::Test { + protected: + void SetUp() override { + // Create encoding options + encoding_options_.column_encode_type = + ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA; + encoding_options_.is_sign = false; + + // Create decoding options + decoding_options_.column_encode_type = + ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA; + decoding_options_.is_sign = false; + } + + void TearDown() override {} + + // Fast bit width calculation (0 -> 0) + inline uint8_t FastNumBits(uint32_t v) { +#if defined(__GNUC__) || defined(__clang__) + return v == 0 ? 0 : static_cast(32 - __builtin_clz(v)); +#else + uint8_t bits = 0; + while (v) { + ++bits; + v >>= 1; + } + return bits; +#endif + } + + // Helper function to encode and decode data + template + std::vector EncodeAndDecode(const std::vector &input) { + // Create encoder + PaxDeltaEncoder encoder(encoding_options_); + + size_t bound_size = encoder.GetBoundSize(input.size() * sizeof(T)); + + encoder.SetDataBuffer(std::make_shared>(bound_size)); + + // Encode data + encoder.Append(reinterpret_cast(const_cast(input.data())), + input.size() * sizeof(T)); + + // Get encoded buffer + const char *encoded_data = encoder.GetBuffer(); + size_t encoded_size = encoder.GetBufferSize(); + + // Create decoder + PaxDeltaDecoder decoder(decoding_options_); + + // Set source buffer + decoder.SetSrcBuffer(const_cast(encoded_data), encoded_size); + + // Create result buffer + auto result_buffer = + std::make_shared>(input.size() * sizeof(T)); + decoder.SetDataBuffer(result_buffer); + + // Decode + size_t decoded_size = decoder.Decoding(); + + // Convert result back to vector + const T *decoded_data = reinterpret_cast(decoder.GetBuffer()); + size_t count = decoded_size / sizeof(T); + + return std::vector(decoded_data, decoded_data + count); + } + + PaxEncoder::EncodingOption encoding_options_; + PaxDecoder::DecodingOption decoding_options_; +}; + +// Test basic functionality +TEST_F(PaxDeltaEncodingTest, BasicEncodeDecode) { + std::vector input = {1, 2, 3, 4, 5}; + auto output = EncodeAndDecode(input); + EXPECT_EQ(input, output); +} + +// Test example from documentation - consecutive sequence +TEST_F(PaxDeltaEncodingTest, ConsecutiveSequence) { + std::vector input = {1, 2, 3, 4, 5}; + auto output = EncodeAndDecode(input); + EXPECT_EQ(input, output); + + // Verify deltas would be [1, 1, 1, 1] with min_delta = 1 + // and adjusted deltas [0, 0, 0, 0] with bit_width = 0 +} + +// Test example from documentation - sequence with variation +TEST_F(PaxDeltaEncodingTest, SequenceWithVariation) { + std::vector input = {7, 5, 3, 1, 2, 3, 4, 5}; + auto output = EncodeAndDecode(input); + EXPECT_EQ(input, output); + + // Verify deltas would be [-2, -2, -2, 1, 1, 1, 1] with min_delta = -2 + // Since we cast to uint32, -2 becomes a large positive number + // adjusted deltas would be [0, 0, 0, 3, 3, 3, 3] with bit_width = 2 +} + +// Test single value +TEST_F(PaxDeltaEncodingTest, SingleValue) { + std::vector input = {42}; + auto output = EncodeAndDecode(input); + EXPECT_EQ(input, output); +} + +// Test two values +TEST_F(PaxDeltaEncodingTest, TwoValues) { + std::vector input = {10, 15}; + auto output = EncodeAndDecode(input); + EXPECT_EQ(input, output); +} + +// Test large values +TEST_F(PaxDeltaEncodingTest, LargeValues) { + std::vector input = {1000000, 1000001, 1000002, 1000003}; + auto output = EncodeAndDecode(input); + EXPECT_EQ(input, output); +} + +// Test values with large deltas +TEST_F(PaxDeltaEncodingTest, LargeDeltas) { + std::vector input = {1, 1000, 2000, 3000}; + auto output = EncodeAndDecode(input); + EXPECT_EQ(input, output); +} + +// Test full block (128 values) +TEST_F(PaxDeltaEncodingTest, FullBlock) { + std::vector input; + for (uint32_t i = 0; i < 128; ++i) { + input.push_back(i); + } + auto output = EncodeAndDecode(input); + EXPECT_EQ(input, output); +} + +// Test multiple blocks +TEST_F(PaxDeltaEncodingTest, MultipleBlocks) { + std::vector input; + for (uint32_t i = 0; i < 250; ++i) { + input.push_back(i); + } + auto output = EncodeAndDecode(input); + EXPECT_EQ(input, output); +} + +// Test random data +TEST_F(PaxDeltaEncodingTest, RandomData) { + std::mt19937 gen(12345); + std::uniform_int_distribution dis(0, 1000000); + + std::vector input; + for (int i = 0; i < 100; ++i) { + input.push_back(dis(gen)); + } + + auto output = EncodeAndDecode(input); + EXPECT_EQ(input, output); +} + +// Test payload size calculation +TEST_F(PaxDeltaEncodingTest, PayloadSizeCalculation) { + std::vector input = { + 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, + 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 56, 63, 89}; + // Test the specific example: deltas [0,0,0,0,0,0,0,0,...,0,22,6,25] with + // bit_width 0,5,0,0 + + PaxDeltaEncoder encoder(encoding_options_); + size_t bound_size = encoder.GetBoundSize(input.size() * sizeof(uint32_t)); + encoder.SetDataBuffer(std::make_shared>(bound_size)); + encoder.Append(reinterpret_cast(input.data()), + input.size() * sizeof(uint32_t)); + + // Verify the encoded data structure manually + const char *encoded_data = encoder.GetBuffer(); + size_t encoded_size = encoder.GetBufferSize(); + + EXPECT_GT(encoded_size, 0); + + // Parse the encoded data + const uint8_t *p = reinterpret_cast(encoded_data); + + // Read header + DeltaBlockHeader header; + std::memcpy(&header, p, sizeof(header)); + p += sizeof(header); + + EXPECT_EQ(header.value_per_block, 128); + EXPECT_EQ(header.values_per_mini_block, 32); + EXPECT_EQ(header.total_count, input.size()); + + // Read first value + uint32_t first_value; + std::memcpy(&first_value, p, sizeof(first_value)); + p += sizeof(first_value); + EXPECT_EQ(first_value, 1); + + // Read block data + uint32_t min_delta; + std::memcpy(&min_delta, p, sizeof(min_delta)); + p += sizeof(min_delta); + + // Read allbit widths + uint8_t bit_widths[4]; + for (int i = 0; i < 4; ++i) { + bit_widths[i] = *p++; + } + + // bit_widths should be [0, 6, 0, 0] + ASSERT_EQ(bit_widths[0], 0); + ASSERT_EQ(bit_widths[1], 5); + ASSERT_EQ(bit_widths[2], 0); + ASSERT_EQ(bit_widths[3], 0); + + // Compute payload size from bit_widths and counts + uint32_t values_in_block = + input.size() - 1; // we constructed input with 35 deltas in first block + uint64_t total_bits = 0; + for (uint32_t i = 0; i < 4; ++i) { + uint32_t start = i * 32; + if (start >= values_in_block) break; + uint32_t end = std::min(start + 32u, values_in_block); + uint8_t w = bit_widths[i]; + total_bits += static_cast(w) * (end - start); + } + uint32_t payload_size = static_cast((total_bits + 7) / 8); + + // For this example, we expect payload_size = 2 bytes + EXPECT_EQ(payload_size, 2); + + // Assert payload bitmap is correct + uint8_t payload[4]; + std::memcpy(payload, p, 4); + p += 4; + + // payload should be LSB-Last, value is(22,6,25) + // [0b10110, 0b00110, 0b11001] + EXPECT_EQ(payload[0], 0b11010110); + EXPECT_EQ(payload[1], 0b01100100); +} + +// Test bit width calculation helper +TEST_F(PaxDeltaEncodingTest, BitWidthCalculation) { + EXPECT_EQ(FastNumBits(0), 0); + EXPECT_EQ(FastNumBits(1), 1); + EXPECT_EQ(FastNumBits(2), 2); + EXPECT_EQ(FastNumBits(3), 2); + EXPECT_EQ(FastNumBits(4), 3); + EXPECT_EQ(FastNumBits(7), 3); + EXPECT_EQ(FastNumBits(8), 4); + EXPECT_EQ(FastNumBits(15), 4); + EXPECT_EQ(FastNumBits(16), 5); + EXPECT_EQ(FastNumBits(255), 8); + EXPECT_EQ(FastNumBits(256), 9); +} + +// Test zero deltas (all same values) +TEST_F(PaxDeltaEncodingTest, ZeroDeltas) { + std::vector input = {42, 42, 42, 42, 42}; + auto output = EncodeAndDecode(input); + EXPECT_EQ(input, output); +} + +// Test decreasing sequence (negative deltas) +TEST_F(PaxDeltaEncodingTest, DecreasingSequence) { + std::vector input = {100, 90, 80, 70, 60}; + auto output = EncodeAndDecode(input); + EXPECT_EQ(input, output); +} + +// Test mixed pattern +TEST_F(PaxDeltaEncodingTest, MixedPattern) { + std::vector input = {10, 20, 15, 25, 5, 30, 1, 35}; + auto output = EncodeAndDecode(input); + EXPECT_EQ(input, output); +} + +// Test empty input (edge case) +TEST_F(PaxDeltaEncodingTest, EmptyInput) { + std::vector input = {}; + // This should handle gracefully or throw expected exception + // For now, let's skip this test until we clarify expected behavior +} + +// Test different data types +TEST_F(PaxDeltaEncodingTest, DifferentTypes) { + // Test int32_t (with non-negative values) + std::vector input32 = {1, 2, 3, 4, 5}; + auto output32 = EncodeAndDecode(input32); + EXPECT_EQ(input32, output32); +} + +} // namespace pax + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}