Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 51 additions & 50 deletions cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -436,13 +436,11 @@ Result<S3Model::GetObjectResult> GetObjectRange(Aws::S3::S3Client* client,
}

// A RandomAccessFile that reads from a S3 object
class ObjectInputFile : public io::RandomAccessFile {
class ObjectInputFile final : public io::RandomAccessFile {
public:
ObjectInputFile(Aws::S3::S3Client* client, const S3Path& path)
: client_(client), path_(path) {}

ObjectInputFile(Aws::S3::S3Client* client, const S3Path& path, int64_t size)
: client_(client), path_(path), content_length_(size) {}
ObjectInputFile(std::shared_ptr<FileSystem> fs, Aws::S3::S3Client* client,
const S3Path& path, int64_t size = kNoSize)
: fs_(std::move(fs)), client_(client), path_(path), content_length_(size) {}

Status Init() {
// Issue a HEAD Object to get the content-length and ensure any
Expand Down Expand Up @@ -492,6 +490,8 @@ class ObjectInputFile : public io::RandomAccessFile {
// RandomAccessFile APIs

Status Close() override {
fs_.reset();
client_ = nullptr;
closed_ = true;
return Status::OK();
}
Expand Down Expand Up @@ -566,6 +566,7 @@ class ObjectInputFile : public io::RandomAccessFile {
}

protected:
std::shared_ptr<FileSystem> fs_; // Owner of S3Client
Aws::S3::S3Client* client_;
S3Path path_;
bool closed_ = false;
Expand All @@ -580,14 +581,14 @@ class ObjectInputFile : public io::RandomAccessFile {
static constexpr int64_t kMinimumPartUpload = 5 * 1024 * 1024;

// An OutputStream that writes to a S3 object
class ObjectOutputStream : public io::OutputStream {
class ObjectOutputStream final : public io::OutputStream {
protected:
struct UploadState;

public:
ObjectOutputStream(Aws::S3::S3Client* client, const S3Path& path,
const S3Options& options)
: client_(client), path_(path), options_(options) {}
ObjectOutputStream(std::shared_ptr<FileSystem> fs, Aws::S3::S3Client* client,
const S3Path& path, const S3Options& options)
: fs_(std::move(fs)), client_(client), path_(path), options_(options) {}

~ObjectOutputStream() override {
// For compliance with the rest of the IO stack, Close rather than Abort,
Expand Down Expand Up @@ -632,6 +633,8 @@ class ObjectOutputStream : public io::OutputStream {
outcome.GetError());
}
current_part_.reset();
fs_.reset();
client_ = nullptr;
closed_ = true;
return Status::OK();
}
Expand Down Expand Up @@ -677,6 +680,8 @@ class ObjectOutputStream : public io::OutputStream {
outcome.GetError());
}

fs_.reset();
client_ = nullptr;
closed_ = true;
return Status::OK();
}
Expand Down Expand Up @@ -849,6 +854,7 @@ class ObjectOutputStream : public io::OutputStream {
}

protected:
std::shared_ptr<FileSystem> fs_; // Owner of S3Client
Aws::S3::S3Client* client_;
S3Path path_;
const S3Options& options_;
Expand Down Expand Up @@ -1273,6 +1279,35 @@ class S3FileSystem::Impl {
}
return Status::OK();
}

Result<std::shared_ptr<ObjectInputFile>> OpenInputFile(const std::string& s,
S3FileSystem* fs) {
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
RETURN_NOT_OK(ValidateFilePath(path));

auto ptr =
std::make_shared<ObjectInputFile>(fs->shared_from_this(), client_.get(), path);
RETURN_NOT_OK(ptr->Init());
return ptr;
}

Result<std::shared_ptr<ObjectInputFile>> OpenInputFile(const FileInfo& info,
S3FileSystem* fs) {
if (info.type() == FileType::NotFound) {
return ::arrow::fs::internal::PathNotFound(info.path());
}
if (info.type() != FileType::File && info.type() != FileType::Unknown) {
return ::arrow::fs::internal::NotAFile(info.path());
}

ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(info.path()));
RETURN_NOT_OK(ValidateFilePath(path));

auto ptr = std::make_shared<ObjectInputFile>(fs->shared_from_this(), client_.get(),
path, info.size());
RETURN_NOT_OK(ptr->Init());
return ptr;
}
};

S3FileSystem::S3FileSystem(const S3Options& options) : impl_(new Impl{options}) {}
Expand Down Expand Up @@ -1528,65 +1563,31 @@ Status S3FileSystem::CopyFile(const std::string& src, const std::string& dest) {

Result<std::shared_ptr<io::InputStream>> S3FileSystem::OpenInputStream(
const std::string& s) {
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
RETURN_NOT_OK(ValidateFilePath(path));

auto ptr = std::make_shared<ObjectInputFile>(impl_->client_.get(), path);
RETURN_NOT_OK(ptr->Init());
return ptr;
return impl_->OpenInputFile(s, this);
}

Result<std::shared_ptr<io::InputStream>> S3FileSystem::OpenInputStream(
const FileInfo& info) {
if (info.type() == FileType::NotFound) {
return ::arrow::fs::internal::PathNotFound(info.path());
}
if (info.type() != FileType::File && info.type() != FileType::Unknown) {
return ::arrow::fs::internal::NotAFile(info.path());
}

ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(info.path()));
RETURN_NOT_OK(ValidateFilePath(path));

auto ptr = std::make_shared<ObjectInputFile>(impl_->client_.get(), path, info.size());
RETURN_NOT_OK(ptr->Init());
return ptr;
return impl_->OpenInputFile(info, this);
}

Result<std::shared_ptr<io::RandomAccessFile>> S3FileSystem::OpenInputFile(
const std::string& s) {
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
RETURN_NOT_OK(ValidateFilePath(path));

auto ptr = std::make_shared<ObjectInputFile>(impl_->client_.get(), path);
RETURN_NOT_OK(ptr->Init());
return ptr;
return impl_->OpenInputFile(s, this);
}

Result<std::shared_ptr<io::RandomAccessFile>> S3FileSystem::OpenInputFile(
const FileInfo& info) {
if (info.type() == FileType::NotFound) {
return ::arrow::fs::internal::PathNotFound(info.path());
}
if (info.type() != FileType::File && info.type() != FileType::Unknown) {
return ::arrow::fs::internal::NotAFile(info.path());
}

ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(info.path()));
RETURN_NOT_OK(ValidateFilePath(path));

auto ptr = std::make_shared<ObjectInputFile>(impl_->client_.get(), path, info.size());
RETURN_NOT_OK(ptr->Init());
return ptr;
return impl_->OpenInputFile(info, this);
}

Result<std::shared_ptr<io::OutputStream>> S3FileSystem::OpenOutputStream(
const std::string& s) {
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
RETURN_NOT_OK(ValidateFilePath(path));

auto ptr =
std::make_shared<ObjectOutputStream>(impl_->client_.get(), path, impl_->options_);
auto ptr = std::make_shared<ObjectOutputStream>(
shared_from_this(), impl_->client_.get(), path, impl_->options_);
RETURN_NOT_OK(ptr->Init());
return ptr;
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/filesystem/s3fs_narrative_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ void PrintError(const std::string& context_msg, const Result<T>& result) {
void ClearBucket(int argc, char** argv) {
auto fs = MakeFileSystem();

ASSERT_OK(fs->DeleteDirContents(""));
ASSERT_OK(fs->DeleteRootDirContents());
}

void TestBucket(int argc, char** argv) {
Expand Down
22 changes: 22 additions & 0 deletions cpp/src/arrow/filesystem/s3fs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,16 @@ class TestS3FS : public S3TestMixin {
ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("bucket/newfile1"));
ASSERT_OK(stream->Close());
AssertObjectContents(client_.get(), "bucket", "newfile1", "");

// Open file and then lose filesystem reference
ASSERT_EQ(fs_.use_count(), 1); // needed for test to work
std::weak_ptr<S3FileSystem> weak_fs(fs_);
ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("bucket/newfile5"));
fs_.reset();
ASSERT_FALSE(weak_fs.expired());
ASSERT_OK(stream->Write("some data"));
ASSERT_OK(stream->Close());
ASSERT_TRUE(weak_fs.expired());
}

void TestOpenOutputStreamAbort() {
Expand Down Expand Up @@ -682,11 +692,23 @@ TEST_F(TestS3FS, OpenInputStream) {
AssertBufferEqual(*buf, "sub data");
ASSERT_OK_AND_ASSIGN(buf, stream->Read(100));
AssertBufferEqual(*buf, "");
ASSERT_OK(stream->Close());

// "Directories"
ASSERT_RAISES(IOError, fs_->OpenInputStream("bucket/emptydir"));
ASSERT_RAISES(IOError, fs_->OpenInputStream("bucket/somedir"));
ASSERT_RAISES(IOError, fs_->OpenInputStream("bucket"));

// Open file and then lose filesystem reference
ASSERT_EQ(fs_.use_count(), 1); // needed for test to work
std::weak_ptr<S3FileSystem> weak_fs(fs_);
ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream("bucket/somefile"));
fs_.reset();
ASSERT_FALSE(weak_fs.expired());
ASSERT_OK_AND_ASSIGN(buf, stream->Read(10));
AssertBufferEqual(*buf, "some data");
ASSERT_OK(stream->Close());
ASSERT_TRUE(weak_fs.expired());
}

TEST_F(TestS3FS, OpenInputFile) {
Expand Down
21 changes: 10 additions & 11 deletions python/pyarrow/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -3593,20 +3593,19 @@ def test_parquet_writer_filesystem_s3(s3_example_fs):
tm.assert_frame_equal(result, df)


# TODO segfaulting (ARROW-9814?)
# @pytest.mark.pandas
# @pytest.mark.s3
# def test_parquet_writer_filesystem_s3_uri(s3_example_fs):
# df = _test_dataframe(100)
# table = pa.Table.from_pandas(df, preserve_index=False)
@pytest.mark.pandas
@pytest.mark.s3
def test_parquet_writer_filesystem_s3_uri(s3_example_fs):
df = _test_dataframe(100)
table = pa.Table.from_pandas(df, preserve_index=False)

# fs, uri, path = s3_example_fs
fs, uri, path = s3_example_fs

# with pq.ParquetWriter(uri, table.schema, version='2.0') as writer:
# writer.write_table(table)
with pq.ParquetWriter(uri, table.schema, version='2.0') as writer:
writer.write_table(table)

# result = _read_table(path, filesystem=fs).to_pandas()
# tm.assert_frame_equal(result, df)
result = _read_table(path, filesystem=fs).to_pandas()
tm.assert_frame_equal(result, df)


@pytest.mark.pandas
Expand Down