Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
0c6002f
feat: flag in dataset writer for creating dir
sanjibansg Mar 22, 2022
0b8902b
test: testing put only limited s3 policy
sanjibansg Apr 11, 2022
3473c51
fix: PrepareDirectory for create_dir flag
sanjibansg Apr 11, 2022
472cac7
fix: lint issue for unused modules
sanjibansg Apr 14, 2022
2ff6233
feat: moved limited_s3_user to util
sanjibansg Apr 14, 2022
9f63cd5
feat: using c_bool instead of bool for create_dir
sanjibansg Apr 14, 2022
f6a01ae
test: test with expected failure for create_dir flag set to true
sanjibansg Apr 15, 2022
21dba25
docs: docstring explaining test for s3 with put only policy
sanjibansg Apr 15, 2022
a40aef0
fix: python lint
sanjibansg Apr 15, 2022
3c00819
fix: limited_s3_user used as a function and moved to util.py
sanjibansg Apr 19, 2022
587bb3e
fix: avoid creating dir if already present in _configure_limited_user
sanjibansg Apr 19, 2022
fc39613
docs: changed docstring for create_dir in FileSystemDatasetWriteOptions
sanjibansg Apr 19, 2022
1652606
refactor: merged limited_s3_user into _configure_limited_user and ren…
sanjibansg Apr 19, 2022
58a0e51
refactor: stdlib imports in alphabetical order in util.py
sanjibansg Apr 19, 2022
ab9a792
docs: change docstring of create_dir in dataset.py and file_base.h
sanjibansg Apr 20, 2022
bd28ec1
test: hive partitioning in test_write_dataset_s3_put_only
sanjibansg Apr 22, 2022
83aecbd
fix: remove dir if already exist in _configure_s3_limited_user
sanjibansg Apr 22, 2022
e181b9d
fix: added flag --ignore-existing in mc mb command
sanjibansg Apr 25, 2022
9ae65f3
review: removed _ensure_minio_component_version
sanjibansg Apr 25, 2022
b1ece5f
review: keep _ensure_minio_component_version and skip test if this fails
sanjibansg Apr 25, 2022
247bec2
test: skip limited user s3 test if _ensure_minio_component_version() …
sanjibansg Apr 25, 2022
0d6e0fe
fix: FileNotFoundError Exception and skipping in windows
sanjibansg Apr 25, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/dataset_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ class DatasetWriterDirectoryQueue : public util::AsyncDestroyable {
uint64_t rows_written() const { return rows_written_; }

void PrepareDirectory() {
if (directory_.empty()) {
if (directory_.empty() || !write_options_.create_dir) {
init_future_ = Future<>::MakeFinished();
} else {
if (write_options_.existing_data_behavior ==
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/dataset/file_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,10 @@ struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions {
/// Controls what happens if an output directory already exists.
ExistingDataBehavior existing_data_behavior = ExistingDataBehavior::kError;

/// \brief If false the dataset writer will not create directories
/// This is mainly intended for filesystems that do not require directories such as S3.
bool create_dir = true;

/// Callback to be invoked against all FileWriters before
/// they are finalized with FileWriter::Finish().
std::function<Status(FileWriter*)> writer_pre_finish = [](FileWriter*) {
Expand Down
3 changes: 3 additions & 0 deletions python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ from cython.operator cimport dereference as deref
import collections
import os
import warnings
from libcpp cimport bool

import pyarrow as pa
from pyarrow.lib cimport *
Expand Down Expand Up @@ -2683,6 +2684,7 @@ def _filesystemdataset_write(
int max_rows_per_file,
int min_rows_per_group,
int max_rows_per_group,
bool create_dir
):
"""
CFileSystemDataset.Write wrapper
Expand Down Expand Up @@ -2715,6 +2717,7 @@ def _filesystemdataset_write(
("existing_data_behavior must be one of 'error', ",
"'overwrite_or_ignore' or 'delete_matching'")
)
c_options.create_dir = create_dir

if file_visitor is not None:
visit_args = {'base_dir': c_options.base_dir,
Expand Down
7 changes: 5 additions & 2 deletions python/pyarrow/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ def write_dataset(data, base_dir, basename_template=None, format=None,
max_partitions=None, max_open_files=None,
max_rows_per_file=None, min_rows_per_group=None,
max_rows_per_group=None, file_visitor=None,
existing_data_behavior='error'):
existing_data_behavior='error', create_dir=True):
"""
Write a dataset to a given format and partitioning.

Expand Down Expand Up @@ -852,6 +852,9 @@ def file_visitor(written_file):
dataset. The first time each partition directory is encountered
the entire directory will be deleted. This allows you to overwrite
old partitions completely.
create_dir : bool, default True
If False, directories will not be created. This can be useful for
filesystems that do not require directories.
"""
from pyarrow.fs import _resolve_filesystem_and_path

Expand Down Expand Up @@ -928,5 +931,5 @@ def file_visitor(written_file):
scanner, base_dir, basename_template, filesystem, partitioning,
file_options, max_partitions, file_visitor, existing_data_behavior,
max_open_files, max_rows_per_file,
min_rows_per_group, max_rows_per_group
min_rows_per_group, max_rows_per_group, create_dir
)
2 changes: 2 additions & 0 deletions python/pyarrow/includes/libarrow_dataset.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
# distutils: language = c++

from libcpp.unordered_map cimport unordered_map
from libcpp cimport bool as c_bool

from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
Expand Down Expand Up @@ -223,6 +224,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
function[cb_writer_finish_internal] writer_pre_finish
function[cb_writer_finish_internal] writer_post_finish
ExistingDataBehavior existing_data_behavior
c_bool create_dir
uint32_t max_open_files
uint64_t max_rows_per_file
uint64_t min_rows_per_group
Expand Down
68 changes: 67 additions & 1 deletion python/pyarrow/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
import pyarrow.feather
import pyarrow.fs as fs
from pyarrow.tests.util import (change_cwd, _filesystem_uri,
FSProtocolClass, ProxyHandler)
FSProtocolClass, ProxyHandler,
_configure_s3_limited_user)

try:
import pandas as pd
Expand Down Expand Up @@ -4334,6 +4335,71 @@ def test_write_dataset_s3(s3_example_simple):
assert result.equals(table)


_minio_put_only_policy = """{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:ListBucket",
"s3:GetObjectVersion"
],
"Resource": [
"arn:aws:s3:::*"
]
}
]
}"""


@pytest.mark.parquet
@pytest.mark.s3
def test_write_dataset_s3_put_only(s3_server):
# [ARROW-15892] Testing the create_dir flag which will restrict
# creating a new directory for writing a dataset. This is
# required while writing a dataset in s3 where we have very
# limited permissions and thus we can directly write the dataset
# without creating a directory.
from pyarrow.fs import S3FileSystem

# write dataset with s3 filesystem
host, port, _, _ = s3_server['connection']
fs = S3FileSystem(
access_key='limited',
secret_key='limited123',
endpoint_override='{}:{}'.format(host, port),
scheme='http'
)
_configure_s3_limited_user(s3_server, _minio_put_only_policy)

table = pa.table([
pa.array(range(20)), pa.array(np.random.randn(20)),
pa.array(np.repeat(['a', 'b'], 10))],
names=["f1", "f2", "part"]
)
part = ds.partitioning(pa.schema([("part", pa.string())]), flavor="hive")

# writing with filesystem object with create_dir flag set to false
ds.write_dataset(
table, "existing-bucket", filesystem=fs,
format="feather", create_dir=False, partitioning=part,
existing_data_behavior='overwrite_or_ignore'
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test doesn't do any partitioning?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can use Hive or Directory partitioning here, as we are keeping the create_dir as False. However, we can use Filename Partitioning. But, while doing that, I noticed there is probably some issues with Filename Partitioning, where the values of the field on which the partitioning is done is not retrieved correctly. All those values were returned as null. I am investigating the cause, if it's a simple fix, I can then maybe push that in this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, if we can't do anything else than filename partitioning then is it worth fixing this issue? @westonpace What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for the filename partioning bug, better to file a separate JIRA IMHO.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original ask was for a case that didn't have any partitioning. If an S3 user has a partitioning then they shouldn't run into the original issue because all CreateDir calls will be for bucket + path. So this flag is only to enable the very specific case where partitioning is not used.

That being said, I think hive & directory partitioning should still work with create_dir=False if you were using s3 or a similar filesystem that did not require directories to be created in advance. So I think it still has some general value if users wanted to avoid creating marker directories in their S3 repository.

We could also solve this problem by modifying s3fs.cc so that it didn't try and create the bucket if it already existed. This would add a BucketExists call to the path here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. So should we add a test to check that it works anyway?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have modified the test to use Hive partitioning with the existing_data_behavior set to 'overwrite_or_ignore'. But, not very sure, why the Python CI tests are failing with the ChildProcessError.

# check roundtrip
result = ds.dataset(
"existing-bucket", filesystem=fs, format="ipc", partitioning="hive"
).to_table()
assert result.equals(table)

with pytest.raises(OSError, match="Access Denied"):
ds.write_dataset(
table, "existing-bucket", filesystem=fs,
format="feather", create_dir=True,
existing_data_behavior='overwrite_or_ignore'
)


@pytest.mark.parquet
def test_dataset_null_to_dictionary_cast(tempdir, dataset_reader):
# ARROW-12420
Expand Down
109 changes: 4 additions & 105 deletions python/pyarrow/tests/test_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,14 @@
import os
import pathlib
import pickle
import re
import subprocess
import sys
import time

import pytest
import weakref

import pyarrow as pa
from pyarrow.tests.test_io import assert_file_not_found
from pyarrow.tests.util import _filesystem_uri, ProxyHandler
from pyarrow.tests.util import (_filesystem_uri, ProxyHandler,
_configure_s3_limited_user)

from pyarrow.fs import (FileType, FileInfo, FileSelector, FileSystem,
LocalFileSystem, SubTreeFileSystem, _MockFileSystem,
Expand Down Expand Up @@ -261,104 +258,6 @@ def subtree_s3fs(request, s3fs):
}"""


def _run_mc_command(mcdir, *args):
full_args = ['mc', '-C', mcdir] + list(args)
proc = subprocess.Popen(full_args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, encoding='utf-8')
retval = proc.wait(10)
cmd_str = ' '.join(full_args)
print(f'Cmd: {cmd_str}')
print(f' Return: {retval}')
print(f' Stdout: {proc.stdout.read()}')
print(f' Stderr: {proc.stderr.read()}')
if retval != 0:
raise ChildProcessError("Could not run mc")


def _wait_for_minio_startup(mcdir, address, access_key, secret_key):
start = time.time()
while time.time() - start < 10:
try:
_run_mc_command(mcdir, 'alias', 'set', 'myminio',
f'http://{address}', access_key, secret_key)
return
except ChildProcessError:
time.sleep(1)
raise Exception("mc command could not connect to local minio")


def _ensure_minio_component_version(component, minimum_year):
full_args = [component, '--version']
proc = subprocess.Popen(full_args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, encoding='utf-8')
if proc.wait(10) != 0:
return False
stdout = proc.stdout.read()
pattern = component + r' version RELEASE\.(\d+)-.*'
version_match = re.search(pattern, stdout)
if version_match:
version_year = version_match.group(1)
return int(version_year) >= minimum_year
else:
return False


def _configure_limited_user(tmpdir, address, access_key, secret_key):
"""
Attempts to use the mc command to configure the minio server
with a special user limited:limited123 which does not have
permission to create buckets. This mirrors some real life S3
configurations where users are given strict permissions.

Arrow S3 operations should still work in such a configuration
(e.g. see ARROW-13685)
"""
try:
if not _ensure_minio_component_version('mc', 2021):
# mc version is too old for the capabilities we need
return False
if not _ensure_minio_component_version('minio', 2021):
# minio version is too old for the capabilities we need
return False
mcdir = os.path.join(tmpdir, 'mc')
os.mkdir(mcdir)
policy_path = os.path.join(tmpdir, 'limited-buckets-policy.json')
with open(policy_path, mode='w') as policy_file:
policy_file.write(_minio_limited_policy)
# The s3_server fixture starts the minio process but
# it takes a few moments for the process to become available
_wait_for_minio_startup(mcdir, address, access_key, secret_key)
# These commands create a limited user with a specific
# policy and creates a sample bucket for that user to
# write to
_run_mc_command(mcdir, 'admin', 'policy', 'add',
'myminio/', 'no-create-buckets', policy_path)
_run_mc_command(mcdir, 'admin', 'user', 'add',
'myminio/', 'limited', 'limited123')
_run_mc_command(mcdir, 'admin', 'policy', 'set',
'myminio', 'no-create-buckets', 'user=limited')
_run_mc_command(mcdir, 'mb', 'myminio/existing-bucket')
return True
except FileNotFoundError:
# If mc is not found, skip these tests
return False


@pytest.fixture(scope='session')
def limited_s3_user(request, s3_server):
if sys.platform == 'win32':
# Can't rely on FileNotFound check because
# there is sometimes an mc command on Windows
# which is unrelated to the minio mc
pytest.skip('The mc command is not installed on Windows')
request.config.pyarrow.requires('s3')
tempdir = s3_server['tempdir']
host, port, access_key, secret_key = s3_server['connection']
address = '{}:{}'.format(host, port)
if not _configure_limited_user(tempdir, address, access_key, secret_key):
pytest.skip('Could not locate mc command to configure limited user')


@pytest.fixture
def hdfs(request, hdfs_connection):
request.config.pyarrow.requires('hdfs')
Expand Down Expand Up @@ -531,9 +430,9 @@ def skip_fsspec_s3fs(fs):


@pytest.mark.s3
def test_s3fs_limited_permissions_create_bucket(s3_server, limited_s3_user):
def test_s3fs_limited_permissions_create_bucket(s3_server):
from pyarrow.fs import S3FileSystem

_configure_s3_limited_user(s3_server, _minio_limited_policy)
host, port, _, _ = s3_server['connection']

fs = S3FileSystem(
Expand Down
Loading