Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ci/scripts/install_minio.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,6 @@ elif [[ ${version} != "latest" ]]; then
fi

wget -nv -P ${prefix}/bin https://dl.min.io/server/minio/release/${platform}-${arch}/minio
wget -nv -P ${prefix}/bin https://dl.min.io/client/mc/release/${platform}-${arch}/mc
chmod +x ${prefix}/bin/minio
chmod +x ${prefix}/bin/mc
22 changes: 21 additions & 1 deletion cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1530,6 +1530,23 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
}
}

// Tests to see if a bucket exists
Result<bool> BucketExists(const std::string& bucket) {
S3Model::HeadBucketRequest req;
req.SetBucket(ToAwsString(bucket));

auto outcome = client_->HeadBucket(req);
if (!outcome.IsSuccess()) {
if (!IsNotFound(outcome.GetError())) {
return ErrorToStatus(std::forward_as_tuple(
"When testing for existence of bucket '", bucket, "': "),
outcome.GetError());
}
return false;
}
return true;
}

// Create a bucket. Successful if bucket already exists.
Status CreateBucket(const std::string& bucket) {
S3Model::CreateBucketConfiguration config;
Expand Down Expand Up @@ -2159,7 +2176,10 @@ Status S3FileSystem::CreateDir(const std::string& s, bool recursive) {
// Create object
if (recursive) {
// Ensure bucket exists
RETURN_NOT_OK(impl_->CreateBucket(path.bucket));
ARROW_ASSIGN_OR_RAISE(bool bucket_exists, impl_->BucketExists(path.bucket));
if (!bucket_exists) {
RETURN_NOT_OK(impl_->CreateBucket(path.bucket));
}
// Ensure that all parents exist, then the directory itself
std::string parent_key;
for (const auto& part : path.key_parts) {
Expand Down
6 changes: 5 additions & 1 deletion python/pyarrow/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,11 @@ def s3_server(s3_connection):
except OSError:
pytest.skip('`minio` command cannot be located')
else:
yield proc
yield {
'connection': s3_connection,
'process': proc,
'tempdir': tempdir
}
finally:
if proc is not None:
proc.kill()
12 changes: 6 additions & 6 deletions python/pyarrow/tests/parquet/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ def datadir(base_datadir):


@pytest.fixture
def s3_bucket(request, s3_connection, s3_server):
def s3_bucket(s3_server):
boto3 = pytest.importorskip('boto3')
botocore = pytest.importorskip('botocore')

host, port, access_key, secret_key = s3_connection
host, port, access_key, secret_key = s3_server['connection']
s3 = boto3.resource(
's3',
endpoint_url='http://{}:{}'.format(host, port),
Expand All @@ -49,10 +49,10 @@ def s3_bucket(request, s3_connection, s3_server):


@pytest.fixture
def s3_example_s3fs(s3_connection, s3_server, s3_bucket):
def s3_example_s3fs(s3_server, s3_bucket):
s3fs = pytest.importorskip('s3fs')

host, port, access_key, secret_key = s3_connection
host, port, access_key, secret_key = s3_server['connection']
fs = s3fs.S3FileSystem(
key=access_key,
secret=secret_key,
Expand All @@ -72,10 +72,10 @@ def s3_example_s3fs(s3_connection, s3_server, s3_bucket):


@pytest.fixture
def s3_example_fs(s3_connection, s3_server):
def s3_example_fs(s3_server):
from pyarrow.fs import FileSystem

host, port, access_key, secret_key = s3_connection
host, port, access_key, secret_key = s3_server['connection']
uri = (
"s3://{}:{}@mybucket/data.parquet?scheme=http&endpoint_override={}:{}"
.format(access_key, secret_key, host, port)
Expand Down
8 changes: 4 additions & 4 deletions python/pyarrow/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2242,11 +2242,11 @@ def test_dataset_partitioned_dictionary_type_reconstruct(tempdir):


@pytest.fixture
def s3_example_simple(s3_connection, s3_server):
def s3_example_simple(s3_server):
from pyarrow.fs import FileSystem
import pyarrow.parquet as pq

host, port, access_key, secret_key = s3_connection
host, port, access_key, secret_key = s3_server['connection']
uri = (
"s3://{}:{}@mybucket/data.parquet?scheme=http&endpoint_override={}:{}"
.format(access_key, secret_key, host, port)
Expand Down Expand Up @@ -2305,11 +2305,11 @@ def test_open_dataset_from_uri_s3_fsspec(s3_example_simple):

@pytest.mark.parquet
@pytest.mark.s3
def test_open_dataset_from_s3_with_filesystem_uri(s3_connection, s3_server):
def test_open_dataset_from_s3_with_filesystem_uri(s3_server):
from pyarrow.fs import FileSystem
import pyarrow.parquet as pq

host, port, access_key, secret_key = s3_connection
host, port, access_key, secret_key = s3_server['connection']
bucket = 'theirbucket'
path = 'nested/folder/data.parquet'
uri = "s3://{}:{}@{}/{}?scheme=http&endpoint_override={}:{}".format(
Expand Down
127 changes: 121 additions & 6 deletions python/pyarrow/tests/test_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import os
import pathlib
import pickle
import subprocess
import sys
import time

import pytest
import weakref
Expand Down Expand Up @@ -263,11 +265,11 @@ def subtree_localfs(request, tempdir, localfs):


@pytest.fixture
def s3fs(request, s3_connection, s3_server):
def s3fs(request, s3_server):
request.config.pyarrow.requires('s3')
from pyarrow.fs import S3FileSystem

host, port, access_key, secret_key = s3_connection
host, port, access_key, secret_key = s3_server['connection']
bucket = 'pyarrow-filesystem/'

fs = S3FileSystem(
Expand Down Expand Up @@ -298,6 +300,104 @@ def subtree_s3fs(request, s3fs):
)


_minio_limited_policy = """{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:ListAllMyBuckets",
"s3:PutObject",
"s3:GetObject",
"s3:ListBucket",
"s3:PutObjectTagging",
"s3:DeleteObject",
"s3:GetObjectVersion"
],
"Resource": [
"arn:aws:s3:::*"
]
}
]
}"""


def _run_mc_command(mcdir, *args):
full_args = ['mc', '-C', mcdir] + list(args)
proc = subprocess.Popen(full_args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, encoding='utf-8')
retval = proc.wait(10)
cmd_str = ' '.join(full_args)
print(f'Cmd: {cmd_str}')
print(f' Return: {retval}')
print(f' Stdout: {proc.stdout.read()}')
print(f' Stderr: {proc.stderr.read()}')
if retval != 0:
raise ChildProcessError("Could not run mc")


def _wait_for_minio_startup(mcdir, address, access_key, secret_key):
start = time.time()
while time.time() - start < 10:
try:
_run_mc_command(mcdir, 'alias', 'set', 'myminio',
f'http://{address}', access_key, secret_key)
return
except ChildProcessError:
time.sleep(1)
raise Exception("mc command could not connect to local minio")


def _configure_limited_user(tmpdir, address, access_key, secret_key):
"""
Attempts to use the mc command to configure the minio server
with a special user limited:limited123 which does not have
permission to create buckets. This mirrors some real life S3
configurations where users are given strict permissions.

Arrow S3 operations should still work in such a configuration
(e.g. see ARROW-13685)
"""
try:
mcdir = os.path.join(tmpdir, 'mc')
os.mkdir(mcdir)
policy_path = os.path.join(tmpdir, 'limited-buckets-policy.json')
with open(policy_path, mode='w') as policy_file:
policy_file.write(_minio_limited_policy)
# The s3_server fixture starts the minio process but
# it takes a few moments for the process to become available
_wait_for_minio_startup(mcdir, address, access_key, secret_key)
# These commands create a limited user with a specific
# policy and creates a sample bucket for that user to
# write to
_run_mc_command(mcdir, 'admin', 'policy', 'add',
'myminio/', 'no-create-buckets', policy_path)
_run_mc_command(mcdir, 'admin', 'user', 'add',
'myminio/', 'limited', 'limited123')
_run_mc_command(mcdir, 'admin', 'policy', 'set',
'myminio', 'no-create-buckets', 'user=limited')
_run_mc_command(mcdir, 'mb', 'myminio/existing-bucket')
return True
except FileNotFoundError:
# If mc is not found, skip these tests
return False


@pytest.fixture(scope='session')
def limited_s3_user(request, s3_server):
if sys.platform == 'win32':
# Can't rely on FileNotFound check because
# there is sometimes an mc command on Windows
# which is unrelated to the minio mc
pytest.skip('The mc command is not installed on Windows')
request.config.pyarrow.requires('s3')
tempdir = s3_server['tempdir']
host, port, access_key, secret_key = s3_server['connection']
address = '{}:{}'.format(host, port)
if not _configure_limited_user(tempdir, address, access_key, secret_key):
pytest.skip('Could not locate mc command to configure limited user')


@pytest.fixture
def hdfs(request, hdfs_connection):
request.config.pyarrow.requires('hdfs')
Expand Down Expand Up @@ -345,13 +445,13 @@ def py_fsspec_memoryfs(request, tempdir):


@pytest.fixture
def py_fsspec_s3fs(request, s3_connection, s3_server):
def py_fsspec_s3fs(request, s3_server):
s3fs = pytest.importorskip("s3fs")
if (sys.version_info < (3, 7) and
Version(s3fs.__version__) >= Version("0.5")):
pytest.skip("s3fs>=0.5 version is async and requires Python >= 3.7")

host, port, access_key, secret_key = s3_connection
host, port, access_key, secret_key = s3_server['connection']
bucket = 'pyarrow-filesystem/'

fs = s3fs.S3FileSystem(
Expand Down Expand Up @@ -473,6 +573,21 @@ def skip_fsspec_s3fs(fs):
pytest.xfail(reason="Not working with fsspec's s3fs")


@pytest.mark.s3
def test_s3fs_limited_permissions_create_bucket(s3_server, limited_s3_user):
from pyarrow.fs import S3FileSystem

host, port, _, _ = s3_server['connection']

fs = S3FileSystem(
access_key='limited',
secret_key='limited123',
endpoint_override='{}:{}'.format(host, port),
scheme='http'
)
fs.create_dir('existing-bucket/test')


def test_file_info_constructor():
dt = datetime.fromtimestamp(1568799826, timezone.utc)

Expand Down Expand Up @@ -1319,10 +1434,10 @@ def test_filesystem_from_path_object(path):


@pytest.mark.s3
def test_filesystem_from_uri_s3(s3_connection, s3_server):
def test_filesystem_from_uri_s3(s3_server):
from pyarrow.fs import S3FileSystem

host, port, access_key, secret_key = s3_connection
host, port, access_key, secret_key = s3_server['connection']

uri = "s3://{}:{}@mybucket/foo/bar?scheme=http&endpoint_override={}:{}" \
.format(access_key, secret_key, host, port)
Expand Down