diff --git a/CMakeLists.txt b/CMakeLists.txt index 698f6d76..44e4eb75 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -537,6 +537,15 @@ else() endif() endif() +############################################################# +# OpenSSL linkage +find_package(OpenSSL REQUIRED) +message(STATUS "OPENSSL_FOUND ${OPENSSL_FOUND}") +if(OPENSSL_FOUND) + set(OPENSSL_USE_STATIC_LIBS TRUE) + include_directories(${OPENSSL_INCLUDE_DIR}) +endif() + ############################################################# # Apache Arrow linkage @@ -691,6 +700,8 @@ set(LIBPARQUET_SRCS src/parquet/types.cc src/parquet/util/comparison.cc src/parquet/util/memory.cc + src/parquet/util/crypto.cc + src/parquet/encryption.cc ) # # Ensure that thrift compilation is done before using its generated headers @@ -712,6 +723,10 @@ if (NOT PARQUET_MINIMAL_DEPENDENCY) # Although we don't link parquet_objlib against anything, we need it to depend # on these libs as we may generate their headers via ExternalProject_Add set(PARQUET_DEPENDENCIES ${PARQUET_DEPENDENCIES} ${LIBPARQUET_INTERFACE_LINK_LIBS}) + set(LIBPARQUET_INTERFACE_LINK_LIBS + ${OPENSSL_CRYPTO_LIBRARY} + ${LIBPARQUET_INTERFACE_LINK_LIBS} + ) endif() if(NOT APPLE AND NOT MSVC) diff --git a/examples/low-level-api/CMakeLists.txt b/examples/low-level-api/CMakeLists.txt index 64ba110e..10082ad2 100644 --- a/examples/low-level-api/CMakeLists.txt +++ b/examples/low-level-api/CMakeLists.txt @@ -18,8 +18,12 @@ if (PARQUET_BUILD_EXECUTABLES) add_executable(reader-writer reader-writer.cc) add_executable(reader-writer2 reader-writer2.cc) + add_executable(encryption-reader-writer encryption-reader-writer.cc) target_include_directories(reader-writer PRIVATE .) target_include_directories(reader-writer2 PRIVATE .) + target_include_directories(encryption-reader-writer PRIVATE .) target_link_libraries(reader-writer parquet_static) target_link_libraries(reader-writer2 parquet_static) + + target_link_libraries(encryption-reader-writer parquet_static) endif() diff --git a/examples/low-level-api/encryption-reader-writer.cc b/examples/low-level-api/encryption-reader-writer.cc new file mode 100644 index 00000000..27dd9ab9 --- /dev/null +++ b/examples/low-level-api/encryption-reader-writer.cc @@ -0,0 +1,436 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include + +#include + +/* + * This example describes writing and reading Parquet Files in C++ and serves as a + * reference to the API. + * The file contains all the physical data types supported by Parquet. + * This example uses the RowGroupWriter API that supports writing RowGroups optimized for + *memory consumption + **/ + +/* Parquet is a structured columnar file format + * Parquet File = "Parquet data" + "Parquet Metadata" + * "Parquet data" is simply a vector of RowGroups. Each RowGroup is a batch of rows in a + * columnar layout + * "Parquet Metadata" contains the "file schema" and attributes of the RowGroups and their + * Columns + * "file schema" is a tree where each node is either a primitive type (leaf nodes) or a + * complex (nested) type (internal nodes) + * For specific details, please refer the format here: + * https://github.com/apache/parquet-format/blob/master/LogicalTypes.md + **/ + +constexpr int NUM_ROWS_PER_ROW_GROUP = 500; +const char PARQUET_FILENAME[] = "parquet_cpp_example.parquet.encrypted"; +const std::string encryptionKey = "0123456789012345"; // 16 bytes +const std::string encryptionKeyCol = "1234567890123450"; // 16 bytes + +int main(int argc, char** argv) { + /********************************************************************************** + PARQUET WRITER EXAMPLE + **********************************************************************************/ + // parquet::REQUIRED fields do not need definition and repetition level values + // parquet::OPTIONAL fields require only definition level values + // parquet::REPEATED fields require both definition and repetition level values + try { + // Create a local file output stream instance. + using FileClass = ::arrow::io::FileOutputStream; + std::shared_ptr out_file; + PARQUET_THROW_NOT_OK(FileClass::Open(PARQUET_FILENAME, &out_file)); + + // Setup the parquet schema + std::shared_ptr schema = SetupSchema(); + + // Add writer properties + parquet::WriterProperties::Builder builder; + builder.compression(parquet::Compression::SNAPPY); + + // uniform encryption + parquet::FileEncryptionProperties::Builder file_encryption_builder; + file_encryption_builder.footer_key(encryptionKey); + + // non-uniform with column keys + std::map> encryption_cols; + parquet::ColumnEncryptionProperties::Builder encryption_col_builder0("column_0", true); + encryption_col_builder0.key(encryptionKeyCol); + auto encryption_col0 = encryption_col_builder0.build(); + + encryption_cols[encryption_col0->path()] = encryption_col0; + + file_encryption_builder.column_properties(encryption_cols, true); + + builder.encryption(file_encryption_builder.build()); + + std::shared_ptr props = builder.build(); + + // Create a ParquetFileWriter instance + std::shared_ptr file_writer = + parquet::ParquetFileWriter::Open(out_file, schema, props); + + // Append a RowGroup with a specific number of rows. + parquet::RowGroupWriter* rg_writer = file_writer->AppendRowGroup(); + + // Write the Bool column + parquet::BoolWriter* bool_writer = + static_cast(rg_writer->NextColumn()); + for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { + bool value = ((i % 2) == 0) ? true : false; + bool_writer->WriteBatch(1, nullptr, nullptr, &value); + } + + // Write the Int32 column + parquet::Int32Writer* int32_writer = + static_cast(rg_writer->NextColumn()); + for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { + int32_t value = i; + int32_writer->WriteBatch(1, nullptr, nullptr, &value); + } + + // Write the Int64 column. Each row has repeats twice. + parquet::Int64Writer* int64_writer = + static_cast(rg_writer->NextColumn()); + for (int i = 0; i < 2 * NUM_ROWS_PER_ROW_GROUP; i++) { + int64_t value = i * 1000 * 1000; + value *= 1000 * 1000; + int16_t definition_level = 1; + int16_t repetition_level = 0; + if ((i % 2) == 0) { + repetition_level = 1; // start of a new record + } + int64_writer->WriteBatch(1, &definition_level, &repetition_level, &value); + } + + // Write the INT96 column. + parquet::Int96Writer* int96_writer = + static_cast(rg_writer->NextColumn()); + for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { + parquet::Int96 value; + value.value[0] = i; + value.value[1] = i + 1; + value.value[2] = i + 2; + int96_writer->WriteBatch(1, nullptr, nullptr, &value); + } + + // Write the Float column + parquet::FloatWriter* float_writer = + static_cast(rg_writer->NextColumn()); + for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { + float value = static_cast(i) * 1.1f; + float_writer->WriteBatch(1, nullptr, nullptr, &value); + } + + // Write the Double column + parquet::DoubleWriter* double_writer = + static_cast(rg_writer->NextColumn()); + for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { + double value = i * 1.1111111; + double_writer->WriteBatch(1, nullptr, nullptr, &value); + } + + // Write the ByteArray column. Make every alternate values NULL + parquet::ByteArrayWriter* ba_writer = + static_cast(rg_writer->NextColumn()); + for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { + parquet::ByteArray value; + char hello[FIXED_LENGTH] = "parquet"; + hello[7] = static_cast(static_cast('0') + i / 100); + hello[8] = static_cast(static_cast('0') + (i / 10) % 10); + hello[9] = static_cast(static_cast('0') + i % 10); + if (i % 2 == 0) { + int16_t definition_level = 1; + value.ptr = reinterpret_cast(&hello[0]); + value.len = FIXED_LENGTH; + ba_writer->WriteBatch(1, &definition_level, nullptr, &value); + } else { + int16_t definition_level = 0; + ba_writer->WriteBatch(1, &definition_level, nullptr, nullptr); + } + } + + // Write the FixedLengthByteArray column + parquet::FixedLenByteArrayWriter* flba_writer = + static_cast(rg_writer->NextColumn()); + for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) { + parquet::FixedLenByteArray value; + char v = static_cast(i); + char flba[FIXED_LENGTH] = {v, v, v, v, v, v, v, v, v, v}; + value.ptr = reinterpret_cast(&flba[0]); + + flba_writer->WriteBatch(1, nullptr, nullptr, &value); + } + + // Close the ParquetFileWriter + file_writer->Close(); + + // Write the bytes to file + DCHECK(out_file->Close().ok()); + } catch (const std::exception& e) { + std::cerr << "Parquet write error: " << e.what() << std::endl; + return -1; + } + + /********************************************************************************** + PARQUET READER EXAMPLE + **********************************************************************************/ + + try { + // decryption properties + std::shared_ptr decryption_properties = + std::make_shared(encryptionKey); + decryption_properties->SetColumnKey("column_0", encryptionKeyCol); + + parquet::ReaderProperties reader_properties = parquet::default_reader_properties(); + reader_properties.file_decryption(decryption_properties); + + // Create a ParquetReader instance + std::unique_ptr parquet_reader = + parquet::ParquetFileReader::OpenFile(PARQUET_FILENAME, false, reader_properties); + + // Get the File MetaData + std::shared_ptr file_metadata = parquet_reader->metadata(); + + // Get the number of RowGroups + int num_row_groups = file_metadata->num_row_groups(); + assert(num_row_groups == 1); + + // Get the number of Columns + int num_columns = file_metadata->num_columns(); + assert(num_columns == 8); + + // Iterate over all the RowGroups in the file + for (int r = 0; r < num_row_groups; ++r) { + // Get the RowGroup Reader + std::shared_ptr row_group_reader = + parquet_reader->RowGroup(r); + + int64_t values_read = 0; + int64_t rows_read = 0; + int16_t definition_level; + int16_t repetition_level; + int i; + std::shared_ptr column_reader; + + // Get the Column Reader for the boolean column + column_reader = row_group_reader->Column(0); + parquet::BoolReader* bool_reader = + static_cast(column_reader.get()); + + // Read all the rows in the column + i = 0; + while (bool_reader->HasNext()) { + bool value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = bool_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + bool expected_value = ((i % 2) == 0) ? true : false; + assert(value == expected_value); + i++; + } + + // Get the Column Reader for the Int32 column + column_reader = row_group_reader->Column(1); + parquet::Int32Reader* int32_reader = + static_cast(column_reader.get()); + // Read all the rows in the column + i = 0; + while (int32_reader->HasNext()) { + int32_t value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = int32_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + assert(value == i); + i++; + } + + // Get the Column Reader for the Int64 column + column_reader = row_group_reader->Column(2); + parquet::Int64Reader* int64_reader = + static_cast(column_reader.get()); + // Read all the rows in the column + i = 0; + while (int64_reader->HasNext()) { + int64_t value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = int64_reader->ReadBatch(1, &definition_level, &repetition_level, + &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + int64_t expected_value = i * 1000 * 1000; + expected_value *= 1000 * 1000; + assert(value == expected_value); + if ((i % 2) == 0) { + assert(repetition_level == 1); + } else { + assert(repetition_level == 0); + } + i++; + } + + // Get the Column Reader for the Int96 column + column_reader = row_group_reader->Column(3); + parquet::Int96Reader* int96_reader = + static_cast(column_reader.get()); + // Read all the rows in the column + i = 0; + while (int96_reader->HasNext()) { + parquet::Int96 value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = int96_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + parquet::Int96 expected_value; + expected_value.value[0] = i; + expected_value.value[1] = i + 1; + expected_value.value[2] = i + 2; + for (int j = 0; j < 3; j++) { + assert(value.value[j] == expected_value.value[j]); + } + i++; + } + + // Get the Column Reader for the Float column + column_reader = row_group_reader->Column(4); + parquet::FloatReader* float_reader = + static_cast(column_reader.get()); + // Read all the rows in the column + i = 0; + while (float_reader->HasNext()) { + float value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = float_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + float expected_value = static_cast(i) * 1.1f; + assert(value == expected_value); + i++; + } + + // Get the Column Reader for the Double column + column_reader = row_group_reader->Column(5); + parquet::DoubleReader* double_reader = + static_cast(column_reader.get()); + // Read all the rows in the column + i = 0; + while (double_reader->HasNext()) { + double value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = double_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + double expected_value = i * 1.1111111; + assert(value == expected_value); + i++; + } + + // Get the Column Reader for the ByteArray column + column_reader = row_group_reader->Column(6); + parquet::ByteArrayReader* ba_reader = + static_cast(column_reader.get()); + // Read all the rows in the column + i = 0; + while (ba_reader->HasNext()) { + parquet::ByteArray value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = + ba_reader->ReadBatch(1, &definition_level, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // Verify the value written + char expected_value[FIXED_LENGTH] = "parquet"; + expected_value[7] = static_cast('0' + i / 100); + expected_value[8] = static_cast('0' + (i / 10) % 10); + expected_value[9] = static_cast('0' + i % 10); + if (i % 2 == 0) { // only alternate values exist + // There are no NULL values in the rows written + assert(values_read == 1); + assert(value.len == FIXED_LENGTH); + assert(memcmp(value.ptr, &expected_value[0], FIXED_LENGTH) == 0); + assert(definition_level == 1); + } else { + // There are NULL values in the rows written + assert(values_read == 0); + assert(definition_level == 0); + } + i++; + } + + // Get the Column Reader for the FixedLengthByteArray column + column_reader = row_group_reader->Column(7); + parquet::FixedLenByteArrayReader* flba_reader = + static_cast(column_reader.get()); + // Read all the rows in the column + i = 0; + while (flba_reader->HasNext()) { + parquet::FixedLenByteArray value; + // Read one value at a time. The number of rows read is returned. values_read + // contains the number of non-null rows + rows_read = flba_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read); + // Ensure only one value is read + assert(rows_read == 1); + // There are no NULL values in the rows written + assert(values_read == 1); + // Verify the value written + char v = static_cast(i); + char expected_value[FIXED_LENGTH] = {v, v, v, v, v, v, v, v, v, v}; + assert(memcmp(value.ptr, &expected_value[0], FIXED_LENGTH) == 0); + i++; + } + } + } catch (const std::exception& e) { + std::cerr << "Parquet read error: " << e.what() << std::endl; + return -1; + } + + std::cout << "Parquet Writing and Reading Complete" << std::endl; + + return 0; +} diff --git a/src/parquet/column_reader.cc b/src/parquet/column_reader.cc index 28d0dcb6..f1da9515 100644 --- a/src/parquet/column_reader.cc +++ b/src/parquet/column_reader.cc @@ -33,6 +33,8 @@ #include "parquet/properties.h" #include "parquet/thrift.h" +#include "parquet/util/crypto.h" + using arrow::MemoryPool; namespace parquet { @@ -102,11 +104,15 @@ ReaderProperties default_reader_properties() { class SerializedPageReader : public PageReader { public: SerializedPageReader(std::unique_ptr stream, int64_t total_num_rows, - Compression::type codec, ::arrow::MemoryPool* pool) + Compression::type codec, + std::shared_ptr encryption, + ::arrow::MemoryPool* pool) : stream_(std::move(stream)), decompression_buffer_(AllocateBuffer(pool, 0)), seen_num_rows_(0), - total_num_rows_(total_num_rows) { + total_num_rows_(total_num_rows), + encryption_(encryption), + decryption_buffer_(AllocateBuffer(pool, 0)) { max_page_header_size_ = kDefaultMaxPageHeaderSize; decompressor_ = GetCodecFromArrow(codec); } @@ -134,6 +140,10 @@ class SerializedPageReader : public PageReader { // Number of rows in all the data pages int64_t total_num_rows_; + + // Encryption + std::shared_ptr encryption_; + std::shared_ptr decryption_buffer_; }; std::shared_ptr SerializedPageReader::NextPage() { @@ -158,7 +168,8 @@ std::shared_ptr SerializedPageReader::NextPage() { // This gets used, then set by DeserializeThriftMsg header_size = static_cast(bytes_available); try { - DeserializeThriftMsg(buffer, &header_size, ¤t_page_header_); + DeserializeThriftMsg(buffer, &header_size, ¤t_page_header_, + encryption_.get()); break; } catch (std::exception& e) { // Failed to deserialize. Double the allowed page header size and try again @@ -186,6 +197,15 @@ std::shared_ptr SerializedPageReader::NextPage() { ParquetException::EofException(ss.str()); } + // Decrypt it if we need to + if (encryption_ != nullptr) { + decryption_buffer_->Resize(encryption_->CalculatePlainSize(compressed_len), false); + compressed_len = parquet_encryption::Decrypt( + encryption_, false, buffer, compressed_len, decryption_buffer_->mutable_data()); + + buffer = decryption_buffer_->data(); + } + // Uncompress it if we need to if (decompressor_ != nullptr) { // Grow the uncompressed buffer if we need to. @@ -254,12 +274,11 @@ std::shared_ptr SerializedPageReader::NextPage() { return std::shared_ptr(nullptr); } -std::unique_ptr PageReader::Open(std::unique_ptr stream, - int64_t total_num_rows, - Compression::type codec, - ::arrow::MemoryPool* pool) { - return std::unique_ptr( - new SerializedPageReader(std::move(stream), total_num_rows, codec, pool)); +std::unique_ptr PageReader::Open( + std::unique_ptr stream, int64_t total_num_rows, Compression::type codec, + std::shared_ptr encryption, ::arrow::MemoryPool* pool) { + return std::unique_ptr(new SerializedPageReader( + std::move(stream), total_num_rows, codec, encryption, pool)); } // ---------------------------------------------------------------------- diff --git a/src/parquet/column_reader.h b/src/parquet/column_reader.h index 71346320..089e4214 100644 --- a/src/parquet/column_reader.h +++ b/src/parquet/column_reader.h @@ -86,7 +86,7 @@ class PARQUET_EXPORT PageReader { static std::unique_ptr Open( std::unique_ptr stream, int64_t total_num_rows, - Compression::type codec, + Compression::type codec, std::shared_ptr encryption = nullptr, ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); // @returns: shared_ptr(nullptr) on EOS, std::shared_ptr diff --git a/src/parquet/column_writer-test.cc b/src/parquet/column_writer-test.cc index e87d549b..97475eec 100644 --- a/src/parquet/column_writer-test.cc +++ b/src/parquet/column_writer-test.cc @@ -91,8 +91,8 @@ class TestPrimitiveWriter : public PrimitiveTypedTest { metadata_ = ColumnChunkMetaDataBuilder::Make( writer_properties_, this->descr_, reinterpret_cast(&thrift_metadata_)); - std::unique_ptr pager = - PageWriter::Open(sink_.get(), column_properties.compression(), metadata_.get()); + std::unique_ptr pager = PageWriter::Open( + sink_.get(), column_properties.compression(), nullptr, metadata_.get()); std::shared_ptr writer = ColumnWriter::Make(metadata_.get(), std::move(pager), writer_properties_.get()); return std::static_pointer_cast>(writer); diff --git a/src/parquet/column_writer.cc b/src/parquet/column_writer.cc index a65bda85..4d7a708d 100644 --- a/src/parquet/column_writer.cc +++ b/src/parquet/column_writer.cc @@ -28,6 +28,7 @@ #include "parquet/properties.h" #include "parquet/statistics.h" #include "parquet/thrift.h" +#include "parquet/util/crypto.h" #include "parquet/util/logging.h" #include "parquet/util/memory.h" @@ -129,6 +130,7 @@ static format::Statistics ToThrift(const EncodedStatistics& row_group_statistics class SerializedPageWriter : public PageWriter { public: SerializedPageWriter(OutputStream* sink, Compression::type codec, + const std::shared_ptr& encryption, ColumnChunkMetaDataBuilder* metadata, ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) : sink_(sink), @@ -138,7 +140,8 @@ class SerializedPageWriter : public PageWriter { dictionary_page_offset_(0), data_page_offset_(0), total_uncompressed_size_(0), - total_compressed_size_(0) { + total_compressed_size_(0), + encryption_(encryption) { compressor_ = GetCodecFromArrow(codec); } @@ -159,10 +162,22 @@ class SerializedPageWriter : public PageWriter { dict_page_header.__set_encoding(ToThrift(page.encoding())); dict_page_header.__set_is_sorted(page.is_sorted()); + const uint8_t* output_data_buffer = compressed_data->data(); + int32_t output_data_len = static_cast(compressed_data->size()); + + std::shared_ptr encrypted_data_buffer = AllocateBuffer(pool_, 0); + if (encryption_.get()) { + encrypted_data_buffer->Resize(encryption_->CalculateCipherSize(output_data_len)); + output_data_len = parquet_encryption::Encrypt( + encryption_, false, compressed_data->data(), output_data_len, + encrypted_data_buffer->mutable_data()); + output_data_buffer = encrypted_data_buffer->data(); + } + format::PageHeader page_header; page_header.__set_type(format::PageType::DICTIONARY_PAGE); page_header.__set_uncompressed_page_size(static_cast(uncompressed_size)); - page_header.__set_compressed_page_size(static_cast(compressed_data->size())); + page_header.__set_compressed_page_size(static_cast(output_data_len)); page_header.__set_dictionary_page_header(dict_page_header); // TODO(PARQUET-594) crc checksum @@ -170,12 +185,13 @@ class SerializedPageWriter : public PageWriter { if (dictionary_page_offset_ == 0) { dictionary_page_offset_ = start_pos; } - int64_t header_size = - SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_); - sink_->Write(compressed_data->data(), compressed_data->size()); + int64_t header_size = SerializeThriftMsg(&page_header, sizeof(format::PageHeader), + sink_, encryption_.get()); + + sink_->Write(output_data_buffer, output_data_len); total_uncompressed_size_ += uncompressed_size + header_size; - total_compressed_size_ += compressed_data->size() + header_size; + total_compressed_size_ += output_data_len + header_size; return sink_->Tell() - start_pos; } @@ -224,10 +240,22 @@ class SerializedPageWriter : public PageWriter { ToThrift(page.repetition_level_encoding())); data_page_header.__set_statistics(ToThrift(page.statistics())); + const uint8_t* output_data_buffer = compressed_data->data(); + int32_t output_data_len = static_cast(compressed_data->size()); + + std::shared_ptr encrypted_data_buffer = AllocateBuffer(pool_, 0); + if (encryption_.get()) { + encrypted_data_buffer->Resize(encryption_->CalculateCipherSize(output_data_len)); + output_data_len = parquet_encryption::Encrypt( + encryption_, false, compressed_data->data(), output_data_len, + encrypted_data_buffer->mutable_data()); + output_data_buffer = encrypted_data_buffer->data(); + } + format::PageHeader page_header; page_header.__set_type(format::PageType::DATA_PAGE); page_header.__set_uncompressed_page_size(static_cast(uncompressed_size)); - page_header.__set_compressed_page_size(static_cast(compressed_data->size())); + page_header.__set_compressed_page_size(static_cast(output_data_len)); page_header.__set_data_page_header(data_page_header); // TODO(PARQUET-594) crc checksum @@ -236,12 +264,13 @@ class SerializedPageWriter : public PageWriter { data_page_offset_ = start_pos; } - int64_t header_size = - SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_); - sink_->Write(compressed_data->data(), compressed_data->size()); + int64_t header_size = SerializeThriftMsg(&page_header, sizeof(format::PageHeader), + sink_, encryption_.get()); + + sink_->Write(output_data_buffer, output_data_len); total_uncompressed_size_ += uncompressed_size + header_size; - total_compressed_size_ += compressed_data->size() + header_size; + total_compressed_size_ += output_data_len + header_size; num_values_ += page.num_values(); return sink_->Tell() - start_pos; @@ -271,18 +300,23 @@ class SerializedPageWriter : public PageWriter { // Compression codec to use. std::unique_ptr<::arrow::Codec> compressor_; + + std::shared_ptr encryption_; }; // This implementation of the PageWriter writes to the final sink on Close . class BufferedPageWriter : public PageWriter { public: BufferedPageWriter(OutputStream* sink, Compression::type codec, + const std::shared_ptr& encryption, ColumnChunkMetaDataBuilder* metadata, ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) : final_sink_(sink), metadata_(metadata), in_memory_sink_(new InMemoryOutputStream(pool)), - pager_(new SerializedPageWriter(in_memory_sink_.get(), codec, metadata, pool)) {} + pager_(new SerializedPageWriter(in_memory_sink_.get(), codec, encryption, + metadata, pool)) { + } // TODO: nullptr for EncryptionProperties int64_t WriteDictionaryPage(const DictionaryPage& page) override { return pager_->WriteDictionaryPage(page); @@ -320,16 +354,17 @@ class BufferedPageWriter : public PageWriter { std::unique_ptr pager_; }; -std::unique_ptr PageWriter::Open(OutputStream* sink, Compression::type codec, - ColumnChunkMetaDataBuilder* metadata, - ::arrow::MemoryPool* pool, - bool buffered_row_group) { +std::unique_ptr PageWriter::Open( + OutputStream* sink, Compression::type codec, + const std::shared_ptr& encryption, + ColumnChunkMetaDataBuilder* metadata, ::arrow::MemoryPool* pool, + bool buffered_row_group) { if (buffered_row_group) { return std::unique_ptr( - new BufferedPageWriter(sink, codec, metadata, pool)); + new BufferedPageWriter(sink, codec, encryption, metadata, pool)); } else { return std::unique_ptr( - new SerializedPageWriter(sink, codec, metadata, pool)); + new SerializedPageWriter(sink, codec, encryption, metadata, pool)); } } diff --git a/src/parquet/column_writer.h b/src/parquet/column_writer.h index 1ba428a9..6fe18e07 100644 --- a/src/parquet/column_writer.h +++ b/src/parquet/column_writer.h @@ -74,7 +74,9 @@ class PageWriter { virtual ~PageWriter() {} static std::unique_ptr Open( - OutputStream* sink, Compression::type codec, ColumnChunkMetaDataBuilder* metadata, + OutputStream* sink, Compression::type codec, + const std::shared_ptr& encryption, + ColumnChunkMetaDataBuilder* metadata, ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(), bool buffered_row_group = false); diff --git a/src/parquet/encryption.cc b/src/parquet/encryption.cc new file mode 100644 index 00000000..0a2d9ef9 --- /dev/null +++ b/src/parquet/encryption.cc @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "encryption.h" + +#include + +namespace parquet { + +// integer key retriever +void IntegerKeyIdRetriever::PutKey(uint32_t key_id, const std::string& key) { + key_map_.insert(std::make_pair(key_id, key)); +} + +const std::string& IntegerKeyIdRetriever::GetKey(const std::string& key_metadata) { + uint32_t key_id; + memcpy(reinterpret_cast(&key_id), key_metadata.c_str(), 4); + + return key_map_[key_id]; +} + +// string key retriever +void StringKeyIdRetriever::PutKey(const std::string& key_id, const std::string& key) { + key_map_.insert(std::make_pair(key_id, key)); +} + +const std::string& StringKeyIdRetriever::GetKey(const std::string& key_id) { + return key_map_[key_id]; +} + +} // namespace parquet diff --git a/src/parquet/encryption.h b/src/parquet/encryption.h new file mode 100644 index 00000000..1dbf0d20 --- /dev/null +++ b/src/parquet/encryption.h @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_ENCRYPTION_H +#define PARQUET_ENCRYPTION_H + +#include +#include +#include + +namespace parquet { + +class PARQUET_EXPORT DecryptionKeyRetriever { + public: + virtual const std::string& GetKey(const std::string& key_metadata) = 0; + virtual ~DecryptionKeyRetriever() {} +}; + +// Simple integer key retriever +class PARQUET_EXPORT IntegerKeyIdRetriever : public DecryptionKeyRetriever { + public: + void PutKey(uint32_t key_id, const std::string& key); + const std::string& GetKey(const std::string& key_metadata); + + private: + std::map key_map_; +}; + +// Simple string key retriever +class PARQUET_EXPORT StringKeyIdRetriever : public DecryptionKeyRetriever { + public: + void PutKey(const std::string& key_id, const std::string& key); + const std::string& GetKey(const std::string& key_metadata); + + private: + std::map key_map_; +}; + +} // namespace parquet + +#endif // PARQUET_ENCRYPTION_H diff --git a/src/parquet/file_reader.cc b/src/parquet/file_reader.cc index c5a0f342..596180c1 100644 --- a/src/parquet/file_reader.cc +++ b/src/parquet/file_reader.cc @@ -53,6 +53,7 @@ namespace parquet { static constexpr int64_t DEFAULT_FOOTER_READ_SIZE = 64 * 1024; static constexpr uint32_t FOOTER_SIZE = 8; static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'}; +static constexpr uint8_t PARQUET_EMAGIC[4] = {'P', 'A', 'R', 'E'}; // For PARQUET-816 static constexpr int64_t kMaxDictHeaderSize = 100; @@ -89,8 +90,12 @@ const RowGroupMetaData* RowGroupReader::metadata() const { return contents_->met class SerializedRowGroup : public RowGroupReader::Contents { public: SerializedRowGroup(RandomAccessSource* source, FileMetaData* file_metadata, - int row_group_number, const ReaderProperties& props) - : source_(source), file_metadata_(file_metadata), properties_(props) { + FileCryptoMetaData* file_crypto_metadata, int row_group_number, + const ReaderProperties& props) + : source_(source), + file_metadata_(file_metadata), + file_crypto_metadata_(file_crypto_metadata), + properties_(props) { row_group_metadata_ = file_metadata->RowGroup(row_group_number); } @@ -122,14 +127,65 @@ class SerializedRowGroup : public RowGroupReader::Contents { } stream = properties_.GetStream(source_, col_start, col_length); + std::unique_ptr crypto_meta_data = col->crypto_meta_data(); + + bool encrypted = true; + + // file is unencrypted + // or file is encrypted but column is unencrypted + if (!file_crypto_metadata_ || !crypto_meta_data) { + encrypted = false; + } + + if (!encrypted) { + return PageReader::Open(std::move(stream), col->num_values(), col->compression(), + nullptr, properties_.memory_pool()); + } + + // the column is encrypted + + auto file_decryption = properties_.file_decryption(); + + // the column is encrypted with footer key + if (crypto_meta_data->encrypted_with_footer_key()) { + std::string footer_key_metadata = file_crypto_metadata_->footer_key_metadata(); + std::string footer_key = file_decryption->GetFooterKey(footer_key_metadata); + + if (footer_key.empty()) { + throw ParquetException("column is encrypted with null footer key"); + } + + auto footer_encryption = std::make_shared( + file_crypto_metadata_->encryption_algorithm().algorithm, footer_key, + file_decryption->GetAad()); + + return PageReader::Open(std::move(stream), col->num_values(), col->compression(), + footer_encryption, properties_.memory_pool()); + } + + // file is non-uniform encrypted and the column is encrypted with its own key + + std::string column_key_metadata = crypto_meta_data->column_key_metadata(); + // encrypted with column key + std::string column_key = + file_decryption->GetColumnKey(col->path_in_schema(), column_key_metadata); + + if (column_key.empty()) { + throw ParquetException("column is encrypted with null key, path=" + + col->path_in_schema()->ToDotString()); + } + auto column_encryption = std::make_shared( + file_crypto_metadata_->encryption_algorithm().algorithm, column_key, + file_decryption->GetAad()); return PageReader::Open(std::move(stream), col->num_values(), col->compression(), - properties_.memory_pool()); + column_encryption, properties_.memory_pool()); } private: RandomAccessSource* source_; FileMetaData* file_metadata_; + FileCryptoMetaData* file_crypto_metadata_; std::unique_ptr row_group_metadata_; ReaderProperties properties_; }; @@ -157,7 +213,8 @@ class SerializedFile : public ParquetFileReader::Contents { std::shared_ptr GetRowGroup(int i) override { std::unique_ptr contents( - new SerializedRowGroup(source_.get(), file_metadata_.get(), i, properties_)); + new SerializedRowGroup(source_.get(), file_metadata_.get(), + file_crypto_metadata_.get(), i, properties_)); return std::make_shared(std::move(contents)); } @@ -180,42 +237,104 @@ class SerializedFile : public ParquetFileReader::Contents { source_->ReadAt(file_size - footer_read_size, footer_read_size, footer_buffer); // Check if all bytes are read. Check if last 4 bytes read have the magic bits - if (bytes_read != footer_read_size || - memcmp(footer_buffer + footer_read_size - 4, PARQUET_MAGIC, 4) != 0) { - throw ParquetException("Invalid parquet file. Corrupt footer."); - } + // no encryption + if (bytes_read == footer_read_size && + memcmp(footer_buffer + footer_read_size - 4, PARQUET_MAGIC, 4) == 0) { + uint32_t metadata_len = + *reinterpret_cast(footer_buffer + footer_read_size - FOOTER_SIZE); + int64_t metadata_start = file_size - FOOTER_SIZE - metadata_len; + if (FOOTER_SIZE + metadata_len > file_size) { + throw ParquetException( + "Invalid parquet file. File is less than " + "file metadata size."); + } + + std::shared_ptr metadata_buffer = + AllocateBuffer(properties_.memory_pool(), metadata_len); + + // Check if the footer_buffer contains the entire metadata + if (footer_read_size >= (metadata_len + FOOTER_SIZE)) { + memcpy(metadata_buffer->mutable_data(), + footer_buffer + (footer_read_size - metadata_len - FOOTER_SIZE), + metadata_len); + } else { + bytes_read = source_->ReadAt(metadata_start, metadata_len, + metadata_buffer->mutable_data()); + if (bytes_read != metadata_len) { + throw ParquetException("Invalid parquet file. Could not read metadata bytes."); + } + } - uint32_t metadata_len = - *reinterpret_cast(footer_buffer + footer_read_size - FOOTER_SIZE); - int64_t metadata_start = file_size - FOOTER_SIZE - metadata_len; - if (FOOTER_SIZE + metadata_len > file_size) { - throw ParquetException( - "Invalid parquet file. File is less than " - "file metadata size."); + file_metadata_ = FileMetaData::Make(metadata_buffer->data(), &metadata_len); } + // encryption + else if (bytes_read == footer_read_size && + memcmp(footer_buffer + footer_read_size - 4, PARQUET_EMAGIC, 4) == 0) { + // read crypto metadata + uint32_t crypto_metadata_len = + *reinterpret_cast(footer_buffer + footer_read_size - FOOTER_SIZE); + int64_t crypto_metadata_start = file_size - FOOTER_SIZE - crypto_metadata_len; + + if (FOOTER_SIZE + crypto_metadata_len > file_size) { + throw ParquetException( + "Invalid parquet file. File is less than " + "file metadata size."); + } + + std::shared_ptr crypto_metadata_buffer = + AllocateBuffer(properties_.memory_pool(), crypto_metadata_len); + + // Check if the footer_buffer contains the entire metadata + if (footer_read_size >= (crypto_metadata_len + FOOTER_SIZE)) { + memcpy(crypto_metadata_buffer->mutable_data(), + footer_buffer + (footer_read_size - crypto_metadata_len - FOOTER_SIZE), + crypto_metadata_len); + } else { + bytes_read = source_->ReadAt(crypto_metadata_start, crypto_metadata_len, + crypto_metadata_buffer->mutable_data()); + if (bytes_read != crypto_metadata_len) { + throw ParquetException("Invalid parquet file. Could not read metadata bytes."); + } + } - std::shared_ptr metadata_buffer = - AllocateBuffer(properties_.memory_pool(), metadata_len); + file_crypto_metadata_ = + FileCryptoMetaData::Make(crypto_metadata_buffer->data(), &crypto_metadata_len); - // Check if the footer_buffer contains the entire metadata - if (footer_read_size >= (metadata_len + FOOTER_SIZE)) { - memcpy(metadata_buffer->mutable_data(), - footer_buffer + (footer_read_size - metadata_len - FOOTER_SIZE), - metadata_len); - } else { + int64_t footer_offset = file_crypto_metadata_->footer_offset(); + uint32_t footer_read_size = (uint32_t)(crypto_metadata_start - footer_offset); + + std::shared_ptr footer_buffer = + AllocateBuffer(properties_.memory_pool(), footer_read_size); bytes_read = - source_->ReadAt(metadata_start, metadata_len, metadata_buffer->mutable_data()); - if (bytes_read != metadata_len) { - throw ParquetException("Invalid parquet file. Could not read metadata bytes."); + source_->ReadAt(footer_offset, footer_read_size, footer_buffer->mutable_data()); + + if (file_crypto_metadata_->encrypted_footer()) { + // get footer key metadata + std::string footer_key_metadata = file_crypto_metadata_->footer_key_metadata(); + + auto file_decryption = properties_.file_decryption(); + std::string footer_key = file_decryption->GetFooterKey(footer_key_metadata); + + auto footer_encryption = std::make_shared( + file_crypto_metadata_->encryption_algorithm().algorithm, footer_key, + file_decryption->GetAad()); + + file_metadata_ = FileMetaData::Make(footer_buffer->data(), &footer_read_size, + footer_encryption); + } else { + file_metadata_ = FileMetaData::Make(footer_buffer->data(), &footer_read_size); } } - - file_metadata_ = FileMetaData::Make(metadata_buffer->data(), &metadata_len); + // error + else { + throw ParquetException("Invalid parquet file. Corrupt footer."); + } } private: std::unique_ptr source_; std::shared_ptr file_metadata_; + std::shared_ptr file_crypto_metadata_; ReaderProperties properties_; }; diff --git a/src/parquet/file_writer.cc b/src/parquet/file_writer.cc index 30673c59..c6447938 100644 --- a/src/parquet/file_writer.cc +++ b/src/parquet/file_writer.cc @@ -34,6 +34,7 @@ namespace parquet { // FIXME: copied from reader-internal.cc static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'}; +static constexpr uint8_t PARQUET_EMAGIC[4] = {'P', 'A', 'R', 'E'}; // ---------------------------------------------------------------------- // RowGroupWriter public API @@ -123,7 +124,8 @@ class RowGroupSerializer : public RowGroupWriter::Contents { const ColumnDescriptor* column_descr = col_meta->descr(); std::unique_ptr pager = - PageWriter::Open(sink_, properties_->compression(column_descr->path()), col_meta, + PageWriter::Open(sink_, properties_->compression(column_descr->path()), + properties_->encryption(column_descr->path()), col_meta, // TODO properties_->memory_pool()); column_writers_[0] = ColumnWriter::Make(col_meta, std::move(pager), properties_); return column_writers_[0].get(); @@ -221,7 +223,8 @@ class RowGroupSerializer : public RowGroupWriter::Contents { const ColumnDescriptor* column_descr = col_meta->descr(); std::unique_ptr pager = PageWriter::Open(sink_, properties_->compression(column_descr->path()), - col_meta, properties_->memory_pool(), buffered_row_group_); + properties_->encryption(column_descr->path()), col_meta, + properties_->memory_pool(), buffered_row_group_); column_writers_.push_back( ColumnWriter::Make(col_meta, std::move(pager), properties_)); } @@ -258,7 +261,20 @@ class FileSerializer : public ParquetFileWriter::Contents { // Write magic bytes and metadata auto metadata = metadata_->Finish(); - WriteFileMetaData(*metadata, sink_.get()); + + auto file_encryption = properties_->file_encryption(); + if (file_encryption == nullptr) { + WriteFileMetaData(*metadata, sink_.get()); + } else { + uint64_t metadata_start = static_cast(sink_->Tell()); + + std::shared_ptr footer_encryption = + file_encryption->GetFooterEncryptionProperties(); + WriteFileMetaData(*metadata, sink_.get(), footer_encryption.get()); + + auto crypto_metadata = metadata_->GetCryptoMetaData(metadata_start); + WriteFileCryptoMetaData(*crypto_metadata, sink_.get()); + } sink_->Close(); is_open_ = false; @@ -323,8 +339,12 @@ class FileSerializer : public ParquetFileWriter::Contents { std::unique_ptr row_group_writer_; void StartFile() { - // Parquet files always start with PAR1 - sink_->Write(PARQUET_MAGIC, 4); + if (properties_->file_encryption() == nullptr) { + // Parquet files always start with PAR1 + sink_->Write(PARQUET_MAGIC, 4); + } else { + sink_->Write(PARQUET_EMAGIC, 4); + } } }; @@ -360,16 +380,35 @@ std::unique_ptr ParquetFileWriter::Open( return result; } -void WriteFileMetaData(const FileMetaData& file_metadata, OutputStream* sink) { - // Write MetaData - uint32_t metadata_len = static_cast(sink->Tell()); +void WriteFileMetaData(const FileMetaData& file_metadata, OutputStream* sink, + EncryptionProperties* footer_encryption) { + if (footer_encryption == nullptr) { + // Write MetaData + uint32_t metadata_len = static_cast(sink->Tell()); + + file_metadata.WriteTo(sink); + metadata_len = static_cast(sink->Tell()) - metadata_len; + + // Write Footer + sink->Write(reinterpret_cast(&metadata_len), 4); + sink->Write(PARQUET_MAGIC, 4); + } else { + // encrypt and write to sink + file_metadata.WriteTo(sink, footer_encryption); + } +} + +void WriteFileCryptoMetaData(const FileCryptoMetaData& crypto_metadata, + OutputStream* sink) { + uint64_t crypto_offset = static_cast(sink->Tell()); + + // Get a FileCryptoMetaData + crypto_metadata.WriteTo(sink); - file_metadata.WriteTo(sink); - metadata_len = static_cast(sink->Tell()) - metadata_len; + auto crypto_len = static_cast(sink->Tell()) - crypto_offset; + sink->Write(reinterpret_cast(&crypto_len), 4); - // Write Footer - sink->Write(reinterpret_cast(&metadata_len), 4); - sink->Write(PARQUET_MAGIC, 4); + sink->Write(PARQUET_EMAGIC, 4); } const SchemaDescriptor* ParquetFileWriter::schema() const { return contents_->schema(); } diff --git a/src/parquet/file_writer.h b/src/parquet/file_writer.h index cdfe06cd..e3f9e9e7 100644 --- a/src/parquet/file_writer.h +++ b/src/parquet/file_writer.h @@ -101,7 +101,10 @@ class PARQUET_EXPORT RowGroupWriter { }; PARQUET_EXPORT -void WriteFileMetaData(const FileMetaData& file_metadata, OutputStream* sink); +void WriteFileMetaData(const FileMetaData& file_metadata, OutputStream* sink, + EncryptionProperties* encryption_properties = nullptr); +void WriteFileCryptoMetaData(const FileCryptoMetaData& crypto_metadata, + OutputStream* sink); class PARQUET_EXPORT ParquetFileWriter { public: diff --git a/src/parquet/metadata.cc b/src/parquet/metadata.cc index 1cab51f0..9a894daf 100644 --- a/src/parquet/metadata.cc +++ b/src/parquet/metadata.cc @@ -88,6 +88,52 @@ std::shared_ptr MakeColumnStats( } // MetaData Accessor +// ColumnCryptoMetaData +class ColumnCryptoMetaData::ColumnCryptoMetaDataImpl { + public: + explicit ColumnCryptoMetaDataImpl(const format::ColumnCryptoMetaData* crypto_metadata) + : crypto_metadata_(crypto_metadata) {} + + ~ColumnCryptoMetaDataImpl() {} + + bool encrypted_with_footer_key() const { + return crypto_metadata_->__isset.ENCRYPTION_WITH_FOOTER_KEY; + } + bool encrypted_with_column_key() const { + return crypto_metadata_->__isset.ENCRYPTION_WITH_COLUMN_KEY; + } + const std::vector& path_in_schema() const { + return crypto_metadata_->ENCRYPTION_WITH_COLUMN_KEY.path_in_schema; + } + const std::string& column_key_metadata() const { + return crypto_metadata_->ENCRYPTION_WITH_COLUMN_KEY.column_key_metadata; + } + + private: + const format::ColumnCryptoMetaData* crypto_metadata_; +}; + +std::unique_ptr ColumnCryptoMetaData::Make( + const uint8_t* metadata) { + return std::unique_ptr(new ColumnCryptoMetaData(metadata)); +} + +ColumnCryptoMetaData::ColumnCryptoMetaData(const uint8_t* metadata) + : impl_(new ColumnCryptoMetaDataImpl( + reinterpret_cast(metadata))) {} + +ColumnCryptoMetaData::~ColumnCryptoMetaData() {} + +const std::vector& ColumnCryptoMetaData::path_in_schema() const { + return impl_->path_in_schema(); +} +bool ColumnCryptoMetaData::encrypted_with_footer_key() const { + return impl_->encrypted_with_footer_key(); +} +const std::string& ColumnCryptoMetaData::column_key_metadata() const { + return impl_->column_key_metadata(); +} + // ColumnChunk metadata class ColumnChunkMetaData::ColumnChunkMetaDataImpl { public: @@ -167,6 +213,15 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl { return column_->meta_data.total_uncompressed_size; } + inline std::unique_ptr crypto_meta_data() const { + if (column_->__isset.crypto_meta_data) { + return ColumnCryptoMetaData::Make( + reinterpret_cast(&column_->crypto_meta_data)); + } else { + return nullptr; + } + } + private: mutable std::shared_ptr stats_; std::vector encodings_; @@ -244,6 +299,10 @@ int64_t ColumnChunkMetaData::total_compressed_size() const { return impl_->total_compressed_size(); } +std::unique_ptr ColumnChunkMetaData::crypto_meta_data() const { + return impl_->crypto_meta_data(); +} + // row-group metadata class RowGroupMetaData::RowGroupMetaDataImpl { public: @@ -311,10 +370,11 @@ class FileMetaData::FileMetaDataImpl { public: FileMetaDataImpl() : metadata_len_(0) {} - explicit FileMetaDataImpl(const uint8_t* metadata, uint32_t* metadata_len) + explicit FileMetaDataImpl(const uint8_t* metadata, uint32_t* metadata_len, + std::shared_ptr encryption = nullptr) : metadata_len_(0) { metadata_.reset(new format::FileMetaData); - DeserializeThriftMsg(metadata, metadata_len, metadata_.get()); + DeserializeThriftMsg(metadata, metadata_len, metadata_.get(), encryption.get()); metadata_len_ = *metadata_len; if (metadata_->__isset.created_by) { @@ -343,8 +403,8 @@ class FileMetaData::FileMetaDataImpl { const ApplicationVersion& writer_version() const { return writer_version_; } - void WriteTo(OutputStream* dst) const { - SerializeThriftMsg(metadata_.get(), 1024, dst); + void WriteTo(OutputStream* dst, EncryptionProperties* encryption) const { + SerializeThriftMsg(metadata_.get(), 1024, dst, encryption); } std::unique_ptr RowGroup(int i) { @@ -408,15 +468,18 @@ class FileMetaData::FileMetaDataImpl { std::shared_ptr key_value_metadata_; }; -std::shared_ptr FileMetaData::Make(const uint8_t* metadata, - uint32_t* metadata_len) { +std::shared_ptr FileMetaData::Make( + const uint8_t* metadata, uint32_t* metadata_len, + std::shared_ptr encryption) { // This FileMetaData ctor is private, not compatible with std::make_shared - return std::shared_ptr(new FileMetaData(metadata, metadata_len)); + return std::shared_ptr( + new FileMetaData(metadata, metadata_len, encryption)); } -FileMetaData::FileMetaData(const uint8_t* metadata, uint32_t* metadata_len) +FileMetaData::FileMetaData(const uint8_t* metadata, uint32_t* metadata_len, + std::shared_ptr encryption) : impl_{std::unique_ptr( - new FileMetaDataImpl(metadata, metadata_len))} {} + new FileMetaDataImpl(metadata, metadata_len, encryption))} {} FileMetaData::FileMetaData() : impl_{std::unique_ptr(new FileMetaDataImpl())} {} @@ -462,7 +525,69 @@ std::shared_ptr FileMetaData::key_value_metadata() const return impl_->key_value_metadata(); } -void FileMetaData::WriteTo(OutputStream* dst) const { return impl_->WriteTo(dst); } +void FileMetaData::WriteTo(OutputStream* dst, EncryptionProperties* encryption) const { + return impl_->WriteTo(dst, encryption); +} + +class FileCryptoMetaData::FileCryptoMetaDataImpl { + public: + FileCryptoMetaDataImpl() {} + + explicit FileCryptoMetaDataImpl(const uint8_t* metadata, uint32_t* metadata_len) { + metadata_.reset(new format::FileCryptoMetaData); + DeserializeThriftMsg(metadata, metadata_len, metadata_.get()); + metadata_len_ = *metadata_len; + } + + ~FileCryptoMetaDataImpl() {} + + EncryptionAlgorithm encryption_algorithm() { + return FromThrift(metadata_->encryption_algorithm); + } + + bool encrypted_footer() { return metadata_->encrypted_footer; } + + const std::string& footer_key_metadata() { return metadata_->footer_key_metadata; } + + uint64_t footer_offset() { return metadata_->footer_offset; } + + const std::string& iv_prefix() { return metadata_->iv_prefix; } + + void WriteTo(OutputStream* dst) const { + SerializeThriftMsg(metadata_.get(), 1024, dst); + } + + private: + friend FileMetaDataBuilder; + std::unique_ptr metadata_; + uint32_t metadata_len_; +}; + +EncryptionAlgorithm FileCryptoMetaData::encryption_algorithm() { + return impl_->encryption_algorithm(); +} +bool FileCryptoMetaData::encrypted_footer() { return impl_->encrypted_footer(); } +const std::string& FileCryptoMetaData::footer_key_metadata() { + return impl_->footer_key_metadata(); +} +uint64_t FileCryptoMetaData::footer_offset() { return impl_->footer_offset(); } +const std::string& FileCryptoMetaData::iv_prefix() { return impl_->iv_prefix(); } + +std::shared_ptr FileCryptoMetaData::Make( + const uint8_t* serialized_metadata, uint32_t* metadata_len) { + return std::shared_ptr( + new FileCryptoMetaData(serialized_metadata, metadata_len)); +} + +FileCryptoMetaData::FileCryptoMetaData(const uint8_t* serialized_metadata, + uint32_t* metadata_len) + : impl_(new FileCryptoMetaDataImpl(serialized_metadata, metadata_len)) {} + +FileCryptoMetaData::FileCryptoMetaData() : impl_(new FileCryptoMetaDataImpl()) {} + +FileCryptoMetaData::~FileCryptoMetaData() {} + +void FileCryptoMetaData::WriteTo(OutputStream* dst) const { impl_->WriteTo(dst); } ApplicationVersion::ApplicationVersion(const std::string& application, int major, int minor, int patch) @@ -570,10 +695,10 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl { uint8_t* contents) : properties_(props), column_(column) { column_chunk_ = reinterpret_cast(contents); - column_chunk_->meta_data.__set_type(ToThrift(column->physical_type())); - column_chunk_->meta_data.__set_path_in_schema(column->path()->ToDotVector()); - column_chunk_->meta_data.__set_codec( - ToThrift(properties_->compression(column->path()))); + column_metadata_ = column_chunk_->meta_data; + column_metadata_.__set_type(ToThrift(column->physical_type())); + column_metadata_.__set_path_in_schema(column->path()->ToDotVector()); + column_metadata_.__set_codec(ToThrift(properties_->compression(column->path()))); } ~ColumnChunkMetaDataBuilderImpl() {} @@ -600,7 +725,7 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl { stats.__isset.max = val.has_max; } - column_chunk_->meta_data.__set_statistics(stats); + column_metadata_.__set_statistics(stats); } void Finish(int64_t num_values, int64_t dictionary_page_offset, @@ -608,19 +733,20 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl { int64_t compressed_size, int64_t uncompressed_size, bool has_dictionary, bool dictionary_fallback) { if (dictionary_page_offset > 0) { - column_chunk_->meta_data.__set_dictionary_page_offset(dictionary_page_offset); + column_metadata_.__set_dictionary_page_offset(dictionary_page_offset); column_chunk_->__set_file_offset(dictionary_page_offset + compressed_size); } else { column_chunk_->__set_file_offset(data_page_offset + compressed_size); } - column_chunk_->__isset.meta_data = true; - column_chunk_->meta_data.__set_num_values(num_values); + + column_metadata_.__set_num_values(num_values); if (index_page_offset >= 0) { - column_chunk_->meta_data.__set_index_page_offset(index_page_offset); + column_metadata_.__set_index_page_offset(index_page_offset); } - column_chunk_->meta_data.__set_data_page_offset(data_page_offset); - column_chunk_->meta_data.__set_total_uncompressed_size(uncompressed_size); - column_chunk_->meta_data.__set_total_compressed_size(compressed_size); + column_metadata_.__set_data_page_offset(data_page_offset); + column_metadata_.__set_total_uncompressed_size(uncompressed_size); + column_metadata_.__set_total_compressed_size(compressed_size); + std::vector thrift_encodings; if (has_dictionary) { thrift_encodings.push_back(ToThrift(properties_->dictionary_index_encoding())); @@ -638,17 +764,68 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl { if (dictionary_fallback) { thrift_encodings.push_back(ToThrift(Encoding::PLAIN)); } - column_chunk_->meta_data.__set_encodings(thrift_encodings); + column_metadata_.__set_encodings(thrift_encodings); } void WriteTo(OutputStream* sink) { - SerializeThriftMsg(column_chunk_, sizeof(format::ColumnChunk), sink); + const auto& encrypt_md = properties_->column_encryption_props(column_->path()); + + // column is unencrypted + if (!encrypt_md || !encrypt_md->encrypted()) { + column_chunk_->__isset.meta_data = true; + column_chunk_->__set_meta_data(column_metadata_); + SerializeThriftMsg(column_chunk_, sizeof(format::ColumnChunk), sink); + } else { // column is encrypted + column_chunk_->__isset.crypto_meta_data = true; + + // encrypted with footer key + format::ColumnCryptoMetaData ccmd; + if (encrypt_md->encrypted_with_footer_key()) { + ccmd.__isset.ENCRYPTION_WITH_FOOTER_KEY = true; + ccmd.__set_ENCRYPTION_WITH_FOOTER_KEY(format::EncryptionWithFooterKey()); + } else { // encrypted with column key + format::EncryptionWithColumnKey eck; + eck.__set_column_key_metadata(encrypt_md->key_metadata()); + eck.__set_path_in_schema(column_->path()->ToDotVector()); + ccmd.__isset.ENCRYPTION_WITH_COLUMN_KEY = true; + ccmd.__set_ENCRYPTION_WITH_COLUMN_KEY(eck); + } + column_chunk_->__set_crypto_meta_data(ccmd); + + auto footer_encryption = properties_->footer_encryption(); + + // non-uniform: footer is unencrypted, or column is encrypted with a column-specific + // key + if ((footer_encryption == nullptr && encrypt_md->encrypted()) || + !encrypt_md->encrypted_with_footer_key()) { + // don't set meta_data + column_chunk_->__isset.meta_data = false; + + // Thrift-serialize the ColumnMetaData structure, + // encrypt it with the column key, and write the result to the output stream + // (first length, then buffer) + auto encrypt_props = properties_->encryption(column_->path()); + uint64_t metadata_start = sink->Tell(); + + SerializeThriftMsg(&column_metadata_, sizeof(format::ColumnMetaData), sink, + encrypt_props.get()); + + // Set the ColumnMetaData offset at the “file_offset” field in the ColumnChunk. + column_chunk_->__set_file_offset(metadata_start); + } else { + column_chunk_->__isset.meta_data = true; + column_chunk_->__set_meta_data(column_metadata_); + } + + SerializeThriftMsg(column_chunk_, sizeof(format::ColumnChunk), sink); + } } const ColumnDescriptor* descr() const { return column_; } private: format::ColumnChunk* column_chunk_; + format::ColumnMetaData column_metadata_; const std::shared_ptr properties_; const ColumnDescriptor* column_; }; @@ -728,20 +905,22 @@ class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl { << " columns are initialized"; throw ParquetException(ss.str()); } - int64_t total_byte_size = 0; - - for (int i = 0; i < schema_->num_columns(); i++) { - if (!(row_group_->columns[i].file_offset >= 0)) { - std::stringstream ss; - ss << "Column " << i << " is not complete."; - throw ParquetException(ss.str()); - } - total_byte_size += row_group_->columns[i].meta_data.total_compressed_size; - } - DCHECK(total_bytes_written == total_byte_size) - << "Total bytes in this RowGroup does not match with compressed sizes of columns"; - - row_group_->__set_total_byte_size(total_byte_size); + // int64_t total_byte_size = 0; + + // for (int i = 0; i < schema_->num_columns(); i++) { + // if (!(row_group_->columns[i].file_offset >= 0)) { + // std::stringstream ss; + // ss << "Column " << i << " is not complete."; + // throw ParquetException(ss.str()); + // } + // total_byte_size += row_group_->columns[i].meta_data.total_compressed_size; + // } + // DCHECK(total_bytes_written == total_byte_size) + // << "Total bytes in this RowGroup does not match with compressed sizes of + // columns"; + + // row_group_->__set_total_byte_size(total_byte_size); + row_group_->__set_total_byte_size(total_bytes_written); } void set_num_rows(int64_t num_rows) { row_group_->num_rows = num_rows; } @@ -802,6 +981,9 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl { const std::shared_ptr& key_value_metadata) : properties_(props), schema_(schema), key_value_metadata_(key_value_metadata) { metadata_.reset(new format::FileMetaData()); + if (props->footer_encryption() != nullptr) { + crypto_metadata_.reset(new format::FileCryptoMetaData()); + } } ~FileMetaDataBuilderImpl() {} @@ -875,8 +1057,39 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl { return file_meta_data; } + std::unique_ptr BuildFileCryptoMetaData(uint64_t footerOffset) { + if (crypto_metadata_ == nullptr) { + return nullptr; + } + + auto file_encryption = properties_->file_encryption(); + auto footer_encryption = properties_->footer_encryption(); + + // build format::FileCryptoMetaData + EncryptionAlgorithm encryption_algorithm; + encryption_algorithm.algorithm = footer_encryption->algorithm(); + encryption_algorithm.aad_metadata = file_encryption->aad_metadata(); + crypto_metadata_->__set_encryption_algorithm(ToThrift(encryption_algorithm)); + crypto_metadata_->__set_encrypted_footer(!footer_encryption->key().empty()); + + std::string footer_key_metadata = file_encryption->footer_key_metadata(); + if (!footer_key_metadata.empty()) { + crypto_metadata_->__set_footer_key_metadata(footer_key_metadata); + } + crypto_metadata_->__set_footer_offset(footerOffset); + + // TODO set iv_prefix??? + + // return as FileCryptoMetaData + std::unique_ptr file_crypto_meta_data = + std::unique_ptr(new FileCryptoMetaData()); + file_crypto_meta_data->impl_->metadata_ = std::move(crypto_metadata_); + return file_crypto_meta_data; + } + protected: std::unique_ptr metadata_; + std::unique_ptr crypto_metadata_; private: const std::shared_ptr properties_; @@ -907,4 +1120,9 @@ RowGroupMetaDataBuilder* FileMetaDataBuilder::AppendRowGroup() { std::unique_ptr FileMetaDataBuilder::Finish() { return impl_->Finish(); } +std::unique_ptr FileMetaDataBuilder::GetCryptoMetaData( + uint64_t footerOffset) { + return impl_->BuildFileCryptoMetaData(footerOffset); +} + } // namespace parquet diff --git a/src/parquet/metadata.h b/src/parquet/metadata.h index 5d51e3d2..2ed4f2e2 100644 --- a/src/parquet/metadata.h +++ b/src/parquet/metadata.h @@ -87,6 +87,22 @@ class ApplicationVersion { SortOrder::type sort_order = SortOrder::SIGNED) const; }; +class PARQUET_EXPORT ColumnCryptoMetaData { + public: + static std::unique_ptr Make(const uint8_t* metadata); + ~ColumnCryptoMetaData(); + + const std::vector& path_in_schema() const; + bool encrypted_with_footer_key() const; + const std::string& column_key_metadata() const; + + private: + explicit ColumnCryptoMetaData(const uint8_t* metadata); + + class ColumnCryptoMetaDataImpl; + std::unique_ptr impl_; +}; + class PARQUET_EXPORT ColumnChunkMetaData { public: // API convenience to get a MetaData accessor @@ -115,6 +131,7 @@ class PARQUET_EXPORT ColumnChunkMetaData { int64_t index_page_offset() const; int64_t total_compressed_size() const; int64_t total_uncompressed_size() const; + std::unique_ptr crypto_meta_data() const; private: explicit ColumnChunkMetaData(const uint8_t* metadata, const ColumnDescriptor* descr, @@ -154,8 +171,9 @@ class FileMetaDataBuilder; class PARQUET_EXPORT FileMetaData { public: // API convenience to get a MetaData accessor - static std::shared_ptr Make(const uint8_t* serialized_metadata, - uint32_t* metadata_len); + static std::shared_ptr Make( + const uint8_t* serialized_metadata, uint32_t* metadata_len, + std::shared_ptr encryption = nullptr); ~FileMetaData(); @@ -171,7 +189,7 @@ class PARQUET_EXPORT FileMetaData { const ApplicationVersion& writer_version() const; - void WriteTo(OutputStream* dst) const; + void WriteTo(OutputStream* dst, EncryptionProperties* encryption = nullptr) const; // Return const-pointer to make it clear that this object is not to be copied const SchemaDescriptor* schema() const; @@ -180,7 +198,8 @@ class PARQUET_EXPORT FileMetaData { private: friend FileMetaDataBuilder; - explicit FileMetaData(const uint8_t* serialized_metadata, uint32_t* metadata_len); + explicit FileMetaData(const uint8_t* serialized_metadata, uint32_t* metadata_len, + std::shared_ptr encryption = nullptr); // PIMPL Idiom FileMetaData(); @@ -188,6 +207,31 @@ class PARQUET_EXPORT FileMetaData { std::unique_ptr impl_; }; +class PARQUET_EXPORT FileCryptoMetaData { + public: + // API convenience to get a MetaData accessor + static std::shared_ptr Make(const uint8_t* serialized_metadata, + uint32_t* metadata_len); + ~FileCryptoMetaData(); + + EncryptionAlgorithm encryption_algorithm(); + bool encrypted_footer(); + const std::string& footer_key_metadata(); + uint64_t footer_offset(); + const std::string& iv_prefix(); + + void WriteTo(OutputStream* dst) const; + + private: + friend FileMetaDataBuilder; + FileCryptoMetaData(const uint8_t* serialized_metadata, uint32_t* metadata_len); + + // PIMPL Idiom + FileCryptoMetaData(); + class FileCryptoMetaDataImpl; + std::unique_ptr impl_; +}; + // Builder API class PARQUET_EXPORT ColumnChunkMetaDataBuilder { public: @@ -263,6 +307,9 @@ class PARQUET_EXPORT FileMetaDataBuilder { // commit the metadata std::unique_ptr Finish(); + // crypto metadata + std::unique_ptr GetCryptoMetaData(uint64_t footerOffset); + private: explicit FileMetaDataBuilder( const SchemaDescriptor* schema, const std::shared_ptr& props, diff --git a/src/parquet/properties.h b/src/parquet/properties.h index 83dc2057..1ac61b1e 100644 --- a/src/parquet/properties.h +++ b/src/parquet/properties.h @@ -22,10 +22,12 @@ #include #include +#include "parquet/encryption.h" #include "parquet/exception.h" #include "parquet/parquet_version.h" #include "parquet/schema.h" #include "parquet/types.h" +#include "parquet/util/logging.h" #include "parquet/util/memory.h" #include "parquet/util/visibility.h" @@ -38,6 +40,123 @@ struct ParquetVersion { static int64_t DEFAULT_BUFFER_SIZE = 0; static bool DEFAULT_USE_BUFFERED_STREAM = false; +class PARQUET_EXPORT ColumnEncryptionProperties { + public: + class Builder { + public: + Builder(const std::string& path, bool encrypt) + : path_(path), encrypt_(encrypt), encrypted_with_footer_key_(encrypt) {} + + Builder* key(const std::string& key) { + DCHECK(key.length() == 16 || key.length() == 24 || key.length() == 32); + DCHECK(encrypt_); + + key_ = key; + return this; + } + Builder* key_metadata(const std::string& key_id) { + DCHECK(!key_id.empty()); + key_metadata_ = key_id; + return this; + } + + Builder* key_id(uint32_t key_id) { + std::string key_metadata = std::string(reinterpret_cast(&key_id), 4); + this->key_metadata(key_metadata); + return this; + } + + std::shared_ptr build() { + return std::make_shared(path_, encrypt_, encrypted_with_footer_key_, + key_, key_metadata_); + } + + private: + std::string path_; + bool encrypt_; + bool encrypted_with_footer_key_; + std::string key_; + std::string key_metadata_; + }; + + ColumnEncryptionProperties() = default; + ColumnEncryptionProperties(const ColumnEncryptionProperties& other) = default; + ColumnEncryptionProperties(ColumnEncryptionProperties&& other) = default; + + ColumnEncryptionProperties(const std::string& path, bool encrypt, bool encrypted_with_footer_key, + const std::string& key, const std::string& key_metadata) + : path_(path), encrypt_(encrypt), encrypted_with_footer_key_(encrypted_with_footer_key), + key_(key), key_metadata_(key_metadata) {} + + const std::string& path() const { return path_; } + bool encrypted() const { return encrypt_; } + bool encrypted_with_footer_key() const { return encrypted_with_footer_key_; } + const std::string& key() const { return key_; } + const std::string& key_metadata() const { return key_metadata_; } + + private: + std::string path_; + bool encrypt_; + bool encrypted_with_footer_key_; + std::string key_; + std::string key_metadata_; +}; + +class PARQUET_EXPORT FileDecryptionProperties { + public: + FileDecryptionProperties(const std::string& footer_key) : footer_key_(footer_key) { + DCHECK(footer_key_.length() == 16 || footer_key_.length() == 24 || + footer_key_.length() == 32); + } + + FileDecryptionProperties(const std::shared_ptr& key_retriever) + : key_retriever_(key_retriever) {} + + void SetAad(const std::string& aad) { aad_ = aad; } + + void SetColumnKey(const std::string& name, const std::string& key) { + SetColumnKey(std::vector({name}), key); + } + + void SetColumnKey(const std::vector& paths, const std::string& key) { + DCHECK(key.length() == 16 || key.length() == 24 || key.length() == 32); + + schema::ColumnPath columnPath(paths); + + column_keys_[columnPath.ToDotString()] = key; + } + + const std::string& GetColumnKey(const std::shared_ptr& columnPath, + const std::string& key_metadata = "") { + if (key_metadata.empty()) { + return column_keys_.at(columnPath->ToDotString()); + } + if (key_retriever_ == nullptr) { + throw ParquetException("no key retriever is provided for column key metadata"); + } + return key_retriever_->GetKey(key_metadata); + } + + const std::string& GetFooterKey(const std::string& footer_key_metadata = "") { + if (footer_key_metadata.empty()) { + return footer_key_; + } + if (key_retriever_ == nullptr) { + throw ParquetException("no key retriever is provided for footer key metadata"); + } + return key_retriever_->GetKey(footer_key_metadata); + } + const std::string& GetAad() { return aad_; } + + private: + std::string footer_key_; + std::string aad_; + + std::map column_keys_; + + std::shared_ptr key_retriever_; +}; + class PARQUET_EXPORT ReaderProperties { public: explicit ReaderProperties(::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) @@ -70,10 +189,17 @@ class PARQUET_EXPORT ReaderProperties { int64_t buffer_size() const { return buffer_size_; } + void file_decryption(const std::shared_ptr& decryption) { + file_decryption_ = decryption; + } + + FileDecryptionProperties* file_decryption() { return file_decryption_.get(); } + private: ::arrow::MemoryPool* pool_; int64_t buffer_size_; bool buffered_stream_enabled_; + std::shared_ptr file_decryption_; }; ReaderProperties PARQUET_EXPORT default_reader_properties(); @@ -90,6 +216,10 @@ static constexpr ParquetVersion::type DEFAULT_WRITER_VERSION = ParquetVersion::PARQUET_1_0; static const char DEFAULT_CREATED_BY[] = CREATED_BY_VERSION; static constexpr Compression::type DEFAULT_COMPRESSION_TYPE = Compression::UNCOMPRESSED; +static constexpr Encryption::type DEFAULT_ENCRYPTION_ALGORITHM = Encryption::AES_GCM_V1; +static constexpr int32_t MAXIMAL_KEY_METADATA_LENGTH = 256; +static constexpr int32_t MAXIMAL_AAD_METADATA_LENGTH = 256; +static constexpr bool DEFAULT_ENCRYPT_THE_REST = true; class PARQUET_EXPORT ColumnProperties { public: @@ -138,6 +268,187 @@ class PARQUET_EXPORT ColumnProperties { size_t max_stats_size_; }; +class PARQUET_EXPORT FileEncryptionProperties { + public: + class Builder { + public: + Builder() : algorithm_(DEFAULT_ENCRYPTION_ALGORITHM), uniform_encryption_(true) {} + + Builder(const std::string& key) + : algorithm_(DEFAULT_ENCRYPTION_ALGORITHM), uniform_encryption_(true) { + DCHECK(key.length() == 16 || key.length() == 24 || key.length() == 32); + footer_key_ = key; + } + + Builder* algorithm(Encryption::type algorithm) { + algorithm_ = algorithm; + return this; + } + + Builder* footer_key(const std::string& key) { + DCHECK(key.length() == 16 || key.length() == 24 || key.length() == 32); + footer_key_ = key; + return this; + } + + Builder* footer_key_metadata(const std::string& key_metadata) { + DCHECK(!footer_key_.empty()); + DCHECK(!key_metadata.empty() && key_metadata.length() < MAXIMAL_KEY_METADATA_LENGTH); + footer_key_metadata_ = key_metadata; + return this; + } + + Builder* aad(const std::string& aad) { + DCHECK(!aad.empty()); + aad_ = aad; + return this; + } + + Builder* aad_metadata(const std::string& aad_metadata) { + DCHECK(!aad_.empty()); + DCHECK(!aad_metadata.empty() && aad_metadata.length() < MAXIMAL_AAD_METADATA_LENGTH); + aad_metadata_ = aad_metadata; + return this; + } + + /** + * encrypt_the_rest will define if other columns (not defined in columns argument) + * will be encrypted or not + * if encrypt_the_rest = true, other columns will be encrypted with footer key + * else, other columns will be unencrypted + */ + Builder* column_properties(const std::map>& column_properties, + bool encrypt_the_rest = DEFAULT_ENCRYPT_THE_REST) { + encrypt_the_rest_ = encrypt_the_rest; + column_properties_ = column_properties; + + if (!footer_key_.empty()) { + uniform_encryption_ = true; + + for (const auto& col : column_properties) { + if (col.second->key().compare(footer_key_) != 0) { + uniform_encryption_ = false; + break; + } + } + } else { + if (encrypt_the_rest) { + throw ParquetException("Encrypt the rest with null footer key"); + } + bool all_are_unencrypted = true; + for (const auto& col : column_properties) { + if (col.second->encrypted()) { + if (col.second->key().empty()) { + throw ParquetException("Encrypt column with null footer key"); + } + all_are_unencrypted = false; + } + } + + if (all_are_unencrypted) { + throw ParquetException("Footer and all columns unencrypted"); + } + } + return this; + } + + std::shared_ptr build() { + std::shared_ptr footer_encryption; + if (!footer_key_.empty()) { + footer_encryption.reset(new EncryptionProperties(algorithm_, footer_key_, aad_)); + } + return std::make_shared(footer_encryption, footer_key_metadata_, + aad_metadata_, uniform_encryption_, column_properties_, encrypt_the_rest_); + } + + private: + Encryption::type algorithm_; + std::string footer_key_; + std::string footer_key_metadata_; + + std::string aad_; + std::string aad_metadata_; + + bool uniform_encryption_; + + std::map> column_properties_; + bool encrypt_the_rest_; + }; + + FileEncryptionProperties(const std::shared_ptr& footer_encryption, + const std::string& footer_key_metadata, const std::string& aad_metadata, + bool uniform_encryption, + const std::map>& column_properties, + bool encrypt_the_rest) + : footer_encryption_(footer_encryption) + , footer_key_metadata_(footer_key_metadata) + , aad_metadata_(aad_metadata) + , uniform_encryption_(uniform_encryption) + , column_properties_(column_properties) + , encrypt_the_rest_(encrypt_the_rest) {} + + std::shared_ptr GetFooterEncryptionProperties() { + return footer_encryption_; + } + + const std::string& footer_key_metadata() const { return footer_key_metadata_; } + + const std::string& aad_metadata() const { return aad_metadata_; } + + std::shared_ptr GetColumnCryptoMetaData( + const std::shared_ptr& path) { + // uniform encryption + if (uniform_encryption_) { + return ColumnEncryptionProperties::Builder(path->ToDotString(), true).build(); + } + + // non-uniform encryption + std::string path_str = path->ToDotString(); + if (column_properties_.find(path_str) != column_properties_.end()) { + return column_properties_[path_str]; + } + + // encrypted with footer key + if (encrypt_the_rest_) { + return ColumnEncryptionProperties::Builder(path->ToDotString(), true).build(); + } + + // unencrypted + return ColumnEncryptionProperties::Builder(path->ToDotString(), false).build(); + } + + std::shared_ptr GetColumnEncryptionProperties( + const std::shared_ptr& path) { + // uniform encryption + if (uniform_encryption_) { + return footer_encryption_; + } + + // non-uniform encryption + std::string path_str = path->ToDotString(); + if (column_properties_.find(path_str) != column_properties_.end()) { + return std::make_shared( + footer_encryption_->algorithm(), column_properties_[path_str]->key(), footer_encryption_->aad()); + } + + if (encrypt_the_rest_) { + return footer_encryption_; + } + + return nullptr; + } + + private: + std::shared_ptr footer_encryption_; + std::string footer_key_metadata_; + std::string aad_metadata_; + + bool uniform_encryption_; + + std::map> column_properties_; + bool encrypt_the_rest_; +}; + class PARQUET_EXPORT WriterProperties { public: class Builder { @@ -278,6 +589,11 @@ class PARQUET_EXPORT WriterProperties { return this->compression(path->ToDotString(), codec); } + Builder* encryption(const std::shared_ptr& file_encryption) { + file_encryption_ = file_encryption; + return this; + } + Builder* enable_statistics() { default_column_properties_.set_statistics_enabled(true); return this; @@ -323,10 +639,10 @@ class PARQUET_EXPORT WriterProperties { for (const auto& item : statistics_enabled_) get(item.first).set_statistics_enabled(item.second); - return std::shared_ptr( - new WriterProperties(pool_, dictionary_pagesize_limit_, write_batch_size_, - max_row_group_length_, pagesize_, version_, created_by_, - default_column_properties_, column_properties)); + return std::shared_ptr(new WriterProperties( + pool_, dictionary_pagesize_limit_, write_batch_size_, max_row_group_length_, + pagesize_, version_, created_by_, std::move(file_encryption_), + default_column_properties_, column_properties)); } private: @@ -337,6 +653,7 @@ class PARQUET_EXPORT WriterProperties { int64_t pagesize_; ParquetVersion::type version_; std::string created_by_; + std::shared_ptr file_encryption_; // Settings used for each column unless overridden in any of the maps below ColumnProperties default_column_properties_; @@ -360,6 +677,18 @@ class PARQUET_EXPORT WriterProperties { inline std::string created_by() const { return parquet_created_by_; } + inline FileEncryptionProperties* file_encryption() const { + return parquet_file_encryption_.get(); + } + + inline std::shared_ptr footer_encryption() const { + if (parquet_file_encryption_ == nullptr) { + return nullptr; + } else { + return parquet_file_encryption_->GetFooterEncryptionProperties(); + } + } + inline Encoding::type dictionary_index_encoding() const { if (parquet_version_ == ParquetVersion::PARQUET_1_0) { return Encoding::PLAIN_DICTIONARY; @@ -403,11 +732,30 @@ class PARQUET_EXPORT WriterProperties { return column_properties(path).max_statistics_size(); } + std::shared_ptr column_encryption_props( + const std::shared_ptr& path) const { + if (parquet_file_encryption_) { + return parquet_file_encryption_->GetColumnCryptoMetaData(path); + } else { + return nullptr; + } + } + + std::shared_ptr encryption( + const std::shared_ptr& path) const { + if (parquet_file_encryption_) { + return parquet_file_encryption_->GetColumnEncryptionProperties(path); + } else { + return nullptr; + } + } + private: explicit WriterProperties( ::arrow::MemoryPool* pool, int64_t dictionary_pagesize_limit, int64_t write_batch_size, int64_t max_row_group_length, int64_t pagesize, ParquetVersion::type version, const std::string& created_by, + std::shared_ptr file_encryption, const ColumnProperties& default_column_properties, const std::unordered_map& column_properties) : pool_(pool), @@ -417,6 +765,7 @@ class PARQUET_EXPORT WriterProperties { pagesize_(pagesize), parquet_version_(version), parquet_created_by_(created_by), + parquet_file_encryption_(file_encryption), default_column_properties_(default_column_properties), column_properties_(column_properties) {} @@ -427,6 +776,7 @@ class PARQUET_EXPORT WriterProperties { int64_t pagesize_; ParquetVersion::type parquet_version_; std::string parquet_created_by_; + std::shared_ptr parquet_file_encryption_; ColumnProperties default_column_properties_; std::unordered_map column_properties_; }; @@ -435,4 +785,4 @@ std::shared_ptr PARQUET_EXPORT default_writer_properties(); } // namespace parquet -#endif // PARQUET_COLUMN_PROPERTIES_H +#endif // PARQUET_COLUMN_PROPERTIES_H \ No newline at end of file diff --git a/src/parquet/thrift.h b/src/parquet/thrift.h index ec7ac906..d759836b 100644 --- a/src/parquet/thrift.h +++ b/src/parquet/thrift.h @@ -40,6 +40,8 @@ #include "parquet/exception.h" #include "parquet/parquet_types.h" +#include "parquet/types.h" +#include "parquet/util/crypto.h" #include "parquet/util/logging.h" #include "parquet/util/memory.h" @@ -77,6 +79,16 @@ static inline Compression::type FromThrift(format::CompressionCodec::type type) return static_cast(type); } +static inline EncryptionAlgorithm FromThrift(format::EncryptionAlgorithm encryption) { + if (encryption.__isset.AES_GCM_V1) { + return EncryptionAlgorithm{Encryption::AES_GCM_V1, + encryption.AES_GCM_V1.aad_metadata}; + } else { + return EncryptionAlgorithm{Encryption::AES_GCM_CTR_V1, + encryption.AES_GCM_CTR_V1.aad_metadata}; + } +} + static inline format::Type::type ToThrift(Type::type type) { return static_cast(type); } @@ -99,6 +111,20 @@ static inline format::CompressionCodec::type ToThrift(Compression::type type) { return static_cast(type); } +static inline format::EncryptionAlgorithm ToThrift(EncryptionAlgorithm encryption) { + format::EncryptionAlgorithm encryption_algorithm; + if (encryption.algorithm == Encryption::AES_GCM_V1) { + encryption_algorithm.__isset.AES_GCM_V1 = true; + encryption_algorithm.AES_GCM_V1 = format::AesGcmV1(); + encryption_algorithm.AES_GCM_V1.aad_metadata = encryption.aad_metadata; + } else { + encryption_algorithm.__isset.AES_GCM_CTR_V1 = true; + encryption_algorithm.AES_GCM_CTR_V1 = format::AesGcmCtrV1(); + encryption_algorithm.AES_GCM_CTR_V1.aad_metadata = encryption.aad_metadata; + } + return encryption_algorithm; +} + // ---------------------------------------------------------------------- // Thrift struct serialization / deserialization utilities @@ -106,31 +132,57 @@ static inline format::CompressionCodec::type ToThrift(Compression::type type) { // all the bytes needed to store the thrift message. On return, len will be // set to the actual length of the header. template -inline void DeserializeThriftMsg(const uint8_t* buf, uint32_t* len, T* deserialized_msg) { - // Deserialize msg bytes into c++ thrift msg using memory transport. - shared_ptr tmem_transport( - new apache::thrift::transport::TMemoryBuffer(const_cast(buf), *len)); - apache::thrift::protocol::TCompactProtocolFactoryT< - apache::thrift::transport::TMemoryBuffer> - tproto_factory; - shared_ptr tproto = - tproto_factory.getProtocol(tmem_transport); - try { - deserialized_msg->read(tproto.get()); - } catch (std::exception& e) { - std::stringstream ss; - ss << "Couldn't deserialize thrift: " << e.what() << "\n"; - throw ParquetException(ss.str()); +inline void DeserializeThriftMsg(const uint8_t* buf, uint32_t* len, T* deserialized_msg, + const EncryptionProperties* encryption = nullptr) { + if (encryption == nullptr) { + shared_ptr tmem_transport( + new apache::thrift::transport::TMemoryBuffer(const_cast(buf), *len)); + apache::thrift::protocol::TCompactProtocolFactoryT< + apache::thrift::transport::TMemoryBuffer> + tproto_factory; + shared_ptr tproto = + tproto_factory.getProtocol(tmem_transport); + try { + deserialized_msg->read(tproto.get()); + } catch (std::exception& e) { + std::stringstream ss; + ss << "Couldn't deserialize thrift: " << e.what() << "\n"; + throw ParquetException(ss.str()); + } + uint32_t bytes_left = tmem_transport->available_read(); + *len = *len - bytes_left; + } else { + // first 4 bytes for length + uint8_t clenBytes[4]; + memcpy(clenBytes, buf, 4); + + uint32_t clen = *(reinterpret_cast(clenBytes)); + + // decrypt + std::vector decrypted_buffer(encryption->CalculatePlainSize(clen)); + + uint32_t decrypted_buffer_len = parquet_encryption::Decrypt( + encryption->algorithm(), true, &buf[4], clen, encryption->key_bytes(), + encryption->key_length(), encryption->aad_bytes(), encryption->aad_length(), + decrypted_buffer.data()); + + if (decrypted_buffer_len <= 0) { + throw ParquetException("Couldn't decrypt buffer\n"); + } + + DeserializeThriftMsg(decrypted_buffer.data(), &decrypted_buffer_len, + deserialized_msg); + + *len = 4 + clen; } - uint32_t bytes_left = tmem_transport->available_read(); - *len = *len - bytes_left; } // Serialize obj into a buffer. The result is returned as a string. // The arguments are the object to be serialized and // the expected size of the serialized object template -inline int64_t SerializeThriftMsg(T* obj, uint32_t len, OutputStream* out) { +inline int64_t SerializeThriftMsg(T* obj, uint32_t len, OutputStream* out, + const EncryptionProperties* encryption = nullptr) { shared_ptr mem_buffer( new apache::thrift::transport::TMemoryBuffer(len)); apache::thrift::protocol::TCompactProtocolFactoryT< @@ -150,8 +202,22 @@ inline int64_t SerializeThriftMsg(T* obj, uint32_t len, OutputStream* out) { uint8_t* out_buffer; uint32_t out_length; mem_buffer->getBuffer(&out_buffer, &out_length); - out->Write(out_buffer, out_length); - return out_length; + if (encryption == nullptr) { + out->Write(out_buffer, out_length); + + return out_length; + } else { + std::vector cipher_buffer(encryption->CalculateCipherSize(len)); + int cipher_buffer_len = parquet_encryption::Encrypt( + encryption->algorithm(), true, out_buffer, out_length, encryption->key_bytes(), + encryption->key_length(), encryption->aad_bytes(), encryption->aad_length(), + cipher_buffer.data()); + + out->Write(reinterpret_cast(&cipher_buffer_len), 4); + out->Write(cipher_buffer.data(), cipher_buffer_len); + + return cipher_buffer_len + 4; + } } } // namespace parquet diff --git a/src/parquet/types.h b/src/parquet/types.h index 10789cbf..36c01de3 100644 --- a/src/parquet/types.h +++ b/src/parquet/types.h @@ -117,6 +117,64 @@ struct Encryption { enum type { AES_GCM_V1 = 0, AES_GCM_CTR_V1 = 1 }; }; +struct EncryptionAlgorithm { + Encryption::type algorithm; + std::string aad_metadata; +}; + +class PARQUET_EXPORT EncryptionProperties { + private: + static inline uint8_t* str2bytes(const std::string& str) { + if (str.empty()) return nullptr; + + char* cbytes = const_cast(str.c_str()); + return reinterpret_cast(cbytes); + } + + public: + EncryptionProperties() = default; + EncryptionProperties(Encryption::type algorithm, const std::string& key, + const std::string& aad = "") + : algorithm_(algorithm), key_(key), aad_(aad) {} + + ~EncryptionProperties() { key_.replace(0, key_.length(), key_.length(), '\0'); } + + int key_length() const { return static_cast(key_.length()); } + uint8_t* key_bytes() const { return str2bytes(key_); } + + void aad(const std::string& aad) { aad_ = aad; } + int aad_length() const { return static_cast(aad_.length()); } + uint8_t* aad_bytes() const { return str2bytes(aad_); } + + Encryption::type algorithm() const { return algorithm_; } + + const std::string& key() const { return key_; } + const std::string& aad() const { return aad_; } + + uint32_t CalculateCipherSize(uint32_t plain_len) const { + if (algorithm_ == Encryption::AES_GCM_V1) { + return plain_len + 28; + } else if (algorithm_ == Encryption::AES_GCM_CTR_V1) { + return plain_len + 16; + } + return plain_len; + } + + uint32_t CalculatePlainSize(uint32_t cipher_len) const { + if (algorithm_ == Encryption::AES_GCM_V1) { + return cipher_len - 28; + } else if (algorithm_ == Encryption::AES_GCM_CTR_V1) { + return cipher_len - 16; + } + return cipher_len; + } + + private: + Encryption::type algorithm_; // encryption algorithm + std::string key_; // encryption key, should have 16, 24, 32-byte length + std::string aad_; // encryption additional authenticated data +}; + // parquet::PageType struct PageType { enum type { DATA_PAGE, INDEX_PAGE, DICTIONARY_PAGE, DATA_PAGE_V2 };