Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cpp/src/arrow/filesystem/gcsfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,7 @@ Status GcsFileSystem::CopyFile(const std::string& src, const std::string& dest)

Result<std::shared_ptr<io::InputStream>> GcsFileSystem::OpenInputStream(
const std::string& path) {
ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(path));
ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(path));
return impl_->OpenInputStream(p, gcs::Generation(), gcs::ReadFromOffset());
}
Expand All @@ -897,12 +898,14 @@ Result<std::shared_ptr<io::InputStream>> GcsFileSystem::OpenInputStream(
return Status::IOError("Cannot open directory '", info.path(),
"' as an input stream");
}
ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(info.path()));
ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(info.path()));
return impl_->OpenInputStream(p, gcs::Generation(), gcs::ReadFromOffset());
}

Result<std::shared_ptr<io::RandomAccessFile>> GcsFileSystem::OpenInputFile(
const std::string& path) {
ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(path));
ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(path));
auto metadata = impl_->GetObjectMetadata(p);
ARROW_GCS_RETURN_NOT_OK(metadata.status());
Expand All @@ -924,6 +927,7 @@ Result<std::shared_ptr<io::RandomAccessFile>> GcsFileSystem::OpenInputFile(
return Status::IOError("Cannot open directory '", info.path(),
"' as an input stream");
}
ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(info.path()));
ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(info.path()));
auto metadata = impl_->GetObjectMetadata(p);
ARROW_GCS_RETURN_NOT_OK(metadata.status());
Expand All @@ -941,6 +945,7 @@ Result<std::shared_ptr<io::RandomAccessFile>> GcsFileSystem::OpenInputFile(

Result<std::shared_ptr<io::OutputStream>> GcsFileSystem::OpenOutputStream(
const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(path));
ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(path));
return impl_->OpenOutputStream(p, metadata);
}
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/filesystem/mockfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ class MockFileSystem::Impl {
Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(
const std::string& path, bool append,
const std::shared_ptr<const KeyValueMetadata>& metadata) {
ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(path));
auto parts = SplitAbstractPath(path);
RETURN_NOT_OK(ValidateAbstractPathParts(parts));

Expand Down Expand Up @@ -412,6 +413,7 @@ class MockFileSystem::Impl {
}

Result<std::shared_ptr<io::BufferReader>> OpenInputReader(const std::string& path) {
ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(path));
auto parts = SplitAbstractPath(path);
RETURN_NOT_OK(ValidateAbstractPathParts(parts));

Expand Down Expand Up @@ -727,6 +729,7 @@ Result<std::shared_ptr<io::OutputStream>> MockFileSystem::OpenOutputStream(

Result<std::shared_ptr<io::OutputStream>> MockFileSystem::OpenAppendStream(
const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(path));
RETURN_NOT_OK(ValidatePath(path));
auto guard = impl_->lock_guard();

Expand Down
8 changes: 8 additions & 0 deletions cpp/src/arrow/filesystem/path_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <regex>

#include "arrow/filesystem/path_util.h"
#include "arrow/filesystem/util_internal.h"
#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/util/logging.h"
Expand Down Expand Up @@ -139,6 +140,13 @@ util::string_view RemoveLeadingSlash(util::string_view key) {
return key;
}

Status AssertNoTrailingSlash(util::string_view key) {
if (key.back() == '/') {
return NotAFile(key);
}
return Status::OK();
}

Result<std::string> MakeAbstractPathRelative(const std::string& base,
const std::string& path) {
if (base.empty() || base.front() != kSep) {
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/filesystem/path_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ std::string EnsureTrailingSlash(util::string_view s);
ARROW_EXPORT
util::string_view RemoveTrailingSlash(util::string_view s);

ARROW_EXPORT
Status AssertNoTrailingSlash(util::string_view s);

ARROW_EXPORT
bool IsAncestorOf(util::string_view ancestor, util::string_view descendant);

Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2169,6 +2169,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp

Result<std::shared_ptr<ObjectInputFile>> OpenInputFile(const std::string& s,
S3FileSystem* fs) {
ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(s));
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
RETURN_NOT_OK(ValidateFilePath(path));

Expand All @@ -2179,6 +2180,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp

Result<std::shared_ptr<ObjectInputFile>> OpenInputFile(const FileInfo& info,
S3FileSystem* fs) {
ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(info.path()));
if (info.type() == FileType::NotFound) {
return ::arrow::fs::internal::PathNotFound(info.path());
}
Expand Down Expand Up @@ -2537,6 +2539,7 @@ Result<std::shared_ptr<io::RandomAccessFile>> S3FileSystem::OpenInputFile(

Result<std::shared_ptr<io::OutputStream>> S3FileSystem::OpenOutputStream(
const std::string& s, const std::shared_ptr<const KeyValueMetadata>& metadata) {
ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(s));
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
RETURN_NOT_OK(ValidateFilePath(path));

Expand Down
33 changes: 33 additions & 0 deletions cpp/src/arrow/filesystem/test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,9 @@ void GenericFileSystemTest::TestOpenOutputStream(FileSystem* fs) {

ASSERT_RAISES(Invalid, stream->Write("x")); // Stream is closed

// Trailing slash rejected
ASSERT_RAISES(IOError, fs->OpenOutputStream("CD/ghi/"));

// Storing metadata along file
auto metadata = KeyValueMetadata::Make({"Content-Type", "Content-Language"},
{"x-arrow/filesystem-test", "fr_FR"});
Expand Down Expand Up @@ -931,6 +934,9 @@ void GenericFileSystemTest::TestOpenAppendStream(FileSystem* fs) {

std::shared_ptr<io::OutputStream> stream;

// Trailing slash rejected
ASSERT_RAISES(IOError, fs->OpenAppendStream("abc/"));

if (allow_append_to_new_file()) {
ASSERT_OK_AND_ASSIGN(stream, fs->OpenAppendStream("abc"));
} else {
Expand Down Expand Up @@ -974,6 +980,9 @@ void GenericFileSystemTest::TestOpenInputStream(FileSystem* fs) {
ASSERT_OK(stream->Close());
ASSERT_RAISES(Invalid, stream->Read(1)); // Stream is closed

// Trailing slash rejected
ASSERT_RAISES(IOError, fs->OpenInputStream("AB/abc/"));

// File does not exist
AssertRaisesWithErrno(ENOENT, fs->OpenInputStream("AB/def"));
AssertRaisesWithErrno(ENOENT, fs->OpenInputStream("def"));
Expand Down Expand Up @@ -1009,6 +1018,15 @@ void GenericFileSystemTest::TestOpenInputStreamWithFileInfo(FileSystem* fs) {
info.set_type(FileType::Unknown);
AssertRaisesWithErrno(ENOENT, fs->OpenInputStream(info));

// Trailing slash rejected
auto maybe_info = fs->GetFileInfo("AB/abc/");
if (maybe_info.ok()) {
ASSERT_OK_AND_ASSIGN(info, maybe_info);
ASSERT_RAISES(IOError, fs->OpenInputStream(info));
} else {
ASSERT_RAISES(IOError, maybe_info);
}

// Cannot open directory
ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo("AB"));
ASSERT_RAISES(IOError, fs->OpenInputStream(info));
Expand Down Expand Up @@ -1044,6 +1062,9 @@ void GenericFileSystemTest::TestOpenInputFile(FileSystem* fs) {
ASSERT_OK(file->Close());
ASSERT_RAISES(Invalid, file->ReadAt(1, 1)); // Stream is closed

// Trailing slash rejected
ASSERT_RAISES(IOError, fs->OpenInputFile("AB/abc/"));

// File does not exist
AssertRaisesWithErrno(ENOENT, fs->OpenInputFile("AB/def"));
AssertRaisesWithErrno(ENOENT, fs->OpenInputFile("def"));
Expand All @@ -1067,6 +1088,9 @@ void GenericFileSystemTest::TestOpenInputFileAsync(FileSystem* fs) {

// File does not exist
AssertRaisesWithErrno(ENOENT, fs->OpenInputFileAsync("AB/def").result());

// Trailing slash rejected
ASSERT_RAISES(IOError, fs->OpenInputFileAsync("AB/abc/").result());
}

void GenericFileSystemTest::TestOpenInputFileWithFileInfo(FileSystem* fs) {
Expand Down Expand Up @@ -1096,6 +1120,15 @@ void GenericFileSystemTest::TestOpenInputFileWithFileInfo(FileSystem* fs) {
info.set_type(FileType::Unknown);
AssertRaisesWithErrno(ENOENT, fs->OpenInputFile(info));

// Trailing slash rejected
auto maybe_info = fs->GetFileInfo("AB/abc/");
if (maybe_info.ok()) {
ASSERT_OK_AND_ASSIGN(info, maybe_info);
ASSERT_RAISES(IOError, fs->OpenInputFile(info));
} else {
ASSERT_RAISES(IOError, maybe_info);
}

// Cannot open directory
ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo("AB"));
ASSERT_RAISES(IOError, fs->OpenInputFile(info));
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/arrow/filesystem/util_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,21 +56,21 @@ Status CopyStream(const std::shared_ptr<io::InputStream>& src,
return Status::OK();
}

Status PathNotFound(const std::string& path) {
Status PathNotFound(util::string_view path) {
return Status::IOError("Path does not exist '", path, "'")
.WithDetail(StatusDetailFromErrno(ENOENT));
}

Status NotADir(const std::string& path) {
Status NotADir(util::string_view path) {
return Status::IOError("Not a directory: '", path, "'")
.WithDetail(StatusDetailFromErrno(ENOTDIR));
}

Status NotAFile(const std::string& path) {
Status NotAFile(util::string_view path) {
return Status::IOError("Not a regular file: '", path, "'");
}

Status InvalidDeleteDirContents(const std::string& path) {
Status InvalidDeleteDirContents(util::string_view path) {
return Status::Invalid(
"DeleteDirContents called on invalid path '", path, "'. ",
"If you wish to delete the root directory's contents, call DeleteRootDirContents.");
Expand Down
9 changes: 5 additions & 4 deletions cpp/src/arrow/filesystem/util_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "arrow/filesystem/filesystem.h"
#include "arrow/io/interfaces.h"
#include "arrow/status.h"
#include "arrow/util/string_view.h"
#include "arrow/util/visibility.h"

namespace arrow {
Expand All @@ -38,16 +39,16 @@ Status CopyStream(const std::shared_ptr<io::InputStream>& src,
const io::IOContext& io_context);

ARROW_EXPORT
Status PathNotFound(const std::string& path);
Status PathNotFound(util::string_view path);

ARROW_EXPORT
Status NotADir(const std::string& path);
Status NotADir(util::string_view path);

ARROW_EXPORT
Status NotAFile(const std::string& path);
Status NotAFile(util::string_view path);

ARROW_EXPORT
Status InvalidDeleteDirContents(const std::string& path);
Status InvalidDeleteDirContents(util::string_view path);

/// \brief Return files matching the glob pattern on the filesystem
///
Expand Down
12 changes: 8 additions & 4 deletions r/R/io.R
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,9 @@ mmap_open <- function(path, mode = c("read", "write", "readwrite")) {
make_readable_file <- function(file, mmap = TRUE, compression = NULL, filesystem = NULL) {
if (inherits(file, "SubTreeFileSystem")) {
filesystem <- file$base_fs
file <- file$base_path
# SubTreeFileSystem adds a slash to base_path, but filesystems will reject file names
# with trailing slashes, so we need to remove it here.
file <- sub("/$", "", file$base_path)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm probably misreading this, but is this treating any SubTreeFileSystem as pointing to a local file path?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By "filesystems" I meant the Arrow FileSystem classes, not the local file system. Does that clarify your confusion? Or are you asking something else?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I am asking something else. It seems that this code is replacing file (a FileSystem instance) with a file path, is that right? And below, the file path file will be treated as a local filesystem path?

Copy link
Member Author

@wjones127 wjones127 Jul 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see. Yeah I just blindly updated this without thinking about what happens downstream 🤦

Below it will go through the code path where is.string(file) and !is.null(filesystem) are both TRUE, so it will later call file <- filesystem$OpenInputFile(file). So it went from SubTreeFilesystem to a CharacterVector to a <whatever OpenInputFile returns>. Wow that is some very dynamic typing 😵

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the path should be treated as correct filesystem since we extracted that out in the line before.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see, I had missed the filesystem <- file$base_fs. So I guess my last question is: why are we calling make_readable_file with a SubTreeFileSystem as the first argument?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The expectation is that users will call FileSystem$path() to specify the location to write/read to:

fs <- S3FileSystem$create()
write_parquet(my_tab, fs$path("my/path/to"))

fs$path returns a SubTreeFileSystem as a convenient way to bundle a path and filesystem object in a single object. See discussion: #8351 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. That's unfortunate, but that's not this PR's business. Thanks for the details!

}
if (is.string(file)) {
if (is_url(file)) {
Expand Down Expand Up @@ -303,7 +305,9 @@ make_output_stream <- function(x, filesystem = NULL, compression = NULL) {

if (inherits(x, "SubTreeFileSystem")) {
filesystem <- x$base_fs
x <- x$base_path
# SubTreeFileSystem adds a slash to base_path, but filesystems will reject file names
# with trailing slashes, so we need to remove it here.
x <- sub("/$", "", x$base_path)
} else if (is_url(x)) {
fs_and_path <- FileSystem$from_uri(x)
filesystem <- fs_and_path$fs
Expand All @@ -317,7 +321,7 @@ make_output_stream <- function(x, filesystem = NULL, compression = NULL) {

assert_that(is.string(x))
if (is.null(filesystem) && is_compressed(compression)) {
CompressedOutputStream$create(x) ##compressed local
CompressedOutputStream$create(x) ## compressed local
} else if (is.null(filesystem) && !is_compressed(compression)) {
FileOutputStream$create(x) ## uncompressed local
} else if (!is.null(filesystem) && is_compressed(compression)) {
Expand All @@ -333,7 +337,7 @@ detect_compression <- function(path) {
}

# Remove any trailing slashes, which FileSystem$from_uri may add
path <- gsub("/$", "", path)
path <- sub("/$", "", path)

switch(tools::file_ext(path),
bz2 = "bz2",
Expand Down