-
Notifications
You must be signed in to change notification settings - Fork 4k
GH-45750: [C++][Python][Parquet] Implement Content-Defined Chunking for the Parquet writer #45360
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
256b83f
e699e9a
ba53621
bfa5cbd
61617b4
cc985bf
1fedb89
7c4d716
ffcea22
9f3896e
32ad613
2886e17
ee6a715
616e76d
002a37d
1eb6f4c
c9b42b4
50ce77c
a20ebbf
47aa8b0
86e348f
960883a
3a92662
9208bd3
1237216
5d187d5
8b8722d
6d63050
614f5df
a2c15b0
dd21d23
5154d01
4cb991f
0b868ca
02143fc
34dbf5b
9792acb
7485762
9c3ea99
c439e59
3a31d93
b3b2b3e
1dd53e9
e39c243
5a9dd37
1908918
40b175c
a9635b0
983ade9
cc88a79
2e38fc0
4558a6c
119393a
d7f3666
1b67e6b
53282cc
7613929
4e7dc0b
804b00d
9735c4c
9e2434a
f4a2869
1d9cbc3
433d263
496e2e5
c99e7cf
629d7c4
4393e91
4d61fbe
5604ab6
724d9b3
b2fc28b
8d6c8ec
899823b
2b74c37
b9ef818
d49327e
7e04246
3ddd529
e6ecef2
c6444c0
52b7a40
d13c89f
7aec1cd
e2229ce
6fe0223
feee7e7
2032b3a
61731c6
4966f9c
ae5c929
0aa90c4
cd27277
5a78e86
4721f00
9b4522d
8f56430
893465a
768743c
cb5e16c
ab3f86e
1cc2e4b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,144 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| #pragma once | ||
|
|
||
| #include <cstdint> | ||
| #include <vector> | ||
kszucs marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| #include "arrow/array.h" | ||
| #include "parquet/level_conversion.h" | ||
|
|
||
| namespace parquet::internal { | ||
|
|
||
| // Represents a chunk of data with level offsets and value offsets due to the | ||
| // record shredding for nested data. | ||
| struct Chunk { | ||
| // The start offset of this chunk inside the given levels | ||
| int64_t level_offset; | ||
| // The start offset of this chunk inside the given values array | ||
| int64_t value_offset; | ||
| // The length of the chunk in levels | ||
| int64_t levels_to_write; | ||
| }; | ||
|
|
||
| /// CDC (Content-Defined Chunking) is a technique that divides data into variable-sized | ||
| /// chunks based on the content of the data itself, rather than using fixed-size | ||
| /// boundaries. | ||
| /// | ||
| /// For example, given this sequence of values in a column: | ||
| /// | ||
| /// File1: [1,2,3, 4,5,6, 7,8,9] | ||
| /// chunk1 chunk2 chunk3 | ||
| /// | ||
| /// Assume there is an inserted value between 3 and 4: | ||
| /// | ||
| /// File2: [1,2,3,0, 4,5,6, 7,8,9] | ||
| /// new-chunk chunk2 chunk3 | ||
| /// | ||
| /// The chunking process will adjust to maintain stable boundaries across data | ||
| /// modifications. Each chunk defines a new parquet data page which is contiguously | ||
| /// written out to the file. Since each page compressed independently, the files' contents | ||
| /// would look like the following with unique page identifiers: | ||
| /// | ||
| /// File1: [Page1][Page2][Page3]... | ||
| /// File2: [Page4][Page2][Page3]... | ||
|
||
| /// | ||
| /// Then the parquet file is being uploaded to a content addressable storage (CAS) system | ||
| /// which splits the bytes stream into content defined blobs. The CAS system will | ||
| /// calculate a unique identifier for each blob, then store the blob in a key-value store. | ||
| /// If the same blob is encountered again, the system can refer to the hash instead of | ||
| /// physically storing the blob again. In the example above, the CAS system would store | ||
| /// Page1, Page2, Page3, and Page4 only once and the required metadata to reassemble the | ||
| /// files. | ||
| /// While the deduplication is performed by the CAS system, the parquet chunker makes it | ||
| /// possible to efficiently deduplicate the data by consistently dividing the data into | ||
| /// chunks. | ||
| /// | ||
| /// Implementation details: | ||
| /// | ||
| /// Only the parquet writer must be aware of the content defined chunking, the reader | ||
| /// doesn't need to know about it. Each parquet column writer holds a | ||
| /// ContentDefinedChunker instance depending on the writer's properties. The chunker's | ||
| /// state is maintained across the entire column without being reset between pages and row | ||
| /// groups. | ||
| /// | ||
| /// The chunker receives the record shredded column data (def_levels, rep_levels, values) | ||
| /// and goes over the (def_level, rep_level, value) triplets one by one while adjusting | ||
| /// the column-global rolling hash based on the triplet. Whenever the rolling hash matches | ||
| /// a predefined mask, the chunker creates a new chunk. The chunker returns a vector of | ||
| /// Chunk objects that represent the boundaries of the chunks. | ||
| /// Note that the boundaries are deterministically calculated exclusively based on the | ||
| /// data itself, so the same data will always produce the same chunks - given the same | ||
| /// chunker configuration. | ||
| /// | ||
| /// References: | ||
| /// - FastCDC: a Fast and Efficient Content-Defined Chunking Approach for Data | ||
| /// Deduplication | ||
| /// https://www.usenix.org/system/files/conference/atc16/atc16-paper-xia.pdf | ||
| /// - Git is for Data (chunk size normalization used here is described in section 6.2.1): | ||
| /// https://www.cidrdb.org/cidr2023/papers/p43-low.pdf | ||
| class PARQUET_EXPORT ContentDefinedChunker { | ||
| public: | ||
| /// Create a new ContentDefinedChunker instance | ||
| /// | ||
| /// @param level_info Information about definition and repetition levels | ||
| /// @param min_chunk_size Minimum chunk size in bytes | ||
| /// The rolling hash will not be updated until this size is reached for each chunk. | ||
| /// Note that all data sent through the hash function is counted towards the chunk | ||
| /// size, including definition and repetition levels if present. | ||
| /// @param max_chunk_size Maximum chunk size in bytes | ||
| /// The chunker creates a new chunk whenever the chunk size exceeds this value. The | ||
| /// chunk size distribution approximates a normal distribution between min_chunk_size | ||
| /// and max_chunk_size. Note that the parquet writer has a related `data_pagesize` | ||
| // property that controls the maximum size of a parquet data page after encoding. | ||
| /// While setting `data_pagesize` to a smaller value than `max_chunk_size` doesn't | ||
| /// affect the chunking effectiveness, it results in more small parquet data pages. | ||
| /// @param norm_level Normalization level to center the chunk size around the average | ||
| /// size more aggressively, default 0. | ||
| /// Increasing the normalization level increases the probability of finding a chunk | ||
| /// boundary, improving the deduplication ratio, but also increases the number of | ||
| /// small chunks resulting in many small parquet data pages. The default value | ||
| /// provides a good balance between deduplication ratio and fragmentation. | ||
| /// Use norm_level=1 or norm_level=2 to reach a higher deduplication ratio at the | ||
| /// expense of fragmentation. | ||
| ContentDefinedChunker(const LevelInfo& level_info, int64_t min_chunk_size, | ||
| int64_t max_chunk_size, int norm_level = 0); | ||
| ~ContentDefinedChunker(); | ||
|
|
||
| /// Get the chunk boundaries for the given column data | ||
kszucs marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| /// | ||
| /// @param def_levels Definition levels | ||
| /// @param rep_levels Repetition levels | ||
| /// @param num_levels Number of levels | ||
| /// @param values Column values as an Arrow array | ||
| /// @return Vector of Chunk objects representing the chunk boundaries | ||
| std::vector<Chunk> GetChunks(const int16_t* def_levels, const int16_t* rep_levels, | ||
| int64_t num_levels, const ::arrow::Array& values); | ||
|
|
||
| private: | ||
| /// @brief Get the rolling hash mask used to determine chunk boundaries, used for | ||
| /// testing the mask calculation. | ||
| uint64_t GetRollingHashMask() const; | ||
|
|
||
| class Impl; | ||
| std::unique_ptr<Impl> impl_; | ||
|
|
||
| friend class TestCDC; | ||
| }; | ||
|
|
||
| } // namespace parquet::internal | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,125 @@ | ||
| #!/usr/bin/env python | ||
|
|
||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, | ||
| # software distributed under the License is distributed on an | ||
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
|
|
||
| """ | ||
| Produce the given number gearhash tables for rolling hash calculations. | ||
|
|
||
| Each table consists of 256 64-bit integer values and by default 8 tables are | ||
| produced. The tables are written to a header file that can be included in the | ||
| C++ code. | ||
|
|
||
| The generated numbers are deterministic "random" numbers created by MD5 hashing | ||
| a fixed seed and the table index. This ensures that the tables are the same | ||
| across different runs and platforms. The function of generating the numbers is | ||
| less important as long as they have sufficiently uniform distribution. | ||
|
|
||
| Reference implementations: | ||
| - https://github.com/Borelset/destor/blob/master/src/chunking/fascdc_chunking.c | ||
| - https://github.com/nlfiedler/fastcdc-rs/blob/master/examples/table64.rs | ||
|
|
||
| Usage: | ||
| python chunker_internal_codegen.py [ntables] | ||
|
|
||
| ntables: Number of gearhash tables to generate (default 8), the | ||
| the C++ implementation expects 8 tables so this should not be | ||
| changed unless the C++ code is also updated. | ||
|
|
||
| The generated header file is written to ./chunker_internal_generated.h | ||
| """ | ||
|
|
||
| import hashlib | ||
| import pathlib | ||
| import sys | ||
| from io import StringIO | ||
|
|
||
|
|
||
| template = """\ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| #pragma once | ||
|
|
||
| #include <cstdint> | ||
|
|
||
| namespace parquet::internal {{ | ||
|
|
||
| constexpr int64_t kNumGearhashTables = {ntables}; | ||
|
|
||
| constexpr uint64_t kGearhashTable[{ntables}][256] = {{ | ||
| {content}}}; | ||
|
|
||
| }} // namespace parquet::internal | ||
| """ | ||
|
|
||
|
|
||
| def generate_hash(n: int, seed: int): | ||
| """Produce predictable hash values for a given seed and n using MD5. | ||
|
|
||
| The value can be arbitrary as long as it is deterministic and has a uniform | ||
| distribution. The MD5 hash is used to produce a 16 character hexadecimal | ||
| string which is then converted to a 64-bit integer. | ||
| """ | ||
| value = bytes([seed] * 64 + [n] * 64) | ||
kszucs marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| hasher = hashlib.md5(value) | ||
| return hasher.hexdigest()[:16] | ||
|
|
||
|
|
||
| def generate_hashtable(seed: int, length=256): | ||
| """Generate and render a single gearhash table.""" | ||
| table = [generate_hash(n, seed=seed) for n in range(length)] | ||
|
|
||
| out = StringIO() | ||
| out.write(f" {{// seed = {seed}\n") | ||
| for i in range(0, length, 4): | ||
| values = [f"0x{value}" for value in table[i : i + 4]] | ||
| values = ", ".join(values) | ||
| out.write(f" {values}") | ||
| if i < length - 4: | ||
| out.write(",\n") | ||
| out.write("}") | ||
|
|
||
| return out.getvalue() | ||
|
|
||
|
|
||
| def generate_header(ntables=8, relative_path="chunker_internal_generated.h"): | ||
| """Generate a header file with multiple gearhash tables.""" | ||
| path = pathlib.Path(__file__).parent / relative_path | ||
| tables = [generate_hashtable(seed) for seed in range(ntables)] | ||
| content = ",\n".join(tables) | ||
| text = template.format(ntables=ntables, content=content) | ||
| path.write_text(text) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| ntables = int(sys.argv[1]) if len(sys.argv) > 1 else 8 | ||
| generate_header(ntables) | ||
Uh oh!
There was an error while loading. Please reload this page.