From b2cf315b2afd5175198329b3b24ce6d1fd297e40 Mon Sep 17 00:00:00 2001 From: Konstantin Vedernikov <75157521+scanhex12@users.noreply.github.com> Date: Sat, 5 Jul 2025 10:51:23 +0000 Subject: [PATCH] Merge pull request #83132 from scanhex12/timestamps_glue Support `TimestampTZ` in Glue catalog --- src/Databases/DataLake/DatabaseDataLake.cpp | 7 +- src/Databases/DataLake/GlueCatalog.cpp | 163 +++++++++++++++--- src/Databases/DataLake/GlueCatalog.h | 19 +- tests/integration/test_database_glue/test.py | 49 ++++++ .../integration/test_database_iceberg/test.py | 49 ++++++ 5 files changed, 258 insertions(+), 29 deletions(-) diff --git a/src/Databases/DataLake/DatabaseDataLake.cpp b/src/Databases/DataLake/DatabaseDataLake.cpp index 7a686c2bb04b..bbe23cbb53cf 100644 --- a/src/Databases/DataLake/DatabaseDataLake.cpp +++ b/src/Databases/DataLake/DatabaseDataLake.cpp @@ -153,11 +153,10 @@ std::shared_ptr DatabaseDataLake::getCatalog() const case DB::DatabaseDataLakeCatalogType::GLUE: { catalog_impl = std::make_shared( - settings[DatabaseDataLakeSetting::aws_access_key_id].value, - settings[DatabaseDataLakeSetting::aws_secret_access_key].value, - settings[DatabaseDataLakeSetting::region].value, url, - Context::getGlobalContextInstance()); + Context::getGlobalContextInstance(), + settings, + table_engine_definition); break; } case DB::DatabaseDataLakeCatalogType::ICEBERG_HIVE: diff --git a/src/Databases/DataLake/GlueCatalog.cpp b/src/Databases/DataLake/GlueCatalog.cpp index 59d0135fd790..74c468cf29e7 100644 --- a/src/Databases/DataLake/GlueCatalog.cpp +++ b/src/Databases/DataLake/GlueCatalog.cpp @@ -1,4 +1,6 @@ #include +#include +#include #if USE_AWS_S3 && USE_AVRO @@ -8,6 +10,7 @@ #include #include +#include #include #include @@ -29,12 +32,19 @@ #include #include #include +#include +#include #include #include #include +#include +#include +#include +#include namespace DB::ErrorCodes { + extern const int BAD_ARGUMENTS; extern const int DATALAKE_DATABASE_ERROR; } @@ -54,20 +64,36 @@ namespace DB::StorageObjectStorageSetting extern const StorageObjectStorageSettingsString iceberg_metadata_file_path; } +namespace DB::DatabaseDataLakeSetting +{ + extern const DatabaseDataLakeSettingsString storage_endpoint; + extern const DatabaseDataLakeSettingsString aws_access_key_id; + extern const DatabaseDataLakeSettingsString aws_secret_access_key; + extern const DatabaseDataLakeSettingsString region; +} + +namespace CurrentMetrics +{ + extern const Metric MarkCacheBytes; + extern const Metric MarkCacheFiles; +} + namespace DataLake { GlueCatalog::GlueCatalog( - const String & access_key_id, - const String & secret_access_key, - const String & region_, const String & endpoint, - DB::ContextPtr context_) + DB::ContextPtr context_, + const DB::DatabaseDataLakeSettings & settings_, + DB::ASTPtr table_engine_definition_) : ICatalog("") , DB::WithContext(context_) - , log(getLogger("GlueCatalog(" + region_ + ")")) - , credentials(access_key_id, secret_access_key) - , region(region_) + , log(getLogger("GlueCatalog(" + settings_[DB::DatabaseDataLakeSetting::region].value + ")")) + , credentials(settings_[DB::DatabaseDataLakeSetting::aws_access_key_id].value, settings_[DB::DatabaseDataLakeSetting::aws_secret_access_key].value) + , region(settings_[DB::DatabaseDataLakeSetting::region].value) + , settings(settings_) + , table_engine_definition(table_engine_definition_) + , metadata_objects(CurrentMetrics::MarkCacheBytes, CurrentMetrics::MarkCacheFiles, 1024) { DB::S3::CredentialsConfiguration creds_config; creds_config.use_environment_credentials = true; @@ -280,23 +306,10 @@ void GlueCatalog::getTableMetadata( database_name + "." + table_name, message_part, "ICEBERG")); } - if (result.requiresSchema()) - { - DB::NamesAndTypesList schema; - auto columns = table_outcome.GetStorageDescriptor().GetColumns(); - for (const auto & column : columns) - { - const auto column_params = column.GetParameters(); - bool can_be_nullable = column_params.contains("iceberg.field.optional") && column_params.at("iceberg.field.optional") == "true"; - schema.push_back({column.GetName(), getType(column.GetType(), can_be_nullable)}); - } - result.setSchema(schema); - } - if (result.requiresCredentials()) setCredentials(result); - if (result.requiresDataLakeSpecificProperties()) + auto setup_specific_properties = [&] { const auto & table_params = table_outcome.GetParameters(); if (table_params.contains("metadata_location")) @@ -309,6 +322,38 @@ void GlueCatalog::getTableMetadata( "It means that it's unreadable with Glue catalog in ClickHouse, readable tables must have 'metadata_location' in table parameters", database_name + "." + table_name)); } + }; + + if (result.requiresDataLakeSpecificProperties()) + setup_specific_properties(); + + if (result.requiresSchema()) + { + DB::NamesAndTypesList schema; + auto columns = table_outcome.GetStorageDescriptor().GetColumns(); + for (const auto & column : columns) + { + const auto column_params = column.GetParameters(); + bool can_be_nullable = column_params.contains("iceberg.field.optional") && column_params.at("iceberg.field.optional") == "true"; + + /// Skip field if it's not "current" (for example Renamed). No idea how someone can utilize "non current fields" but for some reason + /// they are returned by Glue API. So if you do "RENAME COLUMN a to new_a" glue will return two fields: a and new_a. + /// And a will be marked as "non current" field. + if (column_params.contains("iceberg.field.current") && column_params.at("iceberg.field.current") == "false") + continue; + + String column_type = column.GetType(); + if (column_type == "timestamp") + { + if (!result.requiresDataLakeSpecificProperties()) + setup_specific_properties(); + if (classifyTimestampTZ(column.GetName(), result)) + column_type = "timestamptz"; + } + + schema.push_back({column.GetName(), getType(column_type, can_be_nullable)}); + } + result.setSchema(schema); } } else @@ -347,6 +392,82 @@ bool GlueCatalog::empty() const return true; } +bool GlueCatalog::classifyTimestampTZ(const String & column_name, const TableMetadata & table_metadata) const +{ + String metadata_path; + if (auto table_specific_properties = table_metadata.getDataLakeSpecificProperties(); + table_specific_properties.has_value()) + { + metadata_path = table_specific_properties->iceberg_metadata_file_location; + if (metadata_path.starts_with("s3:/")) + metadata_path = metadata_path.substr(5); + + // Delete bucket + std::size_t pos = metadata_path.find('/'); + if (pos != std::string::npos) + metadata_path = metadata_path.substr(pos + 1); + } + else + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Metadata specific properties should be defined"); + + if (!metadata_objects.get(metadata_path)) + { + DB::ASTStorage * storage = table_engine_definition->as(); + DB::ASTs args = storage->engine->arguments->children; + + auto table_endpoint = settings[DB::DatabaseDataLakeSetting::storage_endpoint].value; + if (args.empty()) + args.emplace_back(std::make_shared(table_endpoint)); + else + args[0] = std::make_shared(table_endpoint); + + if (args.size() == 1 && table_metadata.hasStorageCredentials()) + { + auto storage_credentials = table_metadata.getStorageCredentials(); + if (storage_credentials) + storage_credentials->addCredentialsToEngineArgs(args); + } + + auto storage_settings = std::make_shared(); + storage_settings->loadFromSettingsChanges(settings.allChanged()); + auto configuration = std::make_shared(storage_settings); + DB::StorageObjectStorage::Configuration::initialize(*configuration, args, getContext(), false); + + auto object_storage = configuration->createObjectStorage(getContext(), true); + const auto & read_settings = getContext()->getReadSettings(); + + DB::StoredObject metadata_stored_object(metadata_path); + auto read_buf = object_storage->readObject(metadata_stored_object, read_settings); + String metadata_file; + readString(metadata_file, *read_buf); + + Poco::JSON::Parser parser; + Poco::Dynamic::Var result = parser.parse(metadata_file); + auto metadata_object = result.extract(); + metadata_objects.set(metadata_path, std::make_shared(metadata_object)); + } + auto metadata_object = *metadata_objects.get(metadata_path); + auto current_schema_id = metadata_object->getValue("current-schema-id"); + auto schemas = metadata_object->getArray(Iceberg::f_schemas); + for (size_t i = 0; i < schemas->size(); ++i) + { + auto schema = schemas->getObject(static_cast(i)); + if (schema->getValue("schema-id") == current_schema_id) + { + auto fields = schema->getArray(Iceberg::f_fields); + for (size_t j = 0; j < fields->size(); ++j) + { + auto field = fields->getObject(static_cast(j)); + if (field->getValue(Iceberg::f_name) == column_name) + return field->getValue(Iceberg::f_type) == Iceberg::f_timestamptz; + } + } + } + + return false; +} + + } #endif diff --git a/src/Databases/DataLake/GlueCatalog.h b/src/Databases/DataLake/GlueCatalog.h index 259a22059b27..5e4b4f6a1def 100644 --- a/src/Databases/DataLake/GlueCatalog.h +++ b/src/Databases/DataLake/GlueCatalog.h @@ -7,6 +7,10 @@ #include #include #include +#include + +#include +#include namespace Aws::Glue { @@ -20,11 +24,10 @@ class GlueCatalog final : public ICatalog, private DB::WithContext { public: GlueCatalog( - const String & access_key_id, - const String & secret_access_key, - const String & region, const String & endpoint, - DB::ContextPtr context_); + DB::ContextPtr context_, + const DB::DatabaseDataLakeSettings & settings_, + DB::ASTPtr table_engine_definition_); ~GlueCatalog() override; @@ -60,10 +63,18 @@ class GlueCatalog final : public ICatalog, private DB::WithContext const LoggerPtr log; Aws::Auth::AWSCredentials credentials; std::string region; + DB::DatabaseDataLakeSettings settings; + DB::ASTPtr table_engine_definition; DataLake::ICatalog::Namespaces getDatabases(const std::string & prefix, size_t limit = 0) const; DB::Names getTablesForDatabase(const std::string & db_name, size_t limit = 0) const; void setCredentials(TableMetadata & metadata) const; + + /// The Glue catalog does not store detailed information about the types of timestamp columns, such as whether the column is timestamp or timestamptz. + /// This method allows to clarify the actual type of the timestamp column. + bool classifyTimestampTZ(const String & column_name, const TableMetadata & table_metadata) const; + + mutable DB::CacheBase metadata_objects; }; } diff --git a/tests/integration/test_database_glue/test.py b/tests/integration/test_database_glue/test.py index 7680d9a10ff8..6c2af05ce3f7 100644 --- a/tests/integration/test_database_glue/test.py +++ b/tests/integration/test_database_glue/test.py @@ -7,6 +7,7 @@ import pyarrow as pa import pytest import urllib3 +import pytz from datetime import datetime, timedelta from minio import Minio from pyiceberg.catalog import load_catalog @@ -22,6 +23,7 @@ StringType, StructType, TimestampType, + TimestamptzType, MapType, DecimalType, ) @@ -345,3 +347,50 @@ def test_hide_sensitive_info(started_cluster): ) assert "SECRET_1" not in node.query(f"SHOW CREATE DATABASE {CATALOG_NAME}") assert "SECRET_2" not in node.query(f"SHOW CREATE DATABASE {CATALOG_NAME}") + + +def test_timestamps(started_cluster): + node = started_cluster.instances["node1"] + + test_ref = f"test_list_tables_{uuid.uuid4()}" + table_name = f"{test_ref}_table" + root_namespace = f"{test_ref}_namespace" + + catalog = load_catalog_impl(started_cluster) + catalog.create_namespace(root_namespace) + + schema = Schema( + NestedField( + field_id=1, name="timestamp", field_type=TimestampType(), required=False + ), + NestedField( + field_id=2, + name="timestamptz", + field_type=TimestamptzType(), + required=False, + ), + ) + table = create_table(catalog, root_namespace, table_name, schema) + + create_clickhouse_glue_database(started_cluster, node, CATALOG_NAME) + + data = [ + { + "timestamp": datetime(2024, 1, 1, hour=12, minute=0, second=0, microsecond=0), + "timestamptz": datetime( + 2024, + 1, + 1, + hour=12, + minute=0, + second=0, + microsecond=0, + tzinfo=pytz.timezone("UTC"), + ) + } + ] + df = pa.Table.from_pylist(data) + table.append(df) + + assert node.query(f"SHOW CREATE TABLE {CATALOG_NAME}.`{root_namespace}.{table_name}`") == f"CREATE TABLE {CATALOG_NAME}.`{root_namespace}.{table_name}`\\n(\\n `timestamp` Nullable(DateTime64(6)),\\n `timestamptz` Nullable(DateTime64(6, \\'UTC\\'))\\n)\\nENGINE = Iceberg(\\'http://minio:9000/warehouse-glue/data/\\', \\'minio\\', \\'[HIDDEN]\\')\n" + assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{root_namespace}.{table_name}`") == "2024-01-01 12:00:00.000000\t2024-01-01 12:00:00.000000\n" diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index aab38abf5256..6c0cb6a6e508 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -11,6 +11,7 @@ import pytest import requests import urllib3 +import pytz from minio import Minio from pyiceberg.catalog import load_catalog from pyiceberg.partitioning import PartitionField, PartitionSpec @@ -24,6 +25,7 @@ StringType, StructType, TimestampType, + TimestamptzType ) from helpers.cluster import ClickHouseCluster, ClickHouseInstance, is_arm @@ -373,3 +375,50 @@ def record(key): assert 'aaa\naaa\naaa' == node.query(f"SELECT symbol FROM {CATALOG_NAME}.`{namespace}.{table_name}`").strip() assert 'bbb\nbbb\nbbb' == node.query(f"SELECT symbol FROM {CATALOG_NAME}.`{namespace}.{table_name_2}`").strip() + + +def test_timestamps(started_cluster): + node = started_cluster.instances["node1"] + + test_ref = f"test_list_tables_{uuid.uuid4()}" + table_name = f"{test_ref}_table" + root_namespace = f"{test_ref}_namespace" + + catalog = load_catalog_impl(started_cluster) + catalog.create_namespace(root_namespace) + + schema = Schema( + NestedField( + field_id=1, name="timestamp", field_type=TimestampType(), required=False + ), + NestedField( + field_id=2, + name="timestamptz", + field_type=TimestamptzType(), + required=False, + ), + ) + table = create_table(catalog, root_namespace, table_name, schema) + + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) + + data = [ + { + "timestamp": datetime(2024, 1, 1, hour=12, minute=0, second=0, microsecond=0), + "timestamptz": datetime( + 2024, + 1, + 1, + hour=12, + minute=0, + second=0, + microsecond=0, + tzinfo=pytz.timezone("UTC"), + ) + } + ] + df = pa.Table.from_pylist(data) + table.append(df) + + assert node.query(f"SHOW CREATE TABLE {CATALOG_NAME}.`{root_namespace}.{table_name}`") == f"CREATE TABLE {CATALOG_NAME}.`{root_namespace}.{table_name}`\\n(\\n `timestamp` Nullable(DateTime64(6)),\\n `timestamptz` Nullable(DateTime64(6, \\'UTC\\'))\\n)\\nENGINE = Iceberg(\\'http://minio:9000/warehouse-rest/data/\\', \\'minio\\', \\'[HIDDEN]\\')\n" + assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{root_namespace}.{table_name}`") == "2024-01-01 12:00:00.000000\t2024-01-01 12:00:00.000000\n"