Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 143 additions & 7 deletions cpp/src/arrow/filesystem/azurefs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "arrow/filesystem/util_internal.h"
#include "arrow/result.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/formatting.h"
#include "arrow/util/future.h"
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
Expand Down Expand Up @@ -150,13 +151,148 @@ Status ErrorToStatus(const std::string& prefix,
return Status::IOError(prefix, " Azure Error: ", exception.what());
}

template <typename ObjectResult>
std::shared_ptr<const KeyValueMetadata> GetObjectMetadata(const ObjectResult& result) {
auto md = std::make_shared<KeyValueMetadata>();
for (auto prop : result) {
md->Append(prop.first, prop.second);
template <typename ArrowType>
std::string FormatValue(typename TypeTraits<ArrowType>::CType value) {
struct StringAppender {
std::string string;
Status operator()(std::string_view view) {
string.append(view.data(), view.size());
return Status::OK();
}
} appender;
arrow::internal::StringFormatter<ArrowType> formatter;
ARROW_UNUSED(formatter(value, appender));
return appender.string;
}

std::shared_ptr<const KeyValueMetadata> PropertiesToMetadata(
const Azure::Storage::Blobs::Models::BlobProperties& properties) {
auto metadata = std::make_shared<KeyValueMetadata>();
// Not supported yet:
// * properties.ObjectReplicationSourceProperties
// * properties.Metadata
//
// They may have the same key defined in the following
// metadata->Append() list. If we have duplicated key in metadata,
// the first value may be only used by users because
// KeyValueMetadata::Get() returns the first found value. Note that
// users can use all values by using KeyValueMetadata::keys() and
// KeyValueMetadata::values().
if (properties.ImmutabilityPolicy.HasValue()) {
metadata->Append("Immutability-Policy-Expires-On",
properties.ImmutabilityPolicy.Value().ExpiresOn.ToString());
metadata->Append("Immutability-Policy-Mode",
properties.ImmutabilityPolicy.Value().PolicyMode.ToString());
}
metadata->Append("Content-Type", properties.HttpHeaders.ContentType);
metadata->Append("Content-Encoding", properties.HttpHeaders.ContentEncoding);
metadata->Append("Content-Language", properties.HttpHeaders.ContentLanguage);
const auto& content_hash = properties.HttpHeaders.ContentHash.Value;
metadata->Append("Content-Hash", HexEncode(content_hash.data(), content_hash.size()));
metadata->Append("Content-Disposition", properties.HttpHeaders.ContentDisposition);
metadata->Append("Cache-Control", properties.HttpHeaders.CacheControl);
metadata->Append("Last-Modified", properties.LastModified.ToString());
metadata->Append("Created-On", properties.CreatedOn.ToString());
if (properties.ObjectReplicationDestinationPolicyId.HasValue()) {
metadata->Append("Object-Replication-Destination-Policy-Id",
properties.ObjectReplicationDestinationPolicyId.Value());
}
metadata->Append("Blob-Type", properties.BlobType.ToString());
if (properties.CopyCompletedOn.HasValue()) {
metadata->Append("Copy-Completed-On", properties.CopyCompletedOn.Value().ToString());
}
if (properties.CopyStatusDescription.HasValue()) {
metadata->Append("Copy-Status-Description", properties.CopyStatusDescription.Value());
}
if (properties.CopyId.HasValue()) {
metadata->Append("Copy-Id", properties.CopyId.Value());
}
if (properties.CopyProgress.HasValue()) {
metadata->Append("Copy-Progress", properties.CopyProgress.Value());
}
if (properties.CopySource.HasValue()) {
metadata->Append("Copy-Source", properties.CopySource.Value());
}
if (properties.CopyStatus.HasValue()) {
metadata->Append("Copy-Status", properties.CopyStatus.Value().ToString());
}
if (properties.IsIncrementalCopy.HasValue()) {
metadata->Append("Is-Incremental-Copy",
FormatValue<BooleanType>(properties.IsIncrementalCopy.Value()));
}
if (properties.IncrementalCopyDestinationSnapshot.HasValue()) {
metadata->Append("Incremental-Copy-Destination-Snapshot",
properties.IncrementalCopyDestinationSnapshot.Value());
}
if (properties.LeaseDuration.HasValue()) {
metadata->Append("Lease-Duration", properties.LeaseDuration.Value().ToString());
}
if (properties.LeaseState.HasValue()) {
metadata->Append("Lease-State", properties.LeaseState.Value().ToString());
}
if (properties.LeaseStatus.HasValue()) {
metadata->Append("Lease-Status", properties.LeaseStatus.Value().ToString());
}
metadata->Append("Content-Length", FormatValue<Int64Type>(properties.BlobSize));
if (properties.ETag.HasValue()) {
metadata->Append("ETag", properties.ETag.ToString());
}
if (properties.SequenceNumber.HasValue()) {
metadata->Append("Sequence-Number",
FormatValue<Int64Type>(properties.SequenceNumber.Value()));
}
if (properties.CommittedBlockCount.HasValue()) {
metadata->Append("Committed-Block-Count",
FormatValue<Int32Type>(properties.CommittedBlockCount.Value()));
}
metadata->Append("IsServerEncrypted",
FormatValue<BooleanType>(properties.IsServerEncrypted));
if (properties.EncryptionKeySha256.HasValue()) {
const auto& sha256 = properties.EncryptionKeySha256.Value();
metadata->Append("Encryption-Key-Sha-256", HexEncode(sha256.data(), sha256.size()));
}
if (properties.EncryptionScope.HasValue()) {
metadata->Append("Encryption-Scope", properties.EncryptionScope.Value());
}
if (properties.AccessTier.HasValue()) {
metadata->Append("Access-Tier", properties.AccessTier.Value().ToString());
}
if (properties.IsAccessTierInferred.HasValue()) {
metadata->Append("Is-Access-Tier-Inferred",
FormatValue<BooleanType>(properties.IsAccessTierInferred.Value()));
}
if (properties.ArchiveStatus.HasValue()) {
metadata->Append("Archive-Status", properties.ArchiveStatus.Value().ToString());
}
if (properties.AccessTierChangedOn.HasValue()) {
metadata->Append("Access-Tier-Changed-On",
properties.AccessTierChangedOn.Value().ToString());
}
if (properties.VersionId.HasValue()) {
metadata->Append("Version-Id", properties.VersionId.Value());
}
if (properties.IsCurrentVersion.HasValue()) {
metadata->Append("Is-Current-Version",
FormatValue<BooleanType>(properties.IsCurrentVersion.Value()));
}
if (properties.TagCount.HasValue()) {
metadata->Append("Tag-Count", FormatValue<Int32Type>(properties.TagCount.Value()));
}
if (properties.ExpiresOn.HasValue()) {
metadata->Append("Expires-On", properties.ExpiresOn.Value().ToString());
}
if (properties.IsSealed.HasValue()) {
metadata->Append("Is-Sealed", FormatValue<BooleanType>(properties.IsSealed.Value()));
}
if (properties.RehydratePriority.HasValue()) {
metadata->Append("Rehydrate-Priority",
properties.RehydratePriority.Value().ToString());
}
if (properties.LastAccessedOn.HasValue()) {
metadata->Append("Last-Accessed-On", properties.LastAccessedOn.Value().ToString());
}
return md;
metadata->Append("Has-Legal-Hold", FormatValue<BooleanType>(properties.HasLegalHold));
return metadata;
}

class ObjectInputFile final : public io::RandomAccessFile {
Expand All @@ -176,7 +312,7 @@ class ObjectInputFile final : public io::RandomAccessFile {
try {
auto properties = blob_client_->GetProperties();
content_length_ = properties.Value.BlobSize;
metadata_ = GetObjectMetadata(properties.Value.Metadata);
metadata_ = PropertiesToMetadata(properties.Value);
return Status::OK();
} catch (const Azure::Storage::StorageException& exception) {
if (exception.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound) {
Expand Down
67 changes: 58 additions & 9 deletions cpp/src/arrow/filesystem/azurefs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
#include "arrow/testing/util.h"
#include "arrow/util/io_util.h"
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/string.h"
#include "arrow/util/value_parsing.h"

namespace arrow {
using internal::TemporaryDir;
Expand Down Expand Up @@ -296,21 +298,68 @@ TEST_F(TestAzureFileSystem, OpenInputStreamTrailingSlash) {
ASSERT_RAISES(IOError, fs_->OpenInputStream(PreexistingObjectPath() + '/'));
}

TEST_F(TestAzureFileSystem, OpenInputStreamReadMetadata) {
const std::string object_name = "OpenInputStreamMetadataTest/simple.txt";

service_client_->GetBlobContainerClient(PreexistingContainerName())
.GetBlobClient(PreexistingObjectName())
.SetMetadata(Azure::Storage::Metadata{{"key0", "value0"}});
namespace {
std::shared_ptr<const KeyValueMetadata> NormalizerKeyValueMetadata(
std::shared_ptr<const KeyValueMetadata> metadata) {
auto normalized = std::make_shared<KeyValueMetadata>();
for (int64_t i = 0; i < metadata->size(); ++i) {
auto key = metadata->key(i);
auto value = metadata->value(i);
if (key == "Content-Hash") {
std::vector<uint8_t> output;
output.reserve(value.size() / 2);
if (ParseHexValues(value, output.data()).ok()) {
// Valid value
value = std::string(value.size(), 'F');
}
} else if (key == "Last-Modified" || key == "Created-On" ||
key == "Access-Tier-Changed-On") {
auto parser = TimestampParser::MakeISO8601();
int64_t output;
if ((*parser)(value.data(), value.size(), TimeUnit::NANO, &output)) {
// Valid value
value = "2023-10-31T08:15:20Z";
}
} else if (key == "ETag") {
if (internal::StartsWith(value, "\"") && internal::EndsWith(value, "\"")) {
// Valid value
value = "\"ETagValue\"";
}
}
normalized->Append(key, value);
}
return normalized;
}
}; // namespace

TEST_F(TestAzureFileSystem, OpenInputStreamReadMetadata) {
std::shared_ptr<io::InputStream> stream;
ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(PreexistingObjectPath()));

std::shared_ptr<const KeyValueMetadata> actual;
ASSERT_OK_AND_ASSIGN(actual, stream->ReadMetadata());
// TODO(GH-38330): This is asserting that the user defined metadata is returned but this
// is probably not the correct behaviour.
ASSERT_OK_AND_EQ("value0", actual->Get("key0"));
ASSERT_EQ(
"\n"
"-- metadata --\n"
"Content-Type: application/octet-stream\n"
"Content-Encoding: \n"
"Content-Language: \n"
"Content-Hash: FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF\n"
"Content-Disposition: \n"
"Cache-Control: \n"
"Last-Modified: 2023-10-31T08:15:20Z\n"
"Created-On: 2023-10-31T08:15:20Z\n"
"Blob-Type: BlockBlob\n"
"Lease-State: available\n"
"Lease-Status: unlocked\n"
"Content-Length: 447\n"
"ETag: \"ETagValue\"\n"
"IsServerEncrypted: true\n"
"Access-Tier: Hot\n"
"Is-Access-Tier-Inferred: true\n"
"Access-Tier-Changed-On: 2023-10-31T08:15:20Z\n"
"Has-Legal-Hold: false",
NormalizerKeyValueMetadata(actual)->ToString());
}

TEST_F(TestAzureFileSystem, OpenInputStreamClosed) {
Expand Down