Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 137 additions & 69 deletions cpp/src/arrow/filesystem/gcsfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ struct GcsCredentials {
namespace {

namespace gcs = google::cloud::storage;
using GcsCode = google::cloud::StatusCode;

// Change the default upload buffer size. In general, sending larger buffers is more
// efficient with GCS, as each buffer requires a roundtrip to the service. With formatted
Expand Down Expand Up @@ -75,6 +76,13 @@ struct GcsPath {
return path;
}

GcsPath parent() const {
auto object_parent = internal::GetAbstractPathParent(object).first;
if (object_parent.empty()) return GcsPath{bucket, bucket, ""};
return GcsPath{internal::ConcatAbstractPath(bucket, object_parent), bucket,
object_parent};
}

bool empty() const { return bucket.empty() && object.empty(); }

bool operator==(const GcsPath& other) const {
Expand Down Expand Up @@ -310,108 +318,133 @@ class GcsFileSystem::Impl {
Result<FileInfo> GetFileInfo(const GcsPath& path) {
if (path.object.empty()) {
auto meta = client_.GetBucketMetadata(path.bucket);
return GetFileInfoDirectory(path, std::move(meta).status());
return GetFileInfoBucket(path, std::move(meta).status());
}
auto meta = client_.GetObjectMetadata(path.bucket, path.object);
if (path.object.back() == '/') {
return GetFileInfoDirectory(path, std::move(meta).status());
}
return GetFileInfoFile(path, meta);
return GetFileInfoObject(path, meta);
}

Result<FileInfoVector> GetFileInfo(const FileSelector& select) {
ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(select.base_dir));
// Adding the trailing '/' avoids problems with files named 'a', 'ab', 'ac' where GCS
// would return all of them if the prefix is 'a'.
const auto canonical = internal::EnsureTrailingSlash(p.object);
const auto max_depth = internal::Depth(canonical) + select.max_recursion;
auto prefix = p.object.empty() ? gcs::Prefix() : gcs::Prefix(canonical);
auto delimiter = select.recursive ? gcs::Delimiter() : gcs::Delimiter("/");
bool found_directory = false;
FileInfoVector result;
for (auto const& o : client_.ListObjects(p.bucket, prefix, delimiter)) {
if (!o.ok()) {
if (select.allow_not_found &&
o.status().code() == google::cloud::StatusCode::kNotFound) {
continue;
return result;
}
return internal::ToArrowStatus(o.status());
}
found_directory = true;
// Skip the directory itself from the results, and any result that is "too deep"
// into the recursion.
if (o->name() == p.object || internal::Depth(o->name()) > max_depth) {
continue;
}
auto path = internal::ConcatAbstractPath(o->bucket(), o->name());
if (o->name().back() == '/') {
result.push_back(
FileInfo(internal::EnsureTrailingSlash(path), FileType::Directory));
continue;
}
result.push_back(ToFileInfo(path, *o));
}
if (!found_directory && !select.allow_not_found) {
return Status::IOError("No such file or directory '", select.base_dir, "'");
// Finding any elements indicates the directory was found.
if (!result.empty() || select.allow_not_found) {
return result;
}
// To find out if the directory exists we need to perform an additional query.
ARROW_ASSIGN_OR_RAISE(auto directory, GetFileInfo(p));
if (directory.IsDirectory()) return result;
if (directory.IsFile()) {
return Status::IOError("Cannot use file '", select.base_dir, "' as a directory");
}
return result;
return Status::IOError("No such file or directory '", select.base_dir, "'");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the error message accurate? If base_dir is an existing file, do we still get this message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

}

// GCS does not have directories or folders. But folders can be emulated (with some
// limitations) using marker objects. That and listing with prefixes creates the
// illusion of folders.
google::cloud::Status CreateDirMarker(const std::string& bucket,
util::string_view name) {
google::cloud::StatusOr<gcs::ObjectMetadata> CreateDirMarker(const std::string& bucket,
util::string_view name) {
// Make the name canonical.
const auto canonical = internal::EnsureTrailingSlash(name);
return client_
.InsertObject(bucket, canonical, std::string(),
gcs::WithObjectMetadata(gcs::ObjectMetadata().upsert_metadata(
"arrow/gcsfs", "directory")))
.status();
const auto canonical = internal::RemoveTrailingSlash(name).to_string();
google::cloud::StatusOr<gcs::ObjectMetadata> object = client_.InsertObject(
bucket, canonical, std::string(),
gcs::WithObjectMetadata(
gcs::ObjectMetadata().upsert_metadata("arrow/gcsfs", "directory")),
gcs::IfGenerationMatch(0));
if (object) return object;
if (object.status().code() == GcsCode::kFailedPrecondition) {
// The marker already exists, find out if it is a directory or a file.
return client_.GetObjectMetadata(bucket, canonical);
}
return object;
}

google::cloud::Status CreateDirMarkerRecursive(const std::string& bucket,
const std::string& object) {
using GcsCode = google::cloud::StatusCode;
static Status NotDirectoryError(const gcs::ObjectMetadata& o) {
return Status::IOError(
"Cannot create directory, it conflicts with an existing file '",
internal::ConcatAbstractPath(o.bucket(), o.name()), "'");
}

Status CreateDirMarkerRecursive(const std::string& bucket, const std::string& name) {
auto get_parent = [](std::string const& path) {
return std::move(internal::GetAbstractPathParent(path).first);
};
// Maybe counterintuitively we create the markers from the most nested and up. Because
// GCS does not have directories creating `a/b/c` will succeed, even if `a/` or `a/b/`
// does not exist. In the common case, where `a/b/` may already exist, it is more
// efficient to just create `a/b/c/` and then find out that `a/b/` was already there.
// In the case where none exists, it does not matter which order we follow.
auto status = CreateDirMarker(bucket, object);
if (status.code() == GcsCode::kAlreadyExists) return {};
if (status.code() == GcsCode::kNotFound) {
// Missing bucket, create it first ...
status = client_.CreateBucket(bucket, gcs::BucketMetadata()).status();
if (status.code() != GcsCode::kOk && status.code() != GcsCode::kAlreadyExists) {
return status;
// Find the list of missing parents. In the process we discover if any elements in
// the path are files, this is unavoidable as GCS does not really have directories.
std::vector<std::string> missing_parents;
auto dir = name;
for (; !dir.empty(); dir = get_parent(dir)) {
auto o = client_.GetObjectMetadata(bucket, dir);
if (o) {
if (IsDirectory(*o)) break;
return NotDirectoryError(*o);
}
missing_parents.push_back(dir);
}

for (auto parent = get_parent(object); !parent.empty(); parent = get_parent(parent)) {
status = CreateDirMarker(bucket, parent);
if (status.code() == GcsCode::kAlreadyExists) {
break;
if (dir.empty()) {
// We could not find any of the parent directories in the bucket, the last step is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this consistent with what the s3 connector does. bucket creation seems pretty heavy weight to maybe do it accidentally

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this is what you do for s3 too:

// Create object
if (recursive) {
// Ensure bucket exists
ARROW_ASSIGN_OR_RAISE(bool bucket_exists, impl_->BucketExists(path.bucket));
if (!bucket_exists) {
RETURN_NOT_OK(impl_->CreateBucket(path.bucket));
}

// to find out if the bucket exists, and if necessary, create it
google::cloud::StatusOr<gcs::BucketMetadata> b = client_.GetBucketMetadata(bucket);
if (!b) {
if (b.status().code() == GcsCode::kNotFound) {
b = client_.CreateBucket(bucket, gcs::BucketMetadata());
}
if (!b) return internal::ToArrowStatus(b.status());
}
if (!status.ok()) {
return status;
}

// Note that the list of parents are sorted from deepest to most shallow, this is
// convenient because as soon as we find a directory we can stop the iteration.
for (auto const& d : missing_parents) {
auto o = CreateDirMarker(bucket, d);
if (o) {
if (IsDirectory(*o)) continue;
// This is probably a race condition, something created a file before we managed
// to create the directories.
return NotDirectoryError(*o);
}
}
return {};
return Status::OK();
}

Status CreateDir(const GcsPath& p) {
if (p.object.empty()) {
return internal::ToArrowStatus(
client_.CreateBucket(p.bucket, gcs::BucketMetadata()).status());
}
return internal::ToArrowStatus(CreateDirMarker(p.bucket, p.object));
auto parent = p.parent();
if (!parent.object.empty()) {
auto o = client_.GetObjectMetadata(p.bucket, parent.object);
if (!IsDirectory(*o)) return NotDirectoryError(*o);
}
return internal::ToArrowStatus(CreateDirMarker(p.bucket, p.object).status());
}

Status CreateDirRecursive(const GcsPath& p) {
return internal::ToArrowStatus(CreateDirMarkerRecursive(p.bucket, p.object));
return CreateDirMarkerRecursive(p.bucket, p.object);
}

Status DeleteDir(const GcsPath& p, const io::IOContext& io_context) {
Expand All @@ -423,20 +456,26 @@ class GcsFileSystem::Impl {
}

Status DeleteDirContents(const GcsPath& p, const io::IOContext& io_context) {
// If the directory marker exists, it better be a directory.
auto dir = client_.GetObjectMetadata(p.bucket, p.object);
if (dir && !IsDirectory(*dir)) return NotDirectoryError(*dir);

// Deleting large directories can be fairly slow, we need to parallelize the
// operation.
const auto& canonical =
p.object.empty() ? p.object : internal::EnsureTrailingSlash(p.object);
auto async_delete =
[&p, this](const google::cloud::StatusOr<gcs::ObjectMetadata>& o) -> Status {
[&, this](const google::cloud::StatusOr<gcs::ObjectMetadata>& o) -> Status {
if (!o) return internal::ToArrowStatus(o.status());
// The list includes the directory, skip it. DeleteDir() takes care of it.
if (o->bucket() == p.bucket && o->name() == p.object) return {};
if (o->bucket() == p.bucket && o->name() == canonical) return Status::OK();
return internal::ToArrowStatus(
client_.DeleteObject(o->bucket(), o->name(), gcs::Generation(o->generation())));
};

std::vector<Future<>> submitted;
// This iterates over all the objects, and schedules parallel deletes.
auto prefix = p.object.empty() ? gcs::Prefix() : gcs::Prefix(p.object);
auto prefix = p.object.empty() ? gcs::Prefix() : gcs::Prefix(canonical);
for (const auto& o : client_.ListObjects(p.bucket, prefix)) {
submitted.push_back(DeferNotOk(io_context.executor()->Submit(async_delete, o)));
}
Expand All @@ -445,29 +484,47 @@ class GcsFileSystem::Impl {
}

Status DeleteFile(const GcsPath& p) {
if (!p.object.empty() && p.object.back() == '/') {
return Status::IOError("The given path (" + p.full_path +
") is a directory, use DeleteDir");
if (!p.object.empty()) {
auto stat = client_.GetObjectMetadata(p.bucket, p.object);
if (!stat) return internal::ToArrowStatus(stat.status());
if (IsDirectory(*stat)) {
return Status::IOError("The given path '", p.full_path,
"' is a directory, use DeleteDir");
}
}
return internal::ToArrowStatus(client_.DeleteObject(p.bucket, p.object));
}

Status Move(const GcsPath& src, const GcsPath& dest) {
if (src.full_path.empty() || src.object.empty() ||
src.object.back() == internal::kSep) {
if (src == dest) return Status::OK();
if (src.object.empty()) {
return Status::IOError(
"Moving directories or buckets cannot be implemented in GCS. You provided (" +
src.full_path + ") as a source for Move()");
"Moving directories or buckets cannot be implemented in GCS. You provided (",
src.full_path, ") as a source for Move()");
}
ARROW_ASSIGN_OR_RAISE(auto info, GetFileInfo(dest));
if (info.IsDirectory()) {
return Status::IOError("Attempting to Move() to an existing directory");
return Status::IOError("Attempting to Move() '", info.path(),
"' to an existing directory");
}
ARROW_ASSIGN_OR_RAISE(auto src_info, GetFileInfo(src));
if (!src_info.IsFile()) {
return Status::IOError("Cannot move source '", src.full_path,
"' the object does not exist or does not represent a file");
}
RETURN_NOT_OK(CopyFile(src, dest));
return DeleteFile(src);
}

Status CopyFile(const GcsPath& src, const GcsPath& dest) {
auto parent = dest.parent();
if (!parent.object.empty()) {
ARROW_ASSIGN_OR_RAISE(auto parent_info, GetFileInfo(parent));
if (parent_info.IsFile()) {
return Status::IOError("Cannot use file '", parent.full_path,
"' as a destination directory");
}
}
auto metadata =
client_.RewriteObjectBlocking(src.bucket, src.object, dest.bucket, dest.object);
return internal::ToArrowStatus(metadata.status());
Expand Down Expand Up @@ -505,20 +562,23 @@ class GcsFileSystem::Impl {
}

private:
static Result<FileInfo> GetFileInfoDirectory(const GcsPath& path,
const google::cloud::Status& status) {
using ::google::cloud::StatusCode;
auto canonical = internal::EnsureTrailingSlash(path.full_path);
static bool IsDirectory(const gcs::ObjectMetadata& o) {
return o.has_metadata("arrow/gcsfs") && o.metadata("arrow/gcsfs") == "directory";
}

static Result<FileInfo> GetFileInfoBucket(const GcsPath& path,
const google::cloud::Status& status) {
if (status.ok()) {
return FileInfo(canonical, FileType::Directory);
return FileInfo(path.bucket, FileType::Directory);
}
using ::google::cloud::StatusCode;
if (status.code() == StatusCode::kNotFound) {
return FileInfo(canonical, FileType::NotFound);
return FileInfo(path.bucket, FileType::NotFound);
}
return internal::ToArrowStatus(status);
}

static Result<FileInfo> GetFileInfoFile(
static Result<FileInfo> GetFileInfoObject(
const GcsPath& path, const google::cloud::StatusOr<gcs::ObjectMetadata>& meta) {
if (meta.ok()) {
return ToFileInfo(path.full_path, *meta);
Expand All @@ -532,6 +592,9 @@ class GcsFileSystem::Impl {

static FileInfo ToFileInfo(const std::string& full_path,
const gcs::ObjectMetadata& meta) {
if (IsDirectory(meta)) {
return FileInfo(full_path, FileType::Directory);
}
auto info = FileInfo(full_path, FileType::File);
info.set_size(static_cast<int64_t>(meta.size()));
// An object has multiple "time" attributes, including the time when its data was
Expand Down Expand Up @@ -659,8 +722,9 @@ Result<std::shared_ptr<io::InputStream>> GcsFileSystem::OpenInputStream(

Result<std::shared_ptr<io::InputStream>> GcsFileSystem::OpenInputStream(
const FileInfo& info) {
if (!info.IsFile()) {
return Status::IOError("Only files can be opened as input streams");
if (info.IsDirectory()) {
return Status::IOError("Cannot open directory '", info.path(),
"' as an input stream");
}
ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(info.path()));
return impl_->OpenInputStream(p.bucket, p.object, gcs::Generation(),
Expand Down Expand Up @@ -688,6 +752,10 @@ Result<std::shared_ptr<io::RandomAccessFile>> GcsFileSystem::OpenInputFile(

Result<std::shared_ptr<io::RandomAccessFile>> GcsFileSystem::OpenInputFile(
const FileInfo& info) {
if (info.IsDirectory()) {
return Status::IOError("Cannot open directory '", info.path(),
"' as an input stream");
}
ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(info.path()));
auto metadata = impl_->GetObjectMetadata(p);
ARROW_GCS_RETURN_NOT_OK(metadata.status());
Expand Down
9 changes: 2 additions & 7 deletions cpp/src/arrow/filesystem/gcsfs.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ struct ARROW_EXPORT GcsOptions {
static GcsOptions FromServiceAccountCredentials(const std::string& json_object);
};

// - TODO(ARROW-1231) - review this documentation before closing the bug.
/// \brief GCS-backed FileSystem implementation.
///
/// GCS (Google Cloud Storage - https://cloud.google.com/storage) is a scalable object
Expand All @@ -120,12 +119,8 @@ struct ARROW_EXPORT GcsOptions {
/// - All buckets are treated as directories at the "root"
/// - Creating a root directory results in a new bucket being created, this may be slower
/// than most GCS operations.
/// - Any object with a name ending with a slash (`/`) character is treated as a
/// directory.
/// - The class creates marker objects for a directory, using a trailing slash in the
/// marker names. For debugging purposes, the metadata of these marker objects indicate
/// that they are markers created by this class. The class does not rely on this
/// annotation.
/// - The class creates marker objects for a directory, using a metadata attribute to
/// annotate the file.
/// - GCS can list all the objects with a given prefix, this is used to emulate listing
/// of directories.
/// - In object lists GCS can summarize all the objects with a common prefix as a single
Expand Down
Loading