diff --git a/ci/scripts/install_minio.sh b/ci/scripts/install_minio.sh index 42f7ce040e0..4b61a35b208 100755 --- a/ci/scripts/install_minio.sh +++ b/ci/scripts/install_minio.sh @@ -49,4 +49,6 @@ elif [[ ${version} != "latest" ]]; then fi wget -nv -P ${prefix}/bin https://dl.min.io/server/minio/release/${platform}-${arch}/minio +wget -nv -P ${prefix}/bin https://dl.min.io/client/mc/release/${platform}-${arch}/mc chmod +x ${prefix}/bin/minio +chmod +x ${prefix}/bin/mc diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index 3c443ed5518..314abdf3393 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -1530,6 +1530,23 @@ class S3FileSystem::Impl : public std::enable_shared_from_this BucketExists(const std::string& bucket) { + S3Model::HeadBucketRequest req; + req.SetBucket(ToAwsString(bucket)); + + auto outcome = client_->HeadBucket(req); + if (!outcome.IsSuccess()) { + if (!IsNotFound(outcome.GetError())) { + return ErrorToStatus(std::forward_as_tuple( + "When testing for existence of bucket '", bucket, "': "), + outcome.GetError()); + } + return false; + } + return true; + } + // Create a bucket. Successful if bucket already exists. Status CreateBucket(const std::string& bucket) { S3Model::CreateBucketConfiguration config; @@ -2159,7 +2176,10 @@ Status S3FileSystem::CreateDir(const std::string& s, bool recursive) { // Create object if (recursive) { // Ensure bucket exists - RETURN_NOT_OK(impl_->CreateBucket(path.bucket)); + ARROW_ASSIGN_OR_RAISE(bool bucket_exists, impl_->BucketExists(path.bucket)); + if (!bucket_exists) { + RETURN_NOT_OK(impl_->CreateBucket(path.bucket)); + } // Ensure that all parents exist, then the directory itself std::string parent_key; for (const auto& part : path.key_parts) { diff --git a/python/pyarrow/tests/conftest.py b/python/pyarrow/tests/conftest.py index 40836867f5f..8fa520b9398 100644 --- a/python/pyarrow/tests/conftest.py +++ b/python/pyarrow/tests/conftest.py @@ -292,7 +292,11 @@ def s3_server(s3_connection): except OSError: pytest.skip('`minio` command cannot be located') else: - yield proc + yield { + 'connection': s3_connection, + 'process': proc, + 'tempdir': tempdir + } finally: if proc is not None: proc.kill() diff --git a/python/pyarrow/tests/parquet/conftest.py b/python/pyarrow/tests/parquet/conftest.py index ead9affc3cc..1e75493cdae 100644 --- a/python/pyarrow/tests/parquet/conftest.py +++ b/python/pyarrow/tests/parquet/conftest.py @@ -26,11 +26,11 @@ def datadir(base_datadir): @pytest.fixture -def s3_bucket(request, s3_connection, s3_server): +def s3_bucket(s3_server): boto3 = pytest.importorskip('boto3') botocore = pytest.importorskip('botocore') - host, port, access_key, secret_key = s3_connection + host, port, access_key, secret_key = s3_server['connection'] s3 = boto3.resource( 's3', endpoint_url='http://{}:{}'.format(host, port), @@ -49,10 +49,10 @@ def s3_bucket(request, s3_connection, s3_server): @pytest.fixture -def s3_example_s3fs(s3_connection, s3_server, s3_bucket): +def s3_example_s3fs(s3_server, s3_bucket): s3fs = pytest.importorskip('s3fs') - host, port, access_key, secret_key = s3_connection + host, port, access_key, secret_key = s3_server['connection'] fs = s3fs.S3FileSystem( key=access_key, secret=secret_key, @@ -72,10 +72,10 @@ def s3_example_s3fs(s3_connection, s3_server, s3_bucket): @pytest.fixture -def s3_example_fs(s3_connection, s3_server): +def s3_example_fs(s3_server): from pyarrow.fs import FileSystem - host, port, access_key, secret_key = s3_connection + host, port, access_key, secret_key = s3_server['connection'] uri = ( "s3://{}:{}@mybucket/data.parquet?scheme=http&endpoint_override={}:{}" .format(access_key, secret_key, host, port) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 5cbe0d9ab10..900cd650db2 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -2242,11 +2242,11 @@ def test_dataset_partitioned_dictionary_type_reconstruct(tempdir): @pytest.fixture -def s3_example_simple(s3_connection, s3_server): +def s3_example_simple(s3_server): from pyarrow.fs import FileSystem import pyarrow.parquet as pq - host, port, access_key, secret_key = s3_connection + host, port, access_key, secret_key = s3_server['connection'] uri = ( "s3://{}:{}@mybucket/data.parquet?scheme=http&endpoint_override={}:{}" .format(access_key, secret_key, host, port) @@ -2305,11 +2305,11 @@ def test_open_dataset_from_uri_s3_fsspec(s3_example_simple): @pytest.mark.parquet @pytest.mark.s3 -def test_open_dataset_from_s3_with_filesystem_uri(s3_connection, s3_server): +def test_open_dataset_from_s3_with_filesystem_uri(s3_server): from pyarrow.fs import FileSystem import pyarrow.parquet as pq - host, port, access_key, secret_key = s3_connection + host, port, access_key, secret_key = s3_server['connection'] bucket = 'theirbucket' path = 'nested/folder/data.parquet' uri = "s3://{}:{}@{}/{}?scheme=http&endpoint_override={}:{}".format( diff --git a/python/pyarrow/tests/test_fs.py b/python/pyarrow/tests/test_fs.py index 16e73a1e286..684d89f5b0d 100644 --- a/python/pyarrow/tests/test_fs.py +++ b/python/pyarrow/tests/test_fs.py @@ -20,7 +20,9 @@ import os import pathlib import pickle +import subprocess import sys +import time import pytest import weakref @@ -263,11 +265,11 @@ def subtree_localfs(request, tempdir, localfs): @pytest.fixture -def s3fs(request, s3_connection, s3_server): +def s3fs(request, s3_server): request.config.pyarrow.requires('s3') from pyarrow.fs import S3FileSystem - host, port, access_key, secret_key = s3_connection + host, port, access_key, secret_key = s3_server['connection'] bucket = 'pyarrow-filesystem/' fs = S3FileSystem( @@ -298,6 +300,104 @@ def subtree_s3fs(request, s3fs): ) +_minio_limited_policy = """{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "s3:ListAllMyBuckets", + "s3:PutObject", + "s3:GetObject", + "s3:ListBucket", + "s3:PutObjectTagging", + "s3:DeleteObject", + "s3:GetObjectVersion" + ], + "Resource": [ + "arn:aws:s3:::*" + ] + } + ] +}""" + + +def _run_mc_command(mcdir, *args): + full_args = ['mc', '-C', mcdir] + list(args) + proc = subprocess.Popen(full_args, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, encoding='utf-8') + retval = proc.wait(10) + cmd_str = ' '.join(full_args) + print(f'Cmd: {cmd_str}') + print(f' Return: {retval}') + print(f' Stdout: {proc.stdout.read()}') + print(f' Stderr: {proc.stderr.read()}') + if retval != 0: + raise ChildProcessError("Could not run mc") + + +def _wait_for_minio_startup(mcdir, address, access_key, secret_key): + start = time.time() + while time.time() - start < 10: + try: + _run_mc_command(mcdir, 'alias', 'set', 'myminio', + f'http://{address}', access_key, secret_key) + return + except ChildProcessError: + time.sleep(1) + raise Exception("mc command could not connect to local minio") + + +def _configure_limited_user(tmpdir, address, access_key, secret_key): + """ + Attempts to use the mc command to configure the minio server + with a special user limited:limited123 which does not have + permission to create buckets. This mirrors some real life S3 + configurations where users are given strict permissions. + + Arrow S3 operations should still work in such a configuration + (e.g. see ARROW-13685) + """ + try: + mcdir = os.path.join(tmpdir, 'mc') + os.mkdir(mcdir) + policy_path = os.path.join(tmpdir, 'limited-buckets-policy.json') + with open(policy_path, mode='w') as policy_file: + policy_file.write(_minio_limited_policy) + # The s3_server fixture starts the minio process but + # it takes a few moments for the process to become available + _wait_for_minio_startup(mcdir, address, access_key, secret_key) + # These commands create a limited user with a specific + # policy and creates a sample bucket for that user to + # write to + _run_mc_command(mcdir, 'admin', 'policy', 'add', + 'myminio/', 'no-create-buckets', policy_path) + _run_mc_command(mcdir, 'admin', 'user', 'add', + 'myminio/', 'limited', 'limited123') + _run_mc_command(mcdir, 'admin', 'policy', 'set', + 'myminio', 'no-create-buckets', 'user=limited') + _run_mc_command(mcdir, 'mb', 'myminio/existing-bucket') + return True + except FileNotFoundError: + # If mc is not found, skip these tests + return False + + +@pytest.fixture(scope='session') +def limited_s3_user(request, s3_server): + if sys.platform == 'win32': + # Can't rely on FileNotFound check because + # there is sometimes an mc command on Windows + # which is unrelated to the minio mc + pytest.skip('The mc command is not installed on Windows') + request.config.pyarrow.requires('s3') + tempdir = s3_server['tempdir'] + host, port, access_key, secret_key = s3_server['connection'] + address = '{}:{}'.format(host, port) + if not _configure_limited_user(tempdir, address, access_key, secret_key): + pytest.skip('Could not locate mc command to configure limited user') + + @pytest.fixture def hdfs(request, hdfs_connection): request.config.pyarrow.requires('hdfs') @@ -345,13 +445,13 @@ def py_fsspec_memoryfs(request, tempdir): @pytest.fixture -def py_fsspec_s3fs(request, s3_connection, s3_server): +def py_fsspec_s3fs(request, s3_server): s3fs = pytest.importorskip("s3fs") if (sys.version_info < (3, 7) and Version(s3fs.__version__) >= Version("0.5")): pytest.skip("s3fs>=0.5 version is async and requires Python >= 3.7") - host, port, access_key, secret_key = s3_connection + host, port, access_key, secret_key = s3_server['connection'] bucket = 'pyarrow-filesystem/' fs = s3fs.S3FileSystem( @@ -473,6 +573,21 @@ def skip_fsspec_s3fs(fs): pytest.xfail(reason="Not working with fsspec's s3fs") +@pytest.mark.s3 +def test_s3fs_limited_permissions_create_bucket(s3_server, limited_s3_user): + from pyarrow.fs import S3FileSystem + + host, port, _, _ = s3_server['connection'] + + fs = S3FileSystem( + access_key='limited', + secret_key='limited123', + endpoint_override='{}:{}'.format(host, port), + scheme='http' + ) + fs.create_dir('existing-bucket/test') + + def test_file_info_constructor(): dt = datetime.fromtimestamp(1568799826, timezone.utc) @@ -1319,10 +1434,10 @@ def test_filesystem_from_path_object(path): @pytest.mark.s3 -def test_filesystem_from_uri_s3(s3_connection, s3_server): +def test_filesystem_from_uri_s3(s3_server): from pyarrow.fs import S3FileSystem - host, port, access_key, secret_key = s3_connection + host, port, access_key, secret_key = s3_server['connection'] uri = "s3://{}:{}@mybucket/foo/bar?scheme=http&endpoint_override={}:{}" \ .format(access_key, secret_key, host, port)