Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 210 additions & 2 deletions cpp/src/arrow/filesystem/azurefs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ namespace fs {
// -----------------------------------------------------------------------
// AzureOptions Implementation

AzureOptions::AzureOptions() {}
AzureOptions::AzureOptions() = default;

bool AzureOptions::Equals(const AzureOptions& other) const {
return (account_dfs_url == other.account_dfs_url &&
Expand Down Expand Up @@ -820,6 +820,209 @@ class AzureFileSystem::Impl {
}
}

private:
template <typename OnContainer>
Status VisitContainers(const Azure::Core::Context& context,
OnContainer&& on_container) const {
Azure::Storage::Blobs::ListBlobContainersOptions options;
try {
auto container_list_response =
blob_service_client_->ListBlobContainers(options, context);
for (; container_list_response.HasPage();
container_list_response.MoveToNextPage(context)) {
for (const auto& container : container_list_response.BlobContainers) {
RETURN_NOT_OK(on_container(container));
}
}
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus("Failed to list account containers.", exception);
}
return Status::OK();
}

static FileInfo FileInfoFromBlob(const std::string& container,
const Azure::Storage::Blobs::Models::BlobItem& blob) {
auto path = internal::ConcatAbstractPath(container, blob.Name);
if (internal::HasTrailingSlash(blob.Name)) {
return DirectoryFileInfoFromPath(path);
}
FileInfo info{std::move(path), FileType::File};
info.set_size(blob.BlobSize);
info.set_mtime(std::chrono::system_clock::time_point{blob.Details.LastModified});
return info;
}

static FileInfo DirectoryFileInfoFromPath(const std::string& path) {
return FileInfo{std::string{internal::RemoveTrailingSlash(path)},
FileType::Directory};
}

static std::string_view BasenameView(std::string_view s) {
DCHECK(!internal::HasTrailingSlash(s));
auto offset = s.find_last_of(internal::kSep);
auto result = (offset == std::string_view::npos) ? s : s.substr(offset);
DCHECK(!result.empty() && result.back() != internal::kSep);
return result;
}

/// \brief List the blobs at the root of a container or some dir in a container.
///
/// \pre container_client is the client for the container named like the first
/// segment of select.base_dir.
Status GetFileInfoWithSelectorFromContainer(
const Azure::Storage::Blobs::BlobContainerClient& container_client,
const Azure::Core::Context& context, Azure::Nullable<int32_t> page_size_hint,
const FileSelector& select, FileInfoVector* acc_results) {
ARROW_ASSIGN_OR_RAISE(auto base_location, AzureLocation::FromString(select.base_dir));

bool found = false;
Azure::Storage::Blobs::ListBlobsOptions options;
if (internal::IsEmptyPath(base_location.path)) {
// If the base_dir is the root of the container, then we want to list all blobs in
// the container and the Prefix should be empty and not even include the trailing
// slash because the container itself represents the `<container>/` directory.
options.Prefix = {};
found = true; // Unless the container itself is not found later!
} else {
options.Prefix = internal::EnsureTrailingSlash(base_location.path);
}
options.PageSizeHint = page_size_hint;
options.Include = Azure::Storage::Blobs::Models::ListBlobsIncludeFlags::Metadata;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General question: was it considered to have a namespace alias at the top of this file such as namespace BlobsModels = Azure::Storage::Blobs::Models;?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something I prefer to do once all PRs land because it would be a noisy change.


auto recurse = [&](const std::string& blob_prefix) noexcept -> Status {
if (select.recursive && select.max_recursion > 0) {
FileSelector sub_select;
sub_select.base_dir = internal::ConcatAbstractPath(
base_location.container, internal::RemoveTrailingSlash(blob_prefix));
sub_select.allow_not_found = true;
sub_select.recursive = true;
sub_select.max_recursion = select.max_recursion - 1;
return GetFileInfoWithSelectorFromContainer(
container_client, context, page_size_hint, sub_select, acc_results);
}
return Status::OK();
};

auto process_blob =
[&](const Azure::Storage::Blobs::Models::BlobItem& blob) noexcept {
// blob.Name has trailing slash only when Prefix is an empty
// directory marker blob for the directory we're listing
// from, and we should skip it.
if (!internal::HasTrailingSlash(blob.Name)) {
acc_results->push_back(FileInfoFromBlob(base_location.container, blob));
}
};
auto process_prefix = [&](const std::string& prefix) noexcept -> Status {
const auto path = internal::ConcatAbstractPath(base_location.container, prefix);
acc_results->push_back(DirectoryFileInfoFromPath(path));
return recurse(prefix);
};

try {
auto list_response =
container_client.ListBlobsByHierarchy(/*delimiter=*/"/", options, context);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use ListBlobs() instead when select.max_recursion == INT32_MAX as an optimization?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would require that we build a trie of all the blob names to list them hierarchically. I'm open to doing it, but I think we can delay doing it until there is clear indication that it would help a concrete use-case.

for (; list_response.HasPage(); list_response.MoveToNextPage(context)) {
if (list_response.Blobs.empty() && list_response.BlobPrefixes.empty()) {
continue;
}
found = true;
// Blob and BlobPrefixes are sorted by name, so we can merge-iterate
// them to ensure returned results are all sorted.
size_t blob_index = 0;
size_t blob_prefix_index = 0;
while (blob_index < list_response.Blobs.size() &&
blob_prefix_index < list_response.BlobPrefixes.size()) {
const auto& blob = list_response.Blobs[blob_index];
const auto& prefix = list_response.BlobPrefixes[blob_prefix_index];
const int cmp = blob.Name.compare(prefix);
if (cmp < 0) {
process_blob(blob);
blob_index += 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
blob_index += 1;
++blob_index;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both styles are found in the codebase and the += 1 is the only way to do it in modern languages like Rust and Swift.

i++ is ok in for loops, but ++blob_index by itself doesn't stand out enough alone when in a line by itself.

} else if (cmp > 0) {
RETURN_NOT_OK(process_prefix(prefix));
blob_prefix_index += 1;
} else {
DCHECK_EQ(blob.Name, prefix);
RETURN_NOT_OK(process_prefix(prefix));
blob_index += 1;
blob_prefix_index += 1;
// If the container has an empty dir marker blob and another blob starting
// with this blob name as a prefix, the blob doesn't appear in the listing
// that also contains the prefix, so AFAICT this branch in unreachable. The
// code above is kept just in case, but if this DCHECK(false) is ever reached,
// we should refactor this loop to ensure no duplicate entries are ever
// reported.
DCHECK(false)
<< "Unexpected blob/prefix name collision on the same listing request";
}
}
for (; blob_index < list_response.Blobs.size(); blob_index++) {
process_blob(list_response.Blobs[blob_index]);
}
for (; blob_prefix_index < list_response.BlobPrefixes.size();
blob_prefix_index++) {
RETURN_NOT_OK(process_prefix(list_response.BlobPrefixes[blob_prefix_index]));
}
}
} catch (const Azure::Storage::StorageException& exception) {
if (exception.ErrorCode == "ContainerNotFound") {
found = false;
} else {
return internal::ExceptionToStatus(
"Failed to list blobs in a directory: " + select.base_dir + ": " +
container_client.GetUrl(),
exception);
}
}

return found || select.allow_not_found
? Status::OK()
: ::arrow::fs::internal::PathNotFound(select.base_dir);
}

public:
Status GetFileInfoWithSelector(const Azure::Core::Context& context,
Azure::Nullable<int32_t> page_size_hint,
const FileSelector& select,
FileInfoVector* acc_results) {
ARROW_ASSIGN_OR_RAISE(auto base_location, AzureLocation::FromString(select.base_dir));

if (base_location.container.empty()) {
// Without a container, the base_location is equivalent to the filesystem
// root -- `/`. FileSelector::allow_not_found doesn't matter in this case
// because the root always exists.
auto on_container =
[&](const Azure::Storage::Blobs::Models::BlobContainerItem& container) {
// Deleted containers are not listed by ListContainers.
DCHECK(!container.IsDeleted);

// Every container is considered a directory.
FileInfo info{container.Name, FileType::Directory};
info.set_mtime(
std::chrono::system_clock::time_point{container.Details.LastModified});
acc_results->push_back(std::move(info));

// Recurse into containers (subdirectories) if requested.
if (select.recursive && select.max_recursion > 0) {
FileSelector sub_select;
sub_select.base_dir = container.Name;
sub_select.allow_not_found = true;
sub_select.recursive = true;
sub_select.max_recursion = select.max_recursion - 1;
ARROW_RETURN_NOT_OK(GetFileInfoWithSelector(context, page_size_hint,
sub_select, acc_results));
}
return Status::OK();
};
return VisitContainers(context, std::move(on_container));
}

auto container_client =
blob_service_client_->GetBlobContainerClient(base_location.container);
return GetFileInfoWithSelectorFromContainer(container_client, context, page_size_hint,
select, acc_results);
}

Result<std::shared_ptr<ObjectInputFile>> OpenInputFile(const AzureLocation& location,
AzureFileSystem* fs) {
RETURN_NOT_OK(ValidateFileLocation(location));
Expand Down Expand Up @@ -1196,7 +1399,12 @@ Result<FileInfo> AzureFileSystem::GetFileInfo(const std::string& path) {
}

Result<FileInfoVector> AzureFileSystem::GetFileInfo(const FileSelector& select) {
return Status::NotImplemented("The Azure FileSystem is not fully implemented");
Azure::Core::Context context;
Azure::Nullable<int32_t> page_size_hint; // unspecified
Comment on lines +1402 to +1403
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't object them but why do you want to specify them explicitly?
Our other methods don't specify them explicitly. (They use the default value.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want all the wiring to be in place if we need to specify the page_size_hint which I'm pretty sure we will need to tweak when this is used in practice.

FileInfoVector results;
RETURN_NOT_OK(
impl_->GetFileInfoWithSelector(context, page_size_hint, select, &results));
return {std::move(results)};
}

Status AzureFileSystem::CreateDir(const std::string& path, bool recursive) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/filesystem/azurefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ class ARROW_EXPORT AzureFileSystem : public FileSystem {
const AzureOptions& options, const io::IOContext& = io::default_io_context());

private:
explicit AzureFileSystem(const AzureOptions& options, const io::IOContext& io_context);
AzureFileSystem(const AzureOptions& options, const io::IOContext& io_context);

class Impl;
std::unique_ptr<Impl> impl_;
Expand Down
Loading