diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index 93b198c328..72516e02f1 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -232,6 +232,18 @@ catalog: s3.secret-access-key: password ``` +In case of Hive 2.x: + +```yaml +catalog: + default: + uri: thrift://localhost:9083 + hive.hive2-compatible: true + s3.endpoint: http://localhost:9000 + s3.access-key-id: admin + s3.secret-access-key: password +``` + ## Glue Catalog Your AWS credentials can be passed directly through the Python API. diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py index 93177022aa..c3c2fdafc6 100644 --- a/pyiceberg/catalog/glue.py +++ b/pyiceberg/catalog/glue.py @@ -65,6 +65,7 @@ from pyiceberg.table import ( CommitTableRequest, CommitTableResponse, + PropertyUtil, Table, update_table_metadata, ) @@ -344,7 +345,7 @@ def _update_glue_table(self, database_name: str, table_name: str, table_input: T self.glue.update_table( DatabaseName=database_name, TableInput=table_input, - SkipArchive=self.properties.get(GLUE_SKIP_ARCHIVE, GLUE_SKIP_ARCHIVE_DEFAULT), + SkipArchive=PropertyUtil.property_as_bool(self.properties, GLUE_SKIP_ARCHIVE, GLUE_SKIP_ARCHIVE_DEFAULT), VersionId=version_id, ) except self.glue.exceptions.EntityNotFoundException as e: diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index fe6a4fe027..b504da9a73 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -74,7 +74,7 @@ from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema, SchemaVisitor, visit from pyiceberg.serializers import FromInputFile -from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, TableProperties, update_table_metadata +from pyiceberg.table import CommitTableRequest, CommitTableResponse, PropertyUtil, Table, TableProperties, update_table_metadata from pyiceberg.table.metadata import new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties @@ -107,6 +107,10 @@ COMMENT = "comment" OWNER = "owner" +# If set to true, HiveCatalog will operate in Hive2 compatibility mode +HIVE2_COMPATIBLE = "hive.hive2-compatible" +HIVE2_COMPATIBLE_DEFAULT = False + class _HiveClient: """Helper class to nicely open and close the transport.""" @@ -136,10 +140,15 @@ def __exit__( self._transport.close() -def _construct_hive_storage_descriptor(schema: Schema, location: Optional[str]) -> StorageDescriptor: +def _construct_hive_storage_descriptor( + schema: Schema, location: Optional[str], hive2_compatible: bool = False +) -> StorageDescriptor: ser_de_info = SerDeInfo(serializationLib="org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") return StorageDescriptor( - [FieldSchema(field.name, visit(field.field_type, SchemaToHiveConverter()), field.doc) for field in schema.fields], + [ + FieldSchema(field.name, visit(field.field_type, SchemaToHiveConverter(hive2_compatible)), field.doc) + for field in schema.fields + ], location, "org.apache.hadoop.mapred.FileInputFormat", "org.apache.hadoop.mapred.FileOutputFormat", @@ -184,7 +193,7 @@ def _annotate_namespace(database: HiveDatabase, properties: Properties) -> HiveD DateType: "date", TimeType: "string", TimestampType: "timestamp", - TimestamptzType: "timestamp", + TimestamptzType: "timestamp with local time zone", StringType: "string", UUIDType: "string", BinaryType: "binary", @@ -193,6 +202,11 @@ def _annotate_namespace(database: HiveDatabase, properties: Properties) -> HiveD class SchemaToHiveConverter(SchemaVisitor[str]): + hive2_compatible: bool + + def __init__(self, hive2_compatible: bool): + self.hive2_compatible = hive2_compatible + def schema(self, schema: Schema, struct_result: str) -> str: return struct_result @@ -212,6 +226,9 @@ def map(self, map_type: MapType, key_result: str, value_result: str) -> str: def primitive(self, primitive: PrimitiveType) -> str: if isinstance(primitive, DecimalType): return f"decimal({primitive.precision},{primitive.scale})" + elif self.hive2_compatible and isinstance(primitive, TimestamptzType): + # Hive2 doesn't support timestamp with local time zone + return "timestamp" else: return HIVE_PRIMITIVE_TYPES[type(primitive)] @@ -300,7 +317,9 @@ def create_table( owner=properties[OWNER] if properties and OWNER in properties else getpass.getuser(), createTime=current_time_millis // 1000, lastAccessTime=current_time_millis // 1000, - sd=_construct_hive_storage_descriptor(schema, location), + sd=_construct_hive_storage_descriptor( + schema, location, PropertyUtil.property_as_bool(self.properties, HIVE2_COMPATIBLE, HIVE2_COMPATIBLE_DEFAULT) + ), tableType=EXTERNAL_TABLE, parameters=_construct_parameters(metadata_location), ) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index e183d82775..5914da51a9 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -249,6 +249,12 @@ def property_as_int(properties: Dict[str, str], property_name: str, default: Opt else: return default + @staticmethod + def property_as_bool(properties: Dict[str, str], property_name: str, default: bool) -> bool: + if value := properties.get(property_name): + return value.lower() == "true" + return default + class Transaction: _table: Table diff --git a/tests/catalog/test_hive.py b/tests/catalog/test_hive.py index f0b5df79d7..a8c904d646 100644 --- a/tests/catalog/test_hive.py +++ b/tests/catalog/test_hive.py @@ -194,9 +194,14 @@ def test_check_number_of_namespaces(table_schema_simple: Schema) -> None: catalog.create_table("table", schema=table_schema_simple) +@pytest.mark.parametrize("hive2_compatible", [True, False]) @patch("time.time", MagicMock(return_value=12345)) -def test_create_table(table_schema_with_all_types: Schema, hive_database: HiveDatabase, hive_table: HiveTable) -> None: +def test_create_table( + table_schema_with_all_types: Schema, hive_database: HiveDatabase, hive_table: HiveTable, hive2_compatible: bool +) -> None: catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL) + if hive2_compatible: + catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL, **{"hive.hive2-compatible": "true"}) catalog._client = MagicMock() catalog._client.__enter__().create_table.return_value = None @@ -229,7 +234,11 @@ def test_create_table(table_schema_with_all_types: Schema, hive_database: HiveDa FieldSchema(name='date', type='date', comment=None), FieldSchema(name='time', type='string', comment=None), FieldSchema(name='timestamp', type='timestamp', comment=None), - FieldSchema(name='timestamptz', type='timestamp', comment=None), + FieldSchema( + name='timestamptz', + type='timestamp' if hive2_compatible else 'timestamp with local time zone', + comment=None, + ), FieldSchema(name='string', type='string', comment=None), FieldSchema(name='uuid', type='string', comment=None), FieldSchema(name='fixed', type='binary', comment=None), diff --git a/tests/conftest.py b/tests/conftest.py index 89f432ecbc..c80b7bc11a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2006,6 +2006,20 @@ def session_catalog() -> Catalog: ) +@pytest.fixture(scope="session") +def session_catalog_hive() -> Catalog: + return load_catalog( + "local", + **{ + "type": "hive", + "uri": "http://localhost:9083", + "s3.endpoint": "http://localhost:9000", + "s3.access-key-id": "admin", + "s3.secret-access-key": "password", + }, + ) + + @pytest.fixture(scope="session") def spark() -> "SparkSession": import importlib.metadata @@ -2037,6 +2051,13 @@ def spark() -> "SparkSession": .config("spark.sql.catalog.integration.s3.endpoint", "http://localhost:9000") .config("spark.sql.catalog.integration.s3.path-style-access", "true") .config("spark.sql.defaultCatalog", "integration") + .config("spark.sql.catalog.hive", "org.apache.iceberg.spark.SparkCatalog") + .config("spark.sql.catalog.hive.type", "hive") + .config("spark.sql.catalog.hive.uri", "http://localhost:9083") + .config("spark.sql.catalog.hive.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") + .config("spark.sql.catalog.hive.warehouse", "s3://warehouse/hive/") + .config("spark.sql.catalog.hive.s3.endpoint", "http://localhost:9000") + .config("spark.sql.catalog.hive.s3.path-style-access", "true") .getOrCreate() ) diff --git a/tests/integration/test_writes.py b/tests/integration/test_writes.py index e950fb43b1..7cced4ede7 100644 --- a/tests/integration/test_writes.py +++ b/tests/integration/test_writes.py @@ -34,6 +34,7 @@ from pytest_mock.plugin import MockerFixture from pyiceberg.catalog import Catalog +from pyiceberg.catalog.hive import HiveCatalog from pyiceberg.catalog.sql import SqlCatalog from pyiceberg.exceptions import NoSuchTableError from pyiceberg.schema import Schema @@ -832,3 +833,22 @@ def get_metadata_entries_count(identifier: str) -> int: tbl.transaction().set_properties({"test": "2"}).commit_transaction() tbl.append(arrow_table_with_null) assert get_metadata_entries_count(identifier) == 4 + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_hive_catalog_storage_descriptor( + session_catalog_hive: HiveCatalog, + pa_schema: pa.Schema, + arrow_table_with_null: pa.Table, + spark: SparkSession, + format_version: int, +) -> None: + tbl = _create_table( + session_catalog_hive, "default.test_storage_descriptor", {"format-version": format_version}, [arrow_table_with_null] + ) + + # check if pyiceberg can read the table + assert len(tbl.scan().to_arrow()) == 3 + # check if spark can read the table + assert spark.sql("SELECT * FROM hive.default.test_storage_descriptor").count() == 3