From 3e02022bca9bdbf44d055a754b769cdc91c3aa5b Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Tue, 16 Apr 2024 23:55:45 +0200 Subject: [PATCH 1/2] Test: Add test to partition on field with a dot This is not allowed in Avro, just checking if it works. --- tests/integration/test_reads.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index ee9b17e438..76086fef04 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -41,6 +41,7 @@ from pyiceberg.io.pyarrow import pyarrow_to_schema from pyiceberg.schema import Schema from pyiceberg.table import Table +from pyiceberg.transforms import IdentityTransform from pyiceberg.types import ( BooleanType, IntegerType, @@ -473,6 +474,31 @@ def test_sanitize_character(catalog: Catalog) -> None: assert arrow_table.schema.names[0] == table_test_table_sanitized_character.schema().fields[0].name +@pytest.mark.integration +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('session_catalog_hive'), pytest.lazy_fixture('session_catalog')]) +def test_sanitize_character_partitioned(catalog: Catalog) -> None: + table_name = "default.test_table_partitioned_sanitized_character" + try: + catalog.drop_table(table_name) + except NoSuchTableError: + pass + + tbl = catalog.create_table( + identifier=table_name, schema=Schema(NestedField(field_id=1, name="some.id", type=IntegerType(), required=True)) + ) + + import pyarrow as pa + + with tbl.update_spec() as upd: + upd.add_field("some.id", IdentityTransform()) + + ids = pa.Table.from_arrays([range(22)], schema=pa.schema([pa.field("some.id", pa.int32(), nullable=False)])) + + tbl.append(ids) + + assert len(tbl.scan().to_arrow()) == 22 + + @pytest.mark.integration @pytest.mark.parametrize('catalog', [pytest.lazy_fixture('session_catalog_hive'), pytest.lazy_fixture('session_catalog')]) def test_null_list_and_map(catalog: Catalog) -> None: From da2fd2fdbc1e3602ab325d08f0bc90f0fcec4d8e Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Thu, 25 Apr 2024 10:00:14 +0200 Subject: [PATCH 2/2] Use `_create_table` instead --- tests/integration/test_reads.py | 26 -------------------- tests/integration/test_writes/test_writes.py | 26 ++++++++++++++++++++ tests/integration/test_writes/utils.py | 17 +++++-------- 3 files changed, 32 insertions(+), 37 deletions(-) diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index 76086fef04..ee9b17e438 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -41,7 +41,6 @@ from pyiceberg.io.pyarrow import pyarrow_to_schema from pyiceberg.schema import Schema from pyiceberg.table import Table -from pyiceberg.transforms import IdentityTransform from pyiceberg.types import ( BooleanType, IntegerType, @@ -474,31 +473,6 @@ def test_sanitize_character(catalog: Catalog) -> None: assert arrow_table.schema.names[0] == table_test_table_sanitized_character.schema().fields[0].name -@pytest.mark.integration -@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('session_catalog_hive'), pytest.lazy_fixture('session_catalog')]) -def test_sanitize_character_partitioned(catalog: Catalog) -> None: - table_name = "default.test_table_partitioned_sanitized_character" - try: - catalog.drop_table(table_name) - except NoSuchTableError: - pass - - tbl = catalog.create_table( - identifier=table_name, schema=Schema(NestedField(field_id=1, name="some.id", type=IntegerType(), required=True)) - ) - - import pyarrow as pa - - with tbl.update_spec() as upd: - upd.add_field("some.id", IdentityTransform()) - - ids = pa.Table.from_arrays([range(22)], schema=pa.schema([pa.field("some.id", pa.int32(), nullable=False)])) - - tbl.append(ids) - - assert len(tbl.scan().to_arrow()) == 22 - - @pytest.mark.integration @pytest.mark.parametrize('catalog', [pytest.lazy_fixture('session_catalog_hive'), pytest.lazy_fixture('session_catalog')]) def test_null_list_and_map(catalog: Catalog) -> None: diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 775a6f9d42..06a922dc02 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -36,7 +36,11 @@ from pyiceberg.catalog.hive import HiveCatalog from pyiceberg.catalog.sql import SqlCatalog from pyiceberg.exceptions import NoSuchTableError +from pyiceberg.partitioning import PartitionField, PartitionSpec +from pyiceberg.schema import Schema from pyiceberg.table import TableProperties, _dataframe_to_data_files +from pyiceberg.transforms import IdentityTransform +from pyiceberg.types import IntegerType, NestedField from tests.conftest import TEST_DATA_WITH_NULL from utils import _create_table @@ -789,3 +793,25 @@ def test_hive_catalog_storage_descriptor( assert len(tbl.scan().to_arrow()) == 3 # check if spark can read the table assert spark.sql("SELECT * FROM hive.default.test_storage_descriptor").count() == 3 + + +@pytest.mark.integration +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('session_catalog_hive'), pytest.lazy_fixture('session_catalog')]) +def test_sanitize_character_partitioned(catalog: Catalog) -> None: + table_name = "default.test_table_partitioned_sanitized_character" + try: + catalog.drop_table(table_name) + except NoSuchTableError: + pass + + tbl = _create_table( + session_catalog=catalog, + identifier=table_name, + schema=Schema(NestedField(field_id=1, name="some.id", type=IntegerType(), required=True)), + partition_spec=PartitionSpec( + PartitionField(source_id=1, field_id=1000, name="some.id_identity", transform=IdentityTransform()) + ), + data=[pa.Table.from_arrays([range(22)], schema=pa.schema([pa.field("some.id", pa.int32(), nullable=False)]))], + ) + + assert len(tbl.scan().to_arrow()) == 22 diff --git a/tests/integration/test_writes/utils.py b/tests/integration/test_writes/utils.py index 742b1e14fc..9f1f6df043 100644 --- a/tests/integration/test_writes/utils.py +++ b/tests/integration/test_writes/utils.py @@ -21,10 +21,10 @@ from pyiceberg.catalog import Catalog from pyiceberg.exceptions import NoSuchTableError -from pyiceberg.partitioning import PartitionSpec +from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import Table -from pyiceberg.typedef import Properties +from pyiceberg.typedef import EMPTY_DICT, Properties from pyiceberg.types import ( BinaryType, BooleanType, @@ -62,9 +62,9 @@ def _create_table( session_catalog: Catalog, identifier: str, - properties: Properties, + properties: Properties = EMPTY_DICT, data: Optional[List[pa.Table]] = None, - partition_spec: Optional[PartitionSpec] = None, + partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, schema: Union[Schema, "pa.Schema"] = TABLE_SCHEMA, ) -> Table: try: @@ -72,14 +72,9 @@ def _create_table( except NoSuchTableError: pass - if partition_spec: - tbl = session_catalog.create_table( - identifier=identifier, schema=schema, properties=properties, partition_spec=partition_spec - ) - else: - tbl = session_catalog.create_table(identifier=identifier, schema=schema, properties=properties) + tbl = session_catalog.create_table(identifier=identifier, schema=schema, properties=properties, partition_spec=partition_spec) - if data: + if data is not None: for d in data: tbl.append(d)