Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 53 additions & 68 deletions cpp/src/parquet/file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ using arrow::internal::AddWithOverflow;

namespace parquet {

using ::arrow::Future;
using ::arrow::Result;
using ::arrow::Status;

namespace {
bool IsColumnChunkFullyDictionaryEncoded(const ColumnChunkMetaData& col) {
// Check the encoding_stats to see if all data pages are dictionary encoded.
Expand Down Expand Up @@ -398,7 +402,7 @@ class SerializedFile : public ParquetFileReader::Contents {
PARQUET_THROW_NOT_OK(cached_source_->Cache(ranges));
}

::arrow::Result<std::vector<::arrow::io::ReadRange>> GetReadRanges(
Result<std::vector<::arrow::io::ReadRange>> GetReadRanges(
const std::vector<int>& row_groups, const std::vector<int>& column_indices,
int64_t hole_size_limit, int64_t range_size_limit) {
std::vector<::arrow::io::ReadRange> ranges;
Expand All @@ -413,10 +417,10 @@ class SerializedFile : public ParquetFileReader::Contents {
range_size_limit);
}

::arrow::Future<> WhenBuffered(const std::vector<int>& row_groups,
const std::vector<int>& column_indices) const {
Future<> WhenBuffered(const std::vector<int>& row_groups,
const std::vector<int>& column_indices) const {
if (!cached_source_) {
return ::arrow::Status::Invalid("Must call PreBuffer before WhenBuffered");
return Status::Invalid("Must call PreBuffer before WhenBuffered");
}
std::vector<::arrow::io::ReadRange> ranges;
for (int row : row_groups) {
Expand Down Expand Up @@ -465,23 +469,8 @@ class SerializedFile : public ParquetFileReader::Contents {
// Fall through
}

const uint32_t read_metadata_len = ParseUnencryptedFileMetadata(
metadata_buffer, metadata_len, std::move(file_decryptor));
auto file_decryption_properties = properties_.file_decryption_properties();
if (is_encrypted_footer) {
// Nothing else to do here.
return;
} else if (!file_metadata_->is_encryption_algorithm_set()) { // Non encrypted file.
if (file_decryption_properties != nullptr) {
if (!file_decryption_properties->plaintext_files_allowed()) {
throw ParquetException("Applying decryption properties on plaintext file");
}
}
} else {
// Encrypted file with plaintext footer mode.
ParseMetaDataOfEncryptedFileWithPlaintextFooter(
file_decryption_properties, metadata_buffer, metadata_len, read_metadata_len);
}
ParseMetaDataFinal(std::move(metadata_buffer), metadata_len, is_encrypted_footer,
std::move(file_decryptor));
}

// Validate the source size and get the initial read size.
Expand Down Expand Up @@ -522,16 +511,15 @@ class SerializedFile : public ParquetFileReader::Contents {
}

// Does not throw.
::arrow::Future<> ParseMetaDataAsync() {
Future<> ParseMetaDataAsync() {
int64_t footer_read_size;
BEGIN_PARQUET_CATCH_EXCEPTIONS
footer_read_size = GetFooterReadSize();
END_PARQUET_CATCH_EXCEPTIONS
// Assumes this is kept alive externally
return source_->ReadAsync(source_size_ - footer_read_size, footer_read_size)
.Then([this,
footer_read_size](const std::shared_ptr<::arrow::Buffer>& footer_buffer)
-> ::arrow::Future<> {
.Then([this, footer_read_size](
const std::shared_ptr<::arrow::Buffer>& footer_buffer) -> Future<> {
uint32_t metadata_len;
BEGIN_PARQUET_CATCH_EXCEPTIONS
metadata_len = ParseFooterLength(footer_buffer, footer_read_size);
Expand All @@ -557,7 +545,7 @@ class SerializedFile : public ParquetFileReader::Contents {
}

// Continuation
::arrow::Future<> ParseMaybeEncryptedMetaDataAsync(
Future<> ParseMaybeEncryptedMetaDataAsync(
std::shared_ptr<::arrow::Buffer> footer_buffer,
std::shared_ptr<::arrow::Buffer> metadata_buffer, int64_t footer_read_size,
uint32_t metadata_len) {
Expand All @@ -580,26 +568,30 @@ class SerializedFile : public ParquetFileReader::Contents {
file_decryptor = std::move(file_decryptor)](
const std::shared_ptr<::arrow::Buffer>& metadata_buffer) {
// Continue and read the file footer
return ParseMetaDataFinal(metadata_buffer, metadata_len, is_encrypted_footer,
file_decryptor);
BEGIN_PARQUET_CATCH_EXCEPTIONS
ParseMetaDataFinal(metadata_buffer, metadata_len, is_encrypted_footer,
file_decryptor);
END_PARQUET_CATCH_EXCEPTIONS
return Status::OK();
});
}
return ParseMetaDataFinal(std::move(metadata_buffer), metadata_len,
is_encrypted_footer, std::move(file_decryptor));
BEGIN_PARQUET_CATCH_EXCEPTIONS
ParseMetaDataFinal(std::move(metadata_buffer), metadata_len, is_encrypted_footer,
std::move(file_decryptor));
END_PARQUET_CATCH_EXCEPTIONS
return Status::OK();
}

// Continuation
::arrow::Status ParseMetaDataFinal(
std::shared_ptr<::arrow::Buffer> metadata_buffer, uint32_t metadata_len,
const bool is_encrypted_footer,
std::shared_ptr<InternalFileDecryptor> file_decryptor) {
BEGIN_PARQUET_CATCH_EXCEPTIONS
void ParseMetaDataFinal(std::shared_ptr<::arrow::Buffer> metadata_buffer,
uint32_t metadata_len, const bool is_encrypted_footer,
std::shared_ptr<InternalFileDecryptor> file_decryptor) {
const uint32_t read_metadata_len = ParseUnencryptedFileMetadata(
metadata_buffer, metadata_len, std::move(file_decryptor));
auto file_decryption_properties = properties_.file_decryption_properties();
if (is_encrypted_footer) {
// Nothing else to do here.
return ::arrow::Status::OK();
return;
} else if (!file_metadata_->is_encryption_algorithm_set()) { // Non encrypted file.
if (file_decryption_properties != nullptr) {
if (!file_decryption_properties->plaintext_files_allowed()) {
Expand All @@ -611,8 +603,6 @@ class SerializedFile : public ParquetFileReader::Contents {
ParseMetaDataOfEncryptedFileWithPlaintextFooter(
file_decryption_properties, metadata_buffer, metadata_len, read_metadata_len);
}
END_PARQUET_CATCH_EXCEPTIONS
return ::arrow::Status::OK();
}

private:
Expand Down Expand Up @@ -704,23 +694,18 @@ void SerializedFile::ParseMetaDataOfEncryptedFileWithPlaintextFooter(
EncryptionAlgorithm algo = file_metadata_->encryption_algorithm();
// Handle AAD prefix
std::string file_aad = HandleAadPrefix(file_decryption_properties, algo);
// Set the InternalFileDecryptor in the metadata as well, as it's used
// for signature verification and for ColumnChunkMetaData creation.
auto file_decryptor = std::make_shared<InternalFileDecryptor>(
file_decryption_properties, file_aad, algo.algorithm,
file_metadata_->footer_signing_key_metadata(), properties_.memory_pool());
// set the InternalFileDecryptor in the metadata as well, as it's used
// for signature verification and for ColumnChunkMetaData creation.
file_metadata_->set_file_decryptor(std::move(file_decryptor));

if (file_decryption_properties->check_plaintext_footer_integrity()) {
if (metadata_len - read_metadata_len !=
(parquet::encryption::kGcmTagLength + parquet::encryption::kNonceLength)) {
throw ParquetInvalidOrCorruptedFileException(
"Failed reading metadata for encryption signature (requested ",
parquet::encryption::kGcmTagLength + parquet::encryption::kNonceLength,
" bytes but have ", metadata_len - read_metadata_len, " bytes)");
}

if (!file_metadata_->VerifySignature(metadata_buffer->data() + read_metadata_len)) {
auto serialized_metadata =
metadata_buffer->span_as<uint8_t>().subspan(0, read_metadata_len);
auto signature = metadata_buffer->span_as<uint8_t>().subspan(read_metadata_len);
if (!file_metadata_->VerifySignature(serialized_metadata, signature)) {
throw ParquetInvalidOrCorruptedFileException(
"Parquet crypto signature verification failed");
}
Expand Down Expand Up @@ -804,7 +789,7 @@ std::unique_ptr<ParquetFileReader::Contents> ParquetFileReader::Contents::Open(
return result;
}

::arrow::Future<std::unique_ptr<ParquetFileReader::Contents>>
Future<std::unique_ptr<ParquetFileReader::Contents>>
ParquetFileReader::Contents::OpenAsync(std::shared_ptr<ArrowInputFile> source,
const ReaderProperties& props,
std::shared_ptr<FileMetaData> metadata) {
Expand All @@ -815,7 +800,7 @@ ParquetFileReader::Contents::OpenAsync(std::shared_ptr<ArrowInputFile> source,
if (metadata == nullptr) {
// TODO(ARROW-12259): workaround since we have Future<(move-only type)>
struct {
::arrow::Result<std::unique_ptr<ParquetFileReader::Contents>> operator()() {
Result<std::unique_ptr<ParquetFileReader::Contents>> operator()() {
return std::move(result);
}

Expand All @@ -825,7 +810,7 @@ ParquetFileReader::Contents::OpenAsync(std::shared_ptr<ArrowInputFile> source,
return file->ParseMetaDataAsync().Then(std::move(Continuation));
} else {
file->set_metadata(std::move(metadata));
return ::arrow::Future<std::unique_ptr<ParquetFileReader::Contents>>::MakeFinished(
return Future<std::unique_ptr<ParquetFileReader::Contents>>::MakeFinished(
std::move(result));
}
END_PARQUET_CATCH_EXCEPTIONS
Expand Down Expand Up @@ -855,24 +840,24 @@ std::unique_ptr<ParquetFileReader> ParquetFileReader::OpenFile(
return Open(std::move(source), props, std::move(metadata));
}

::arrow::Future<std::unique_ptr<ParquetFileReader>> ParquetFileReader::OpenAsync(
Future<std::unique_ptr<ParquetFileReader>> ParquetFileReader::OpenAsync(
std::shared_ptr<::arrow::io::RandomAccessFile> source, const ReaderProperties& props,
std::shared_ptr<FileMetaData> metadata) {
BEGIN_PARQUET_CATCH_EXCEPTIONS
auto fut = SerializedFile::OpenAsync(std::move(source), props, std::move(metadata));
// TODO(ARROW-12259): workaround since we have Future<(move-only type)>
auto completed = ::arrow::Future<std::unique_ptr<ParquetFileReader>>::Make();
fut.AddCallback([fut, completed](
const ::arrow::Result<std::unique_ptr<ParquetFileReader::Contents>>&
contents) mutable {
if (!contents.ok()) {
completed.MarkFinished(contents.status());
return;
}
std::unique_ptr<ParquetFileReader> result = std::make_unique<ParquetFileReader>();
result->Open(fut.MoveResult().MoveValueUnsafe());
completed.MarkFinished(std::move(result));
});
auto completed = Future<std::unique_ptr<ParquetFileReader>>::Make();
fut.AddCallback(
[fut, completed](
const Result<std::unique_ptr<ParquetFileReader::Contents>>& contents) mutable {
if (!contents.ok()) {
completed.MarkFinished(contents.status());
return;
}
std::unique_ptr<ParquetFileReader> result = std::make_unique<ParquetFileReader>();
result->Open(fut.MoveResult().MoveValueUnsafe());
completed.MarkFinished(std::move(result));
});
return completed;
END_PARQUET_CATCH_EXCEPTIONS
}
Expand Down Expand Up @@ -919,7 +904,7 @@ void ParquetFileReader::PreBuffer(const std::vector<int>& row_groups,
file->PreBuffer(row_groups, column_indices, ctx, options);
}

::arrow::Result<std::vector<::arrow::io::ReadRange>> ParquetFileReader::GetReadRanges(
Result<std::vector<::arrow::io::ReadRange>> ParquetFileReader::GetReadRanges(
const std::vector<int>& row_groups, const std::vector<int>& column_indices,
int64_t hole_size_limit, int64_t range_size_limit) {
// Access private methods here
Expand All @@ -929,8 +914,8 @@ ::arrow::Result<std::vector<::arrow::io::ReadRange>> ParquetFileReader::GetReadR
range_size_limit);
}

::arrow::Future<> ParquetFileReader::WhenBuffered(
const std::vector<int>& row_groups, const std::vector<int>& column_indices) const {
Future<> ParquetFileReader::WhenBuffered(const std::vector<int>& row_groups,
const std::vector<int>& column_indices) const {
// Access private methods here
SerializedFile* file =
::arrow::internal::checked_cast<SerializedFile*>(contents_.get());
Expand Down
42 changes: 42 additions & 0 deletions cpp/src/parquet/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,43 @@ class FileMetaData::FileMetaDataImpl {
tag, encryption::kGcmTagLength);
}

bool VerifySignature(std::span<const uint8_t> serialized_metadata,
std::span<const uint8_t> signature) {
Comment on lines +837 to +838
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I implemented this as a method of FileMetaData but the only member it uses is the FileDecryptor, so perhaps this should be moved elsewhere (or made static?).

// Verify decryption properties are set
if (file_decryptor_ == nullptr) {
throw ParquetException("Decryption not set properly. cannot verify signature");
}

if (signature.size() != encryption::kGcmTagLength + encryption::kNonceLength) {
throw ParquetInvalidOrCorruptedFileException(
"Invalid footer encryption signature (expected ",
encryption::kGcmTagLength + encryption::kNonceLength, " bytes, got ",
signature.size(), ")");
}

// Encrypt plaintext serialized metadata so as to compute its signature
auto nonce = signature.subspan(0, encryption::kNonceLength);
auto tag = signature.subspan(encryption::kNonceLength);
const SecureString& key = file_decryptor_->GetFooterKey();
const std::string& aad = encryption::CreateFooterAad(file_decryptor_->file_aad());

auto aes_encryptor = encryption::AesEncryptor::Make(file_decryptor_->algorithm(),
static_cast<int>(key.size()),
true, false /*write_length*/);

std::shared_ptr<Buffer> encrypted_buffer =
AllocateBuffer(file_decryptor_->pool(),
aes_encryptor->CiphertextLength(serialized_metadata.size()));
int32_t encrypted_len = aes_encryptor->SignedFooterEncrypt(
serialized_metadata, key.as_span(), str2span(aad), nonce,
encrypted_buffer->mutable_span_as<uint8_t>());
DCHECK_EQ(encrypted_len, encrypted_buffer->size());
// Check computed signature against expected
return 0 ==
memcmp(encrypted_buffer->data() + encrypted_len - encryption::kGcmTagLength,
tag.data(), encryption::kGcmTagLength);
}

inline uint32_t size() const { return metadata_len_; }
inline int num_columns() const { return schema_.num_columns(); }
inline int64_t num_rows() const { return metadata_->num_rows; }
Expand Down Expand Up @@ -1083,6 +1120,11 @@ bool FileMetaData::VerifySignature(const void* signature) {
return impl_->VerifySignature(signature);
}

bool FileMetaData::VerifySignature(std::span<const uint8_t> serialized_metadata,
std::span<const uint8_t> signature) {
return impl_->VerifySignature(serialized_metadata, signature);
}

uint32_t FileMetaData::size() const { return impl_->size(); }

int FileMetaData::num_columns() const { return impl_->num_columns(); }
Expand Down
8 changes: 6 additions & 2 deletions cpp/src/parquet/metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <map>
#include <memory>
#include <optional>
#include <span>
#include <string>
#include <vector>

Expand Down Expand Up @@ -322,9 +323,12 @@ class PARQUET_EXPORT FileMetaData {
EncryptionAlgorithm encryption_algorithm() const;
const std::string& footer_signing_key_metadata() const;

/// \brief Verify signature of FileMetaData when file is encrypted but footer
/// is not encrypted (plaintext footer).
PARQUET_DEPRECATED("Deprecated in 24.0.0. Use the two-argument overload instead.")
bool VerifySignature(const void* signature);
Comment on lines +326 to 327
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opted to deprecate this but we might also remove it, as it seems this API is meant for internal use? @adamreeve @EnricoMi

/// \brief Verify footer signature when file is encrypted but footer is not
/// encrypted (plaintext footer).
bool VerifySignature(std::span<const uint8_t> serialized_metadata,
std::span<const uint8_t> signature);

void WriteTo(::arrow::io::OutputStream* dst,
const std::shared_ptr<Encryptor>& encryptor = NULLPTR) const;
Expand Down
Loading