From 0c6002fb9bfa2261dfa9c4d83f1a954ceec5096b Mon Sep 17 00:00:00 2001 From: Sanjiban Sengupta Date: Tue, 22 Mar 2022 10:33:51 +0530 Subject: [PATCH 01/22] feat: flag in dataset writer for creating dir --- cpp/src/arrow/dataset/dataset_writer.cc | 12 +++++++----- cpp/src/arrow/dataset/file_base.h | 2 ++ 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset_writer.cc b/cpp/src/arrow/dataset/dataset_writer.cc index 3af44b36a9b..77997bebc5c 100644 --- a/cpp/src/arrow/dataset/dataset_writer.cc +++ b/cpp/src/arrow/dataset/dataset_writer.cc @@ -355,7 +355,9 @@ class DatasetWriterDirectoryQueue : public util::AsyncDestroyable { std::move(directory), std::move(prefix), std::move(schema), write_options, writer_state); RETURN_NOT_OK(task_group->AddTask(dir_queue->on_closed())); - dir_queue->PrepareDirectory(); + if (write_options.create_dir) { + dir_queue->PrepareDirectory(); + } ARROW_ASSIGN_OR_RAISE(dir_queue->current_filename_, dir_queue->GetNextFilename()); // std::move required to make RTools 3.5 mingw compiler happy return std::move(dir_queue); @@ -457,9 +459,9 @@ class DatasetWriter::DatasetWriterImpl : public util::AsyncDestroyable { if (!directory.empty()) { auto full_path = fs::internal::ConcatAbstractPath(write_options_.base_dir, directory); - return DoWriteRecordBatch(std::move(batch), full_path, prefix); + return DoWriteRecordBatch(std::move(batch), full_path, prefix, write_options_.create_dir); } else { - return DoWriteRecordBatch(std::move(batch), write_options_.base_dir, prefix); + return DoWriteRecordBatch(std::move(batch), write_options_.base_dir, prefix, write_options_.create_dir); } } @@ -478,7 +480,7 @@ class DatasetWriter::DatasetWriterImpl : public util::AsyncDestroyable { } Future<> DoWriteRecordBatch(std::shared_ptr batch, - const std::string& directory, const std::string& prefix) { + const std::string& directory, const std::string& prefix, const bool& create_dir) { ARROW_ASSIGN_OR_RAISE( auto dir_queue_itr, ::arrow::internal::GetOrInsertGenerated( @@ -518,7 +520,7 @@ class DatasetWriter::DatasetWriterImpl : public util::AsyncDestroyable { if (batch) { return backpressure.Then([this, batch, directory, prefix] { - return DoWriteRecordBatch(batch, directory, prefix); + return DoWriteRecordBatch(batch, directory, prefix, create_dir); }); } return Future<>::MakeFinished(); diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index dc0ce5cf192..aa628860720 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -387,6 +387,8 @@ struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions { /// Controls what happens if an output directory already exists. ExistingDataBehavior existing_data_behavior = ExistingDataBehavior::kError; + bool create_dir = true; + /// Callback to be invoked against all FileWriters before /// they are finalized with FileWriter::Finish(). std::function writer_pre_finish = [](FileWriter*) { From 0b8902b5cb888e021f886de1e3cd6c7b802159ad Mon Sep 17 00:00:00 2001 From: Sanjiban Sengupta Date: Mon, 11 Apr 2022 10:18:01 +0530 Subject: [PATCH 02/22] test: testing put only limited s3 policy --- cpp/src/arrow/dataset/dataset_writer.cc | 11 +- python/pyarrow/_dataset.pyx | 3 + python/pyarrow/dataset.py | 4 +- python/pyarrow/includes/libarrow_dataset.pxd | 2 + python/pyarrow/tests/conftest.py | 22 ++++ python/pyarrow/tests/test_dataset.py | 50 ++++++++++ python/pyarrow/tests/test_fs.py | 100 +------------------ python/pyarrow/tests/util.py | 85 ++++++++++++++++ 8 files changed, 172 insertions(+), 105 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset_writer.cc b/cpp/src/arrow/dataset/dataset_writer.cc index 77997bebc5c..4c29b2e8347 100644 --- a/cpp/src/arrow/dataset/dataset_writer.cc +++ b/cpp/src/arrow/dataset/dataset_writer.cc @@ -459,9 +459,11 @@ class DatasetWriter::DatasetWriterImpl : public util::AsyncDestroyable { if (!directory.empty()) { auto full_path = fs::internal::ConcatAbstractPath(write_options_.base_dir, directory); - return DoWriteRecordBatch(std::move(batch), full_path, prefix, write_options_.create_dir); + return DoWriteRecordBatch(std::move(batch), full_path, prefix, + write_options_.create_dir); } else { - return DoWriteRecordBatch(std::move(batch), write_options_.base_dir, prefix, write_options_.create_dir); + return DoWriteRecordBatch(std::move(batch), write_options_.base_dir, prefix, + write_options_.create_dir); } } @@ -480,7 +482,8 @@ class DatasetWriter::DatasetWriterImpl : public util::AsyncDestroyable { } Future<> DoWriteRecordBatch(std::shared_ptr batch, - const std::string& directory, const std::string& prefix, const bool& create_dir) { + const std::string& directory, const std::string& prefix, + const bool& create_dir) { ARROW_ASSIGN_OR_RAISE( auto dir_queue_itr, ::arrow::internal::GetOrInsertGenerated( @@ -519,7 +522,7 @@ class DatasetWriter::DatasetWriterImpl : public util::AsyncDestroyable { } if (batch) { - return backpressure.Then([this, batch, directory, prefix] { + return backpressure.Then([this, batch, directory, prefix, create_dir] { return DoWriteRecordBatch(batch, directory, prefix, create_dir); }); } diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 3a4d3ad4c37..efa736939ec 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -24,6 +24,7 @@ from cython.operator cimport dereference as deref import collections import os import warnings +from libcpp cimport bool import pyarrow as pa from pyarrow.lib cimport * @@ -2683,6 +2684,7 @@ def _filesystemdataset_write( int max_rows_per_file, int min_rows_per_group, int max_rows_per_group, + bool create_dir ): """ CFileSystemDataset.Write wrapper @@ -2715,6 +2717,7 @@ def _filesystemdataset_write( ("existing_data_behavior must be one of 'error', ", "'overwrite_or_ignore' or 'delete_matching'") ) + c_options.create_dir = create_dir if file_visitor is not None: visit_args = {'base_dir': c_options.base_dir, diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 955374693f1..afdecf9e0f3 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -753,7 +753,7 @@ def write_dataset(data, base_dir, basename_template=None, format=None, max_partitions=None, max_open_files=None, max_rows_per_file=None, min_rows_per_group=None, max_rows_per_group=None, file_visitor=None, - existing_data_behavior='error'): + existing_data_behavior='error', create_dir=True): """ Write a dataset to a given format and partitioning. @@ -928,5 +928,5 @@ def file_visitor(written_file): scanner, base_dir, basename_template, filesystem, partitioning, file_options, max_partitions, file_visitor, existing_data_behavior, max_open_files, max_rows_per_file, - min_rows_per_group, max_rows_per_group + min_rows_per_group, max_rows_per_group, create_dir ) diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index cd9ca2a4086..76184745035 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -18,6 +18,7 @@ # distutils: language = c++ from libcpp.unordered_map cimport unordered_map +from libcpp cimport bool from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport * @@ -223,6 +224,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: function[cb_writer_finish_internal] writer_pre_finish function[cb_writer_finish_internal] writer_post_finish ExistingDataBehavior existing_data_behavior + bool create_dir uint32_t max_open_files uint64_t max_rows_per_file uint64_t min_rows_per_group diff --git a/python/pyarrow/tests/conftest.py b/python/pyarrow/tests/conftest.py index 466b1647fdd..03e062235de 100644 --- a/python/pyarrow/tests/conftest.py +++ b/python/pyarrow/tests/conftest.py @@ -16,6 +16,7 @@ # under the License. import os +import sys import pathlib import subprocess from tempfile import TemporaryDirectory @@ -25,6 +26,7 @@ from pyarrow.util import find_free_port from pyarrow import Codec +from pyarrow.tests.util import _configure_limited_user # setup hypothesis profiles @@ -311,3 +313,23 @@ def s3_server(s3_connection): finally: if proc is not None: proc.kill() + + +@pytest.fixture(scope='session') +def limited_s3_user(request, s3_server): + def _limited_s3_user(policy): + if sys.platform == 'win32': + # Can't rely on FileNotFound check because + # there is sometimes an mc command on Windows + # which is unrelated to the minio mc + pytest.skip('The mc command is not installed on Windows') + request.config.pyarrow.requires('s3') + tempdir = s3_server['tempdir'] + host, port, access_key, secret_key = s3_server['connection'] + address = '{}:{}'.format(host, port) + if not _configure_limited_user(tempdir, address, + access_key, secret_key, policy): + pytest.skip( + 'Could not locate mc command to configure limited user') + + return _limited_s3_user diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 878478ad6c1..bd41b310bad 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -25,6 +25,7 @@ import tempfile import threading import time +from venv import create import numpy as np import pytest @@ -4334,6 +4335,55 @@ def test_write_dataset_s3(s3_example_simple): assert result.equals(table) +_minio_put_only_policy = """{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "s3:PutObject", + "s3:GetObjectVersion" + ], + "Resource": [ + "arn:aws:s3:::*" + ] + } + ] +}""" + + +@pytest.mark.parquet +@pytest.mark.s3 +def test_write_dataset_s3_put_only(s3_server, limited_s3_user): + from pyarrow.fs import S3FileSystem + + # write dataset with s3 filesystem + limited_s3_user(_minio_put_only_policy) + host, port, _, _ = s3_server['connection'] + fs = S3FileSystem( + access_key='limited', + secret_key='limited123', + endpoint_override='{}:{}'.format(host, port), + scheme='http' + ) + # fs.create_dir('existing-bucket/test') + table = pa.table([ + pa.array(range(20)), pa.array(np.random.randn(20)), + pa.array(np.repeat(['a', 'b'], 10))], + names=["f1", "f2", "part"] + ) + # writing with filesystem object with create_dir flag set to false + ds.write_dataset( + table, "'existing-bucket/test/", filesystem=fs, + format="feather", create_dir=False + ) + # check roundtrip + result = ds.dataset( + "'existing-bucket/test/", filesystem=fs, format="ipc", + ).to_table() + assert result.equals(table) + + @pytest.mark.parquet def test_dataset_null_to_dictionary_cast(tempdir, dataset_reader): # ARROW-12420 diff --git a/python/pyarrow/tests/test_fs.py b/python/pyarrow/tests/test_fs.py index 33c91933bf5..86826e32ead 100644 --- a/python/pyarrow/tests/test_fs.py +++ b/python/pyarrow/tests/test_fs.py @@ -261,104 +261,6 @@ def subtree_s3fs(request, s3fs): }""" -def _run_mc_command(mcdir, *args): - full_args = ['mc', '-C', mcdir] + list(args) - proc = subprocess.Popen(full_args, stdout=subprocess.PIPE, - stderr=subprocess.PIPE, encoding='utf-8') - retval = proc.wait(10) - cmd_str = ' '.join(full_args) - print(f'Cmd: {cmd_str}') - print(f' Return: {retval}') - print(f' Stdout: {proc.stdout.read()}') - print(f' Stderr: {proc.stderr.read()}') - if retval != 0: - raise ChildProcessError("Could not run mc") - - -def _wait_for_minio_startup(mcdir, address, access_key, secret_key): - start = time.time() - while time.time() - start < 10: - try: - _run_mc_command(mcdir, 'alias', 'set', 'myminio', - f'http://{address}', access_key, secret_key) - return - except ChildProcessError: - time.sleep(1) - raise Exception("mc command could not connect to local minio") - - -def _ensure_minio_component_version(component, minimum_year): - full_args = [component, '--version'] - proc = subprocess.Popen(full_args, stdout=subprocess.PIPE, - stderr=subprocess.PIPE, encoding='utf-8') - if proc.wait(10) != 0: - return False - stdout = proc.stdout.read() - pattern = component + r' version RELEASE\.(\d+)-.*' - version_match = re.search(pattern, stdout) - if version_match: - version_year = version_match.group(1) - return int(version_year) >= minimum_year - else: - return False - - -def _configure_limited_user(tmpdir, address, access_key, secret_key): - """ - Attempts to use the mc command to configure the minio server - with a special user limited:limited123 which does not have - permission to create buckets. This mirrors some real life S3 - configurations where users are given strict permissions. - - Arrow S3 operations should still work in such a configuration - (e.g. see ARROW-13685) - """ - try: - if not _ensure_minio_component_version('mc', 2021): - # mc version is too old for the capabilities we need - return False - if not _ensure_minio_component_version('minio', 2021): - # minio version is too old for the capabilities we need - return False - mcdir = os.path.join(tmpdir, 'mc') - os.mkdir(mcdir) - policy_path = os.path.join(tmpdir, 'limited-buckets-policy.json') - with open(policy_path, mode='w') as policy_file: - policy_file.write(_minio_limited_policy) - # The s3_server fixture starts the minio process but - # it takes a few moments for the process to become available - _wait_for_minio_startup(mcdir, address, access_key, secret_key) - # These commands create a limited user with a specific - # policy and creates a sample bucket for that user to - # write to - _run_mc_command(mcdir, 'admin', 'policy', 'add', - 'myminio/', 'no-create-buckets', policy_path) - _run_mc_command(mcdir, 'admin', 'user', 'add', - 'myminio/', 'limited', 'limited123') - _run_mc_command(mcdir, 'admin', 'policy', 'set', - 'myminio', 'no-create-buckets', 'user=limited') - _run_mc_command(mcdir, 'mb', 'myminio/existing-bucket') - return True - except FileNotFoundError: - # If mc is not found, skip these tests - return False - - -@pytest.fixture(scope='session') -def limited_s3_user(request, s3_server): - if sys.platform == 'win32': - # Can't rely on FileNotFound check because - # there is sometimes an mc command on Windows - # which is unrelated to the minio mc - pytest.skip('The mc command is not installed on Windows') - request.config.pyarrow.requires('s3') - tempdir = s3_server['tempdir'] - host, port, access_key, secret_key = s3_server['connection'] - address = '{}:{}'.format(host, port) - if not _configure_limited_user(tempdir, address, access_key, secret_key): - pytest.skip('Could not locate mc command to configure limited user') - - @pytest.fixture def hdfs(request, hdfs_connection): request.config.pyarrow.requires('hdfs') @@ -533,7 +435,7 @@ def skip_fsspec_s3fs(fs): @pytest.mark.s3 def test_s3fs_limited_permissions_create_bucket(s3_server, limited_s3_user): from pyarrow.fs import S3FileSystem - + limited_s3_user(_minio_limited_policy) host, port, _, _ = s3_server['connection'] fs = S3FileSystem( diff --git a/python/pyarrow/tests/util.py b/python/pyarrow/tests/util.py index 9d22e6e6a29..efbe4baac27 100644 --- a/python/pyarrow/tests/util.py +++ b/python/pyarrow/tests/util.py @@ -30,6 +30,8 @@ import string import subprocess import sys +import re +import time import pytest @@ -350,3 +352,86 @@ def signal_wakeup_fd(*, warn_on_full_buffer=False): signal.set_wakeup_fd(old_fd) r.close() w.close() + + +def _ensure_minio_component_version(component, minimum_year): + full_args = [component, '--version'] + proc = subprocess.Popen(full_args, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, encoding='utf-8') + if proc.wait(10) != 0: + return False + stdout = proc.stdout.read() + pattern = component + r' version RELEASE\.(\d+)-.*' + version_match = re.search(pattern, stdout) + if version_match: + version_year = version_match.group(1) + return int(version_year) >= minimum_year + else: + return False + + +def _wait_for_minio_startup(mcdir, address, access_key, secret_key): + start = time.time() + while time.time() - start < 10: + try: + _run_mc_command(mcdir, 'alias', 'set', 'myminio', + f'http://{address}', access_key, secret_key) + return + except ChildProcessError: + time.sleep(1) + raise Exception("mc command could not connect to local minio") + + +def _run_mc_command(mcdir, *args): + full_args = ['mc', '-C', mcdir] + list(args) + proc = subprocess.Popen(full_args, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, encoding='utf-8') + retval = proc.wait(10) + cmd_str = ' '.join(full_args) + print(f'Cmd: {cmd_str}') + print(f' Return: {retval}') + print(f' Stdout: {proc.stdout.read()}') + print(f' Stderr: {proc.stderr.read()}') + if retval != 0: + raise ChildProcessError("Could not run mc") + + +def _configure_limited_user(tmpdir, address, access_key, secret_key, policy): + """ + Attempts to use the mc command to configure the minio server + with a special user limited:limited123 which does not have + permission to create buckets. This mirrors some real life S3 + configurations where users are given strict permissions. + + Arrow S3 operations should still work in such a configuration + (e.g. see ARROW-13685) + """ + try: + if not _ensure_minio_component_version('mc', 2021): + # mc version is too old for the capabilities we need + return False + if not _ensure_minio_component_version('minio', 2021): + # minio version is too old for the capabilities we need + return False + mcdir = os.path.join(tmpdir, 'mc') + os.mkdir(mcdir) + policy_path = os.path.join(tmpdir, 'limited-buckets-policy.json') + with open(policy_path, mode='w') as policy_file: + policy_file.write(policy) + # The s3_server fixture starts the minio process but + # it takes a few moments for the process to become available + _wait_for_minio_startup(mcdir, address, access_key, secret_key) + # These commands create a limited user with a specific + # policy and creates a sample bucket for that user to + # write to + _run_mc_command(mcdir, 'admin', 'policy', 'add', + 'myminio/', 'no-create-buckets', policy_path) + _run_mc_command(mcdir, 'admin', 'user', 'add', + 'myminio/', 'limited', 'limited123') + _run_mc_command(mcdir, 'admin', 'policy', 'set', + 'myminio', 'no-create-buckets', 'user=limited') + _run_mc_command(mcdir, 'mb', 'myminio/existing-bucket') + return True + except FileNotFoundError: + # If mc is not found, skip these tests + return False From 3473c51b1aa5b27a95280501c86b99e9b4b5f429 Mon Sep 17 00:00:00 2001 From: Sanjiban Sengupta Date: Tue, 12 Apr 2022 03:54:23 +0530 Subject: [PATCH 03/22] fix: PrepareDirectory for create_dir flag --- cpp/src/arrow/dataset/dataset_writer.cc | 6 ++---- python/pyarrow/tests/test_dataset.py | 8 ++++---- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset_writer.cc b/cpp/src/arrow/dataset/dataset_writer.cc index 4c29b2e8347..33d18b9072a 100644 --- a/cpp/src/arrow/dataset/dataset_writer.cc +++ b/cpp/src/arrow/dataset/dataset_writer.cc @@ -328,7 +328,7 @@ class DatasetWriterDirectoryQueue : public util::AsyncDestroyable { uint64_t rows_written() const { return rows_written_; } void PrepareDirectory() { - if (directory_.empty()) { + if (directory_.empty() || !write_options_.create_dir) { init_future_ = Future<>::MakeFinished(); } else { if (write_options_.existing_data_behavior == @@ -355,9 +355,7 @@ class DatasetWriterDirectoryQueue : public util::AsyncDestroyable { std::move(directory), std::move(prefix), std::move(schema), write_options, writer_state); RETURN_NOT_OK(task_group->AddTask(dir_queue->on_closed())); - if (write_options.create_dir) { - dir_queue->PrepareDirectory(); - } + dir_queue->PrepareDirectory(); ARROW_ASSIGN_OR_RAISE(dir_queue->current_filename_, dir_queue->GetNextFilename()); // std::move required to make RTools 3.5 mingw compiler happy return std::move(dir_queue); diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index bd41b310bad..a53360f3b97 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -4342,6 +4342,7 @@ def test_write_dataset_s3(s3_example_simple): "Effect": "Allow", "Action": [ "s3:PutObject", + "s3:ListBucket", "s3:GetObjectVersion" ], "Resource": [ @@ -4358,7 +4359,6 @@ def test_write_dataset_s3_put_only(s3_server, limited_s3_user): from pyarrow.fs import S3FileSystem # write dataset with s3 filesystem - limited_s3_user(_minio_put_only_policy) host, port, _, _ = s3_server['connection'] fs = S3FileSystem( access_key='limited', @@ -4366,7 +4366,7 @@ def test_write_dataset_s3_put_only(s3_server, limited_s3_user): endpoint_override='{}:{}'.format(host, port), scheme='http' ) - # fs.create_dir('existing-bucket/test') + limited_s3_user(_minio_put_only_policy) table = pa.table([ pa.array(range(20)), pa.array(np.random.randn(20)), pa.array(np.repeat(['a', 'b'], 10))], @@ -4374,12 +4374,12 @@ def test_write_dataset_s3_put_only(s3_server, limited_s3_user): ) # writing with filesystem object with create_dir flag set to false ds.write_dataset( - table, "'existing-bucket/test/", filesystem=fs, + table, "existing-bucket", filesystem=fs, format="feather", create_dir=False ) # check roundtrip result = ds.dataset( - "'existing-bucket/test/", filesystem=fs, format="ipc", + "existing-bucket", filesystem=fs, format="ipc", ).to_table() assert result.equals(table) From 472cac7a8cebd519b716ae6e5981f6e6eefec8ff Mon Sep 17 00:00:00 2001 From: Sanjiban Sengupta Date: Fri, 15 Apr 2022 03:36:53 +0530 Subject: [PATCH 04/22] fix: lint issue for unused modules --- python/pyarrow/tests/test_dataset.py | 1 - python/pyarrow/tests/test_fs.py | 4 ---- 2 files changed, 5 deletions(-) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index a53360f3b97..7bffbe6d8f2 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -25,7 +25,6 @@ import tempfile import threading import time -from venv import create import numpy as np import pytest diff --git a/python/pyarrow/tests/test_fs.py b/python/pyarrow/tests/test_fs.py index 86826e32ead..cf9dc8d24a2 100644 --- a/python/pyarrow/tests/test_fs.py +++ b/python/pyarrow/tests/test_fs.py @@ -20,10 +20,6 @@ import os import pathlib import pickle -import re -import subprocess -import sys -import time import pytest import weakref From 2ff6233c9a021f566e51a72934da14c835a9d255 Mon Sep 17 00:00:00 2001 From: Sanjiban Sengupta Date: Fri, 15 Apr 2022 03:54:15 +0530 Subject: [PATCH 05/22] feat: moved limited_s3_user to util --- python/pyarrow/tests/conftest.py | 19 ------------------- python/pyarrow/tests/test_dataset.py | 5 +++-- python/pyarrow/tests/test_fs.py | 4 ++-- python/pyarrow/tests/util.py | 18 ++++++++++++++++++ 4 files changed, 23 insertions(+), 23 deletions(-) diff --git a/python/pyarrow/tests/conftest.py b/python/pyarrow/tests/conftest.py index 03e062235de..9841d0505bf 100644 --- a/python/pyarrow/tests/conftest.py +++ b/python/pyarrow/tests/conftest.py @@ -314,22 +314,3 @@ def s3_server(s3_connection): if proc is not None: proc.kill() - -@pytest.fixture(scope='session') -def limited_s3_user(request, s3_server): - def _limited_s3_user(policy): - if sys.platform == 'win32': - # Can't rely on FileNotFound check because - # there is sometimes an mc command on Windows - # which is unrelated to the minio mc - pytest.skip('The mc command is not installed on Windows') - request.config.pyarrow.requires('s3') - tempdir = s3_server['tempdir'] - host, port, access_key, secret_key = s3_server['connection'] - address = '{}:{}'.format(host, port) - if not _configure_limited_user(tempdir, address, - access_key, secret_key, policy): - pytest.skip( - 'Could not locate mc command to configure limited user') - - return _limited_s3_user diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 7bffbe6d8f2..c549fcaea1b 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -35,7 +35,8 @@ import pyarrow.feather import pyarrow.fs as fs from pyarrow.tests.util import (change_cwd, _filesystem_uri, - FSProtocolClass, ProxyHandler) + FSProtocolClass, ProxyHandler, + limited_s3_user) try: import pandas as pd @@ -4354,7 +4355,7 @@ def test_write_dataset_s3(s3_example_simple): @pytest.mark.parquet @pytest.mark.s3 -def test_write_dataset_s3_put_only(s3_server, limited_s3_user): +def test_write_dataset_s3_put_only(s3_server): from pyarrow.fs import S3FileSystem # write dataset with s3 filesystem diff --git a/python/pyarrow/tests/test_fs.py b/python/pyarrow/tests/test_fs.py index cf9dc8d24a2..a156934421f 100644 --- a/python/pyarrow/tests/test_fs.py +++ b/python/pyarrow/tests/test_fs.py @@ -26,7 +26,7 @@ import pyarrow as pa from pyarrow.tests.test_io import assert_file_not_found -from pyarrow.tests.util import _filesystem_uri, ProxyHandler +from pyarrow.tests.util import _filesystem_uri, ProxyHandler, limited_s3_user from pyarrow.fs import (FileType, FileInfo, FileSelector, FileSystem, LocalFileSystem, SubTreeFileSystem, _MockFileSystem, @@ -429,7 +429,7 @@ def skip_fsspec_s3fs(fs): @pytest.mark.s3 -def test_s3fs_limited_permissions_create_bucket(s3_server, limited_s3_user): +def test_s3fs_limited_permissions_create_bucket(s3_server): from pyarrow.fs import S3FileSystem limited_s3_user(_minio_limited_policy) host, port, _, _ = s3_server['connection'] diff --git a/python/pyarrow/tests/util.py b/python/pyarrow/tests/util.py index efbe4baac27..e29a89600bf 100644 --- a/python/pyarrow/tests/util.py +++ b/python/pyarrow/tests/util.py @@ -435,3 +435,21 @@ def _configure_limited_user(tmpdir, address, access_key, secret_key, policy): except FileNotFoundError: # If mc is not found, skip these tests return False + +def limited_s3_user(request, s3_server): + def _limited_s3_user(policy): + if sys.platform == 'win32': + # Can't rely on FileNotFound check because + # there is sometimes an mc command on Windows + # which is unrelated to the minio mc + pytest.skip('The mc command is not installed on Windows') + request.config.pyarrow.requires('s3') + tempdir = s3_server['tempdir'] + host, port, access_key, secret_key = s3_server['connection'] + address = '{}:{}'.format(host, port) + if not _configure_limited_user(tempdir, address, + access_key, secret_key, policy): + pytest.skip( + 'Could not locate mc command to configure limited user') + + return _limited_s3_user From 9f63cd5d1501dd17fa99ebbf3fd3e535841795ee Mon Sep 17 00:00:00 2001 From: Sanjiban Sengupta Date: Fri, 15 Apr 2022 03:56:40 +0530 Subject: [PATCH 06/22] feat: using c_bool instead of bool for create_dir --- python/pyarrow/includes/libarrow_dataset.pxd | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 76184745035..64e59e05613 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -18,7 +18,7 @@ # distutils: language = c++ from libcpp.unordered_map cimport unordered_map -from libcpp cimport bool +from libcpp cimport bool as c_bool from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport * @@ -224,7 +224,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: function[cb_writer_finish_internal] writer_pre_finish function[cb_writer_finish_internal] writer_post_finish ExistingDataBehavior existing_data_behavior - bool create_dir + c_bool create_dir uint32_t max_open_files uint64_t max_rows_per_file uint64_t min_rows_per_group From f6a01ae41f0e8ce8924bea3f3ee461c1123965e6 Mon Sep 17 00:00:00 2001 From: Sanjiban Sengupta Date: Fri, 15 Apr 2022 19:24:15 +0530 Subject: [PATCH 07/22] test: test with expected failure for create_dir flag set to true --- python/pyarrow/tests/conftest.py | 19 +++++++++++++++++++ python/pyarrow/tests/test_dataset.py | 12 +++++++++--- python/pyarrow/tests/test_fs.py | 4 ++-- python/pyarrow/tests/util.py | 3 +-- 4 files changed, 31 insertions(+), 7 deletions(-) diff --git a/python/pyarrow/tests/conftest.py b/python/pyarrow/tests/conftest.py index 9841d0505bf..03e062235de 100644 --- a/python/pyarrow/tests/conftest.py +++ b/python/pyarrow/tests/conftest.py @@ -314,3 +314,22 @@ def s3_server(s3_connection): if proc is not None: proc.kill() + +@pytest.fixture(scope='session') +def limited_s3_user(request, s3_server): + def _limited_s3_user(policy): + if sys.platform == 'win32': + # Can't rely on FileNotFound check because + # there is sometimes an mc command on Windows + # which is unrelated to the minio mc + pytest.skip('The mc command is not installed on Windows') + request.config.pyarrow.requires('s3') + tempdir = s3_server['tempdir'] + host, port, access_key, secret_key = s3_server['connection'] + address = '{}:{}'.format(host, port) + if not _configure_limited_user(tempdir, address, + access_key, secret_key, policy): + pytest.skip( + 'Could not locate mc command to configure limited user') + + return _limited_s3_user diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index c549fcaea1b..0f396a957f4 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -25,6 +25,7 @@ import tempfile import threading import time +from venv import create import numpy as np import pytest @@ -35,8 +36,7 @@ import pyarrow.feather import pyarrow.fs as fs from pyarrow.tests.util import (change_cwd, _filesystem_uri, - FSProtocolClass, ProxyHandler, - limited_s3_user) + FSProtocolClass, ProxyHandler) try: import pandas as pd @@ -4355,7 +4355,7 @@ def test_write_dataset_s3(s3_example_simple): @pytest.mark.parquet @pytest.mark.s3 -def test_write_dataset_s3_put_only(s3_server): +def test_write_dataset_s3_put_only(s3_server, limited_s3_user): from pyarrow.fs import S3FileSystem # write dataset with s3 filesystem @@ -4382,6 +4382,12 @@ def test_write_dataset_s3_put_only(s3_server): "existing-bucket", filesystem=fs, format="ipc", ).to_table() assert result.equals(table) + + with pytest.raises(OSError, match="Access Denied"): + ds.write_dataset( + table, "existing-bucket", filesystem=fs, + format="feather", create_dir=True + ) @pytest.mark.parquet diff --git a/python/pyarrow/tests/test_fs.py b/python/pyarrow/tests/test_fs.py index a156934421f..cf9dc8d24a2 100644 --- a/python/pyarrow/tests/test_fs.py +++ b/python/pyarrow/tests/test_fs.py @@ -26,7 +26,7 @@ import pyarrow as pa from pyarrow.tests.test_io import assert_file_not_found -from pyarrow.tests.util import _filesystem_uri, ProxyHandler, limited_s3_user +from pyarrow.tests.util import _filesystem_uri, ProxyHandler from pyarrow.fs import (FileType, FileInfo, FileSelector, FileSystem, LocalFileSystem, SubTreeFileSystem, _MockFileSystem, @@ -429,7 +429,7 @@ def skip_fsspec_s3fs(fs): @pytest.mark.s3 -def test_s3fs_limited_permissions_create_bucket(s3_server): +def test_s3fs_limited_permissions_create_bucket(s3_server, limited_s3_user): from pyarrow.fs import S3FileSystem limited_s3_user(_minio_limited_policy) host, port, _, _ = s3_server['connection'] diff --git a/python/pyarrow/tests/util.py b/python/pyarrow/tests/util.py index e29a89600bf..4ecd1fff1a7 100644 --- a/python/pyarrow/tests/util.py +++ b/python/pyarrow/tests/util.py @@ -436,14 +436,13 @@ def _configure_limited_user(tmpdir, address, access_key, secret_key, policy): # If mc is not found, skip these tests return False -def limited_s3_user(request, s3_server): +def limited_s3_user(s3_server): def _limited_s3_user(policy): if sys.platform == 'win32': # Can't rely on FileNotFound check because # there is sometimes an mc command on Windows # which is unrelated to the minio mc pytest.skip('The mc command is not installed on Windows') - request.config.pyarrow.requires('s3') tempdir = s3_server['tempdir'] host, port, access_key, secret_key = s3_server['connection'] address = '{}:{}'.format(host, port) From 21dba2514770ee9f5ff63b9f3419402a852157c2 Mon Sep 17 00:00:00 2001 From: Sanjiban Sengupta Date: Sat, 16 Apr 2022 00:01:48 +0530 Subject: [PATCH 08/22] docs: docstring explaining test for s3 with put only policy --- python/pyarrow/tests/test_dataset.py | 5 +++++ python/pyarrow/tests/util.py | 17 ----------------- 2 files changed, 5 insertions(+), 17 deletions(-) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 0f396a957f4..bd9ef8dfe8d 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -4356,6 +4356,11 @@ def test_write_dataset_s3(s3_example_simple): @pytest.mark.parquet @pytest.mark.s3 def test_write_dataset_s3_put_only(s3_server, limited_s3_user): + # [ARROW-15892] Testing the create_dir flag which will restrict + # creating a new directory for writing a dataset. This is + # required while writing a dataset in s3 where we have very + # limited permissions and thus we can directly write the dataset + # without creating a directory. from pyarrow.fs import S3FileSystem # write dataset with s3 filesystem diff --git a/python/pyarrow/tests/util.py b/python/pyarrow/tests/util.py index 4ecd1fff1a7..efbe4baac27 100644 --- a/python/pyarrow/tests/util.py +++ b/python/pyarrow/tests/util.py @@ -435,20 +435,3 @@ def _configure_limited_user(tmpdir, address, access_key, secret_key, policy): except FileNotFoundError: # If mc is not found, skip these tests return False - -def limited_s3_user(s3_server): - def _limited_s3_user(policy): - if sys.platform == 'win32': - # Can't rely on FileNotFound check because - # there is sometimes an mc command on Windows - # which is unrelated to the minio mc - pytest.skip('The mc command is not installed on Windows') - tempdir = s3_server['tempdir'] - host, port, access_key, secret_key = s3_server['connection'] - address = '{}:{}'.format(host, port) - if not _configure_limited_user(tempdir, address, - access_key, secret_key, policy): - pytest.skip( - 'Could not locate mc command to configure limited user') - - return _limited_s3_user From a40aef0242a93632fd7a196d1c013cbc63fbb54b Mon Sep 17 00:00:00 2001 From: Sanjiban Sengupta Date: Sat, 16 Apr 2022 00:06:17 +0530 Subject: [PATCH 09/22] fix: python lint --- cpp/src/arrow/dataset/dataset_writer.cc | 13 +++++-------- cpp/src/arrow/dataset/file_base.h | 1 + python/pyarrow/dataset.py | 2 ++ python/pyarrow/tests/test_dataset.py | 9 ++++----- 4 files changed, 12 insertions(+), 13 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset_writer.cc b/cpp/src/arrow/dataset/dataset_writer.cc index 33d18b9072a..4fc0d814f36 100644 --- a/cpp/src/arrow/dataset/dataset_writer.cc +++ b/cpp/src/arrow/dataset/dataset_writer.cc @@ -457,11 +457,9 @@ class DatasetWriter::DatasetWriterImpl : public util::AsyncDestroyable { if (!directory.empty()) { auto full_path = fs::internal::ConcatAbstractPath(write_options_.base_dir, directory); - return DoWriteRecordBatch(std::move(batch), full_path, prefix, - write_options_.create_dir); + return DoWriteRecordBatch(std::move(batch), full_path, prefix); } else { - return DoWriteRecordBatch(std::move(batch), write_options_.base_dir, prefix, - write_options_.create_dir); + return DoWriteRecordBatch(std::move(batch), write_options_.base_dir, prefix); } } @@ -480,8 +478,7 @@ class DatasetWriter::DatasetWriterImpl : public util::AsyncDestroyable { } Future<> DoWriteRecordBatch(std::shared_ptr batch, - const std::string& directory, const std::string& prefix, - const bool& create_dir) { + const std::string& directory, const std::string& prefix) { ARROW_ASSIGN_OR_RAISE( auto dir_queue_itr, ::arrow::internal::GetOrInsertGenerated( @@ -520,8 +517,8 @@ class DatasetWriter::DatasetWriterImpl : public util::AsyncDestroyable { } if (batch) { - return backpressure.Then([this, batch, directory, prefix, create_dir] { - return DoWriteRecordBatch(batch, directory, prefix, create_dir); + return backpressure.Then([this, batch, directory, prefix] { + return DoWriteRecordBatch(batch, directory, prefix); }); } return Future<>::MakeFinished(); diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index aa628860720..80f5dc2eca7 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -387,6 +387,7 @@ struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions { /// Controls what happens if an output directory already exists. ExistingDataBehavior existing_data_behavior = ExistingDataBehavior::kError; + /// Flag to restrict or allow creating directory for writing the dataset. bool create_dir = true; /// Callback to be invoked against all FileWriters before diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index afdecf9e0f3..63ff6c02c2d 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -852,6 +852,8 @@ def file_visitor(written_file): dataset. The first time each partition directory is encountered the entire directory will be deleted. This allows you to overwrite old partitions completely. + create_dir : bool, default True + Flag to restrict or allow creating directory while writing a dataset. """ from pyarrow.fs import _resolve_filesystem_and_path diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index bd9ef8dfe8d..8f64fa480d3 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -25,7 +25,6 @@ import tempfile import threading import time -from venv import create import numpy as np import pytest @@ -4356,11 +4355,11 @@ def test_write_dataset_s3(s3_example_simple): @pytest.mark.parquet @pytest.mark.s3 def test_write_dataset_s3_put_only(s3_server, limited_s3_user): - # [ARROW-15892] Testing the create_dir flag which will restrict + # [ARROW-15892] Testing the create_dir flag which will restrict # creating a new directory for writing a dataset. This is - # required while writing a dataset in s3 where we have very + # required while writing a dataset in s3 where we have very # limited permissions and thus we can directly write the dataset - # without creating a directory. + # without creating a directory. from pyarrow.fs import S3FileSystem # write dataset with s3 filesystem @@ -4387,7 +4386,7 @@ def test_write_dataset_s3_put_only(s3_server, limited_s3_user): "existing-bucket", filesystem=fs, format="ipc", ).to_table() assert result.equals(table) - + with pytest.raises(OSError, match="Access Denied"): ds.write_dataset( table, "existing-bucket", filesystem=fs, From 3c00819a8ed036da5b607f91404f4c6b7c78ce3c Mon Sep 17 00:00:00 2001 From: Sanjiban Sengupta Date: Tue, 19 Apr 2022 10:00:32 +0530 Subject: [PATCH 10/22] fix: limited_s3_user used as a function and moved to util.py --- python/pyarrow/tests/conftest.py | 22 ---------------------- python/pyarrow/tests/test_dataset.py | 6 +++--- python/pyarrow/tests/test_fs.py | 6 +++--- python/pyarrow/tests/util.py | 15 +++++++++++++++ 4 files changed, 21 insertions(+), 28 deletions(-) diff --git a/python/pyarrow/tests/conftest.py b/python/pyarrow/tests/conftest.py index 03e062235de..466b1647fdd 100644 --- a/python/pyarrow/tests/conftest.py +++ b/python/pyarrow/tests/conftest.py @@ -16,7 +16,6 @@ # under the License. import os -import sys import pathlib import subprocess from tempfile import TemporaryDirectory @@ -26,7 +25,6 @@ from pyarrow.util import find_free_port from pyarrow import Codec -from pyarrow.tests.util import _configure_limited_user # setup hypothesis profiles @@ -313,23 +311,3 @@ def s3_server(s3_connection): finally: if proc is not None: proc.kill() - - -@pytest.fixture(scope='session') -def limited_s3_user(request, s3_server): - def _limited_s3_user(policy): - if sys.platform == 'win32': - # Can't rely on FileNotFound check because - # there is sometimes an mc command on Windows - # which is unrelated to the minio mc - pytest.skip('The mc command is not installed on Windows') - request.config.pyarrow.requires('s3') - tempdir = s3_server['tempdir'] - host, port, access_key, secret_key = s3_server['connection'] - address = '{}:{}'.format(host, port) - if not _configure_limited_user(tempdir, address, - access_key, secret_key, policy): - pytest.skip( - 'Could not locate mc command to configure limited user') - - return _limited_s3_user diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 8f64fa480d3..d2506b67feb 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -35,7 +35,7 @@ import pyarrow.feather import pyarrow.fs as fs from pyarrow.tests.util import (change_cwd, _filesystem_uri, - FSProtocolClass, ProxyHandler) + FSProtocolClass, ProxyHandler, limited_s3_user) try: import pandas as pd @@ -4354,7 +4354,7 @@ def test_write_dataset_s3(s3_example_simple): @pytest.mark.parquet @pytest.mark.s3 -def test_write_dataset_s3_put_only(s3_server, limited_s3_user): +def test_write_dataset_s3_put_only(s3_server): # [ARROW-15892] Testing the create_dir flag which will restrict # creating a new directory for writing a dataset. This is # required while writing a dataset in s3 where we have very @@ -4370,7 +4370,7 @@ def test_write_dataset_s3_put_only(s3_server, limited_s3_user): endpoint_override='{}:{}'.format(host, port), scheme='http' ) - limited_s3_user(_minio_put_only_policy) + limited_s3_user(s3_server, _minio_put_only_policy) table = pa.table([ pa.array(range(20)), pa.array(np.random.randn(20)), pa.array(np.repeat(['a', 'b'], 10))], diff --git a/python/pyarrow/tests/test_fs.py b/python/pyarrow/tests/test_fs.py index cf9dc8d24a2..681450af6f9 100644 --- a/python/pyarrow/tests/test_fs.py +++ b/python/pyarrow/tests/test_fs.py @@ -26,7 +26,7 @@ import pyarrow as pa from pyarrow.tests.test_io import assert_file_not_found -from pyarrow.tests.util import _filesystem_uri, ProxyHandler +from pyarrow.tests.util import _filesystem_uri, ProxyHandler, limited_s3_user from pyarrow.fs import (FileType, FileInfo, FileSelector, FileSystem, LocalFileSystem, SubTreeFileSystem, _MockFileSystem, @@ -429,9 +429,9 @@ def skip_fsspec_s3fs(fs): @pytest.mark.s3 -def test_s3fs_limited_permissions_create_bucket(s3_server, limited_s3_user): +def test_s3fs_limited_permissions_create_bucket(s3_server): from pyarrow.fs import S3FileSystem - limited_s3_user(_minio_limited_policy) + limited_s3_user(s3_server, _minio_limited_policy) host, port, _, _ = s3_server['connection'] fs = S3FileSystem( diff --git a/python/pyarrow/tests/util.py b/python/pyarrow/tests/util.py index efbe4baac27..2feaaa1a36c 100644 --- a/python/pyarrow/tests/util.py +++ b/python/pyarrow/tests/util.py @@ -435,3 +435,18 @@ def _configure_limited_user(tmpdir, address, access_key, secret_key, policy): except FileNotFoundError: # If mc is not found, skip these tests return False + + +def limited_s3_user(s3_server, policy): + if sys.platform == 'win32': + # Can't rely on FileNotFound check because + # there is sometimes an mc command on Windows + # which is unrelated to the minio mc + pytest.skip('The mc command is not installed on Windows') + tempdir = s3_server['tempdir'] + host, port, access_key, secret_key = s3_server['connection'] + address = '{}:{}'.format(host, port) + if not _configure_limited_user(tempdir, address, + access_key, secret_key, policy): + pytest.skip( + 'Could not locate mc command to configure limited user') From 587bb3e9715b05ffc0c721aed5c8c619c5669904 Mon Sep 17 00:00:00 2001 From: Sanjiban Sengupta Date: Tue, 19 Apr 2022 17:38:03 +0530 Subject: [PATCH 11/22] fix: avoid creating dir if already present in _configure_limited_user --- python/pyarrow/tests/util.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyarrow/tests/util.py b/python/pyarrow/tests/util.py index 2feaaa1a36c..d8f7b22888d 100644 --- a/python/pyarrow/tests/util.py +++ b/python/pyarrow/tests/util.py @@ -414,7 +414,8 @@ def _configure_limited_user(tmpdir, address, access_key, secret_key, policy): # minio version is too old for the capabilities we need return False mcdir = os.path.join(tmpdir, 'mc') - os.mkdir(mcdir) + if not os.path.exists(mcdir): + os.mkdir(mcdir) policy_path = os.path.join(tmpdir, 'limited-buckets-policy.json') with open(policy_path, mode='w') as policy_file: policy_file.write(policy) From fc39613d62499847f8447b24e8cc614c680185ae Mon Sep 17 00:00:00 2001 From: Sanjiban Sengupta Date: Wed, 20 Apr 2022 00:55:10 +0530 Subject: [PATCH 12/22] docs: changed docstring for create_dir in FileSystemDatasetWriteOptions --- cpp/src/arrow/dataset/file_base.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index 80f5dc2eca7..db363526c19 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -387,7 +387,8 @@ struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions { /// Controls what happens if an output directory already exists. ExistingDataBehavior existing_data_behavior = ExistingDataBehavior::kError; - /// Flag to restrict or allow creating directory for writing the dataset. + /// Whether to attempt creating the dataset directory. + /// This can be set to false to work around limited permissions on some filesystems. bool create_dir = true; /// Callback to be invoked against all FileWriters before From 16526067d0c576d632683aeaf234611f6361ce47 Mon Sep 17 00:00:00 2001 From: Sanjiban Sengupta Date: Wed, 20 Apr 2022 00:59:56 +0530 Subject: [PATCH 13/22] refactor: merged limited_s3_user into _configure_limited_user and renamed as _configure_s3_limited_user --- python/pyarrow/tests/test_dataset.py | 4 ++-- python/pyarrow/tests/test_fs.py | 4 ++-- python/pyarrow/tests/util.py | 34 ++++++++++++---------------- 3 files changed, 19 insertions(+), 23 deletions(-) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index d2506b67feb..5cb8b7902b5 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -35,7 +35,7 @@ import pyarrow.feather import pyarrow.fs as fs from pyarrow.tests.util import (change_cwd, _filesystem_uri, - FSProtocolClass, ProxyHandler, limited_s3_user) + FSProtocolClass, ProxyHandler, _configure_s3_limited_user) try: import pandas as pd @@ -4370,7 +4370,7 @@ def test_write_dataset_s3_put_only(s3_server): endpoint_override='{}:{}'.format(host, port), scheme='http' ) - limited_s3_user(s3_server, _minio_put_only_policy) + _configure_s3_limited_user(s3_server, _minio_put_only_policy) table = pa.table([ pa.array(range(20)), pa.array(np.random.randn(20)), pa.array(np.repeat(['a', 'b'], 10))], diff --git a/python/pyarrow/tests/test_fs.py b/python/pyarrow/tests/test_fs.py index 681450af6f9..0e377c36620 100644 --- a/python/pyarrow/tests/test_fs.py +++ b/python/pyarrow/tests/test_fs.py @@ -26,7 +26,7 @@ import pyarrow as pa from pyarrow.tests.test_io import assert_file_not_found -from pyarrow.tests.util import _filesystem_uri, ProxyHandler, limited_s3_user +from pyarrow.tests.util import _filesystem_uri, ProxyHandler, _configure_s3_limited_user from pyarrow.fs import (FileType, FileInfo, FileSelector, FileSystem, LocalFileSystem, SubTreeFileSystem, _MockFileSystem, @@ -431,7 +431,7 @@ def skip_fsspec_s3fs(fs): @pytest.mark.s3 def test_s3fs_limited_permissions_create_bucket(s3_server): from pyarrow.fs import S3FileSystem - limited_s3_user(s3_server, _minio_limited_policy) + _configure_s3_limited_user(s3_server, _minio_limited_policy) host, port, _, _ = s3_server['connection'] fs = S3FileSystem( diff --git a/python/pyarrow/tests/util.py b/python/pyarrow/tests/util.py index d8f7b22888d..41b0096a58e 100644 --- a/python/pyarrow/tests/util.py +++ b/python/pyarrow/tests/util.py @@ -396,7 +396,7 @@ def _run_mc_command(mcdir, *args): raise ChildProcessError("Could not run mc") -def _configure_limited_user(tmpdir, address, access_key, secret_key, policy): +def _configure_s3_limited_user(s3_server, policy): """ Attempts to use the mc command to configure the minio server with a special user limited:limited123 which does not have @@ -406,6 +406,16 @@ def _configure_limited_user(tmpdir, address, access_key, secret_key, policy): Arrow S3 operations should still work in such a configuration (e.g. see ARROW-13685) """ + + if sys.platform == 'win32': + # Can't rely on FileNotFound check because + # there is sometimes an mc command on Windows + # which is unrelated to the minio mc + pytest.skip('The mc command is not installed on Windows') + tempdir = s3_server['tempdir'] + host, port, access_key, secret_key = s3_server['connection'] + address = '{}:{}'.format(host, port) + try: if not _ensure_minio_component_version('mc', 2021): # mc version is too old for the capabilities we need @@ -413,10 +423,10 @@ def _configure_limited_user(tmpdir, address, access_key, secret_key, policy): if not _ensure_minio_component_version('minio', 2021): # minio version is too old for the capabilities we need return False - mcdir = os.path.join(tmpdir, 'mc') + mcdir = os.path.join(tempdir, 'mc') if not os.path.exists(mcdir): os.mkdir(mcdir) - policy_path = os.path.join(tmpdir, 'limited-buckets-policy.json') + policy_path = os.path.join(tempdir, 'limited-buckets-policy.json') with open(policy_path, mode='w') as policy_file: policy_file.write(policy) # The s3_server fixture starts the minio process but @@ -432,22 +442,8 @@ def _configure_limited_user(tmpdir, address, access_key, secret_key, policy): _run_mc_command(mcdir, 'admin', 'policy', 'set', 'myminio', 'no-create-buckets', 'user=limited') _run_mc_command(mcdir, 'mb', 'myminio/existing-bucket') - return True - except FileNotFoundError: - # If mc is not found, skip these tests - return False - -def limited_s3_user(s3_server, policy): - if sys.platform == 'win32': - # Can't rely on FileNotFound check because - # there is sometimes an mc command on Windows - # which is unrelated to the minio mc - pytest.skip('The mc command is not installed on Windows') - tempdir = s3_server['tempdir'] - host, port, access_key, secret_key = s3_server['connection'] - address = '{}:{}'.format(host, port) - if not _configure_limited_user(tempdir, address, - access_key, secret_key, policy): + except FileNotFoundError: pytest.skip( 'Could not locate mc command to configure limited user') + From 58a0e511e0de69c819674754d0a3ca840ac4fe0a Mon Sep 17 00:00:00 2001 From: Sanjiban Sengupta Date: Wed, 20 Apr 2022 01:02:13 +0530 Subject: [PATCH 14/22] refactor: stdlib imports in alphabetical order in util.py --- python/pyarrow/tests/util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyarrow/tests/util.py b/python/pyarrow/tests/util.py index 41b0096a58e..9749819bdf7 100644 --- a/python/pyarrow/tests/util.py +++ b/python/pyarrow/tests/util.py @@ -25,12 +25,12 @@ import numpy as np import os import random +import re import signal import socket import string import subprocess import sys -import re import time import pytest From ab9a79299f3873718feadf1fa0e8c2650266b895 Mon Sep 17 00:00:00 2001 From: Sanjiban Sengupta Date: Thu, 21 Apr 2022 05:02:58 +0530 Subject: [PATCH 15/22] docs: change docstring of create_dir in dataset.py and file_base.h --- cpp/src/arrow/dataset/file_base.h | 4 ++-- python/pyarrow/dataset.py | 3 ++- python/pyarrow/tests/test_dataset.py | 3 ++- python/pyarrow/tests/test_fs.py | 3 ++- python/pyarrow/tests/util.py | 3 +-- 5 files changed, 9 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index db363526c19..debb26fd4d9 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -387,8 +387,8 @@ struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions { /// Controls what happens if an output directory already exists. ExistingDataBehavior existing_data_behavior = ExistingDataBehavior::kError; - /// Whether to attempt creating the dataset directory. - /// This can be set to false to work around limited permissions on some filesystems. + /// \brief If false the dataset writer will not create directories + /// This is mainly intended for filesystems that do not require directories such as S3. bool create_dir = true; /// Callback to be invoked against all FileWriters before diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 63ff6c02c2d..b22a3d032f0 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -853,7 +853,8 @@ def file_visitor(written_file): the entire directory will be deleted. This allows you to overwrite old partitions completely. create_dir : bool, default True - Flag to restrict or allow creating directory while writing a dataset. + If False, directories will not be created. This can be useful for + filesystems that do not require directories. """ from pyarrow.fs import _resolve_filesystem_and_path diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 5cb8b7902b5..7bf2ccdf8e5 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -35,7 +35,8 @@ import pyarrow.feather import pyarrow.fs as fs from pyarrow.tests.util import (change_cwd, _filesystem_uri, - FSProtocolClass, ProxyHandler, _configure_s3_limited_user) + FSProtocolClass, ProxyHandler, + _configure_s3_limited_user) try: import pandas as pd diff --git a/python/pyarrow/tests/test_fs.py b/python/pyarrow/tests/test_fs.py index 0e377c36620..4fd72704a71 100644 --- a/python/pyarrow/tests/test_fs.py +++ b/python/pyarrow/tests/test_fs.py @@ -26,7 +26,8 @@ import pyarrow as pa from pyarrow.tests.test_io import assert_file_not_found -from pyarrow.tests.util import _filesystem_uri, ProxyHandler, _configure_s3_limited_user +from pyarrow.tests.util import (_filesystem_uri, ProxyHandler, + _configure_s3_limited_user) from pyarrow.fs import (FileType, FileInfo, FileSelector, FileSystem, LocalFileSystem, SubTreeFileSystem, _MockFileSystem, diff --git a/python/pyarrow/tests/util.py b/python/pyarrow/tests/util.py index 9749819bdf7..a61d70ddcb8 100644 --- a/python/pyarrow/tests/util.py +++ b/python/pyarrow/tests/util.py @@ -406,7 +406,7 @@ def _configure_s3_limited_user(s3_server, policy): Arrow S3 operations should still work in such a configuration (e.g. see ARROW-13685) """ - + if sys.platform == 'win32': # Can't rely on FileNotFound check because # there is sometimes an mc command on Windows @@ -446,4 +446,3 @@ def _configure_s3_limited_user(s3_server, policy): except FileNotFoundError: pytest.skip( 'Could not locate mc command to configure limited user') - From bd28ec1df79f996c2fe984ae4cf1d1d1d93b9bbc Mon Sep 17 00:00:00 2001 From: Sanjiban Sengupta Date: Fri, 22 Apr 2022 20:33:57 +0530 Subject: [PATCH 16/22] test: hive partitioning in test_write_dataset_s3_put_only --- python/pyarrow/tests/test_dataset.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 7bf2ccdf8e5..f6ab30886fe 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -4377,21 +4377,25 @@ def test_write_dataset_s3_put_only(s3_server): pa.array(np.repeat(['a', 'b'], 10))], names=["f1", "f2", "part"] ) + part = ds.partitioning(pa.schema([("part", pa.string())]), flavor="hive") + # writing with filesystem object with create_dir flag set to false ds.write_dataset( table, "existing-bucket", filesystem=fs, - format="feather", create_dir=False + format="feather", create_dir=False, partitioning=part, + existing_data_behavior='overwrite_or_ignore' ) # check roundtrip result = ds.dataset( - "existing-bucket", filesystem=fs, format="ipc", + "existing-bucket", filesystem=fs, format="ipc", partitioning="hive" ).to_table() assert result.equals(table) with pytest.raises(OSError, match="Access Denied"): ds.write_dataset( table, "existing-bucket", filesystem=fs, - format="feather", create_dir=True + format="feather", create_dir=True, + existing_data_behavior='overwrite_or_ignore' ) From 83aecbdc79f592b574b5de73e8fa772848481e4c Mon Sep 17 00:00:00 2001 From: Sanjiban Sengupta Date: Fri, 22 Apr 2022 20:40:32 +0530 Subject: [PATCH 17/22] fix: remove dir if already exist in _configure_s3_limited_user --- python/pyarrow/tests/util.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/tests/util.py b/python/pyarrow/tests/util.py index a61d70ddcb8..ef796557808 100644 --- a/python/pyarrow/tests/util.py +++ b/python/pyarrow/tests/util.py @@ -26,6 +26,7 @@ import os import random import re +import shutil import signal import socket import string @@ -424,8 +425,9 @@ def _configure_s3_limited_user(s3_server, policy): # minio version is too old for the capabilities we need return False mcdir = os.path.join(tempdir, 'mc') - if not os.path.exists(mcdir): - os.mkdir(mcdir) + if os.path.exists(mcdir): + shutil.rmtree(mcdir) + os.mkdir(mcdir) policy_path = os.path.join(tempdir, 'limited-buckets-policy.json') with open(policy_path, mode='w') as policy_file: policy_file.write(policy) From e181b9d8e75212511de51cfde08e10169d3f6957 Mon Sep 17 00:00:00 2001 From: Sanjiban Sengupta Date: Mon, 25 Apr 2022 14:54:10 +0530 Subject: [PATCH 18/22] fix: added flag --ignore-existing in mc mb command --- python/pyarrow/tests/util.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyarrow/tests/util.py b/python/pyarrow/tests/util.py index ef796557808..d5c66c59905 100644 --- a/python/pyarrow/tests/util.py +++ b/python/pyarrow/tests/util.py @@ -443,7 +443,8 @@ def _configure_s3_limited_user(s3_server, policy): 'myminio/', 'limited', 'limited123') _run_mc_command(mcdir, 'admin', 'policy', 'set', 'myminio', 'no-create-buckets', 'user=limited') - _run_mc_command(mcdir, 'mb', 'myminio/existing-bucket') + _run_mc_command(mcdir, 'mb', 'myminio/existing-bucket', + '--ignore-existing') except FileNotFoundError: pytest.skip( From 9ae65f34a7245115a7fe4f4fc17de30262f2b6e5 Mon Sep 17 00:00:00 2001 From: Sanjiban Sengupta Date: Mon, 25 Apr 2022 14:58:29 +0530 Subject: [PATCH 19/22] review: removed _ensure_minio_component_version review: refactored to start try block from _wait_for_minio_startup --- python/pyarrow/tests/util.py | 38 ++++++++---------------------------- 1 file changed, 8 insertions(+), 30 deletions(-) diff --git a/python/pyarrow/tests/util.py b/python/pyarrow/tests/util.py index d5c66c59905..306cb04dd1b 100644 --- a/python/pyarrow/tests/util.py +++ b/python/pyarrow/tests/util.py @@ -25,7 +25,6 @@ import numpy as np import os import random -import re import shutil import signal import socket @@ -355,22 +354,6 @@ def signal_wakeup_fd(*, warn_on_full_buffer=False): w.close() -def _ensure_minio_component_version(component, minimum_year): - full_args = [component, '--version'] - proc = subprocess.Popen(full_args, stdout=subprocess.PIPE, - stderr=subprocess.PIPE, encoding='utf-8') - if proc.wait(10) != 0: - return False - stdout = proc.stdout.read() - pattern = component + r' version RELEASE\.(\d+)-.*' - version_match = re.search(pattern, stdout) - if version_match: - version_year = version_match.group(1) - return int(version_year) >= minimum_year - else: - return False - - def _wait_for_minio_startup(mcdir, address, access_key, secret_key): start = time.time() while time.time() - start < 10: @@ -417,20 +400,15 @@ def _configure_s3_limited_user(s3_server, policy): host, port, access_key, secret_key = s3_server['connection'] address = '{}:{}'.format(host, port) + mcdir = os.path.join(tempdir, 'mc') + if os.path.exists(mcdir): + shutil.rmtree(mcdir) + os.mkdir(mcdir) + policy_path = os.path.join(tempdir, 'limited-buckets-policy.json') + + with open(policy_path, mode='w') as policy_file: + policy_file.write(policy) try: - if not _ensure_minio_component_version('mc', 2021): - # mc version is too old for the capabilities we need - return False - if not _ensure_minio_component_version('minio', 2021): - # minio version is too old for the capabilities we need - return False - mcdir = os.path.join(tempdir, 'mc') - if os.path.exists(mcdir): - shutil.rmtree(mcdir) - os.mkdir(mcdir) - policy_path = os.path.join(tempdir, 'limited-buckets-policy.json') - with open(policy_path, mode='w') as policy_file: - policy_file.write(policy) # The s3_server fixture starts the minio process but # it takes a few moments for the process to become available _wait_for_minio_startup(mcdir, address, access_key, secret_key) From b1ece5f09b7f908929775223b87ad430ec75cb47 Mon Sep 17 00:00:00 2001 From: Sanjiban Sengupta Date: Mon, 25 Apr 2022 16:13:27 +0530 Subject: [PATCH 20/22] review: keep _ensure_minio_component_version and skip test if this fails --- python/pyarrow/tests/util.py | 40 ++++++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/python/pyarrow/tests/util.py b/python/pyarrow/tests/util.py index 306cb04dd1b..d91146ca393 100644 --- a/python/pyarrow/tests/util.py +++ b/python/pyarrow/tests/util.py @@ -25,6 +25,7 @@ import numpy as np import os import random +import re import shutil import signal import socket @@ -354,6 +355,22 @@ def signal_wakeup_fd(*, warn_on_full_buffer=False): w.close() +def _ensure_minio_component_version(component, minimum_year): + full_args = [component, '--version'] + proc = subprocess.Popen(full_args, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, encoding='utf-8') + if proc.wait(10) != 0: + return False + stdout = proc.stdout.read() + pattern = component + r' version RELEASE\.(\d+)-.*' + version_match = re.search(pattern, stdout) + if version_match: + version_year = version_match.group(1) + return int(version_year) >= minimum_year + else: + return False + + def _wait_for_minio_startup(mcdir, address, access_key, secret_key): start = time.time() while time.time() - start < 10: @@ -400,15 +417,16 @@ def _configure_s3_limited_user(s3_server, policy): host, port, access_key, secret_key = s3_server['connection'] address = '{}:{}'.format(host, port) - mcdir = os.path.join(tempdir, 'mc') - if os.path.exists(mcdir): - shutil.rmtree(mcdir) - os.mkdir(mcdir) - policy_path = os.path.join(tempdir, 'limited-buckets-policy.json') - - with open(policy_path, mode='w') as policy_file: - policy_file.write(policy) - try: + # ensuring version of mc and minio for the capabilities we need + if (_ensure_minio_component_version('mc', 2021) and + _ensure_minio_component_version('minio', 2021)): + mcdir = os.path.join(tempdir, 'mc') + if os.path.exists(mcdir): + shutil.rmtree(mcdir) + os.mkdir(mcdir) + policy_path = os.path.join(tempdir, 'limited-buckets-policy.json') + with open(policy_path, mode='w') as policy_file: + policy_file.write(policy) # The s3_server fixture starts the minio process but # it takes a few moments for the process to become available _wait_for_minio_startup(mcdir, address, access_key, secret_key) @@ -423,7 +441,3 @@ def _configure_s3_limited_user(s3_server, policy): 'myminio', 'no-create-buckets', 'user=limited') _run_mc_command(mcdir, 'mb', 'myminio/existing-bucket', '--ignore-existing') - - except FileNotFoundError: - pytest.skip( - 'Could not locate mc command to configure limited user') From 247bec27fb225f6c6b379d9b3ad7150b56bc28bd Mon Sep 17 00:00:00 2001 From: Sanjiban Sengupta Date: Mon, 25 Apr 2022 17:56:53 +0530 Subject: [PATCH 21/22] test: skip limited user s3 test if _ensure_minio_component_version() fails --- python/pyarrow/tests/test_dataset.py | 4 ++- python/pyarrow/tests/test_fs.py | 3 +- python/pyarrow/tests/util.py | 51 +++++++++++++++------------- 3 files changed, 32 insertions(+), 26 deletions(-) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index f6ab30886fe..65483779155 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -4371,7 +4371,9 @@ def test_write_dataset_s3_put_only(s3_server): endpoint_override='{}:{}'.format(host, port), scheme='http' ) - _configure_s3_limited_user(s3_server, _minio_put_only_policy) + if not _configure_s3_limited_user(s3_server, _minio_put_only_policy): + pytest.skip("Configuring limited s3 user failed") + table = pa.table([ pa.array(range(20)), pa.array(np.random.randn(20)), pa.array(np.repeat(['a', 'b'], 10))], diff --git a/python/pyarrow/tests/test_fs.py b/python/pyarrow/tests/test_fs.py index 4fd72704a71..755c587c0f3 100644 --- a/python/pyarrow/tests/test_fs.py +++ b/python/pyarrow/tests/test_fs.py @@ -432,7 +432,8 @@ def skip_fsspec_s3fs(fs): @pytest.mark.s3 def test_s3fs_limited_permissions_create_bucket(s3_server): from pyarrow.fs import S3FileSystem - _configure_s3_limited_user(s3_server, _minio_limited_policy) + if not _configure_s3_limited_user(s3_server, _minio_limited_policy): + pytest.skip("Configuring limited s3 user failed") host, port, _, _ = s3_server['connection'] fs = S3FileSystem( diff --git a/python/pyarrow/tests/util.py b/python/pyarrow/tests/util.py index d91146ca393..109dd687ec1 100644 --- a/python/pyarrow/tests/util.py +++ b/python/pyarrow/tests/util.py @@ -408,6 +408,11 @@ def _configure_s3_limited_user(s3_server, policy): (e.g. see ARROW-13685) """ + # ensuring version of mc and minio for the capabilities we need + if not (_ensure_minio_component_version('mc', 2021) and + _ensure_minio_component_version('minio', 2021)): + return False + if sys.platform == 'win32': # Can't rely on FileNotFound check because # there is sometimes an mc command on Windows @@ -417,27 +422,25 @@ def _configure_s3_limited_user(s3_server, policy): host, port, access_key, secret_key = s3_server['connection'] address = '{}:{}'.format(host, port) - # ensuring version of mc and minio for the capabilities we need - if (_ensure_minio_component_version('mc', 2021) and - _ensure_minio_component_version('minio', 2021)): - mcdir = os.path.join(tempdir, 'mc') - if os.path.exists(mcdir): - shutil.rmtree(mcdir) - os.mkdir(mcdir) - policy_path = os.path.join(tempdir, 'limited-buckets-policy.json') - with open(policy_path, mode='w') as policy_file: - policy_file.write(policy) - # The s3_server fixture starts the minio process but - # it takes a few moments for the process to become available - _wait_for_minio_startup(mcdir, address, access_key, secret_key) - # These commands create a limited user with a specific - # policy and creates a sample bucket for that user to - # write to - _run_mc_command(mcdir, 'admin', 'policy', 'add', - 'myminio/', 'no-create-buckets', policy_path) - _run_mc_command(mcdir, 'admin', 'user', 'add', - 'myminio/', 'limited', 'limited123') - _run_mc_command(mcdir, 'admin', 'policy', 'set', - 'myminio', 'no-create-buckets', 'user=limited') - _run_mc_command(mcdir, 'mb', 'myminio/existing-bucket', - '--ignore-existing') + mcdir = os.path.join(tempdir, 'mc') + if os.path.exists(mcdir): + shutil.rmtree(mcdir) + os.mkdir(mcdir) + policy_path = os.path.join(tempdir, 'limited-buckets-policy.json') + with open(policy_path, mode='w') as policy_file: + policy_file.write(policy) + # The s3_server fixture starts the minio process but + # it takes a few moments for the process to become available + _wait_for_minio_startup(mcdir, address, access_key, secret_key) + # These commands create a limited user with a specific + # policy and creates a sample bucket for that user to + # write to + _run_mc_command(mcdir, 'admin', 'policy', 'add', + 'myminio/', 'no-create-buckets', policy_path) + _run_mc_command(mcdir, 'admin', 'user', 'add', + 'myminio/', 'limited', 'limited123') + _run_mc_command(mcdir, 'admin', 'policy', 'set', + 'myminio', 'no-create-buckets', 'user=limited') + _run_mc_command(mcdir, 'mb', 'myminio/existing-bucket', + '--ignore-existing') + return True From 0d6e0fe2eeb581be08d44011bc95a4aeacf3a782 Mon Sep 17 00:00:00 2001 From: Sanjiban Sengupta Date: Mon, 25 Apr 2022 20:28:34 +0530 Subject: [PATCH 22/22] fix: FileNotFoundError Exception and skipping in windows --- python/pyarrow/tests/test_dataset.py | 3 +- python/pyarrow/tests/test_fs.py | 3 +- python/pyarrow/tests/util.py | 67 +++++++++++++++------------- 3 files changed, 37 insertions(+), 36 deletions(-) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 65483779155..2e705f7a38c 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -4371,8 +4371,7 @@ def test_write_dataset_s3_put_only(s3_server): endpoint_override='{}:{}'.format(host, port), scheme='http' ) - if not _configure_s3_limited_user(s3_server, _minio_put_only_policy): - pytest.skip("Configuring limited s3 user failed") + _configure_s3_limited_user(s3_server, _minio_put_only_policy) table = pa.table([ pa.array(range(20)), pa.array(np.random.randn(20)), diff --git a/python/pyarrow/tests/test_fs.py b/python/pyarrow/tests/test_fs.py index 755c587c0f3..4fd72704a71 100644 --- a/python/pyarrow/tests/test_fs.py +++ b/python/pyarrow/tests/test_fs.py @@ -432,8 +432,7 @@ def skip_fsspec_s3fs(fs): @pytest.mark.s3 def test_s3fs_limited_permissions_create_bucket(s3_server): from pyarrow.fs import S3FileSystem - if not _configure_s3_limited_user(s3_server, _minio_limited_policy): - pytest.skip("Configuring limited s3 user failed") + _configure_s3_limited_user(s3_server, _minio_limited_policy) host, port, _, _ = s3_server['connection'] fs = S3FileSystem( diff --git a/python/pyarrow/tests/util.py b/python/pyarrow/tests/util.py index 109dd687ec1..ddeca128791 100644 --- a/python/pyarrow/tests/util.py +++ b/python/pyarrow/tests/util.py @@ -368,7 +368,7 @@ def _ensure_minio_component_version(component, minimum_year): version_year = version_match.group(1) return int(version_year) >= minimum_year else: - return False + raise FileNotFoundError("minio component older than the minimum year") def _wait_for_minio_startup(mcdir, address, access_key, secret_key): @@ -408,39 +408,42 @@ def _configure_s3_limited_user(s3_server, policy): (e.g. see ARROW-13685) """ - # ensuring version of mc and minio for the capabilities we need - if not (_ensure_minio_component_version('mc', 2021) and - _ensure_minio_component_version('minio', 2021)): - return False - if sys.platform == 'win32': # Can't rely on FileNotFound check because # there is sometimes an mc command on Windows # which is unrelated to the minio mc pytest.skip('The mc command is not installed on Windows') - tempdir = s3_server['tempdir'] - host, port, access_key, secret_key = s3_server['connection'] - address = '{}:{}'.format(host, port) - - mcdir = os.path.join(tempdir, 'mc') - if os.path.exists(mcdir): - shutil.rmtree(mcdir) - os.mkdir(mcdir) - policy_path = os.path.join(tempdir, 'limited-buckets-policy.json') - with open(policy_path, mode='w') as policy_file: - policy_file.write(policy) - # The s3_server fixture starts the minio process but - # it takes a few moments for the process to become available - _wait_for_minio_startup(mcdir, address, access_key, secret_key) - # These commands create a limited user with a specific - # policy and creates a sample bucket for that user to - # write to - _run_mc_command(mcdir, 'admin', 'policy', 'add', - 'myminio/', 'no-create-buckets', policy_path) - _run_mc_command(mcdir, 'admin', 'user', 'add', - 'myminio/', 'limited', 'limited123') - _run_mc_command(mcdir, 'admin', 'policy', 'set', - 'myminio', 'no-create-buckets', 'user=limited') - _run_mc_command(mcdir, 'mb', 'myminio/existing-bucket', - '--ignore-existing') - return True + + try: + # ensuring version of mc and minio for the capabilities we need + _ensure_minio_component_version('mc', 2021) + _ensure_minio_component_version('minio', 2021) + + tempdir = s3_server['tempdir'] + host, port, access_key, secret_key = s3_server['connection'] + address = '{}:{}'.format(host, port) + + mcdir = os.path.join(tempdir, 'mc') + if os.path.exists(mcdir): + shutil.rmtree(mcdir) + os.mkdir(mcdir) + policy_path = os.path.join(tempdir, 'limited-buckets-policy.json') + with open(policy_path, mode='w') as policy_file: + policy_file.write(policy) + # The s3_server fixture starts the minio process but + # it takes a few moments for the process to become available + _wait_for_minio_startup(mcdir, address, access_key, secret_key) + # These commands create a limited user with a specific + # policy and creates a sample bucket for that user to + # write to + _run_mc_command(mcdir, 'admin', 'policy', 'add', + 'myminio/', 'no-create-buckets', policy_path) + _run_mc_command(mcdir, 'admin', 'user', 'add', + 'myminio/', 'limited', 'limited123') + _run_mc_command(mcdir, 'admin', 'policy', 'set', + 'myminio', 'no-create-buckets', 'user=limited') + _run_mc_command(mcdir, 'mb', 'myminio/existing-bucket', + '--ignore-existing') + + except FileNotFoundError: + pytest.skip("Configuring limited s3 user failed")