Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 76 additions & 6 deletions python/pyarrow/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ def datadir(datadir):
"use_legacy_dataset", [True, pytest.param(False, marks=pytest.mark.skip)])
parametrize_legacy_dataset_skip_buffer = pytest.mark.parametrize(
"use_legacy_dataset", [True, pytest.param(False, marks=pytest.mark.skip)])
parametrize_legacy_dataset_fixed = pytest.mark.parametrize(
"use_legacy_dataset", [pytest.param(True, marks=pytest.mark.xfail),
pytest.param(False, marks=pytest.mark.dataset)])


def deterministic_row_order(use_legacy_dataset, chunk_size=-1):
Expand Down Expand Up @@ -1715,7 +1718,7 @@ def test_read_partitioned_columns_selection(tempdir, use_legacy_dataset):

@pytest.mark.pandas
@parametrize_legacy_dataset
def test_equivalency(tempdir, use_legacy_dataset):
def test_filters_equivalency(tempdir, use_legacy_dataset):
fs = LocalFileSystem.get_instance()
base_path = tempdir

Expand Down Expand Up @@ -1803,7 +1806,7 @@ def test_equivalency(tempdir, use_legacy_dataset):

@pytest.mark.pandas
@parametrize_legacy_dataset
def test_cutoff_exclusive_integer(tempdir, use_legacy_dataset):
def test_filters_cutoff_exclusive_integer(tempdir, use_legacy_dataset):
fs = LocalFileSystem.get_instance()
base_path = tempdir

Expand Down Expand Up @@ -1845,7 +1848,7 @@ def test_cutoff_exclusive_integer(tempdir, use_legacy_dataset):
raises=(TypeError, AssertionError),
reason='Loss of type information in creation of categoricals.'
)
def test_cutoff_exclusive_datetime(tempdir, use_legacy_dataset):
def test_filters_cutoff_exclusive_datetime(tempdir, use_legacy_dataset):
fs = LocalFileSystem.get_instance()
base_path = tempdir

Expand Down Expand Up @@ -1890,7 +1893,7 @@ def test_cutoff_exclusive_datetime(tempdir, use_legacy_dataset):

@pytest.mark.pandas
@parametrize_legacy_dataset
def test_inclusive_integer(tempdir, use_legacy_dataset):
def test_filters_inclusive_integer(tempdir, use_legacy_dataset):
fs = LocalFileSystem.get_instance()
base_path = tempdir

Expand Down Expand Up @@ -1926,7 +1929,7 @@ def test_inclusive_integer(tempdir, use_legacy_dataset):

@pytest.mark.pandas
@parametrize_legacy_dataset
def test_inclusive_set(tempdir, use_legacy_dataset):
def test_filters_inclusive_set(tempdir, use_legacy_dataset):
fs = LocalFileSystem.get_instance()
base_path = tempdir

Expand Down Expand Up @@ -1964,7 +1967,7 @@ def test_inclusive_set(tempdir, use_legacy_dataset):

@pytest.mark.pandas
@parametrize_legacy_dataset
def test_invalid_pred_op(tempdir, use_legacy_dataset):
def test_filters_invalid_pred_op(tempdir, use_legacy_dataset):
fs = LocalFileSystem.get_instance()
base_path = tempdir

Expand Down Expand Up @@ -2009,6 +2012,32 @@ def test_invalid_pred_op(tempdir, use_legacy_dataset):
use_legacy_dataset=use_legacy_dataset)


@pytest.mark.pandas
@parametrize_legacy_dataset_fixed
def test_filters_invalid_column(tempdir, use_legacy_dataset):
# ARROW-5572 - raise error on invalid name in filter specification
# works with new dataset / xfail with legacy implementation
fs = LocalFileSystem.get_instance()
base_path = tempdir

integer_keys = [0, 1, 2, 3, 4]
partition_spec = [['integers', integer_keys]]
N = 5

df = pd.DataFrame({
'index': np.arange(N),
'integers': np.array(integer_keys, dtype='i4'),
}, columns=['index', 'integers'])

_generate_partition_directories(fs, base_path, partition_spec, df)

msg = "Field named 'non_existent_column' not found"
with pytest.raises(ValueError, match=msg):
pq.ParquetDataset(base_path, filesystem=fs,
filters=[('non_existent_column', '<', 3), ],
use_legacy_dataset=use_legacy_dataset).read()


@pytest.mark.pandas
def test_filters_read_table(tempdir):
# test that filters keyword is passed through in read_table
Expand Down Expand Up @@ -2041,6 +2070,33 @@ def test_filters_read_table(tempdir):
assert table.num_rows == 3


@pytest.mark.pandas
@parametrize_legacy_dataset_fixed
def test_partition_keys_with_underscores(tempdir, use_legacy_dataset):
# ARROW-5666 - partition field values with underscores preserve underscores
# xfail with legacy dataset -> they get interpreted as integers
fs = LocalFileSystem.get_instance()
base_path = tempdir

string_keys = ["2019_2", "2019_3"]
partition_spec = [
['year_week', string_keys],
]
N = 2

df = pd.DataFrame({
'index': np.arange(N),
'year_week': np.array(string_keys, dtype='object'),
}, columns=['index', 'year_week'])

_generate_partition_directories(fs, base_path, partition_spec, df)

dataset = pq.ParquetDataset(
base_path, use_legacy_dataset=use_legacy_dataset)
result = dataset.read()
assert result.column("year_week").to_pylist() == string_keys


@pytest.fixture
def s3_bucket(request, s3_connection, s3_server):
boto3 = pytest.importorskip('boto3')
Expand Down Expand Up @@ -2586,6 +2642,20 @@ def test_ignore_no_private_directories_path_list(
_assert_dataset_paths(dataset, paths, use_legacy_dataset)


@parametrize_legacy_dataset_fixed
def test_empty_directory(tempdir, use_legacy_dataset):
# ARROW-5310 - reading empty directory
# fails with legacy implementation
empty_dir = tempdir / 'dataset'
empty_dir.mkdir()

dataset = pq.ParquetDataset(
empty_dir, use_legacy_dataset=use_legacy_dataset)
result = dataset.read()
assert result.num_rows == 0
assert result.num_columns == 0


@pytest.mark.pandas
@parametrize_legacy_dataset
def test_multiindex_duplicate_values(tempdir, use_legacy_dataset):
Expand Down