diff --git a/cpp/src/arrow/dataset/dataset_writer.cc b/cpp/src/arrow/dataset/dataset_writer.cc index 3af44b36a9b..4fc0d814f36 100644 --- a/cpp/src/arrow/dataset/dataset_writer.cc +++ b/cpp/src/arrow/dataset/dataset_writer.cc @@ -328,7 +328,7 @@ class DatasetWriterDirectoryQueue : public util::AsyncDestroyable { uint64_t rows_written() const { return rows_written_; } void PrepareDirectory() { - if (directory_.empty()) { + if (directory_.empty() || !write_options_.create_dir) { init_future_ = Future<>::MakeFinished(); } else { if (write_options_.existing_data_behavior == diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index dc0ce5cf192..debb26fd4d9 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -387,6 +387,10 @@ struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions { /// Controls what happens if an output directory already exists. ExistingDataBehavior existing_data_behavior = ExistingDataBehavior::kError; + /// \brief If false the dataset writer will not create directories + /// This is mainly intended for filesystems that do not require directories such as S3. + bool create_dir = true; + /// Callback to be invoked against all FileWriters before /// they are finalized with FileWriter::Finish(). std::function writer_pre_finish = [](FileWriter*) { diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 3a4d3ad4c37..efa736939ec 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -24,6 +24,7 @@ from cython.operator cimport dereference as deref import collections import os import warnings +from libcpp cimport bool import pyarrow as pa from pyarrow.lib cimport * @@ -2683,6 +2684,7 @@ def _filesystemdataset_write( int max_rows_per_file, int min_rows_per_group, int max_rows_per_group, + bool create_dir ): """ CFileSystemDataset.Write wrapper @@ -2715,6 +2717,7 @@ def _filesystemdataset_write( ("existing_data_behavior must be one of 'error', ", "'overwrite_or_ignore' or 'delete_matching'") ) + c_options.create_dir = create_dir if file_visitor is not None: visit_args = {'base_dir': c_options.base_dir, diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 955374693f1..b22a3d032f0 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -753,7 +753,7 @@ def write_dataset(data, base_dir, basename_template=None, format=None, max_partitions=None, max_open_files=None, max_rows_per_file=None, min_rows_per_group=None, max_rows_per_group=None, file_visitor=None, - existing_data_behavior='error'): + existing_data_behavior='error', create_dir=True): """ Write a dataset to a given format and partitioning. @@ -852,6 +852,9 @@ def file_visitor(written_file): dataset. The first time each partition directory is encountered the entire directory will be deleted. This allows you to overwrite old partitions completely. + create_dir : bool, default True + If False, directories will not be created. This can be useful for + filesystems that do not require directories. """ from pyarrow.fs import _resolve_filesystem_and_path @@ -928,5 +931,5 @@ def file_visitor(written_file): scanner, base_dir, basename_template, filesystem, partitioning, file_options, max_partitions, file_visitor, existing_data_behavior, max_open_files, max_rows_per_file, - min_rows_per_group, max_rows_per_group + min_rows_per_group, max_rows_per_group, create_dir ) diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index cd9ca2a4086..64e59e05613 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -18,6 +18,7 @@ # distutils: language = c++ from libcpp.unordered_map cimport unordered_map +from libcpp cimport bool as c_bool from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport * @@ -223,6 +224,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: function[cb_writer_finish_internal] writer_pre_finish function[cb_writer_finish_internal] writer_post_finish ExistingDataBehavior existing_data_behavior + c_bool create_dir uint32_t max_open_files uint64_t max_rows_per_file uint64_t min_rows_per_group diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 878478ad6c1..2e705f7a38c 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -35,7 +35,8 @@ import pyarrow.feather import pyarrow.fs as fs from pyarrow.tests.util import (change_cwd, _filesystem_uri, - FSProtocolClass, ProxyHandler) + FSProtocolClass, ProxyHandler, + _configure_s3_limited_user) try: import pandas as pd @@ -4334,6 +4335,71 @@ def test_write_dataset_s3(s3_example_simple): assert result.equals(table) +_minio_put_only_policy = """{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "s3:PutObject", + "s3:ListBucket", + "s3:GetObjectVersion" + ], + "Resource": [ + "arn:aws:s3:::*" + ] + } + ] +}""" + + +@pytest.mark.parquet +@pytest.mark.s3 +def test_write_dataset_s3_put_only(s3_server): + # [ARROW-15892] Testing the create_dir flag which will restrict + # creating a new directory for writing a dataset. This is + # required while writing a dataset in s3 where we have very + # limited permissions and thus we can directly write the dataset + # without creating a directory. + from pyarrow.fs import S3FileSystem + + # write dataset with s3 filesystem + host, port, _, _ = s3_server['connection'] + fs = S3FileSystem( + access_key='limited', + secret_key='limited123', + endpoint_override='{}:{}'.format(host, port), + scheme='http' + ) + _configure_s3_limited_user(s3_server, _minio_put_only_policy) + + table = pa.table([ + pa.array(range(20)), pa.array(np.random.randn(20)), + pa.array(np.repeat(['a', 'b'], 10))], + names=["f1", "f2", "part"] + ) + part = ds.partitioning(pa.schema([("part", pa.string())]), flavor="hive") + + # writing with filesystem object with create_dir flag set to false + ds.write_dataset( + table, "existing-bucket", filesystem=fs, + format="feather", create_dir=False, partitioning=part, + existing_data_behavior='overwrite_or_ignore' + ) + # check roundtrip + result = ds.dataset( + "existing-bucket", filesystem=fs, format="ipc", partitioning="hive" + ).to_table() + assert result.equals(table) + + with pytest.raises(OSError, match="Access Denied"): + ds.write_dataset( + table, "existing-bucket", filesystem=fs, + format="feather", create_dir=True, + existing_data_behavior='overwrite_or_ignore' + ) + + @pytest.mark.parquet def test_dataset_null_to_dictionary_cast(tempdir, dataset_reader): # ARROW-12420 diff --git a/python/pyarrow/tests/test_fs.py b/python/pyarrow/tests/test_fs.py index 33c91933bf5..4fd72704a71 100644 --- a/python/pyarrow/tests/test_fs.py +++ b/python/pyarrow/tests/test_fs.py @@ -20,17 +20,14 @@ import os import pathlib import pickle -import re -import subprocess -import sys -import time import pytest import weakref import pyarrow as pa from pyarrow.tests.test_io import assert_file_not_found -from pyarrow.tests.util import _filesystem_uri, ProxyHandler +from pyarrow.tests.util import (_filesystem_uri, ProxyHandler, + _configure_s3_limited_user) from pyarrow.fs import (FileType, FileInfo, FileSelector, FileSystem, LocalFileSystem, SubTreeFileSystem, _MockFileSystem, @@ -261,104 +258,6 @@ def subtree_s3fs(request, s3fs): }""" -def _run_mc_command(mcdir, *args): - full_args = ['mc', '-C', mcdir] + list(args) - proc = subprocess.Popen(full_args, stdout=subprocess.PIPE, - stderr=subprocess.PIPE, encoding='utf-8') - retval = proc.wait(10) - cmd_str = ' '.join(full_args) - print(f'Cmd: {cmd_str}') - print(f' Return: {retval}') - print(f' Stdout: {proc.stdout.read()}') - print(f' Stderr: {proc.stderr.read()}') - if retval != 0: - raise ChildProcessError("Could not run mc") - - -def _wait_for_minio_startup(mcdir, address, access_key, secret_key): - start = time.time() - while time.time() - start < 10: - try: - _run_mc_command(mcdir, 'alias', 'set', 'myminio', - f'http://{address}', access_key, secret_key) - return - except ChildProcessError: - time.sleep(1) - raise Exception("mc command could not connect to local minio") - - -def _ensure_minio_component_version(component, minimum_year): - full_args = [component, '--version'] - proc = subprocess.Popen(full_args, stdout=subprocess.PIPE, - stderr=subprocess.PIPE, encoding='utf-8') - if proc.wait(10) != 0: - return False - stdout = proc.stdout.read() - pattern = component + r' version RELEASE\.(\d+)-.*' - version_match = re.search(pattern, stdout) - if version_match: - version_year = version_match.group(1) - return int(version_year) >= minimum_year - else: - return False - - -def _configure_limited_user(tmpdir, address, access_key, secret_key): - """ - Attempts to use the mc command to configure the minio server - with a special user limited:limited123 which does not have - permission to create buckets. This mirrors some real life S3 - configurations where users are given strict permissions. - - Arrow S3 operations should still work in such a configuration - (e.g. see ARROW-13685) - """ - try: - if not _ensure_minio_component_version('mc', 2021): - # mc version is too old for the capabilities we need - return False - if not _ensure_minio_component_version('minio', 2021): - # minio version is too old for the capabilities we need - return False - mcdir = os.path.join(tmpdir, 'mc') - os.mkdir(mcdir) - policy_path = os.path.join(tmpdir, 'limited-buckets-policy.json') - with open(policy_path, mode='w') as policy_file: - policy_file.write(_minio_limited_policy) - # The s3_server fixture starts the minio process but - # it takes a few moments for the process to become available - _wait_for_minio_startup(mcdir, address, access_key, secret_key) - # These commands create a limited user with a specific - # policy and creates a sample bucket for that user to - # write to - _run_mc_command(mcdir, 'admin', 'policy', 'add', - 'myminio/', 'no-create-buckets', policy_path) - _run_mc_command(mcdir, 'admin', 'user', 'add', - 'myminio/', 'limited', 'limited123') - _run_mc_command(mcdir, 'admin', 'policy', 'set', - 'myminio', 'no-create-buckets', 'user=limited') - _run_mc_command(mcdir, 'mb', 'myminio/existing-bucket') - return True - except FileNotFoundError: - # If mc is not found, skip these tests - return False - - -@pytest.fixture(scope='session') -def limited_s3_user(request, s3_server): - if sys.platform == 'win32': - # Can't rely on FileNotFound check because - # there is sometimes an mc command on Windows - # which is unrelated to the minio mc - pytest.skip('The mc command is not installed on Windows') - request.config.pyarrow.requires('s3') - tempdir = s3_server['tempdir'] - host, port, access_key, secret_key = s3_server['connection'] - address = '{}:{}'.format(host, port) - if not _configure_limited_user(tempdir, address, access_key, secret_key): - pytest.skip('Could not locate mc command to configure limited user') - - @pytest.fixture def hdfs(request, hdfs_connection): request.config.pyarrow.requires('hdfs') @@ -531,9 +430,9 @@ def skip_fsspec_s3fs(fs): @pytest.mark.s3 -def test_s3fs_limited_permissions_create_bucket(s3_server, limited_s3_user): +def test_s3fs_limited_permissions_create_bucket(s3_server): from pyarrow.fs import S3FileSystem - + _configure_s3_limited_user(s3_server, _minio_limited_policy) host, port, _, _ = s3_server['connection'] fs = S3FileSystem( diff --git a/python/pyarrow/tests/util.py b/python/pyarrow/tests/util.py index 9d22e6e6a29..ddeca128791 100644 --- a/python/pyarrow/tests/util.py +++ b/python/pyarrow/tests/util.py @@ -25,11 +25,14 @@ import numpy as np import os import random +import re +import shutil import signal import socket import string import subprocess import sys +import time import pytest @@ -350,3 +353,97 @@ def signal_wakeup_fd(*, warn_on_full_buffer=False): signal.set_wakeup_fd(old_fd) r.close() w.close() + + +def _ensure_minio_component_version(component, minimum_year): + full_args = [component, '--version'] + proc = subprocess.Popen(full_args, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, encoding='utf-8') + if proc.wait(10) != 0: + return False + stdout = proc.stdout.read() + pattern = component + r' version RELEASE\.(\d+)-.*' + version_match = re.search(pattern, stdout) + if version_match: + version_year = version_match.group(1) + return int(version_year) >= minimum_year + else: + raise FileNotFoundError("minio component older than the minimum year") + + +def _wait_for_minio_startup(mcdir, address, access_key, secret_key): + start = time.time() + while time.time() - start < 10: + try: + _run_mc_command(mcdir, 'alias', 'set', 'myminio', + f'http://{address}', access_key, secret_key) + return + except ChildProcessError: + time.sleep(1) + raise Exception("mc command could not connect to local minio") + + +def _run_mc_command(mcdir, *args): + full_args = ['mc', '-C', mcdir] + list(args) + proc = subprocess.Popen(full_args, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, encoding='utf-8') + retval = proc.wait(10) + cmd_str = ' '.join(full_args) + print(f'Cmd: {cmd_str}') + print(f' Return: {retval}') + print(f' Stdout: {proc.stdout.read()}') + print(f' Stderr: {proc.stderr.read()}') + if retval != 0: + raise ChildProcessError("Could not run mc") + + +def _configure_s3_limited_user(s3_server, policy): + """ + Attempts to use the mc command to configure the minio server + with a special user limited:limited123 which does not have + permission to create buckets. This mirrors some real life S3 + configurations where users are given strict permissions. + + Arrow S3 operations should still work in such a configuration + (e.g. see ARROW-13685) + """ + + if sys.platform == 'win32': + # Can't rely on FileNotFound check because + # there is sometimes an mc command on Windows + # which is unrelated to the minio mc + pytest.skip('The mc command is not installed on Windows') + + try: + # ensuring version of mc and minio for the capabilities we need + _ensure_minio_component_version('mc', 2021) + _ensure_minio_component_version('minio', 2021) + + tempdir = s3_server['tempdir'] + host, port, access_key, secret_key = s3_server['connection'] + address = '{}:{}'.format(host, port) + + mcdir = os.path.join(tempdir, 'mc') + if os.path.exists(mcdir): + shutil.rmtree(mcdir) + os.mkdir(mcdir) + policy_path = os.path.join(tempdir, 'limited-buckets-policy.json') + with open(policy_path, mode='w') as policy_file: + policy_file.write(policy) + # The s3_server fixture starts the minio process but + # it takes a few moments for the process to become available + _wait_for_minio_startup(mcdir, address, access_key, secret_key) + # These commands create a limited user with a specific + # policy and creates a sample bucket for that user to + # write to + _run_mc_command(mcdir, 'admin', 'policy', 'add', + 'myminio/', 'no-create-buckets', policy_path) + _run_mc_command(mcdir, 'admin', 'user', 'add', + 'myminio/', 'limited', 'limited123') + _run_mc_command(mcdir, 'admin', 'policy', 'set', + 'myminio', 'no-create-buckets', 'user=limited') + _run_mc_command(mcdir, 'mb', 'myminio/existing-bucket', + '--ignore-existing') + + except FileNotFoundError: + pytest.skip("Configuring limited s3 user failed")