Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
#include "arrow/util/optional.h"
#include "arrow/util/string.h"
#include "arrow/util/task_group.h"
#include "arrow/util/thread_pool.h"

Expand Down Expand Up @@ -353,6 +354,12 @@ Result<S3Options> S3Options::FromUri(const Uri& uri, std::string* out_path) {
options.scheme = kv.second;
} else if (kv.first == "endpoint_override") {
options.endpoint_override = kv.second;
} else if (kv.first == "allow_bucket_creation") {
ARROW_ASSIGN_OR_RAISE(options.allow_bucket_creation,
::arrow::internal::ParseBoolean(kv.second));
} else if (kv.first == "allow_bucket_deletion") {
ARROW_ASSIGN_OR_RAISE(options.allow_bucket_deletion,
::arrow::internal::ParseBoolean(kv.second));
} else {
return Status::Invalid("Unexpected query parameter in S3 URI: '", kv.first, "'");
}
Expand All @@ -376,6 +383,8 @@ Result<S3Options> S3Options::FromUri(const std::string& uri_string,
bool S3Options::Equals(const S3Options& other) const {
return (region == other.region && endpoint_override == other.endpoint_override &&
scheme == other.scheme && background_writes == other.background_writes &&
allow_bucket_creation == other.allow_bucket_creation &&
allow_bucket_deletion == other.allow_bucket_deletion &&
credentials_kind == other.credentials_kind &&
proxy_options.Equals(other.proxy_options) &&
GetAccessKey() == other.GetAccessKey() &&
Expand Down Expand Up @@ -1677,6 +1686,27 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp

// Create a bucket. Successful if bucket already exists.
Status CreateBucket(const std::string& bucket) {
// Check bucket exists first.
{
S3Model::HeadBucketRequest req;
req.SetBucket(ToAwsString(bucket));
auto outcome = client_->HeadBucket(req);

if (outcome.IsSuccess()) {
return Status::OK();
} else if (!IsNotFound(outcome.GetError())) {
return ErrorToStatus(
std::forward_as_tuple("When creating bucket '", bucket, "': "),
outcome.GetError());
}

if (!options().allow_bucket_creation) {
return Status::IOError(
"Bucket '", bucket, "' not found. ",
"To create buckets, enable the allow_bucket_creation option.");
}
}

S3Model::CreateBucketConfiguration config;
S3Model::CreateBucketRequest req;
auto _region = region();
Expand Down Expand Up @@ -2373,13 +2403,16 @@ Status S3FileSystem::DeleteDir(const std::string& s) {
return Status::NotImplemented("Cannot delete all S3 buckets");
}
RETURN_NOT_OK(impl_->DeleteDirContentsAsync(path.bucket, path.key).status());
if (path.key.empty()) {
if (path.key.empty() && options().allow_bucket_deletion) {
// Delete bucket
S3Model::DeleteBucketRequest req;
req.SetBucket(ToAwsString(path.bucket));
return OutcomeToStatus(
std::forward_as_tuple("When deleting bucket '", path.bucket, "': "),
impl_->client_->DeleteBucket(req));
} else if (path.key.empty()) {
return Status::IOError("Would delete bucket '", path.bucket, "'. ",
"To delete buckets, enable the allow_bucket_deletion option.");
} else {
// Delete "directory"
RETURN_NOT_OK(impl_->DeleteObject(path.bucket, path.key + kSep));
Expand Down
11 changes: 11 additions & 0 deletions cpp/src/arrow/filesystem/s3fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,17 @@ struct ARROW_EXPORT S3Options {
/// Whether OutputStream writes will be issued in the background, without blocking.
bool background_writes = true;

/// Whether to allow creation of buckets
///
/// When S3FileSystem creates new buckets, it does not pass any non-default settings.
/// In AWS S3, the bucket and all objects will be not publicly visible, and there
/// will be no bucket policies and no resource tags. To have more control over how
/// buckets are created, use a different API to create them.
bool allow_bucket_creation = false;

/// Whether to allow deletion of buckets
bool allow_bucket_deletion = false;

/// \brief Default metadata for OpenOutputStream.
///
/// This will be ignored if non-empty metadata is passed to OpenOutputStream.
Expand Down
22 changes: 22 additions & 0 deletions cpp/src/arrow/filesystem/s3fs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,9 @@ class TestS3FS : public S3TestMixin {
public:
void SetUp() override {
S3TestMixin::SetUp();
// Most tests will create buckets
options_.allow_bucket_creation = true;
options_.allow_bucket_deletion = true;
MakeFileSystem();
// Set up test bucket
{
Expand Down Expand Up @@ -1124,6 +1127,25 @@ TEST_F(TestS3FS, FileSystemFromUri) {
AssertFileInfo(fs.get(), path, FileType::File, 8);
}

TEST_F(TestS3FS, NoCreateDeleteBucket) {
// Create a bucket to try deleting
ASSERT_OK(fs_->CreateDir("test-no-delete"));

options_.allow_bucket_creation = false;
options_.allow_bucket_deletion = false;
MakeFileSystem();

auto maybe_create_dir = fs_->CreateDir("test-no-create");
ASSERT_RAISES(IOError, maybe_create_dir);
ASSERT_THAT(maybe_create_dir.message(),
::testing::HasSubstr("Bucket 'test-no-create' not found"));

auto maybe_delete_dir = fs_->DeleteDir("test-no-delete");
ASSERT_RAISES(IOError, maybe_delete_dir);
ASSERT_THAT(maybe_delete_dir.message(),
::testing::HasSubstr("Would delete bucket 'test-no-delete'"));
}

// Simple retry strategy that records errors encountered and its emitted retry delays
class TestRetryStrategy : public S3RetryStrategy {
public:
Expand Down
10 changes: 10 additions & 0 deletions cpp/src/arrow/util/string.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,5 +187,15 @@ util::optional<std::string> Replace(util::string_view s, util::string_view token
s.substr(token_start + token.size()).to_string();
}

Result<bool> ParseBoolean(util::string_view value) {
if (AsciiEqualsCaseInsensitive(value, "true") || value == "1") {
return true;
} else if (AsciiEqualsCaseInsensitive(value, "false") || value == "0") {
return false;
} else {
return Status::Invalid("String is not a valid boolean value: '", value, "'.");
}
}

} // namespace internal
} // namespace arrow
8 changes: 8 additions & 0 deletions cpp/src/arrow/util/string.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <string>
#include <vector>

#include "arrow/result.h"
#include "arrow/util/optional.h"
#include "arrow/util/string_view.h"
#include "arrow/util/visibility.h"
Expand Down Expand Up @@ -75,5 +76,12 @@ ARROW_EXPORT
util::optional<std::string> Replace(util::string_view s, util::string_view token,
util::string_view replacement);

/// \brief Get boolean value from string
///
/// If "1", "true" (case-insensitive), returns true
/// If "0", "false" (case-insensitive), returns false
/// Otherwise, returns Status::Invalid
ARROW_EXPORT
arrow::Result<bool> ParseBoolean(util::string_view value);
} // namespace internal
} // namespace arrow
22 changes: 20 additions & 2 deletions python/pyarrow/_s3fs.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ cdef class S3FileSystem(FileSystem):
Note: S3 buckets are special and the operations available on them may be
limited or more expensive than desired.

When S3FileSystem creates new buckets (assuming allow_bucket_creation is
True), it does not pass any non-default settings. In AWS S3, the bucket and
all objects will be not publicly visible, and will have no bucket policies
and no resource tags. To have more control over how buckets are created,
use a different API to create them.

Parameters
----------
access_key : str, default None
Expand Down Expand Up @@ -150,6 +156,12 @@ cdef class S3FileSystem(FileSystem):
S3FileSystem(proxy_options={'scheme': 'http', 'host': 'localhost',
'port': 8020, 'username': 'username',
'password': 'password'})
allow_bucket_creation : bool, default False
Whether to allow CreateDir at the bucket-level. This option may also be
passed in a URI query parameter.
allow_bucket_deletion : bool, default False
Whether to allow DeleteDir at the bucket-level. This option may also be
passed in a URI query parameter.
"""

cdef:
Expand All @@ -159,7 +171,8 @@ cdef class S3FileSystem(FileSystem):
bint anonymous=False, region=None, scheme=None,
endpoint_override=None, bint background_writes=True,
default_metadata=None, role_arn=None, session_name=None,
external_id=None, load_frequency=900, proxy_options=None):
external_id=None, load_frequency=900, proxy_options=None,
allow_bucket_creation=False, allow_bucket_deletion=False):
cdef:
CS3Options options
shared_ptr[CS3FileSystem] wrapped
Expand Down Expand Up @@ -253,6 +266,9 @@ cdef class S3FileSystem(FileSystem):
"'proxy_options': expected 'dict' or 'str', "
f"got {type(proxy_options)} instead.")

options.allow_bucket_creation = allow_bucket_creation
options.allow_bucket_deletion = allow_bucket_deletion

with nogil:
wrapped = GetResultValue(CS3FileSystem.Make(options))

Expand Down Expand Up @@ -295,14 +311,16 @@ cdef class S3FileSystem(FileSystem):
external_id=frombytes(opts.external_id),
load_frequency=opts.load_frequency,
background_writes=opts.background_writes,
allow_bucket_creation=opts.allow_bucket_creation,
allow_bucket_deletion=opts.allow_bucket_deletion,
default_metadata=pyarrow_wrap_metadata(opts.default_metadata),
proxy_options={'scheme': frombytes(opts.proxy_options.scheme),
'host': frombytes(opts.proxy_options.host),
'port': opts.proxy_options.port,
'username': frombytes(
opts.proxy_options.username),
'password': frombytes(
opts.proxy_options.password)}
opts.proxy_options.password)},
),)
)

Expand Down
2 changes: 2 additions & 0 deletions python/pyarrow/includes/libarrow_fs.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ cdef extern from "arrow/filesystem/api.h" namespace "arrow::fs" nogil:
c_string endpoint_override
c_string scheme
c_bool background_writes
c_bool allow_bucket_creation
c_bool allow_bucket_deletion
shared_ptr[const CKeyValueMetadata] default_metadata
c_string role_arn
c_string session_name
Expand Down
39 changes: 35 additions & 4 deletions python/pyarrow/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2495,6 +2495,7 @@ def s3_example_simple(s3_server):
host, port, access_key, secret_key = s3_server['connection']
uri = (
"s3://{}:{}@mybucket/data.parquet?scheme=http&endpoint_override={}:{}"
"&allow_bucket_creation=True"
.format(access_key, secret_key, host, port)
)

Expand Down Expand Up @@ -2557,9 +2558,10 @@ def test_open_dataset_from_s3_with_filesystem_uri(s3_server):
host, port, access_key, secret_key = s3_server['connection']
bucket = 'theirbucket'
path = 'nested/folder/data.parquet'
uri = "s3://{}:{}@{}/{}?scheme=http&endpoint_override={}:{}".format(
access_key, secret_key, bucket, path, host, port
)
uri = "s3://{}:{}@{}/{}?scheme=http&endpoint_override={}:{}"\
"&allow_bucket_creation=true".format(
access_key, secret_key, bucket, path, host, port
)

fs, path = FileSystem.from_uri(uri)
assert path == 'theirbucket/nested/folder/data.parquet'
Expand Down Expand Up @@ -4529,9 +4531,38 @@ def test_write_dataset_s3_put_only(s3_server):
).to_table()
assert result.equals(table)

# Passing create_dir is fine if the bucket already exists
ds.write_dataset(
table, "existing-bucket", filesystem=fs,
format="feather", create_dir=True, partitioning=part,
existing_data_behavior='overwrite_or_ignore'
)
# check roundtrip
result = ds.dataset(
"existing-bucket", filesystem=fs, format="ipc", partitioning="hive"
).to_table()
assert result.equals(table)

# Error enforced by filesystem
with pytest.raises(OSError,
match="Bucket 'non-existing-bucket' not found"):
ds.write_dataset(
table, "non-existing-bucket", filesystem=fs,
format="feather", create_dir=True,
existing_data_behavior='overwrite_or_ignore'
)

# Error enforced by minio / S3 service
fs = S3FileSystem(
access_key='limited',
secret_key='limited123',
endpoint_override='{}:{}'.format(host, port),
scheme='http',
allow_bucket_creation=True,
)
with pytest.raises(OSError, match="Access Denied"):
ds.write_dataset(
table, "existing-bucket", filesystem=fs,
table, "non-existing-bucket", filesystem=fs,
format="feather", create_dir=True,
existing_data_behavior='overwrite_or_ignore'
)
Expand Down
19 changes: 16 additions & 3 deletions python/pyarrow/tests/test_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,9 @@ def s3fs(request, s3_server):
access_key=access_key,
secret_key=secret_key,
endpoint_override='{}:{}'.format(host, port),
scheme='http'
scheme='http',
allow_bucket_creation=True,
allow_bucket_deletion=True
)
fs.create_dir(bucket)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add tests in test_s3_options?


Expand Down Expand Up @@ -443,6 +445,12 @@ def test_s3fs_limited_permissions_create_bucket(s3_server):
)
fs.create_dir('existing-bucket/test')

with pytest.raises(pa.ArrowIOError, match="Bucket 'new-bucket' not found"):
fs.create_dir('new-bucket')

with pytest.raises(pa.ArrowIOError, match="Would delete bucket"):
fs.delete_dir('existing-bucket')


def test_file_info_constructor():
dt = datetime.fromtimestamp(1568799826, timezone.utc)
Expand Down Expand Up @@ -1036,6 +1044,10 @@ def test_s3_options():
assert isinstance(fs, S3FileSystem)
assert pickle.loads(pickle.dumps(fs)) == fs

fs = S3FileSystem(allow_bucket_creation=True, allow_bucket_deletion=True)
assert isinstance(fs, S3FileSystem)
assert pickle.loads(pickle.dumps(fs)) == fs

with pytest.raises(ValueError):
S3FileSystem(access_key='access')
with pytest.raises(ValueError):
Expand Down Expand Up @@ -1308,8 +1320,9 @@ def test_filesystem_from_uri_s3(s3_server):

host, port, access_key, secret_key = s3_server['connection']

uri = "s3://{}:{}@mybucket/foo/bar?scheme=http&endpoint_override={}:{}" \
.format(access_key, secret_key, host, port)
uri = "s3://{}:{}@mybucket/foo/bar?scheme=http&endpoint_override={}:{}"\
"&allow_bucket_creation=True" \
.format(access_key, secret_key, host, port)

fs, path = FileSystem.from_uri(uri)
assert isinstance(fs, S3FileSystem)
Expand Down
4 changes: 2 additions & 2 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading