From 38f47072d56890758abce4ed86b1eb26c1448c48 Mon Sep 17 00:00:00 2001 From: HonahX Date: Fri, 5 Apr 2024 21:37:05 -0700 Subject: [PATCH 1/5] test --- pyiceberg/catalog/hive.py | 2 +- tests/conftest.py | 24 +++++++++++++++++++++++- tests/integration/test_writes.py | 29 +++++++++++++++++++++++++++++ 3 files changed, 53 insertions(+), 2 deletions(-) diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index fe6a4fe027..2dff6408f6 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -184,7 +184,7 @@ def _annotate_namespace(database: HiveDatabase, properties: Properties) -> HiveD DateType: "date", TimeType: "string", TimestampType: "timestamp", - TimestamptzType: "timestamp", + TimestamptzType: "timestamp with my bad timezone", StringType: "string", UUIDType: "string", BinaryType: "binary", diff --git a/tests/conftest.py b/tests/conftest.py index 89f432ecbc..30f4cb8d30 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2006,6 +2006,20 @@ def session_catalog() -> Catalog: ) +@pytest.fixture(scope="session") +def session_catalog_hive() -> Catalog: + return load_catalog( + "local", + **{ + "type": "hive", + "uri": "http://localhost:9083", + "s3.endpoint": "http://localhost:9000", + "s3.access-key-id": "admin", + "s3.secret-access-key": "password" + }, + ) + + @pytest.fixture(scope="session") def spark() -> "SparkSession": import importlib.metadata @@ -2018,7 +2032,8 @@ def spark() -> "SparkSession": os.environ["PYSPARK_SUBMIT_ARGS"] = ( f"--packages org.apache.iceberg:iceberg-spark-runtime-{spark_version}_{scala_version}:{iceberg_version}," - f"org.apache.iceberg:iceberg-aws-bundle:{iceberg_version} pyspark-shell" + f"org.apache.iceberg:iceberg-aws-bundle:{iceberg_version}," + f" pyspark-shell" ) os.environ["AWS_REGION"] = "us-east-1" os.environ["AWS_ACCESS_KEY_ID"] = "admin" @@ -2037,6 +2052,13 @@ def spark() -> "SparkSession": .config("spark.sql.catalog.integration.s3.endpoint", "http://localhost:9000") .config("spark.sql.catalog.integration.s3.path-style-access", "true") .config("spark.sql.defaultCatalog", "integration") + .config("spark.sql.catalog.hive", "org.apache.iceberg.spark.SparkCatalog") + .config("spark.sql.catalog.hive.type", "hive") + .config("spark.sql.catalog.hive.uri", "http://localhost:9083") + .config("spark.sql.catalog.hive.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") + .config("spark.sql.catalog.hive.warehouse", "s3://warehouse/hive/") + .config("spark.sql.catalog.hive.s3.endpoint", "http://localhost:9000") + .config("spark.sql.catalog.hive.s3.path-style-access", "true") .getOrCreate() ) diff --git a/tests/integration/test_writes.py b/tests/integration/test_writes.py index e950fb43b1..e1b68ca55b 100644 --- a/tests/integration/test_writes.py +++ b/tests/integration/test_writes.py @@ -34,8 +34,10 @@ from pytest_mock.plugin import MockerFixture from pyiceberg.catalog import Catalog +from pyiceberg.catalog.hive import HiveCatalog from pyiceberg.catalog.sql import SqlCatalog from pyiceberg.exceptions import NoSuchTableError +from pyiceberg.io.pyarrow import pyarrow_to_schema from pyiceberg.schema import Schema from pyiceberg.table import Table, TableProperties, _dataframe_to_data_files from pyiceberg.typedef import Properties @@ -53,6 +55,7 @@ TimestampType, TimestamptzType, ) +from pyiceberg.utils.datetime import timestamptz_to_micros TEST_DATA_WITH_NULL = { 'bool': [False, None, True], @@ -832,3 +835,29 @@ def get_metadata_entries_count(identifier: str) -> int: tbl.transaction().set_properties({"test": "2"}).commit_transaction() tbl.append(arrow_table_with_null) assert get_metadata_entries_count(identifier) == 4 + + +@pytest.mark.integration +def test_hive_storage_descriptor(session_catalog_hive: HiveCatalog, table_schema_with_all_types: Schema, spark: SparkSession) -> None: + try: + session_catalog_hive.drop_table(("default", "test_storage_descriptor")) + except NoSuchTableError: + pass # Just to make sure that the table doesn't exist + pa_table = pa.table({ + 'timestamp_with_timezone': pa.array( + [timestamptz_to_micros('2023-01-01T12:00:00+00:00'), timestamptz_to_micros('2023-01-02T12:00:00+00:00')], type=pa.timestamp('us', tz='UTC') + ) + }) + pa_table_2 = pa.table( + { + 'bool': pa.array([True, False], type=pa.bool_()), + } + ) + iceberg_schema = Schema(NestedField(field_id=1, name="timestamp_with_timezone", field_type=TimestamptzType(), required=False)) + iceberg_schema_2 = Schema(NestedField(field_id=1, name="bool", field_type=BooleanType(), required=False)) + table = session_catalog_hive.create_table(identifier=("default", "test_storage_descriptor"), schema=iceberg_schema_2) + table.append(pa_table_2) + + print(session_catalog_hive.load_table(("default", "test_storage_descriptor")).scan().to_pandas()) + + spark.sql("SELECT * FROM hive.default.test_storage_descriptor").show() From 23766bd86ff6a2b43f30e8b1c36da83009c43eee Mon Sep 17 00:00:00 2001 From: HonahX Date: Sun, 7 Apr 2024 18:32:02 -0700 Subject: [PATCH 2/5] add integration test --- pyiceberg/catalog/hive.py | 2 +- tests/conftest.py | 2 +- tests/integration/test_writes.py | 37 ++++++++++++-------------------- 3 files changed, 16 insertions(+), 25 deletions(-) diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 2dff6408f6..14b76c3537 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -184,7 +184,7 @@ def _annotate_namespace(database: HiveDatabase, properties: Properties) -> HiveD DateType: "date", TimeType: "string", TimestampType: "timestamp", - TimestamptzType: "timestamp with my bad timezone", + TimestamptzType: "timestamp with local time zone", StringType: "string", UUIDType: "string", BinaryType: "binary", diff --git a/tests/conftest.py b/tests/conftest.py index 30f4cb8d30..bde3c07130 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2015,7 +2015,7 @@ def session_catalog_hive() -> Catalog: "uri": "http://localhost:9083", "s3.endpoint": "http://localhost:9000", "s3.access-key-id": "admin", - "s3.secret-access-key": "password" + "s3.secret-access-key": "password", }, ) diff --git a/tests/integration/test_writes.py b/tests/integration/test_writes.py index e1b68ca55b..7cced4ede7 100644 --- a/tests/integration/test_writes.py +++ b/tests/integration/test_writes.py @@ -37,7 +37,6 @@ from pyiceberg.catalog.hive import HiveCatalog from pyiceberg.catalog.sql import SqlCatalog from pyiceberg.exceptions import NoSuchTableError -from pyiceberg.io.pyarrow import pyarrow_to_schema from pyiceberg.schema import Schema from pyiceberg.table import Table, TableProperties, _dataframe_to_data_files from pyiceberg.typedef import Properties @@ -55,7 +54,6 @@ TimestampType, TimestamptzType, ) -from pyiceberg.utils.datetime import timestamptz_to_micros TEST_DATA_WITH_NULL = { 'bool': [False, None, True], @@ -838,26 +836,19 @@ def get_metadata_entries_count(identifier: str) -> int: @pytest.mark.integration -def test_hive_storage_descriptor(session_catalog_hive: HiveCatalog, table_schema_with_all_types: Schema, spark: SparkSession) -> None: - try: - session_catalog_hive.drop_table(("default", "test_storage_descriptor")) - except NoSuchTableError: - pass # Just to make sure that the table doesn't exist - pa_table = pa.table({ - 'timestamp_with_timezone': pa.array( - [timestamptz_to_micros('2023-01-01T12:00:00+00:00'), timestamptz_to_micros('2023-01-02T12:00:00+00:00')], type=pa.timestamp('us', tz='UTC') - ) - }) - pa_table_2 = pa.table( - { - 'bool': pa.array([True, False], type=pa.bool_()), - } +@pytest.mark.parametrize("format_version", [1, 2]) +def test_hive_catalog_storage_descriptor( + session_catalog_hive: HiveCatalog, + pa_schema: pa.Schema, + arrow_table_with_null: pa.Table, + spark: SparkSession, + format_version: int, +) -> None: + tbl = _create_table( + session_catalog_hive, "default.test_storage_descriptor", {"format-version": format_version}, [arrow_table_with_null] ) - iceberg_schema = Schema(NestedField(field_id=1, name="timestamp_with_timezone", field_type=TimestamptzType(), required=False)) - iceberg_schema_2 = Schema(NestedField(field_id=1, name="bool", field_type=BooleanType(), required=False)) - table = session_catalog_hive.create_table(identifier=("default", "test_storage_descriptor"), schema=iceberg_schema_2) - table.append(pa_table_2) - - print(session_catalog_hive.load_table(("default", "test_storage_descriptor")).scan().to_pandas()) - spark.sql("SELECT * FROM hive.default.test_storage_descriptor").show() + # check if pyiceberg can read the table + assert len(tbl.scan().to_arrow()) == 3 + # check if spark can read the table + assert spark.sql("SELECT * FROM hive.default.test_storage_descriptor").count() == 3 From 504cf7d91af5d5f8e7ab99debc490dd944764510 Mon Sep 17 00:00:00 2001 From: HonahX Date: Sun, 7 Apr 2024 19:19:35 -0700 Subject: [PATCH 3/5] add Hive2 compatible --- pyiceberg/catalog/glue.py | 3 ++- pyiceberg/catalog/hive.py | 27 +++++++++++++++++++++++---- pyiceberg/table/__init__.py | 6 ++++++ tests/catalog/test_hive.py | 13 +++++++++++-- 4 files changed, 42 insertions(+), 7 deletions(-) diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py index 93177022aa..c3c2fdafc6 100644 --- a/pyiceberg/catalog/glue.py +++ b/pyiceberg/catalog/glue.py @@ -65,6 +65,7 @@ from pyiceberg.table import ( CommitTableRequest, CommitTableResponse, + PropertyUtil, Table, update_table_metadata, ) @@ -344,7 +345,7 @@ def _update_glue_table(self, database_name: str, table_name: str, table_input: T self.glue.update_table( DatabaseName=database_name, TableInput=table_input, - SkipArchive=self.properties.get(GLUE_SKIP_ARCHIVE, GLUE_SKIP_ARCHIVE_DEFAULT), + SkipArchive=PropertyUtil.property_as_bool(self.properties, GLUE_SKIP_ARCHIVE, GLUE_SKIP_ARCHIVE_DEFAULT), VersionId=version_id, ) except self.glue.exceptions.EntityNotFoundException as e: diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 14b76c3537..b504da9a73 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -74,7 +74,7 @@ from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema, SchemaVisitor, visit from pyiceberg.serializers import FromInputFile -from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, TableProperties, update_table_metadata +from pyiceberg.table import CommitTableRequest, CommitTableResponse, PropertyUtil, Table, TableProperties, update_table_metadata from pyiceberg.table.metadata import new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties @@ -107,6 +107,10 @@ COMMENT = "comment" OWNER = "owner" +# If set to true, HiveCatalog will operate in Hive2 compatibility mode +HIVE2_COMPATIBLE = "hive.hive2-compatible" +HIVE2_COMPATIBLE_DEFAULT = False + class _HiveClient: """Helper class to nicely open and close the transport.""" @@ -136,10 +140,15 @@ def __exit__( self._transport.close() -def _construct_hive_storage_descriptor(schema: Schema, location: Optional[str]) -> StorageDescriptor: +def _construct_hive_storage_descriptor( + schema: Schema, location: Optional[str], hive2_compatible: bool = False +) -> StorageDescriptor: ser_de_info = SerDeInfo(serializationLib="org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") return StorageDescriptor( - [FieldSchema(field.name, visit(field.field_type, SchemaToHiveConverter()), field.doc) for field in schema.fields], + [ + FieldSchema(field.name, visit(field.field_type, SchemaToHiveConverter(hive2_compatible)), field.doc) + for field in schema.fields + ], location, "org.apache.hadoop.mapred.FileInputFormat", "org.apache.hadoop.mapred.FileOutputFormat", @@ -193,6 +202,11 @@ def _annotate_namespace(database: HiveDatabase, properties: Properties) -> HiveD class SchemaToHiveConverter(SchemaVisitor[str]): + hive2_compatible: bool + + def __init__(self, hive2_compatible: bool): + self.hive2_compatible = hive2_compatible + def schema(self, schema: Schema, struct_result: str) -> str: return struct_result @@ -212,6 +226,9 @@ def map(self, map_type: MapType, key_result: str, value_result: str) -> str: def primitive(self, primitive: PrimitiveType) -> str: if isinstance(primitive, DecimalType): return f"decimal({primitive.precision},{primitive.scale})" + elif self.hive2_compatible and isinstance(primitive, TimestamptzType): + # Hive2 doesn't support timestamp with local time zone + return "timestamp" else: return HIVE_PRIMITIVE_TYPES[type(primitive)] @@ -300,7 +317,9 @@ def create_table( owner=properties[OWNER] if properties and OWNER in properties else getpass.getuser(), createTime=current_time_millis // 1000, lastAccessTime=current_time_millis // 1000, - sd=_construct_hive_storage_descriptor(schema, location), + sd=_construct_hive_storage_descriptor( + schema, location, PropertyUtil.property_as_bool(self.properties, HIVE2_COMPATIBLE, HIVE2_COMPATIBLE_DEFAULT) + ), tableType=EXTERNAL_TABLE, parameters=_construct_parameters(metadata_location), ) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index e183d82775..5914da51a9 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -249,6 +249,12 @@ def property_as_int(properties: Dict[str, str], property_name: str, default: Opt else: return default + @staticmethod + def property_as_bool(properties: Dict[str, str], property_name: str, default: bool) -> bool: + if value := properties.get(property_name): + return value.lower() == "true" + return default + class Transaction: _table: Table diff --git a/tests/catalog/test_hive.py b/tests/catalog/test_hive.py index f0b5df79d7..a8c904d646 100644 --- a/tests/catalog/test_hive.py +++ b/tests/catalog/test_hive.py @@ -194,9 +194,14 @@ def test_check_number_of_namespaces(table_schema_simple: Schema) -> None: catalog.create_table("table", schema=table_schema_simple) +@pytest.mark.parametrize("hive2_compatible", [True, False]) @patch("time.time", MagicMock(return_value=12345)) -def test_create_table(table_schema_with_all_types: Schema, hive_database: HiveDatabase, hive_table: HiveTable) -> None: +def test_create_table( + table_schema_with_all_types: Schema, hive_database: HiveDatabase, hive_table: HiveTable, hive2_compatible: bool +) -> None: catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL) + if hive2_compatible: + catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL, **{"hive.hive2-compatible": "true"}) catalog._client = MagicMock() catalog._client.__enter__().create_table.return_value = None @@ -229,7 +234,11 @@ def test_create_table(table_schema_with_all_types: Schema, hive_database: HiveDa FieldSchema(name='date', type='date', comment=None), FieldSchema(name='time', type='string', comment=None), FieldSchema(name='timestamp', type='timestamp', comment=None), - FieldSchema(name='timestamptz', type='timestamp', comment=None), + FieldSchema( + name='timestamptz', + type='timestamp' if hive2_compatible else 'timestamp with local time zone', + comment=None, + ), FieldSchema(name='string', type='string', comment=None), FieldSchema(name='uuid', type='string', comment=None), FieldSchema(name='fixed', type='binary', comment=None), From 79a5996323ff47cde10306d2b707355ad23b8ff4 Mon Sep 17 00:00:00 2001 From: HonahX Date: Sun, 7 Apr 2024 19:29:00 -0700 Subject: [PATCH 4/5] add doc --- mkdocs/docs/configuration.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index 93b198c328..72516e02f1 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -232,6 +232,18 @@ catalog: s3.secret-access-key: password ``` +In case of Hive 2.x: + +```yaml +catalog: + default: + uri: thrift://localhost:9083 + hive.hive2-compatible: true + s3.endpoint: http://localhost:9000 + s3.access-key-id: admin + s3.secret-access-key: password +``` + ## Glue Catalog Your AWS credentials can be passed directly through the Python API. From 1450a4fd8f8dd486217391aaa3971a54b0e46c8f Mon Sep 17 00:00:00 2001 From: HonahX Date: Sun, 7 Apr 2024 19:33:04 -0700 Subject: [PATCH 5/5] fix lint --- tests/conftest.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index bde3c07130..c80b7bc11a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2032,8 +2032,7 @@ def spark() -> "SparkSession": os.environ["PYSPARK_SUBMIT_ARGS"] = ( f"--packages org.apache.iceberg:iceberg-spark-runtime-{spark_version}_{scala_version}:{iceberg_version}," - f"org.apache.iceberg:iceberg-aws-bundle:{iceberg_version}," - f" pyspark-shell" + f"org.apache.iceberg:iceberg-aws-bundle:{iceberg_version} pyspark-shell" ) os.environ["AWS_REGION"] = "us-east-1" os.environ["AWS_ACCESS_KEY_ID"] = "admin"